11 # Create 3 thread-pools each with 1 thread
12 self.network_service = bitcoin.async_service(1)
13 self.disk_service = bitcoin.async_service(1)
14 self.mempool_service = bitcoin.async_service(1)
16 self.hosts = bitcoin.hosts(self.network_service)
17 self.handshake = bitcoin.handshake(self.network_service)
18 self.network = bitcoin.network(self.network_service)
19 self.protocol = bitcoin.protocol(self.network_service, self.hosts,
20 self.handshake, self.network)
22 db_prefix = "/home/genjix/libbitcoin/database"
23 self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix)
24 self.poller = bitcoin.poller(self.blockchain)
25 self.transaction_pool = \
26 bitcoin.transaction_pool(self.mempool_service, self.blockchain)
28 self.protocol.subscribe_channel(self.monitor_tx)
30 bitcoin.session(self.hosts, self.handshake, self.network,
31 self.protocol, self.blockchain, self.poller,
32 self.transaction_pool)
33 self.session.start(self.handle_start)
35 def handle_start(self, ec):
37 print "Error starting backend:", ec
40 self.session.stop(self.handle_stop)
42 def handle_stop(self, ec):
44 print "Error stopping backend:", ec
45 print "Stopped backend"
47 def monitor_tx(self, node):
48 # We will be notified here when connected to new bitcoin nodes
49 # Here we subscribe to new transactions from them which we
50 # add to the transaction_pool. That way we can track which
51 # transactions we are interested in.
52 node.subscribe_transaction(
53 bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node))
54 # Re-subscribe to next new node
55 self.protocol.subscribe_channel(self.monitor_tx)
57 def recv_tx(self, ec, tx, node):
59 print "Error with new transaction:", ec
61 tx_hash = bitcoin.hash_transaction(tx)
62 # If we want to ignore this transaction, we can set
63 # the 2 handlers to be null handlers that do nothing.
64 self.transaction_pool.store(tx,
65 bitcoin.bind(self.tx_confirmed, bitcoin._1, tx_hash),
66 bitcoin.bind(self.handle_mempool_store, bitcoin._1, tx_hash))
67 # Re-subscribe to new transactions from node
68 node.subscribe_transaction(
69 bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node))
71 def handle_mempool_store(self, ec, tx_hash):
73 print "Error storing memory pool transaction", tx_hash, ec
75 print "Accepted transaction", tx_hash
77 def tx_confirmed(self, ec, tx_hash):
79 print "Problem confirming transaction", tx_hash, ec
81 print "Confirmed", tx_hash
86 self.event = threading.Event()
97 class NumblocksSubscribe:
99 def __init__(self, backend):
100 self.backend = backend
101 self.lock = threading.Lock()
102 self.backend.blockchain.subscribe_reorganize(self.reorganize)
103 self.backend.blockchain.fetch_last_depth(self.set_last_depth)
104 self.latest = GhostValue()
107 def subscribe(self, session, request):
108 last = self.latest.get()
109 session.push_response({"id": request["id"], "result": last})
111 self.subscribed.append((session, request))
113 def set_last_depth(self, ec, last_depth):
115 print "Error retrieving last depth", ec
117 self.latest.set(last_depth)
119 def reorganize(self, ec, fork_point, arrivals, replaced):
120 latest = fork_point + len(arrivals)
121 self.latest.set(latest)
122 subscribed = self.spring_clean()
123 for session, request in subscribed:
124 session.push_response({"id": request["id"], "result": latest})
125 self.backend.blockchain.subscribe_reorganize(self.reorganize)
127 def spring_clean(self):
129 self.subscribed = [sub for sub in self.subscribed
130 if not sub[0].stopped()]
131 return self.subscribed[:]
133 class AddressGetHistory:
135 def __init__(self, backend):
136 self.backend = backend
138 def get(self, session, request):
139 address = str(request["params"])
140 composed.payment_history(self.backend.blockchain, address,
141 bitcoin.bind(self.respond, session, request, bitcoin._1))
143 def respond(self, session, request, result):
144 session.push_response({"id": request["id"], "result": result})
146 class LibbitcoinProcessor(stratum.Processor):
149 self.backend = Backend()
150 self.numblocks_subscribe = NumblocksSubscribe(self.backend)
151 self.address_get_history = AddressGetHistory(self.backend)
152 stratum.Processor.__init__(self)
157 def process(self, session):
158 request = session.pop_request()
159 print "New request (lib)", request
160 if request["method"] == "numblocks.subscribe":
161 self.numblocks_subscribe.subscribe(session, request)
162 elif request["method"] == "address.get_history":
163 self.address_get_history.get(session, request)
164 elif request["method"] == "server.banner":
165 session.push_response({"id": request["id"],
166 "result": "libbitcoin using python-bitcoin bindings"})
167 elif request["method"] == "transaction.broadcast":
168 self.broadcast_transaction(session, request)
169 # Execute and when ready, you call
170 # session.push_response(response)
172 def broadcast_transaction(self, session, request):
173 raw_tx = bitcoin.data_chunk(str(request["params"]))
174 exporter = bitcoin.satoshi_exporter()
176 tx = exporter.load_transaction(raw_tx)
178 response = {"id": request["id"], "result": None,
180 "Exception while parsing the transaction data.",
183 self.backend.protocol.broadcast_transaction(tx)
184 tx_hash = str(bitcoin.hash_transaction(tx))
185 response = {"id": request["id"], "result": tx_hash}
186 session.push_response(response)
189 print "Warning: pre-alpha prototype. Full of bugs."
190 processor = LibbitcoinProcessor()
191 stratum.start(processor)