9 # Create 3 thread-pools each with 1 thread
10 self.network_service = bitcoin.async_service(1)
11 self.disk_service = bitcoin.async_service(1)
12 self.mempool_service = bitcoin.async_service(1)
14 self.hosts = bitcoin.hosts(self.network_service)
15 self.handshake = bitcoin.handshake(self.network_service)
16 self.network = bitcoin.network(self.network_service)
17 self.protocol = bitcoin.protocol(self.network_service, self.hosts,
18 self.handshake, self.network)
20 db_prefix = "/home/genjix/libbitcoin/database"
21 self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix)
22 self.poller = bitcoin.poller(self.blockchain)
23 self.transaction_pool = \
24 bitcoin.transaction_pool(self.mempool_service, self.blockchain)
26 self.protocol.subscribe_channel(self.monitor_tx)
28 bitcoin.session(self.hosts, self.handshake, self.network,
29 self.protocol, self.blockchain, self.poller,
30 self.transaction_pool)
31 self.session.start(self.handle_start)
33 def handle_start(self, ec):
35 print "Error starting backend:", ec
38 self.session.stop(self.handle_stop)
40 def handle_stop(self, ec):
42 print "Error stopping backend:", ec
43 print "Stopped backend"
45 def monitor_tx(self, node):
46 # We will be notified here when connected to new bitcoin nodes
47 # Here we subscribe to new transactions from them which we
48 # add to the transaction_pool. That way we can track which
49 # transactions we are interested in.
50 node.subscribe_transaction(
51 bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node))
52 # Re-subscribe to next new node
53 self.protocol.subscribe_channel(self.monitor_tx)
55 def recv_tx(self, ec, tx, node):
57 print "Error with new transaction:", ec
59 tx_hash = bitcoin.hash_transaction(tx)
60 # If we want to ignore this transaction, we can set
61 # the 2 handlers to be null handlers that do nothing.
62 self.transaction_pool.store(tx,
63 bitcoin.bind(self.tx_confirmed, bitcoin._1, tx_hash),
64 bitcoin.bind(self.handle_mempool_store, bitcoin._1, tx_hash))
65 # Re-subscribe to new transactions from node
66 node.subscribe_transaction(
67 bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node))
69 def handle_mempool_store(self, ec, tx_hash):
71 print "Error storing memory pool transaction", tx_hash, ec
73 print "Accepted transaction", tx_hash
75 def tx_confirmed(self, ec, tx_hash):
77 print "Problem confirming transaction", tx_hash, ec
79 print "Confirmed", tx_hash
84 self.event = threading.Event()
95 class NumblocksSubscribe:
97 def __init__(self, backend):
98 self.backend = backend
99 self.lock = threading.Lock()
100 self.backend.blockchain.subscribe_reorganize(self.reorganize)
101 self.backend.blockchain.fetch_last_depth(self.set_last_depth)
102 self.latest = GhostValue()
105 def subscribe(self, session, request):
106 last = self.latest.get()
107 session.push_response({"id": request["id"], "result": last})
109 self.subscribed.append((session, request))
111 def set_last_depth(self, ec, last_depth):
113 print "Error retrieving last depth", ec
115 self.latest.set(last_depth)
117 def reorganize(self, ec, fork_point, arrivals, replaced):
118 latest = fork_point + len(arrivals)
119 self.latest.set(latest)
120 subscribed = self.spring_clean()
121 for session, request in subscribed:
122 session.push_response({"id": request["id"], "result": latest})
124 def spring_clean(self):
126 self.subscribed = [sub for sub in self.subscribed
127 if not sub[0].stopped()]
128 return self.subscribed[:]
130 class LibbitcoinProcessor(stratum.Processor):
133 self.backend = Backend()
134 self.numblocks_subscribe = NumblocksSubscribe(self.backend)
135 stratum.Processor.__init__(self)
140 def process(self, session):
141 request = session.pop_request()
142 if request["method"] == "numblocks.subscribe":
143 self.numblocks_subscribe.subscribe(session, request)
144 print "New request (lib)", request
145 # Execute and when ready, you call
146 # session.push_response(response)
148 if __name__ == "__main__":
149 processor = LibbitcoinProcessor()
150 app = stratum.Stratum()