2 from bitcoin import bind, _1, _2, _3
3 from processor import Processor
7 import history1 as history
13 self.lock = threading.Lock()
16 def store(self, address, result):
18 self.cache[address] = result
20 def fetch(self, address):
23 return self.cache[address]
27 def clear(self, addresses):
29 for address in addresses:
30 if self.cache.has_key(address):
31 del self.cache[address]
35 def __init__(self, processor, cache, backend):
36 self.processor = processor
38 self.backend = backend
39 self.lock = threading.Lock()
40 # key is hash:index, value is address
41 self.monitor_output = {}
43 self.monitor_address = set()
47 backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed)
49 def monitor(self, address, result):
51 if not info.has_key("raw_output_script"):
53 assert info["is_input"] == 0
54 tx_hash = info["tx_hash"]
55 output_index = info["index"]
56 outpoint = "%s:%s" % (tx_hash, output_index)
58 self.monitor_output[outpoint] = address
60 self.monitor_address.add(address)
63 tx_hash = bitcoin.hash_transaction(tx)
65 for input in tx.inputs:
66 prevout = input.previous_output
67 prevout = "%s:%s" % (prevout.hash, prevout.index)
68 previous_outputs.append(prevout)
70 for output_index, output in enumerate(tx.outputs):
71 address = bitcoin.payment_address()
72 if address.extract(output.output_script):
73 addrs.append((output_index, str(address)))
74 return tx_hash, previous_outputs, addrs
76 def tx_stored(self, tx):
77 affected_addrs = set()
78 tx_hash, previous_outputs, addrs = self.unpack(tx)
79 for prevout in previous_outputs:
81 if self.monitor_output.has_key(prevout):
82 affected_addrs.add(self.monitor_output[prevout])
83 for idx, address in addrs:
85 if address in self.monitor_address:
86 affected_addrs.add(address)
88 self.affected[tx_hash] = affected_addrs
89 self.cache.clear(affected_addrs)
90 self.notify(affected_addrs)
92 def tx_confirmed(self, tx):
93 tx_hash, previous_outputs, addrs = self.unpack(tx)
95 affected_addrs = self.affected[tx_hash]
96 del self.affected[tx_hash]
97 self.cache.clear(affected_addrs)
98 self.notify(affected_addrs)
99 # add new outputs to monitor
100 for idx, address in addrs:
101 outpoint = "%s:%s" % (tx_hash, idx)
103 if address in affected_addrs:
104 self.monitor_output[outpoint] = address
105 # delete spent outpoints
106 for prevout in previous_outputs:
108 if self.monitor_output.has_key(prevout):
109 del self.monitor_output[prevout]
111 def notify(self, affected_addrs):
112 service = self.backend.mempool_service
113 chain = self.backend.blockchain
114 txpool = self.backend.transaction_pool
115 memory_buff = self.backend.memory_buffer
116 for address in affected_addrs:
117 response = {"id": None,
118 "method": "blockchain.address.subscribe",
119 "params": [str(address)]}
120 history.payment_history(service, chain, txpool, memory_buff,
121 address, bind(self.send_notify, _1, _2, response))
123 def mempool_n(self, result):
124 assert result is not None
128 # Order by time, grab last item (latest)
129 last_info = sorted(result, key=lambda k: k['timestamp'])[-1]
130 if last_info["block_hash"] == "mempool":
131 last_id = "mempool:%s" % len(result)
133 last_id = last_info["block_hash"]
136 def send_notify(self, ec, result, response):
138 print "Error: Monitor.send_notify()", ec
140 assert len(response["params"]) == 1
141 response["params"].append(self.mempool_n(result))
142 self.processor.push_response(response)
147 # Create 3 thread-pools each with 1 thread
148 self.network_service = bitcoin.async_service(1)
149 self.disk_service = bitcoin.async_service(1)
150 self.mempool_service = bitcoin.async_service(1)
152 self.hosts = bitcoin.hosts(self.network_service)
153 self.handshake = bitcoin.handshake(self.network_service)
154 self.network = bitcoin.network(self.network_service)
155 self.protocol = bitcoin.protocol(self.network_service, self.hosts,
156 self.handshake, self.network)
158 db_prefix = "/home/genjix/libbitcoin/database"
159 self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix,
160 self.blockchain_started)
161 self.poller = bitcoin.poller(self.mempool_service, self.blockchain)
162 self.transaction_pool = \
163 bitcoin.transaction_pool(self.mempool_service, self.blockchain)
165 self.protocol.subscribe_channel(self.monitor_tx)
167 bitcoin.session(self.network_service, self.hosts, self.handshake,
168 self.network, self.protocol, self.blockchain,
169 self.poller, self.transaction_pool)
170 self.session.start(self.handle_start)
172 self.memory_buffer = \
173 membuf.memory_buffer(self.mempool_service.internal_ptr,
174 self.blockchain.internal_ptr,
175 self.transaction_pool.internal_ptr)
177 def handle_start(self, ec):
179 print "Error starting backend:", ec
181 def blockchain_started(self, ec, chain):
182 print "Blockchain initialisation:", ec
185 self.session.stop(self.handle_stop)
187 def handle_stop(self, ec):
189 print "Error stopping backend:", ec
190 print "Stopped backend"
192 def monitor_tx(self, node):
193 # We will be notified here when connected to new bitcoin nodes
194 # Here we subscribe to new transactions from them which we
195 # add to the transaction_pool. That way we can track which
196 # transactions we are interested in.
197 node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
198 # Re-subscribe to next new node
199 self.protocol.subscribe_channel(self.monitor_tx)
201 def recv_tx(self, ec, tx, node):
203 print "Error with new transaction:", ec
205 tx_hash = bitcoin.hash_transaction(tx)
206 self.memory_buffer.receive(tx, bind(self.store_tx, _1, tx_hash))
207 # Re-subscribe to new transactions from node
208 node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
210 def store_tx(self, ec, tx_hash):
212 print "Error storing memory pool transaction", tx_hash, ec
214 print "Accepted transaction", tx_hash
219 self.event = threading.Event()
226 def set(self, value):
230 class NumblocksSubscribe:
232 def __init__(self, backend, processor):
233 self.backend = backend
234 self.processor = processor
235 self.lock = threading.Lock()
236 self.backend.blockchain.subscribe_reorganize(self.reorganize)
237 self.backend.blockchain.fetch_last_depth(self.set_last_depth)
238 self.latest = GhostValue()
240 def set_last_depth(self, ec, last_depth):
242 print "Error retrieving last depth", ec
244 self.latest.set(last_depth)
246 def reorganize(self, ec, fork_point, arrivals, replaced):
247 latest = fork_point + len(arrivals)
248 self.latest.set(latest)
249 response = {"id": None, "method": "blockchain.numblocks.subscribe",
251 self.processor.push_response(response)
252 self.backend.blockchain.subscribe_reorganize(self.reorganize)
254 def subscribe(self, request):
255 latest = self.latest.get()
256 response = {"id": request["id"],
259 self.processor.push_response(response)
261 class AddressGetHistory:
263 def __init__(self, backend, processor):
264 self.backend = backend
265 self.processor = processor
267 def get(self, request):
268 address = str(request["params"][0])
269 service = self.backend.mempool_service
270 chain = self.backend.blockchain
271 txpool = self.backend.transaction_pool
272 memory_buff = self.backend.memory_buffer
273 history.payment_history(service, chain, txpool, memory_buff,
274 address, bind(self.respond, _1, _2, request))
276 def respond(self, ec, result, request):
278 response = {"id": request["id"], "result": None,
279 "error": {"message": str(ec), "code": -4}}
281 response = {"id": request["id"], "result": result, "error": None}
282 self.processor.push_response(response)
284 class AddressSubscribe:
286 def __init__(self, backend, processor, cache, monitor):
287 self.backend = backend
288 self.processor = processor
290 self.monitor = monitor
292 def subscribe(self, request):
293 address = str(request["params"][0])
294 service = self.backend.mempool_service
295 chain = self.backend.blockchain
296 txpool = self.backend.transaction_pool
297 memory_buff = self.backend.memory_buffer
298 history.payment_history(service, chain, txpool, memory_buff,
299 address, bind(self.construct, _1, _2, request))
301 def construct(self, ec, result, request):
303 response = {"id": request["id"], "result": None,
304 "error": {"message": str(ec), "code": -4}}
305 self.processor.push_response(response)
307 last_id = self.monitor.mempool_n(result)
308 response = {"id": request["id"], "result": last_id, "error": None}
309 self.processor.push_response(response)
310 address = request["params"][0]
311 self.monitor.monitor(address, result)
312 # Cache result for get_history
313 self.cache.store(address, result)
315 def fetch_cached(self, request):
316 address = request["params"][0]
317 cached = self.cache.fetch(address)
320 response = {"id": request["id"], "result": cached, "error": None}
321 self.processor.push_response(response)
324 class BlockchainProcessor(Processor):
326 def __init__(self, config):
327 Processor.__init__(self)
328 cache = HistoryCache()
329 self.backend = Backend()
330 monitor = MonitorAddress(self, cache, self.backend)
331 self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
332 self.address_get_history = AddressGetHistory(self.backend, self)
333 self.address_subscribe = \
334 AddressSubscribe(self.backend, self, cache, monitor)
339 def process(self, request):
340 print "New request (lib)", request
341 if request["method"] == "blockchain.numblocks.subscribe":
342 self.numblocks_subscribe.subscribe(request)
343 elif request["method"] == "blockchain.address.subscribe":
344 self.address_subscribe.subscribe(request)
345 elif request["method"] == "blockchain.address.get_history":
346 if not self.address_subscribe.fetch_cached(request):
347 self.address_get_history.get(request)
348 elif request["method"] == "blockchain.transaction.broadcast":
349 self.broadcast_transaction(request)
351 def broadcast_transaction(self, request):
352 raw_tx = bitcoin.data_chunk(str(request["params"][0]))
353 exporter = bitcoin.satoshi_exporter()
355 tx = exporter.load_transaction(raw_tx)
357 response = {"id": request["id"], "result": None,
359 "Exception while parsing the transaction data.",
362 self.backend.protocol.broadcast_transaction(tx)
363 tx_hash = str(bitcoin.hash_transaction(tx))
364 response = {"id": request["id"], "result": tx_hash, "error": None}
365 self.push_response(response)
368 print "Warning: pre-alpha prototype. Full of bugs."
369 while not self.shared.stopped():