2 from bitcoin import bind, _1, _2, _3
3 from processor import Processor
7 import history1 as history
13 self.lock = threading.Lock()
16 def store(self, address, result):
18 self.cache[address] = result
20 def fetch(self, address):
23 return self.cache[address]
27 def clear(self, addresses):
29 for address in addresses:
30 if self.cache.has_key(address):
31 del self.cache[address]
35 def __init__(self, processor, cache, backend):
36 self.processor = processor
38 self.backend = backend
39 self.lock = threading.Lock()
40 # key is hash:index, value is address
41 self.monitor_output = {}
43 self.monitor_address = set()
45 backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed)
47 def monitor(self, address, result):
49 if not info.has_key("raw_output_script"):
51 assert info["is_input"] == 0
52 tx_hash = info["tx_hash"]
53 output_index = info["index"]
54 outpoint = "%s:%s" % (tx_hash, output_index)
56 self.monitor_output[outpoint] = address
58 self.monitor_address.add(address)
61 tx_hash = bitcoin.hash_transaction(tx)
63 for input in tx.inputs:
64 prevout = input.previous_output
65 prevout = "%s:%s" % (prevout.hash, prevout.index)
66 previous_outputs.append(prevout)
68 for output_index, output in enumerate(tx.outputs):
69 address = bitcoin.payment_address()
70 if address.extract(output.output_script):
71 addrs.append((output_index, str(address)))
72 return tx_hash, previous_outputs, addrs
74 def effect_notify(self, tx, delete_outs):
75 affected_addrs = set()
76 tx_hash, previous_outputs, addrs = self.unpack(tx)
77 for prevout in previous_outputs:
80 affected_addrs.add(self.monitor_output[prevout])
82 del self.monitor_output[prevout]
85 for idx, address in addrs:
87 if address in self.monitor_address:
88 affected_addrs.add(address)
89 self.cache.clear(affected_addrs)
90 self.notify(affected_addrs)
91 # Used in confirmed txs
92 return tx_hash, addrs, affected_addrs
94 def tx_stored(self, tx):
95 self.effect_notify(tx, False)
97 def tx_confirmed(self, tx):
98 tx_hash, addrs, affected_addrs = self.effect_notify(tx, True)
99 # add new outputs to monitor
100 for idx, address in addrs:
101 outpoint = "%s:%s" % (tx_hash, idx)
102 if address in affected_addrs:
104 self.monitor_output[outpoint] = address
106 def notify(self, affected_addrs):
107 service = self.backend.mempool_service
108 chain = self.backend.blockchain
109 txpool = self.backend.transaction_pool
110 memory_buff = self.backend.memory_buffer
111 for address in affected_addrs:
112 response = {"id": None,
113 "method": "blockchain.address.subscribe",
114 "params": [str(address)]}
115 history.payment_history(service, chain, txpool, memory_buff,
116 address, bind(self.send_notify, _1, _2, response))
118 def mempool_n(self, result):
119 assert result is not None
123 # Order by time, grab last item (latest)
124 last_info = sorted(result, key=lambda k: k['timestamp'])[-1]
125 if last_info["block_hash"] == "mempool":
126 last_id = "mempool:%s" % len(result)
128 last_id = last_info["block_hash"]
131 def send_notify(self, ec, result, response):
133 print "Error: Monitor.send_notify()", ec
135 assert len(response["params"]) == 1
136 response["params"].append(self.mempool_n(result))
137 self.processor.push_response(response)
142 # Create 3 thread-pools each with 1 thread
143 self.network_service = bitcoin.async_service(1)
144 self.disk_service = bitcoin.async_service(1)
145 self.mempool_service = bitcoin.async_service(1)
147 self.hosts = bitcoin.hosts(self.network_service)
148 self.handshake = bitcoin.handshake(self.network_service)
149 self.network = bitcoin.network(self.network_service)
150 self.protocol = bitcoin.protocol(self.network_service, self.hosts,
151 self.handshake, self.network)
153 db_prefix = "/home/genjix/libbitcoin/database"
154 self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix,
155 self.blockchain_started)
156 self.poller = bitcoin.poller(self.mempool_service, self.blockchain)
157 self.transaction_pool = \
158 bitcoin.transaction_pool(self.mempool_service, self.blockchain)
160 self.protocol.subscribe_channel(self.monitor_tx)
162 bitcoin.session(self.network_service, self.hosts, self.handshake,
163 self.network, self.protocol, self.blockchain,
164 self.poller, self.transaction_pool)
165 self.session.start(self.handle_start)
167 self.memory_buffer = \
168 membuf.memory_buffer(self.mempool_service.internal_ptr,
169 self.blockchain.internal_ptr,
170 self.transaction_pool.internal_ptr)
172 def handle_start(self, ec):
174 print "Error starting backend:", ec
176 def blockchain_started(self, ec, chain):
177 print "Blockchain initialisation:", ec
180 self.session.stop(self.handle_stop)
182 def handle_stop(self, ec):
184 print "Error stopping backend:", ec
185 print "Stopped backend"
187 def monitor_tx(self, node):
188 # We will be notified here when connected to new bitcoin nodes
189 # Here we subscribe to new transactions from them which we
190 # add to the transaction_pool. That way we can track which
191 # transactions we are interested in.
192 node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
193 # Re-subscribe to next new node
194 self.protocol.subscribe_channel(self.monitor_tx)
196 def recv_tx(self, ec, tx, node):
198 print "Error with new transaction:", ec
200 tx_hash = bitcoin.hash_transaction(tx)
201 self.memory_buffer.receive(tx, bind(self.store_tx, _1, tx_hash))
202 # Re-subscribe to new transactions from node
203 node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
205 def store_tx(self, ec, tx_hash):
207 print "Error storing memory pool transaction", tx_hash, ec
209 print "Accepted transaction", tx_hash
214 self.event = threading.Event()
221 def set(self, value):
225 class NumblocksSubscribe:
227 def __init__(self, backend, processor):
228 self.backend = backend
229 self.processor = processor
230 self.lock = threading.Lock()
231 self.backend.blockchain.subscribe_reorganize(self.reorganize)
232 self.backend.blockchain.fetch_last_depth(self.set_last_depth)
233 self.latest = GhostValue()
235 def set_last_depth(self, ec, last_depth):
237 print "Error retrieving last depth", ec
239 self.latest.set(last_depth)
241 def reorganize(self, ec, fork_point, arrivals, replaced):
242 latest = fork_point + len(arrivals)
243 self.latest.set(latest)
244 response = {"id": None, "method": "blockchain.numblocks.subscribe",
246 self.processor.push_response(response)
247 self.backend.blockchain.subscribe_reorganize(self.reorganize)
249 def subscribe(self, request):
250 latest = self.latest.get()
251 response = {"id": request["id"],
254 self.processor.push_response(response)
256 class AddressGetHistory:
258 def __init__(self, backend, processor):
259 self.backend = backend
260 self.processor = processor
262 def get(self, request):
263 address = str(request["params"][0])
264 service = self.backend.mempool_service
265 chain = self.backend.blockchain
266 txpool = self.backend.transaction_pool
267 memory_buff = self.backend.memory_buffer
268 history.payment_history(service, chain, txpool, memory_buff,
269 address, bind(self.respond, _1, _2, request))
271 def respond(self, ec, result, request):
273 response = {"id": request["id"], "result": None,
274 "error": {"message": str(ec), "code": -4}}
276 response = {"id": request["id"], "result": result, "error": None}
277 self.processor.push_response(response)
279 class AddressSubscribe:
281 def __init__(self, backend, processor, cache, monitor):
282 self.backend = backend
283 self.processor = processor
285 self.monitor = monitor
287 def subscribe(self, request):
288 address = str(request["params"][0])
289 service = self.backend.mempool_service
290 chain = self.backend.blockchain
291 txpool = self.backend.transaction_pool
292 memory_buff = self.backend.memory_buffer
293 history.payment_history(service, chain, txpool, memory_buff,
294 address, bind(self.construct, _1, _2, request))
296 def construct(self, ec, result, request):
298 response = {"id": request["id"], "result": None,
299 "error": {"message": str(ec), "code": -4}}
300 self.processor.push_response(response)
302 last_id = self.monitor.mempool_n(result)
303 response = {"id": request["id"], "result": last_id, "error": None}
304 self.processor.push_response(response)
305 address = request["params"][0]
306 self.monitor.monitor(address, result)
307 # Cache result for get_history
308 self.cache.store(address, result)
310 def fetch_cached(self, request):
311 address = request["params"][0]
312 cached = self.cache.fetch(address)
315 response = {"id": request["id"], "result": cached, "error": None}
316 self.processor.push_response(response)
319 class BlockchainProcessor(Processor):
321 def __init__(self, config):
322 Processor.__init__(self)
323 cache = HistoryCache()
324 self.backend = Backend()
325 monitor = MonitorAddress(self, cache, self.backend)
326 self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
327 self.address_get_history = AddressGetHistory(self.backend, self)
328 self.address_subscribe = \
329 AddressSubscribe(self.backend, self, cache, monitor)
334 def process(self, request):
335 print "New request (lib)", request
336 if request["method"] == "blockchain.numblocks.subscribe":
337 self.numblocks_subscribe.subscribe(request)
338 elif request["method"] == "blockchain.address.subscribe":
339 self.address_subscribe.subscribe(request)
340 elif request["method"] == "blockchain.address.get_history":
341 if not self.address_subscribe.fetch_cached(request):
342 self.address_get_history.get(request)
343 elif request["method"] == "blockchain.transaction.broadcast":
344 self.broadcast_transaction(request)
346 def broadcast_transaction(self, request):
347 raw_tx = bitcoin.data_chunk(str(request["params"][0]))
348 exporter = bitcoin.satoshi_exporter()
350 tx = exporter.load_transaction(raw_tx)
352 response = {"id": request["id"], "result": None,
354 "Exception while parsing the transaction data.",
357 self.backend.protocol.broadcast_transaction(tx)
358 tx_hash = str(bitcoin.hash_transaction(tx))
359 response = {"id": request["id"], "result": tx_hash, "error": None}
360 self.push_response(response)