From 5c89878db307ef2c0687a801751eeec2997d6db6 Mon Sep 17 00:00:00 2001 From: genjix Date: Sat, 24 Mar 2012 21:19:05 +0000 Subject: [PATCH] reorganised and tidied files up. --- begin.py | 8 ++ composed.py | 181 ----------------------------------- modules/__init__.pyc | Bin 0 -> 106 bytes modules/python_bitcoin/__init__.py | 170 ++++++++++++++++++++++++++++++++ modules/python_bitcoin/__init__.pyc | Bin 0 -> 8060 bytes modules/python_bitcoin/composed.py | 181 +++++++++++++++++++++++++++++++++++ server-genjix.py | 170 -------------------------------- stratum.py | 2 +- 8 files changed, 360 insertions(+), 352 deletions(-) create mode 100644 begin.py delete mode 100644 composed.py create mode 100644 modules/__init__.py create mode 100644 modules/__init__.pyc create mode 100644 modules/python_bitcoin/__init__.py create mode 100644 modules/python_bitcoin/__init__.pyc create mode 100644 modules/python_bitcoin/composed.py delete mode 100644 server-genjix.py diff --git a/begin.py b/begin.py new file mode 100644 index 0000000..04eeef7 --- /dev/null +++ b/begin.py @@ -0,0 +1,8 @@ +import sys +import stratum + +if __name__ == "__main__": + backend = __import__("modules." + sys.argv[1], fromlist=["run"]) + stratum_frontend = stratum.Stratum() + backend.run(stratum_frontend) + diff --git a/composed.py b/composed.py deleted file mode 100644 index 4ba18d0..0000000 --- a/composed.py +++ /dev/null @@ -1,181 +0,0 @@ -import bitcoin -import threading -import time - -class ExpiryQueue(threading.Thread): - - def __init__(self): - self.lock = threading.Lock() - self.items = [] - threading.Thread.__init__(self) - self.daemon = True - - def run(self): - # Garbage collection - while True: - with self.lock: - self.items = [i for i in self.items if not i.stopped()] - time.sleep(0.1) - - def add(self, item): - with self.lock: - self.items.append(item) - -expiry_queue = ExpiryQueue() - -class StatementLine: - - def __init__(self, output_point): - self.lock = threading.Lock() - self.output_point = output_point - self.output_loaded = None - self.input_point = None - self.input_loaded = None - - def is_loaded(self): - with self.lock: - if self.output_loaded is None: - return False - elif (self.input_point is not False and - self.input_loaded is None): - return False - return True - -class PaymentHistory: - - def __init__(self, chain): - self.chain = chain - self.lock = threading.Lock() - self.statement = [] - self._stopped = False - - def run(self, address, handle_finish): - self.address = address - self.handle_finish = handle_finish - - pubkey_hash = bitcoin.address_to_short_hash(address) - self.chain.fetch_outputs(pubkey_hash, self.start_loading) - - def start_loading(self, ec, output_points): - with self.lock: - for outpoint in output_points: - statement_line = StatementLine(outpoint) - self.statement.append(statement_line) - self.chain.fetch_spend(outpoint, - bitcoin.bind(self.load_spend, - bitcoin._1, bitcoin._2, statement_line)) - self.load_tx_info(outpoint, statement_line, False) - - def load_spend(self, ec, inpoint, statement_line): - with statement_line.lock: - if ec: - statement_line.input_point = False - else: - statement_line.input_point = inpoint - self.finish_if_done() - self.load_tx_info(inpoint, statement_line, True) - - def finish_if_done(self): - with self.lock: - if any(not line.is_loaded() for line in self.statement): - return - result = [] - for line in self.statement: - line.input_loaded["value"] = -line.output_loaded["value"] - result.append(line.input_loaded) - result.append(line.output_loaded) - self.handle_finish(result) - self.stop() - - def stop(self): - with self.lock: - self._stopped = True - - def stopped(self): - with self.lock: - return self._stopped - - def load_tx_info(self, point, statement_line, is_input): - info = {} - info["tx_hash"] = str(point.hash) - info["pos"] = point.index - info["is_in"] = 1 if is_input else 0 - self.chain.fetch_transaction_index(point.hash, - bitcoin.bind(self.tx_index, bitcoin._1, bitcoin._2, bitcoin._3, - statement_line, info)) - - def tx_index(self, ec, block_depth, offset, statement_line, info): - info["height"] = block_depth - self.chain.fetch_block_header_by_depth(block_depth, - bitcoin.bind(self.block_header, bitcoin._1, bitcoin._2, - statement_line, info)) - - def block_header(self, ec, blk_head, statement_line, info): - info["time"] = blk_head.timestamp - info["blk_hash"] = str(bitcoin.hash_block_header(blk_head)) - tx_hash = bitcoin.hash_digest(info["tx_hash"]) - self.chain.fetch_transaction(tx_hash, - bitcoin.bind(self.load_tx, bitcoin._1, bitcoin._2, - statement_line, info)) - - def load_tx(self, ec, tx, statement_line, info): - outputs = [] - for tx_out in tx.outputs: - script = tx_out.output_script - if script.type() == bitcoin.payment_type.pubkey_hash: - pkh = bitcoin.short_hash(str(script.operations()[2].data)) - outputs.append(bitcoin.public_key_hash_to_address(pkh)) - else: - outputs.append("Unknown") - info["outputs"] = outputs - info["inputs"] = [None for i in range(len(tx.inputs))] - if info["is_in"] == 1: - info["inputs"][info["pos"]] = self.address - else: - info["value"] = tx.outputs[info["pos"]].value - if not [empty_in for empty_in in info["inputs"] if empty_in is None]: - # We have the sole input - assert(info["is_in"] == 1) - with statement_line.lock: - statement_line.input_loaded = info - self.finish_if_done() - for tx_idx, tx_in in enumerate(tx.inputs): - if info["is_in"] == 1 and info["pos"] == tx_idx: - continue - self.chain.fetch_transaction(tx_in.previous_output.hash, - bitcoin.bind(self.load_input, bitcoin._1, bitcoin._2, - tx_in.previous_output.index, statement_line, info, tx_idx)) - - def load_input(self, ec, tx, index, statement_line, info, inputs_index): - script = tx.outputs[index].output_script - if script.type() == bitcoin.payment_type.pubkey_hash: - pkh = bitcoin.short_hash(str(script.operations()[2].data)) - info["inputs"][inputs_index] = \ - bitcoin.public_key_hash_to_address(pkh) - else: - info["inputs"][inputs_index] = "Unknown" - if not [empty_in for empty_in in info["inputs"] if empty_in is None]: - with statement_line.lock: - if info["is_in"] == 1: - statement_line.input_loaded = info - else: - statement_line.output_loaded = info - self.finish_if_done() - -def payment_history(chain, address, handle_finish): - ph = PaymentHistory(chain) - expiry_queue.add(ph) - ph.run(address, handle_finish) - -if __name__ == "__main__": - def finish(result): - print result - - service = bitcoin.async_service(1) - prefix = "/home/genjix/libbitcoin/database" - chain = bitcoin.bdb_blockchain(service, prefix) - address = "1FpES68UNcxnXeoaFciqvUSGiKGZ33gbfQ" - print "Looking up", address - payment_history(chain, address, finish) - raw_input() - diff --git a/modules/__init__.py b/modules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/modules/__init__.pyc b/modules/__init__.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f6cbf88c89e298a753820d181c36a5043d68197f GIT binary patch literal 106 zcmZSn%**9xp68#;00oRd+5w1*S%5?e14FO|NW@PANHCxg#lk?b-29Z%oYZ3d`1s7c Y%#!$cy@JXT4xqRVM8XcFyBLTW00fT_9smFU literal 0 HcmV?d00001 diff --git a/modules/python_bitcoin/__init__.py b/modules/python_bitcoin/__init__.py new file mode 100644 index 0000000..fd3afd8 --- /dev/null +++ b/modules/python_bitcoin/__init__.py @@ -0,0 +1,170 @@ +import bitcoin +import stratum +import threading +import time + +import composed + +class Backend: + + def __init__(self): + # Create 3 thread-pools each with 1 thread + self.network_service = bitcoin.async_service(1) + self.disk_service = bitcoin.async_service(1) + self.mempool_service = bitcoin.async_service(1) + + self.hosts = bitcoin.hosts(self.network_service) + self.handshake = bitcoin.handshake(self.network_service) + self.network = bitcoin.network(self.network_service) + self.protocol = bitcoin.protocol(self.network_service, self.hosts, + self.handshake, self.network) + + db_prefix = "/home/genjix/libbitcoin/database" + self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix) + self.poller = bitcoin.poller(self.blockchain) + self.transaction_pool = \ + bitcoin.transaction_pool(self.mempool_service, self.blockchain) + + self.protocol.subscribe_channel(self.monitor_tx) + self.session = \ + bitcoin.session(self.hosts, self.handshake, self.network, + self.protocol, self.blockchain, self.poller, + self.transaction_pool) + self.session.start(self.handle_start) + + def handle_start(self, ec): + if ec: + print "Error starting backend:", ec + + def stop(self): + self.session.stop(self.handle_stop) + + def handle_stop(self, ec): + if ec: + print "Error stopping backend:", ec + print "Stopped backend" + + def monitor_tx(self, node): + # We will be notified here when connected to new bitcoin nodes + # Here we subscribe to new transactions from them which we + # add to the transaction_pool. That way we can track which + # transactions we are interested in. + node.subscribe_transaction( + bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node)) + # Re-subscribe to next new node + self.protocol.subscribe_channel(self.monitor_tx) + + def recv_tx(self, ec, tx, node): + if ec: + print "Error with new transaction:", ec + return + tx_hash = bitcoin.hash_transaction(tx) + # If we want to ignore this transaction, we can set + # the 2 handlers to be null handlers that do nothing. + self.transaction_pool.store(tx, + bitcoin.bind(self.tx_confirmed, bitcoin._1, tx_hash), + bitcoin.bind(self.handle_mempool_store, bitcoin._1, tx_hash)) + # Re-subscribe to new transactions from node + node.subscribe_transaction( + bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node)) + + def handle_mempool_store(self, ec, tx_hash): + if ec: + print "Error storing memory pool transaction", tx_hash, ec + else: + print "Accepted transaction", tx_hash + + def tx_confirmed(self, ec, tx_hash): + if ec: + print "Problem confirming transaction", tx_hash, ec + else: + print "Confirmed", tx_hash + +class GhostValue: + + def __init__(self): + self.event = threading.Event() + self.value = None + + def get(self): + self.event.wait() + return self.value + + def set(self, value): + self.value = value + self.event.set() + +class NumblocksSubscribe: + + def __init__(self, backend): + self.backend = backend + self.lock = threading.Lock() + self.backend.blockchain.subscribe_reorganize(self.reorganize) + self.backend.blockchain.fetch_last_depth(self.set_last_depth) + self.latest = GhostValue() + self.subscribed = [] + + def subscribe(self, session, request): + last = self.latest.get() + session.push_response({"id": request["id"], "result": last}) + with self.lock: + self.subscribed.append((session, request)) + + def set_last_depth(self, ec, last_depth): + if ec: + print "Error retrieving last depth", ec + else: + self.latest.set(last_depth) + + def reorganize(self, ec, fork_point, arrivals, replaced): + latest = fork_point + len(arrivals) + self.latest.set(latest) + subscribed = self.spring_clean() + for session, request in subscribed: + session.push_response({"id": request["id"], "result": latest}) + + def spring_clean(self): + with self.lock: + self.subscribed = [sub for sub in self.subscribed + if not sub[0].stopped()] + return self.subscribed[:] + +class AddressGetHistory: + + def __init__(self, backend): + self.backend = backend + + def get(self, session, request): + address = str(request["params"]) + composed.payment_history(self.backend.blockchain, address, + bitcoin.bind(self.respond, session, request, bitcoin._1)) + + def respond(self, session, request, result): + session.push_response({"id": request["id"], "result": result}) + +class LibbitcoinProcessor(stratum.Processor): + + def __init__(self): + self.backend = Backend() + self.numblocks_subscribe = NumblocksSubscribe(self.backend) + self.address_get_history = AddressGetHistory(self.backend) + stratum.Processor.__init__(self) + + def stop(self): + self.backend.stop() + + def process(self, session): + request = session.pop_request() + print "New request (lib)", request + if request["method"] == "numblocks.subscribe": + self.numblocks_subscribe.subscribe(session, request) + elif request["method"] == "address.get_history": + self.address_get_history.get(session, request) + # Execute and when ready, you call + # session.push_response(response) + +def run(stratum): + print "Warning: pre-alpha prototype. Full of bugs." + processor = LibbitcoinProcessor() + stratum.start(processor) + diff --git a/modules/python_bitcoin/__init__.pyc b/modules/python_bitcoin/__init__.pyc new file mode 100644 index 0000000000000000000000000000000000000000..ecdac5c61cdc39b740da3b3728e5c0ec785d15e0 GIT binary patch literal 8060 zcmcIp?Q;{y8Q(j}vSkc*a3D}_68(p#A-xJ>AJbhM8KGyzAX(_x7ImmsRWEN0%>N%|A|5 z^=aY%FVO72(53hb)JUnVdK{>&U?c!B|dK#$5N=4ywkk)gMwm=rRe>a>yxFEpJL10IK ziwA*=0$i$qJj5U6pIuZ>+gyr3L8zVtvbigpTk0|J3}tm$;70_$JjLON09OPkhwNg> z@AtqZwR==nkMWU!#IMTyAO^(AfY|QmNn}m=AWMu}1td4_ zVNu?zVJm>8nQZ{vG&bW&QH<(M3xEv^>#XZ?6z8cO#`lc8t~$_B6((ijili91W0-8G z+tK!@NbV)WILqBKLCv(o?Zs$hO1FlgE92b8iOY&S;;S8X-0p4Lq|CNWgd_6YjO37U zk!P+bBlk$&Z;iEh$`MEF;?g;iL>xyZ5e^g4qa7gus7*?U(IG7DPd;G(-_3qWC!mAPS;z=*4aA!uethhl?IQ3hu8h-i9LUs9o<_ zml_A;MMi^nC(t#!MaYZP z%!xZZeFM#NXQ30E3lqMA-hr`FxbqhbDDw5j$S7(bzaC2%dg6V4sK7Qrxa1*qYy$Iq zIP-8Hi4b=Lj-}vni=MxEvMXaB#t;iyQ~ryVky`^xUG&ENFmuCRZXWh#n0>WRWAQAf z5e;KIjAqs~J$~f}i_++Ca7t(B9z{u!?_}lJr1~rYucE80uJ-ltsYaj1f+iaJo9Oyv zo5)EtScK^w>DK`jS>s?LF(=vawLd|#^flD%ZqN=^TPtvyXW=>%S|S^DPULtV1IqEF zmm^FTGdRpH!~HFY?SKh?2fL@9vnXl7AiXH}dyF@;e75AywIneU2aBFzu&0wKG(6GY z#&k{$IESNdq1C3n79N&pit3ZH*dCd2uhIchK0}nm!|PM^_s+Bp zfdZc!fUJO%u%?BK&==52EO{7bPG7`q-k3VR=dN*I;9rI8(I$G*2Me-vobV*%xW`&z zD6yVlzo1McJ>HyJ|r)>81WYPPxMuP?!pS(L3fhG0{wJyoE6C$<}!q0?*n+ z6Q2Y_Xm{I={&}!BmTcZ`PLuQmQ2Eqm@Iy>rY6j2OpD+ySYMmU{av=+9x%k8eLZO%t zq`h*rn+lLsSp%I-Z(udmbIaKmkgs(Y+aNhx(uaGg4WD{G#T!g$cc`nj;5ZyQLKk)^_e(%8L`>ysUtsez`BOJzIR zL-K>=Ns(Klf55e?T;tXlsd%3DiMJ2b(w?0vq25 zmYc+-R%9A36PMX4&tCwD+XrvRIAIB&nN18!C|RtCN+LFrnEp)8~KDmK_X0jYE%oH&CxV@wX$sC|RiM4=h5JI_l9o z>TyTixN|}M4i`3y>Itq+7MXfeyKsvWD6d_(W2?05{yT~aRNa^8ckx2~9y%W$sZb-6 zd)*;xBpYY@z znq}9V*n0&>Jw&qb`%vA-0AABq>4AEEbH)Zm;W8XrsZnEE^a0k_xs4l=kP6+O6WKTLjz97}q))+k z)?h+?zFE^~W}@9SPN{^NtkQh2!9hlca*3C5UW_LEN8DTLcJN)4d#P(_3eRJ28FxDa zL(2UG8|71nzVn#AH1tg~K(c(=M5Zj{>Od#j&4UL%9TLCK?cTS|ISl*>2T$)xS{S#{ zC=0>sCI+-+@8@C9@cy$G46Km>S>S~NbrL~WJUt<}S&-6r#;u97p2TH5wqk)^ms}|} zmK3;yv?i5X%SpUHMp_#UJ-;=Hmhdapu&;EfBt9VcBKH~pFUBcBAok*GwB=BW!ie-| zw8{^z;bKp29LP8}VcdBPzzU?lUmGSx%;flB#)(nv-b3-ICYT8^-p>I(hYa&$oWg67 zO7vf!Uvgu^xx?GpuW;U%d~pXI>e$zC5n$h;-qY;5owL29YNwcIb& zau;|d@=G%!_sI&&iAga*Rx77!jk&(YZj&7y%}i#;oOX^0hiDyszoDu914mb-zzT|i zHIxEtFD^OdK87X1JYB-4IPb4Sp>heGZY!A@Zf(48U;}?5x$P~OsgBQ`#aL&8jJ<%J zpT%X4l<#T}zxrH^N0VWU5q_(3`xA4i_shM}s8{Utw)gJZOFp$B4w_>2(_dpGwpekf zC>w$g6Fns2m3#R=aefs*7Fgb6`YcQPtj&D5BQ{M<>awxXyi=pTD9vZ}Wp;Ejn)fnV rsM*5knS&37`V!82&)?DgjN*|yo)u)N_=YRt`UmUB*3YjWJ$>T8-mz)I literal 0 HcmV?d00001 diff --git a/modules/python_bitcoin/composed.py b/modules/python_bitcoin/composed.py new file mode 100644 index 0000000..5f17d70 --- /dev/null +++ b/modules/python_bitcoin/composed.py @@ -0,0 +1,181 @@ +import bitcoin +import threading +import time + +class ExpiryQueue(threading.Thread): + + def __init__(self): + self.lock = threading.Lock() + self.items = [] + threading.Thread.__init__(self) + self.daemon = True + + def run(self): + # Garbage collection + while True: + with self.lock: + self.items = [i for i in self.items if not i.stopped()] + time.sleep(0.1) + + def add(self, item): + with self.lock: + self.items.append(item) + +expiry_queue = ExpiryQueue() + +class StatementLine: + + def __init__(self, output_point): + self.lock = threading.Lock() + self.output_point = output_point + self.output_loaded = None + self.input_point = None + self.input_loaded = None + + def is_loaded(self): + with self.lock: + if self.output_loaded is None: + return False + elif (self.input_point is not False and + self.input_loaded is None): + return False + return True + +class PaymentHistory: + + def __init__(self, chain): + self.chain = chain + self.lock = threading.Lock() + self.statement = [] + self._stopped = False + + def run(self, address, handle_finish): + self.address = address + self.handle_finish = handle_finish + + pubkey_hash = bitcoin.address_to_short_hash(address) + self.chain.fetch_outputs(pubkey_hash, self.start_loading) + + def start_loading(self, ec, output_points): + with self.lock: + for outpoint in output_points: + statement_line = StatementLine(outpoint) + self.statement.append(statement_line) + self.chain.fetch_spend(outpoint, + bitcoin.bind(self.load_spend, + bitcoin._1, bitcoin._2, statement_line)) + self.load_tx_info(outpoint, statement_line, False) + + def load_spend(self, ec, inpoint, statement_line): + with statement_line.lock: + if ec: + statement_line.input_point = False + else: + statement_line.input_point = inpoint + self.finish_if_done() + self.load_tx_info(inpoint, statement_line, True) + + def finish_if_done(self): + with self.lock: + if any(not line.is_loaded() for line in self.statement): + return + result = [] + for line in self.statement: + line.input_loaded["value"] = -line.output_loaded["value"] + result.append(line.input_loaded) + result.append(line.output_loaded) + self.handle_finish(result) + self.stop() + + def stop(self): + with self.lock: + self._stopped = True + + def stopped(self): + with self.lock: + return self._stopped + + def load_tx_info(self, point, statement_line, is_input): + info = {} + info["tx_hash"] = str(point.hash) + info["pos"] = point.index + info["is_in"] = 1 if is_input else 0 + self.chain.fetch_transaction_index(point.hash, + bitcoin.bind(self.tx_index, bitcoin._1, bitcoin._2, bitcoin._3, + statement_line, info)) + + def tx_index(self, ec, block_depth, offset, statement_line, info): + info["height"] = block_depth + self.chain.fetch_block_header_by_depth(block_depth, + bitcoin.bind(self.block_header, bitcoin._1, bitcoin._2, + statement_line, info)) + + def block_header(self, ec, blk_head, statement_line, info): + info["time"] = blk_head.timestamp + info["blk_hash"] = str(bitcoin.hash_block_header(blk_head)) + tx_hash = bitcoin.hash_digest(info["tx_hash"]) + self.chain.fetch_transaction(tx_hash, + bitcoin.bind(self.load_tx, bitcoin._1, bitcoin._2, + statement_line, info)) + + def load_tx(self, ec, tx, statement_line, info): + outputs = [] + for tx_out in tx.outputs: + script = tx_out.output_script + if script.type() == bitcoin.payment_type.pubkey_hash: + pkh = bitcoin.short_hash(str(script.operations()[2].data)) + outputs.append(bitcoin.public_key_hash_to_address(pkh)) + else: + outputs.append("Unknown") + info["outputs"] = outputs + info["inputs"] = [None for i in range(len(tx.inputs))] + if info["is_in"] == 1: + info["inputs"][info["pos"]] = self.address + else: + info["value"] = tx.outputs[info["pos"]].value + if not [empty_in for empty_in in info["inputs"] if empty_in is None]: + # We have the sole input + assert(info["is_in"] == 1) + with statement_line.lock: + statement_line.input_loaded = info + self.finish_if_done() + for tx_idx, tx_in in enumerate(tx.inputs): + if info["is_in"] == 1 and info["pos"] == tx_idx: + continue + self.chain.fetch_transaction(tx_in.previous_output.hash, + bitcoin.bind(self.load_input, bitcoin._1, bitcoin._2, + tx_in.previous_output.index, statement_line, info, tx_idx)) + + def load_input(self, ec, tx, index, statement_line, info, inputs_index): + script = tx.outputs[index].output_script + if script.type() == bitcoin.payment_type.pubkey_hash: + pkh = bitcoin.short_hash(str(script.operations()[2].data)) + info["inputs"][inputs_index] = \ + bitcoin.public_key_hash_to_address(pkh) + else: + info["inputs"][inputs_index] = "Unknown" + if not [empty_in for empty_in in info["inputs"] if empty_in is None]: + with statement_line.lock: + if info["is_in"] == 1: + statement_line.input_loaded = info + else: + statement_line.output_loaded = info + self.finish_if_done() + +def payment_history(chain, address, handle_finish): + ph = PaymentHistory(chain) + expiry_queue.add(ph) + ph.run(address, handle_finish) + +if __name__ == "__main__": + def finish(result): + print result + + service = bitcoin.async_service(1) + prefix = "/home/genjix/libbitcoin/database" + chain = bitcoin.bdb_blockchain(service, prefix) + address = "1LzBzVqEeuQyjD2mRWHes3dgWrT9titxvq" + print "Looking up", address + payment_history(chain, address, finish) + raw_input() + diff --git a/server-genjix.py b/server-genjix.py deleted file mode 100644 index 6df53f6..0000000 --- a/server-genjix.py +++ /dev/null @@ -1,170 +0,0 @@ -import bitcoin -import stratum -import threading -import time - -import composed - -class Backend: - - def __init__(self): - # Create 3 thread-pools each with 1 thread - self.network_service = bitcoin.async_service(1) - self.disk_service = bitcoin.async_service(1) - self.mempool_service = bitcoin.async_service(1) - - self.hosts = bitcoin.hosts(self.network_service) - self.handshake = bitcoin.handshake(self.network_service) - self.network = bitcoin.network(self.network_service) - self.protocol = bitcoin.protocol(self.network_service, self.hosts, - self.handshake, self.network) - - db_prefix = "/home/genjix/libbitcoin/database" - self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix) - self.poller = bitcoin.poller(self.blockchain) - self.transaction_pool = \ - bitcoin.transaction_pool(self.mempool_service, self.blockchain) - - self.protocol.subscribe_channel(self.monitor_tx) - self.session = \ - bitcoin.session(self.hosts, self.handshake, self.network, - self.protocol, self.blockchain, self.poller, - self.transaction_pool) - self.session.start(self.handle_start) - - def handle_start(self, ec): - if ec: - print "Error starting backend:", ec - - def stop(self): - self.session.stop(self.handle_stop) - - def handle_stop(self, ec): - if ec: - print "Error stopping backend:", ec - print "Stopped backend" - - def monitor_tx(self, node): - # We will be notified here when connected to new bitcoin nodes - # Here we subscribe to new transactions from them which we - # add to the transaction_pool. That way we can track which - # transactions we are interested in. - node.subscribe_transaction( - bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node)) - # Re-subscribe to next new node - self.protocol.subscribe_channel(self.monitor_tx) - - def recv_tx(self, ec, tx, node): - if ec: - print "Error with new transaction:", ec - return - tx_hash = bitcoin.hash_transaction(tx) - # If we want to ignore this transaction, we can set - # the 2 handlers to be null handlers that do nothing. - self.transaction_pool.store(tx, - bitcoin.bind(self.tx_confirmed, bitcoin._1, tx_hash), - bitcoin.bind(self.handle_mempool_store, bitcoin._1, tx_hash)) - # Re-subscribe to new transactions from node - node.subscribe_transaction( - bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node)) - - def handle_mempool_store(self, ec, tx_hash): - if ec: - print "Error storing memory pool transaction", tx_hash, ec - else: - print "Accepted transaction", tx_hash - - def tx_confirmed(self, ec, tx_hash): - if ec: - print "Problem confirming transaction", tx_hash, ec - else: - print "Confirmed", tx_hash - -class GhostValue: - - def __init__(self): - self.event = threading.Event() - self.value = None - - def get(self): - self.event.wait() - return self.value - - def set(self, value): - self.value = value - self.event.set() - -class NumblocksSubscribe: - - def __init__(self, backend): - self.backend = backend - self.lock = threading.Lock() - self.backend.blockchain.subscribe_reorganize(self.reorganize) - self.backend.blockchain.fetch_last_depth(self.set_last_depth) - self.latest = GhostValue() - self.subscribed = [] - - def subscribe(self, session, request): - last = self.latest.get() - session.push_response({"id": request["id"], "result": last}) - with self.lock: - self.subscribed.append((session, request)) - - def set_last_depth(self, ec, last_depth): - if ec: - print "Error retrieving last depth", ec - else: - self.latest.set(last_depth) - - def reorganize(self, ec, fork_point, arrivals, replaced): - latest = fork_point + len(arrivals) - self.latest.set(latest) - subscribed = self.spring_clean() - for session, request in subscribed: - session.push_response({"id": request["id"], "result": latest}) - - def spring_clean(self): - with self.lock: - self.subscribed = [sub for sub in self.subscribed - if not sub[0].stopped()] - return self.subscribed[:] - -class AddressGetHistory: - - def __init__(self, backend): - self.backend = backend - - def get(self, session, request): - address = str(request["params"]) - composed.payment_history(self.backend.blockchain, address, - bitcoin.bind(self.respond, session, request, bitcoin._1)) - - def respond(self, session, request, result): - session.push_response({"id": request["id"], "result": result}) - -class LibbitcoinProcessor(stratum.Processor): - - def __init__(self): - self.backend = Backend() - self.numblocks_subscribe = NumblocksSubscribe(self.backend) - self.address_get_history = AddressGetHistory(self.backend) - stratum.Processor.__init__(self) - - def stop(self): - self.backend.stop() - - def process(self, session): - request = session.pop_request() - print "New request (lib)", request - if request["method"] == "numblocks.subscribe": - self.numblocks_subscribe.subscribe(session, request) - elif request["method"] == "address.get_history": - self.address_get_history.get(session, request) - # Execute and when ready, you call - # session.push_response(response) - -if __name__ == "__main__": - processor = LibbitcoinProcessor() - app = stratum.Stratum() - app.start(processor) - diff --git a/stratum.py b/stratum.py index 949fb9f..6643815 100644 --- a/stratum.py +++ b/stratum.py @@ -178,7 +178,7 @@ class TcpServer(threading.Thread): print "TCP server started." sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(("localhost", 50001)) + sock.bind(("176.31.24.241", 50001)) sock.listen(1) while not self.shared.stopped(): session = Session(*sock.accept()) -- 1.7.1