2 from bitcoin import bind, _1, _2, _3
3 from processor import Processor
12 self.lock = threading.Lock()
15 def store(self, address, result):
17 self.cache[address] = result
19 def fetch(self, address):
22 return self.cache[address]
26 def clear(self, addresses):
28 for address in addresses:
29 if self.cache.has_key(address):
30 del self.cache[address]
34 def __init__(self, processor, cache):
35 self.processor = processor
37 self.lock = threading.Lock()
38 # key is hash:index, value is address
39 self.monitor_output = {}
41 self.monitor_address = set()
45 def monitor(self, address, result):
47 if not info.has_key("raw_output_script"):
49 assert info["is_input"] == 0
50 tx_hash = info["tx_hash"]
51 output_index = info["index"]
52 outpoint = "%s:%s" % (tx_hash, output_index)
54 self.monitor_output[outpoint] = address
56 self.monitor_address.add(address)
58 def tx_stored(self, tx_desc):
59 tx_hash, prevouts, addrs = tx_desc
60 affected_addrs = set()
61 for prevout_hash, prevout_index in prevouts:
62 prevout = "%s:%s" % (prevout_hash, prevout_index)
64 if self.monitor_output.has_key(prevout):
65 affected_addrs.add(self.monitor_output[prevout])
66 for idx, address in addrs:
68 if address in self.monitor_address:
69 affected_addrs.add(address)
71 self.affected[tx_hash] = affected_addrs
72 self.cache.clear(affected_addrs)
73 self.notify(affected_addrs)
75 def tx_confirmed(self, tx_desc):
76 tx_hash, prevouts, addrs = tx_desc
78 affected_addrs = self.affected[tx_hash]
79 del self.affected[tx_hash]
80 self.cache.clear(affected_addrs)
81 self.notify(affected_addrs)
82 # add new outputs to monitor
83 for idx, address in addrs:
84 outpoint = "%s:%s" % (tx_hash, idx)
86 if address in affected_addrs:
87 self.monitor_output[outpoint] = address
88 # delete spent outpoints
89 for prevout_hash, prevout_index in prevouts:
90 prevout = "%s:%s" % (prevout_hash, prevout_index)
92 if self.monitor_output.has_key(prevout):
93 del self.monitor_output[prevout]
95 def notify(self, affected_addrs):
96 templ_response = {"id": None,
97 "method": "blockchain.address.subscribe",
99 chain = self.backend.blockchain
100 txpool = self.backend.transaction_pool
101 membuf = self.backend.pool_buffer
102 for address in affected_addrs:
103 response = templ_response.copy()
104 response["params"].append(address)
105 history.payment_history(chain, txpool, membuf, address,
106 bind(self.send_notify, _1, response))
108 def mempool_n(self, result):
109 assert result is not None
113 # Order by time, grab last item (latest)
114 last_info = sorted(result, key=lambda k: k['timestamp'])[-1]
115 if last_info["block_hash"] == "mempool":
116 last_id = "mempool:%s" % len(result)
118 last_id = last_info["block_hash"]
121 def send_notify(self, result, response):
122 response["params"].append(self.mempool_n(result))
123 self.processor.push_response(response)
127 def __init__(self, monitor):
128 # Create 3 thread-pools each with 1 thread
129 self.network_service = bitcoin.async_service(1)
130 self.disk_service = bitcoin.async_service(1)
131 self.mempool_service = bitcoin.async_service(1)
133 self.hosts = bitcoin.hosts(self.network_service)
134 self.handshake = bitcoin.handshake(self.network_service)
135 self.network = bitcoin.network(self.network_service)
136 self.protocol = bitcoin.protocol(self.network_service, self.hosts,
137 self.handshake, self.network)
139 db_prefix = "/home/genjix/libbitcoin/database"
140 self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix,
141 self.blockchain_started)
142 self.poller = bitcoin.poller(self.blockchain)
143 self.transaction_pool = \
144 bitcoin.transaction_pool(self.mempool_service, self.blockchain)
146 self.protocol.subscribe_channel(self.monitor_tx)
148 bitcoin.session(self.network_service, self.hosts, self.handshake,
149 self.network, self.protocol, self.blockchain,
150 self.poller, self.transaction_pool)
151 self.session.start(self.handle_start)
154 history.MemoryPoolBuffer(self.transaction_pool,
155 self.blockchain, monitor)
157 def handle_start(self, ec):
159 print "Error starting backend:", ec
161 def blockchain_started(self, ec, chain):
162 print "Blockchain initialisation:", ec
165 self.session.stop(self.handle_stop)
167 def handle_stop(self, ec):
169 print "Error stopping backend:", ec
170 print "Stopped backend"
172 def monitor_tx(self, node):
173 # We will be notified here when connected to new bitcoin nodes
174 # Here we subscribe to new transactions from them which we
175 # add to the transaction_pool. That way we can track which
176 # transactions we are interested in.
177 node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
178 # Re-subscribe to next new node
179 self.protocol.subscribe_channel(self.monitor_tx)
181 def recv_tx(self, ec, tx, node):
183 print "Error with new transaction:", ec
185 tx_hash = bitcoin.hash_transaction(tx)
186 self.pool_buffer.recv_tx(tx, bind(self.store_tx, _1, tx_hash))
187 # Re-subscribe to new transactions from node
188 node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
190 def store_tx(self, ec, tx_hash):
192 print "Error storing memory pool transaction", tx_hash, ec
194 print "Accepted transaction", tx_hash
199 self.event = threading.Event()
206 def set(self, value):
210 class NumblocksSubscribe:
212 def __init__(self, backend, processor):
213 self.backend = backend
214 self.processor = processor
215 self.lock = threading.Lock()
216 self.backend.blockchain.subscribe_reorganize(self.reorganize)
217 self.backend.blockchain.fetch_last_depth(self.set_last_depth)
218 self.latest = GhostValue()
220 def set_last_depth(self, ec, last_depth):
222 print "Error retrieving last depth", ec
224 self.latest.set(last_depth)
226 def reorganize(self, ec, fork_point, arrivals, replaced):
227 latest = fork_point + len(arrivals)
228 self.latest.set(latest)
229 response = {"id": None, "method": "blockchain.numblocks.subscribe",
231 self.processor.push_response(response)
232 self.backend.blockchain.subscribe_reorganize(self.reorganize)
234 def subscribe(self, request):
235 latest = self.latest.get()
236 response = {"id": request["id"],
237 "method": "blockchain.numblocks.subscribe",
240 self.processor.push_response(response)
242 class AddressGetHistory:
244 def __init__(self, backend, processor):
245 self.backend = backend
246 self.processor = processor
248 def get(self, request):
249 address = str(request["params"][0])
250 chain = self.backend.blockchain
251 txpool = self.backend.transaction_pool
252 membuf = self.backend.pool_buffer
253 history.payment_history(chain, txpool, membuf, address,
254 bind(self.respond, _1, request))
256 def respond(self, result, request):
258 response = {"id": request["id"], "result": None,
259 "error": {"message": "Error", "code": -4}}
261 response = {"id": request["id"], "result": result, "error": None}
262 self.processor.push_response(response)
264 class AddressSubscribe:
266 def __init__(self, backend, processor, cache, monitor):
267 self.backend = backend
268 self.processor = processor
270 self.monitor = monitor
272 self.backend.pool_buffer.cheat = self
274 def subscribe(self, request):
275 address = str(request["params"][0])
276 chain = self.backend.blockchain
277 txpool = self.backend.transaction_pool
278 membuf = self.backend.pool_buffer
279 history.payment_history(chain, txpool, membuf, address,
280 bind(self.construct, _1, request))
282 def construct(self, result, request):
284 response = {"id": request["id"], "result": None,
285 "error": {"message": "Error", "code": -4}}
287 last_id = self.monitor.mempool_n(result)
288 response = {"id": request["id"], "result": last_id, "error": None}
289 self.processor.push_response(response)
290 address = request["params"][0]
291 self.monitor.monitor(address, result)
292 # Cache result for get_history
293 self.cache.store(address, result)
295 def fetch_cached(self, request):
296 address = request["params"][0]
297 cached = self.cache.fetch(address)
300 response = {"id": request["id"], "result": cached, "error": None}
301 self.processor.push_response(response)
304 class BlockchainProcessor(Processor):
306 def __init__(self, config):
307 Processor.__init__(self)
308 cache = HistoryCache()
309 monitor = MonitorAddress(self, cache)
310 self.backend = Backend(monitor)
311 monitor.backend = self.backend
312 self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
313 self.address_get_history = AddressGetHistory(self.backend, self)
314 self.address_subscribe = \
315 AddressSubscribe(self.backend, self, cache, monitor)
320 def process(self, request):
321 print "New request (lib)", request
322 if request["method"] == "blockchain.numblocks.subscribe":
323 self.numblocks_subscribe.subscribe(request)
324 elif request["method"] == "blockchain.address.subscribe":
325 self.address_subscribe.subscribe(request)
326 elif request["method"] == "blockchain.address.get_history":
327 if not self.address_subscribe.fetch_cached(request):
328 self.address_get_history.get(request)
329 elif request["method"] == "blockchain.transaction.broadcast":
330 self.broadcast_transaction(request)
332 def broadcast_transaction(self, request):
333 raw_tx = bitcoin.data_chunk(str(request["params"]))
334 exporter = bitcoin.satoshi_exporter()
336 tx = exporter.load_transaction(raw_tx)
338 response = {"id": request["id"], "result": None,
340 "Exception while parsing the transaction data.",
343 self.backend.protocol.broadcast_transaction(tx)
344 tx_hash = str(bitcoin.hash_transaction(tx))
345 response = {"id": request["id"], "result": tx_hash, "error": None}
346 self.push_response(response)
349 print "Warning: pre-alpha prototype. Full of bugs."
350 while not self.shared.stopped():