3 from json import dumps, loads
6 from Queue import Queue
14 from backends.bitcoind import deserialize
15 from processor import Processor, print_log
19 class BlockchainProcessor(Processor):
21 def __init__(self, config, shared):
22 Processor.__init__(self)
26 self.up_to_date = False
27 self.watched_addresses = []
28 self.history_cache = {}
30 self.cache_lock = threading.Lock()
31 self.headers_data = ''
33 self.mempool_addresses = {}
34 self.mempool_hist = {}
35 self.mempool_hashes = []
36 self.mempool_lock = threading.Lock()
38 self.address_queue = Queue()
39 self.dbpath = config.get('leveldb', 'path')
41 self.dblock = threading.Lock()
43 self.db = leveldb.LevelDB(self.dbpath)
45 traceback.print_exc(file=sys.stdout)
48 self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
49 config.get('bitcoind', 'user'),
50 config.get('bitcoind', 'password'),
51 config.get('bitcoind', 'host'),
52 config.get('bitcoind', 'port'))
57 self.sent_header = None
60 hash_160 = bc_address_to_hash_160("1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa")
62 print_log("Your database '%s' is deprecated. Please create a new database"%self.dbpath)
69 hist = self.deserialize(self.db.Get('height'))
70 self.last_hash, self.height, _ = hist[0]
71 print_log("hist", hist)
73 #traceback.print_exc(file=sys.stdout)
74 print_log('initializing database')
76 self.last_hash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
79 self.init_headers(self.height)
81 threading.Timer(0, lambda: self.catch_up(sync=False)).start()
82 while not shared.stopped() and not self.up_to_date:
86 print "keyboard interrupt: stopping threads"
90 print_log("blockchain is up to date.")
92 threading.Timer(10, self.main_iteration).start()
94 def bitcoind(self, method, params=[]):
95 postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
97 respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
99 traceback.print_exc(file=sys.stdout)
103 if r['error'] is not None:
104 raise BaseException(r['error'])
105 return r.get('result')
107 def serialize(self, h):
109 for txid, txpos, height in h:
110 s += txid + int_to_hex(txpos, 4) + int_to_hex(height, 4)
111 return s.decode('hex')
113 def deserialize(self, s):
116 txid = s[0:32].encode('hex')
117 txpos = int(rev_hex(s[32:36].encode('hex')), 16)
118 height = int(rev_hex(s[36:40].encode('hex')), 16)
119 h.append((txid, txpos, height))
123 def block2header(self, b):
125 "block_height": b.get('height'),
126 "version": b.get('version'),
127 "prev_block_hash": b.get('previousblockhash'),
128 "merkle_root": b.get('merkleroot'),
129 "timestamp": b.get('time'),
130 "bits": int(b.get('bits'), 16),
131 "nonce": b.get('nonce'),
134 def get_header(self, height):
135 block_hash = self.bitcoind('getblockhash', [height])
136 b = self.bitcoind('getblock', [block_hash])
137 return self.block2header(b)
139 def init_headers(self, db_height):
140 self.chunk_cache = {}
141 self.headers_filename = os.path.join(self.dbpath, 'blockchain_headers')
143 if os.path.exists(self.headers_filename):
144 height = os.path.getsize(self.headers_filename)/80 - 1 # the current height
146 prev_hash = self.hash_header(self.read_header(height))
150 open(self.headers_filename, 'wb').close()
154 if height < db_height:
155 print_log("catching up missing headers:", height, db_height)
158 while height < db_height:
160 header = self.get_header(height)
162 assert prev_hash == header.get('prev_block_hash')
163 self.write_header(header, sync=False)
164 prev_hash = self.hash_header(header)
165 if (height % 1000) == 0:
166 print_log("headers file:", height)
167 except KeyboardInterrupt:
173 def hash_header(self, header):
174 return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
176 def read_header(self, block_height):
177 if os.path.exists(self.headers_filename):
178 with open(self.headers_filename, 'rb') as f:
179 f.seek(block_height * 80)
182 h = header_from_string(h)
185 def read_chunk(self, index):
186 with open(self.headers_filename, 'rb') as f:
187 f.seek(index*2016*80)
188 chunk = f.read(2016*80)
189 return chunk.encode('hex')
191 def write_header(self, header, sync=True):
192 if not self.headers_data:
193 self.headers_offset = header.get('block_height')
195 self.headers_data += header_to_string(header).decode('hex')
196 if sync or len(self.headers_data) > 40*100:
199 with self.cache_lock:
200 chunk_index = header.get('block_height')/2016
201 if self.chunk_cache.get(chunk_index):
202 self.chunk_cache.pop(chunk_index)
204 def pop_header(self):
205 # we need to do this only if we have not flushed
206 if self.headers_data:
207 self.headers_data = self.headers_data[:-40]
209 def flush_headers(self):
210 if not self.headers_data:
212 with open(self.headers_filename, 'rb+') as f:
213 f.seek(self.headers_offset*80)
214 f.write(self.headers_data)
215 self.headers_data = ''
217 def get_chunk(self, i):
218 # store them on disk; store the current chunk in memory
219 with self.cache_lock:
220 chunk = self.chunk_cache.get(i)
222 chunk = self.read_chunk(i)
223 self.chunk_cache[i] = chunk
227 def get_mempool_transaction(self, txid):
229 raw_tx = self.bitcoind('getrawtransaction', [txid, 0, -1])
233 vds = deserialize.BCDataStream()
234 vds.write(raw_tx.decode('hex'))
236 return deserialize.parse_Transaction(vds, is_coinbase=False)
238 def get_history(self, addr, cache_only=False):
239 with self.cache_lock:
240 hist = self.history_cache.get(addr)
248 hist = self.deserialize(self.db.Get(addr))
254 # should not be necessary
255 hist.sort(key=lambda tup: tup[2])
256 # check uniqueness too...
259 with self.mempool_lock:
260 for txid in self.mempool_hist.get(addr, []):
261 hist.append((txid, 0, 0))
263 hist = map(lambda x: {'tx_hash': x[0], 'height': x[2]}, hist)
264 # add something to distinguish between unused and empty addresses
265 if hist == [] and is_known:
268 with self.cache_lock:
269 self.history_cache[addr] = hist
272 def get_status(self, addr, cache_only=False):
273 tx_points = self.get_history(addr, cache_only)
274 if cache_only and tx_points == -1:
279 if tx_points == ['*']:
283 status += tx.get('tx_hash') + ':%d:' % tx.get('height')
284 return hashlib.sha256(status).digest().encode('hex')
286 def get_merkle(self, tx_hash, height):
288 block_hash = self.bitcoind('getblockhash', [height])
289 b = self.bitcoind('getblock', [block_hash])
290 tx_list = b.get('tx')
291 tx_pos = tx_list.index(tx_hash)
293 merkle = map(hash_decode, tx_list)
294 target_hash = hash_decode(tx_hash)
296 while len(merkle) != 1:
298 merkle.append(merkle[-1])
301 new_hash = Hash(merkle[0] + merkle[1])
302 if merkle[0] == target_hash:
303 s.append(hash_encode(merkle[1]))
304 target_hash = new_hash
305 elif merkle[1] == target_hash:
306 s.append(hash_encode(merkle[0]))
307 target_hash = new_hash
312 return {"block_height": height, "merkle": s, "pos": tx_pos}
314 def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
316 s = (tx_hash + int_to_hex(tx_pos, 4) + int_to_hex(tx_height, 4)).decode('hex')
318 serialized_hist = self.batch_list[addr]
320 l = len(serialized_hist)/40
321 for i in range(l-1, -1, -1):
322 item = serialized_hist[40*i:40*(i+1)]
323 item_height = int(rev_hex(item[36:40].encode('hex')), 16)
324 if item_height < tx_height:
325 serialized_hist = serialized_hist[0:40*(i+1)] + s + serialized_hist[40*(i+1):]
328 serialized_hist = s + serialized_hist
330 self.batch_list[addr] = serialized_hist
333 txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
334 self.batch_txio[txo] = addr
336 def remove_from_history(self, addr, tx_hash, tx_pos):
337 txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
341 addr = self.batch_txio[txi]
343 raise BaseException(tx_hash, tx_pos)
345 serialized_hist = self.batch_list[addr]
347 l = len(serialized_hist)/40
349 item = serialized_hist[40*i:40*(i+1)]
350 if item[0:36] == txi:
351 height = int(rev_hex(item[36:40].encode('hex')), 16)
352 serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):]
355 hist = self.deserialize(serialized_hist)
356 raise BaseException("prevout not found", addr, hist, tx_hash, tx_pos)
358 self.batch_list[addr] = serialized_hist
361 def deserialize_block(self, block):
362 txlist = block.get('tx')
363 tx_hashes = [] # ordered txids
364 txdict = {} # deserialized tx
366 for raw_tx in txlist:
367 tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
368 tx_hashes.append(tx_hash)
369 vds = deserialize.BCDataStream()
370 vds.write(raw_tx.decode('hex'))
371 tx = deserialize.parse_Transaction(vds, is_coinbase)
374 return tx_hashes, txdict
376 def get_undo_info(self, height):
377 s = self.db.Get("undo%d" % (height % 100))
380 def write_undo_info(self, batch, height, undo_info):
381 if self.is_test or height > self.bitcoind_height - 100:
382 batch.Put("undo%d" % (height % 100), repr(undo_info))
384 def import_block(self, block, block_hash, block_height, sync, revert=False):
386 self.batch_list = {} # address -> history
387 self.batch_txio = {} # transaction i/o -> address
393 # deserialize transactions
395 tx_hashes, txdict = self.deserialize_block(block)
400 # read addresses of tx inputs
401 for tx in txdict.values():
402 for x in tx.get('inputs'):
403 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
404 block_inputs.append(txi)
407 for txi in block_inputs:
409 addr = self.db.Get(txi)
411 # the input could come from the same block
413 self.batch_txio[txi] = addr
414 addr_to_read.append(addr)
417 for txid, tx in txdict.items():
418 for x in tx.get('outputs'):
419 txo = (txid + int_to_hex(x.get('index'), 4)).decode('hex')
420 block_outputs.append(txo)
422 # read histories of addresses
423 for txid, tx in txdict.items():
424 for x in tx.get('outputs'):
425 addr_to_read.append(x.get('address'))
428 for addr in addr_to_read:
430 self.batch_list[addr] = self.db.Get(addr)
432 self.batch_list[addr] = ''
435 undo_info = self.get_undo_info(block_height)
436 # print "undo", block_height, undo_info
444 tx_hashes = tx_hashes[::-1]
445 for txid in tx_hashes: # must be ordered
450 for x in tx.get('inputs'):
451 prevout_height, prevout_addr = self.remove_from_history(None, x.get('prevout_hash'), x.get('prevout_n'))
452 undo.append((prevout_height, prevout_addr))
453 undo_info[txid] = undo
455 for x in tx.get('outputs'):
456 self.add_to_history(x.get('address'), txid, x.get('index'), block_height)
459 for x in tx.get('outputs'):
460 self.remove_from_history(x.get('address'), txid, x.get('index'))
463 for x in tx.get('inputs'):
464 prevout_height, prevout_addr = undo_info.get(txid)[i]
467 # read the history into batch list
468 if self.batch_list.get(prevout_addr) is None:
469 self.batch_list[prevout_addr] = self.db.Get(prevout_addr)
471 # re-add them to the history
472 self.add_to_history(prevout_addr, x.get('prevout_hash'), x.get('prevout_n'), prevout_height)
473 # print_log("new hist for", prevout_addr, self.deserialize(self.batch_list[prevout_addr]) )
480 batch = leveldb.WriteBatch()
481 for addr, serialized_hist in self.batch_list.items():
482 batch.Put(addr, serialized_hist)
483 l = len(serialized_hist)
489 # add new created outputs
490 for txio, addr in self.batch_txio.items():
491 batch.Put(txio, addr)
492 # delete spent inputs
493 for txi in block_inputs:
496 self.write_undo_info(batch, block_height, undo_info)
498 # restore spent inputs
499 for txio, addr in self.batch_txio.items():
500 batch.Put(txio, addr)
501 # delete spent outputs
502 for txo in block_outputs:
506 batch.Put('height', self.serialize([(block_hash, block_height, 0)]))
509 self.db.Write(batch, sync=sync)
512 if t3 - t0 > 10 and not sync:
513 print_log("block", block_height,
514 "parse:%0.2f " % (t00 - t0),
515 "read:%0.2f " % (t1 - t00),
516 "proc:%.2f " % (t2-t1),
517 "write:%.2f " % (t3-t2),
518 "max:", max_len, max_addr)
520 for addr in self.batch_list.keys():
521 self.invalidate_cache(addr)
523 def add_request(self, request):
524 # see if we can get if from cache. if not, add to queue
525 if self.process(request, cache_only=True) == -1:
526 self.queue.put(request)
528 def process(self, request, cache_only=False):
529 #print "abe process", request
531 message_id = request['id']
532 method = request['method']
533 params = request.get('params', [])
537 if method == 'blockchain.numblocks.subscribe':
540 elif method == 'blockchain.headers.subscribe':
543 elif method == 'blockchain.address.subscribe':
546 result = self.get_status(address, cache_only)
547 self.watch_address(address)
548 except BaseException, e:
549 error = str(e) + ': ' + address
550 print_log("error:", error)
552 elif method == 'blockchain.address.unsubscribe':
556 if password == self.config.get('server', 'password'):
557 self.watched_addresses.remove(address)
558 # print_log('unsubscribed', address)
561 print_log('incorrect password')
562 result = "authentication error"
563 except BaseException, e:
564 error = str(e) + ': ' + address
565 print_log("error:", error)
567 elif method == 'blockchain.address.get_history':
570 result = self.get_history(address, cache_only)
571 except BaseException, e:
572 error = str(e) + ': ' + address
573 print_log("error:", error)
575 elif method == 'blockchain.block.get_header':
581 result = self.get_header(height)
582 except BaseException, e:
583 error = str(e) + ': %d' % height
584 print_log("error:", error)
586 elif method == 'blockchain.block.get_chunk':
592 result = self.get_chunk(index)
593 except BaseException, e:
594 error = str(e) + ': %d' % index
595 print_log("error:", error)
597 elif method == 'blockchain.transaction.broadcast':
599 txo = self.bitcoind('sendrawtransaction', params)
600 print_log("sent tx:", txo)
602 except BaseException, e:
603 result = str(e) # do not send an error
604 print_log("error:", result, params)
606 elif method == 'blockchain.transaction.get_merkle':
612 tx_height = params[1]
613 result = self.get_merkle(tx_hash, tx_height)
614 except BaseException, e:
615 error = str(e) + ': ' + repr(params)
616 print_log("get_merkle error:", error)
618 elif method == 'blockchain.transaction.get':
622 result = self.bitcoind('getrawtransaction', [tx_hash, 0, height])
623 except BaseException, e:
624 error = str(e) + ': ' + repr(params)
625 print_log("tx get error:", error)
628 error = "unknown method:%s" % method
630 if cache_only and result == -1:
634 self.push_response({'id': message_id, 'error': error})
636 self.push_response({'id': message_id, 'result': result})
638 def watch_address(self, addr):
639 if addr not in self.watched_addresses:
640 self.watched_addresses.append(addr)
642 def catch_up(self, sync=True):
645 while not self.shared.stopped():
647 info = self.bitcoind('getinfo')
648 self.bitcoind_height = info.get('blocks')
649 bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
650 if self.last_hash == bitcoind_block_hash:
651 self.up_to_date = True
655 self.up_to_date = False
656 next_block_hash = self.bitcoind('getblockhash', [self.height + 1])
657 next_block = self.bitcoind('getblock', [next_block_hash, 1])
659 # fixme: this is unsafe, if we revert when the undo info is not yet written
660 revert = (random.randint(1, 100) == 1) if self.is_test else False
662 if (next_block.get('previousblockhash') == self.last_hash) and not revert:
664 self.import_block(next_block, next_block_hash, self.height+1, sync)
665 self.height = self.height + 1
666 self.write_header(self.block2header(next_block), sync)
667 self.last_hash = next_block_hash
669 if self.height % 100 == 0 and not sync:
671 print_log("catch_up: block %d (%.3fs)" % (self.height, t2 - t1))
675 # revert current block
676 block = self.bitcoind('getblock', [self.last_hash, 1])
677 print_log("blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash)
678 self.import_block(block, self.last_hash, self.height, sync, revert=True)
684 # read previous header from disk
685 self.header = self.read_header(self.height)
686 self.last_hash = self.hash_header(self.header)
688 self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
690 def memorypool_update(self):
691 mempool_hashes = self.bitcoind('getrawmempool')
693 for tx_hash in mempool_hashes:
694 if tx_hash in self.mempool_hashes:
697 tx = self.get_mempool_transaction(tx_hash)
701 for x in tx.get('inputs'):
702 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
704 addr = self.db.Get(txi)
707 l = self.mempool_addresses.get(tx_hash, [])
710 self.mempool_addresses[tx_hash] = l
712 for x in tx.get('outputs'):
713 addr = x.get('address')
714 l = self.mempool_addresses.get(tx_hash, [])
717 self.mempool_addresses[tx_hash] = l
719 self.mempool_hashes.append(tx_hash)
721 # remove older entries from mempool_hashes
722 self.mempool_hashes = mempool_hashes
724 # remove deprecated entries from mempool_addresses
725 for tx_hash, addresses in self.mempool_addresses.items():
726 if tx_hash not in self.mempool_hashes:
727 self.mempool_addresses.pop(tx_hash)
729 # rebuild mempool histories
730 new_mempool_hist = {}
731 for tx_hash, addresses in self.mempool_addresses.items():
732 for addr in addresses:
733 h = new_mempool_hist.get(addr, [])
736 new_mempool_hist[addr] = h
738 # invalidate cache for mempool addresses whose mempool history has changed
739 for addr in new_mempool_hist.keys():
740 if addr in self.mempool_hist.keys():
741 if self.mempool_hist[addr] != new_mempool_hist[addr]:
742 self.invalidate_cache(addr)
744 self.invalidate_cache(addr)
746 # invalidate cache for addresses that are removed from mempool ?
747 # this should not be necessary if they go into a block, but they might not
748 for addr in self.mempool_hist.keys():
749 if addr not in new_mempool_hist.keys():
750 self.invalidate_cache(addr)
753 with self.mempool_lock:
754 self.mempool_hist = new_mempool_hist
756 def invalidate_cache(self, address):
757 with self.cache_lock:
758 if address in self.history_cache:
759 print_log("cache: invalidating", address)
760 self.history_cache.pop(address)
762 if address in self.watched_addresses:
763 self.address_queue.put(address)
765 def main_iteration(self):
766 if self.shared.stopped():
767 print_log("blockchain processor terminating")
775 self.memorypool_update()
777 # print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2)
779 if self.sent_height != self.height:
780 self.sent_height = self.height
783 'method': 'blockchain.numblocks.subscribe',
784 'params': [self.height],
787 if self.sent_header != self.header:
788 print_log("blockchain: %d (%.3fs)" % (self.height, t2 - t1))
789 self.sent_header = self.header
792 'method': 'blockchain.headers.subscribe',
793 'params': [self.header],
798 addr = self.address_queue.get(False)
801 if addr in self.watched_addresses:
802 status = self.get_status(addr)
805 'method': 'blockchain.address.subscribe',
806 'params': [addr, status],
809 if not self.shared.stopped():
810 threading.Timer(10, self.main_iteration).start()
812 print_log("blockchain processor terminating")