5 from bitcoin import bind, _1, _2, _3
7 from processor import Processor
8 import history1 as history
15 self.lock = threading.Lock()
18 def store(self, address, result):
20 self.cache[address] = result
22 def fetch(self, address):
25 return self.cache[address]
29 def clear(self, addresses):
31 for address in addresses:
32 if address in self.cache:
33 del self.cache[address]
38 def __init__(self, processor, cache, backend):
39 self.processor = processor
41 self.backend = backend
42 self.lock = threading.Lock()
43 # key is hash:index, value is address
44 self.monitor_output = {}
46 self.monitor_address = set()
48 backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed)
50 def monitor(self, address, result):
52 if "raw_output_script" not in info:
54 assert info["is_input"] == 0
55 tx_hash = info["tx_hash"]
56 output_index = info["index"]
57 outpoint = "%s:%s" % (tx_hash, output_index)
59 self.monitor_output[outpoint] = address
61 self.monitor_address.add(address)
64 tx_hash = bitcoin.hash_transaction(tx)
66 for input in tx.inputs:
67 prevout = input.previous_output
68 prevout = "%s:%s" % (prevout.hash, prevout.index)
69 previous_outputs.append(prevout)
71 for output_index, output in enumerate(tx.outputs):
72 address = bitcoin.payment_address()
73 if address.extract(output.output_script):
74 addrs.append((output_index, str(address)))
75 return tx_hash, previous_outputs, addrs
77 def effect_notify(self, tx, delete_outs):
78 affected_addrs = set()
79 tx_hash, previous_outputs, addrs = self.unpack(tx)
80 for prevout in previous_outputs:
83 affected_addrs.add(self.monitor_output[prevout])
85 del self.monitor_output[prevout]
88 for idx, address in addrs:
90 if address in self.monitor_address:
91 affected_addrs.add(address)
92 self.cache.clear(affected_addrs)
93 self.notify(affected_addrs)
94 # Used in confirmed txs
95 return tx_hash, addrs, affected_addrs
97 def tx_stored(self, tx):
98 self.effect_notify(tx, False)
100 def tx_confirmed(self, tx):
101 tx_hash, addrs, affected_addrs = self.effect_notify(tx, True)
102 # add new outputs to monitor
103 for idx, address in addrs:
104 outpoint = "%s:%s" % (tx_hash, idx)
105 if address in affected_addrs:
107 self.monitor_output[outpoint] = address
109 def notify(self, affected_addrs):
110 service = self.backend.mempool_service
111 chain = self.backend.blockchain
112 txpool = self.backend.transaction_pool
113 memory_buff = self.backend.memory_buffer
114 for address in affected_addrs:
115 response = {"id": None,
116 "method": "blockchain.address.subscribe",
117 "params": [str(address)]}
118 history.payment_history(service, chain, txpool, memory_buff, address,
119 bind(self.send_notify, _1, _2, response))
121 def mempool_n(self, result):
122 assert result is not None
126 # Order by time, grab last item (latest)
127 last_info = sorted(result, key=lambda k: k['timestamp'])[-1]
128 if last_info["block_hash"] == "mempool":
129 last_id = "mempool:%s" % len(result)
131 last_id = last_info["block_hash"]
134 def send_notify(self, ec, result, response):
136 print "Error: Monitor.send_notify()", ec
138 assert len(response["params"]) == 1
139 response["params"].append(self.mempool_n(result))
140 self.processor.push_response(response)
146 # Create 3 thread-pools each with 1 thread
147 self.network_service = bitcoin.async_service(1)
148 self.disk_service = bitcoin.async_service(1)
149 self.mempool_service = bitcoin.async_service(1)
151 self.hosts = bitcoin.hosts(self.network_service)
152 self.handshake = bitcoin.handshake(self.network_service)
153 self.network = bitcoin.network(self.network_service)
154 self.protocol = bitcoin.protocol(self.network_service, self.hosts,
155 self.handshake, self.network)
157 db_prefix = "/home/genjix/libbitcoin/database"
158 self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix,
159 self.blockchain_started)
160 self.poller = bitcoin.poller(self.mempool_service, self.blockchain)
161 self.transaction_pool = \
162 bitcoin.transaction_pool(self.mempool_service, self.blockchain)
164 self.protocol.subscribe_channel(self.monitor_tx)
166 bitcoin.session(self.network_service, self.hosts, self.handshake,
167 self.network, self.protocol, self.blockchain,
168 self.poller, self.transaction_pool)
169 self.session.start(self.handle_start)
171 self.memory_buffer = \
172 membuf.memory_buffer(self.mempool_service.internal_ptr,
173 self.blockchain.internal_ptr,
174 self.transaction_pool.internal_ptr)
176 def handle_start(self, ec):
178 print "Error starting backend:", ec
180 def blockchain_started(self, ec, chain):
181 print "Blockchain initialisation:", ec
184 self.session.stop(self.handle_stop)
186 def handle_stop(self, ec):
188 print "Error stopping backend:", ec
189 print "Stopped backend"
191 def monitor_tx(self, node):
192 # We will be notified here when connected to new bitcoin nodes
193 # Here we subscribe to new transactions from them which we
194 # add to the transaction_pool. That way we can track which
195 # transactions we are interested in.
196 node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
197 # Re-subscribe to next new node
198 self.protocol.subscribe_channel(self.monitor_tx)
200 def recv_tx(self, ec, tx, node):
202 print "Error with new transaction:", ec
204 tx_hash = bitcoin.hash_transaction(tx)
205 self.memory_buffer.receive(tx, bind(self.store_tx, _1, tx_hash))
206 # Re-subscribe to new transactions from node
207 node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
209 def store_tx(self, ec, tx_hash):
211 print "Error storing memory pool transaction", tx_hash, ec
213 print "Accepted transaction", tx_hash
219 self.event = threading.Event()
226 def set(self, value):
231 class NumblocksSubscribe:
233 def __init__(self, backend, processor):
234 self.backend = backend
235 self.processor = processor
236 self.lock = threading.Lock()
237 self.backend.blockchain.subscribe_reorganize(self.reorganize)
238 self.backend.blockchain.fetch_last_depth(self.set_last_depth)
239 self.latest = GhostValue()
241 def set_last_depth(self, ec, last_depth):
243 print "Error retrieving last depth", ec
245 self.latest.set(last_depth)
247 def reorganize(self, ec, fork_point, arrivals, replaced):
248 latest = fork_point + len(arrivals)
249 self.latest.set(latest)
250 response = {"id": None, "method": "blockchain.numblocks.subscribe",
252 self.processor.push_response(response)
253 self.backend.blockchain.subscribe_reorganize(self.reorganize)
255 def subscribe(self, request):
256 latest = self.latest.get()
257 response = {"id": request["id"],
260 self.processor.push_response(response)
263 class AddressGetHistory:
265 def __init__(self, backend, processor):
266 self.backend = backend
267 self.processor = processor
269 def get(self, request):
270 address = str(request["params"][0])
271 service = self.backend.mempool_service
272 chain = self.backend.blockchain
273 txpool = self.backend.transaction_pool
274 memory_buff = self.backend.memory_buffer
275 history.payment_history(service, chain, txpool, memory_buff, address,
276 bind(self.respond, _1, _2, request))
278 def respond(self, ec, result, request):
280 response = {"id": request["id"], "result": None,
281 "error": {"message": str(ec), "code": -4}}
283 response = {"id": request["id"], "result": result, "error": None}
284 self.processor.push_response(response)
287 class AddressSubscribe:
289 def __init__(self, backend, processor, cache, monitor):
290 self.backend = backend
291 self.processor = processor
293 self.monitor = monitor
295 def subscribe(self, request):
296 address = str(request["params"][0])
297 service = self.backend.mempool_service
298 chain = self.backend.blockchain
299 txpool = self.backend.transaction_pool
300 memory_buff = self.backend.memory_buffer
301 history.payment_history(service, chain, txpool, memory_buff, address,
302 bind(self.construct, _1, _2, request))
304 def construct(self, ec, result, request):
306 response = {"id": request["id"], "result": None,
307 "error": {"message": str(ec), "code": -4}}
308 self.processor.push_response(response)
310 last_id = self.monitor.mempool_n(result)
311 response = {"id": request["id"], "result": last_id, "error": None}
312 self.processor.push_response(response)
313 address = request["params"][0]
314 self.monitor.monitor(address, result)
315 # Cache result for get_history
316 self.cache.store(address, result)
318 def fetch_cached(self, request):
319 address = request["params"][0]
320 cached = self.cache.fetch(address)
323 response = {"id": request["id"], "result": cached, "error": None}
324 self.processor.push_response(response)
328 class BlockchainProcessor(Processor):
330 def __init__(self, config):
331 Processor.__init__(self)
332 cache = HistoryCache()
333 self.backend = Backend()
334 monitor = MonitorAddress(self, cache, self.backend)
335 self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
336 self.address_get_history = AddressGetHistory(self.backend, self)
337 self.address_subscribe = \
338 AddressSubscribe(self.backend, self, cache, monitor)
343 def process(self, request):
344 print "New request (lib)", request
345 if request["method"] == "blockchain.numblocks.subscribe":
346 self.numblocks_subscribe.subscribe(request)
347 elif request["method"] == "blockchain.address.subscribe":
348 self.address_subscribe.subscribe(request)
349 elif request["method"] == "blockchain.address.get_history":
350 if not self.address_subscribe.fetch_cached(request):
351 self.address_get_history.get(request)
352 elif request["method"] == "blockchain.transaction.broadcast":
353 self.broadcast_transaction(request)
355 def broadcast_transaction(self, request):
356 raw_tx = bitcoin.data_chunk(str(request["params"][0]))
357 exporter = bitcoin.satoshi_exporter()
359 tx = exporter.load_transaction(raw_tx)
365 "message": "Exception while parsing the transaction data.",
370 self.backend.protocol.broadcast_transaction(tx)
371 tx_hash = str(bitcoin.hash_transaction(tx))
372 response = {"id": request["id"], "result": tx_hash, "error": None}
373 self.push_response(response)