From: genjix Date: Wed, 21 Mar 2012 22:28:55 +0000 (+0000) Subject: numblocks.subscribe responds with latest block. X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=commitdiff_plain;h=c7afa218d4302644ce883b776b943762529664e3 numblocks.subscribe responds with latest block. --- diff --git a/server-genjix.py b/server-genjix.py index dab3020..a6cfb1e 100644 --- a/server-genjix.py +++ b/server-genjix.py @@ -1,9 +1,127 @@ +import bitcoin import stratum +import threading +import time + +class Backend: + + def __init__(self): + self.network_service = bitcoin.async_service(1) + self.disk_service = bitcoin.async_service(1) + self.mempool_service = bitcoin.async_service(1) + + self.hosts = bitcoin.hosts(self.network_service) + self.handshake = bitcoin.handshake(self.network_service) + self.network = bitcoin.network(self.network_service) + self.protocol = bitcoin.protocol(self.network_service, self.hosts, + self.handshake, self.network) + + db_prefix = "/home/genjix/libbitcoin/database" + self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix) + self.poller = bitcoin.poller(self.blockchain) + self.transaction_pool = \ + bitcoin.transaction_pool(self.mempool_service, self.blockchain) + + self.protocol.subscribe_channel(self.monitor_tx) + self.session = \ + bitcoin.session(self.hosts, self.handshake, self.network, + self.protocol, self.blockchain, self.poller, + self.transaction_pool) + self.session.start(self.handle_start) + + def handle_start(self, ec): + if ec: + print "Error starting backend:", ec + + def stop(self): + self.session.stop(self.handle_stop) + + def handle_stop(self, ec): + if ec: + print "Error stopping backend:", ec + print "Stopped backend" + + def monitor_tx(self, node): + # We will be notified here when connected to new bitcoin nodes + # Here we subscribe to new transactions from them which we + # add to the transaction_pool. That way we can track which + # transactions we are interested in. + node.subscribe_transaction( + bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node)) + # Re-subscribe to next new node + self.protocol.subscribe_channel(self.monitor_tx) + + def recv_tx(self, ec, tx, node): + if ec: + print "Error with new transaction:", ec + return + tx_hash = bitcoin.hash_transaction(tx) + self.transaction_pool.store(tx, + bitcoin.bind(self.tx_confirmed, bitcoin._1, tx_hash), + bitcoin.bind(self.handle_mempool_store, bitcoin._1, tx_hash)) + # Re-subscribe to new transactions from node + node.subscribe_transaction( + bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node)) + + def handle_mempool_store(self, ec, tx_hash): + if ec: + print "Error storing memory pool transaction", tx_hash, ec + else: + print "Accepted transaction", tx_hash + + def tx_confirmed(self, ec, tx_hash): + if ec: + print "Problem confirming transaction", tx_hash, ec + else: + print "Confirmed", tx_hash + +class NumblocksSubscribe: + + def __init__(self, backend): + self.backend = backend + self.lock = threading.Lock() + self.backend.blockchain.subscribe_reorganize(self.reorganize) + self.backend.blockchain.fetch_last_depth(self.set_last_depth) + self.latest = None + + def subscribe(self, session, request): + last = self.get_last_depth() + session.push_response({"id": request["id"], "result": last}) + + def get_last_depth(self): + # Stall until last depth has been set... + # Should use condition variable here instead + while True: + with self.lock: + last = self.latest + if last is not None: + return last + time.sleep(0.1) + + def set_last_depth(self, ec, last_depth): + if ec: + print "Error retrieving last depth", ec + else: + with self.lock: + self.latest = last_depth + + def reorganize(self, ec, fork_point, arrivals, replaced): + pass class LibbitcoinProcessor(stratum.Processor): + def __init__(self): + self.backend = Backend() + self.numblocks_subscribe = NumblocksSubscribe(self.backend) + stratum.Processor.__init__(self) + + def stop(self): + self.backend.stop() + def process(self, session): request = session.pop_request() + if request["method"] == "numblocks.subscribe": + self.numblocks_subscribe.subscribe(session, request) print "New request (lib)", request # Execute and when ready, you call # session.push_response(response) diff --git a/stratum.py b/stratum.py index fab33f9..949fb9f 100644 --- a/stratum.py +++ b/stratum.py @@ -32,6 +32,10 @@ class Processor(threading.Thread): # to our internal register self.add_session(session) self.process(session) + self.stop() + + def stop(self): + pass def process(self, session): request = session.pop_request()