3 from json import dumps, loads
5 from Queue import Queue
13 from backends.bitcoind import deserialize
14 from processor import Processor, print_log
17 from storage import Storage
20 class BlockchainProcessor(Processor):
22 def __init__(self, config, shared):
23 Processor.__init__(self)
25 self.mtimes = {} # monitoring
28 self.up_to_date = False
30 self.watch_lock = threading.Lock()
31 self.watch_blocks = []
32 self.watch_headers = []
33 self.watched_addresses = {}
35 self.history_cache = {}
37 self.cache_lock = threading.Lock()
38 self.headers_data = ''
39 self.headers_path = config.get('leveldb', 'path_fulltree')
41 self.mempool_addresses = {}
42 self.mempool_hist = {}
43 self.mempool_hashes = set([])
44 self.mempool_lock = threading.Lock()
46 self.address_queue = Queue()
49 self.test_reorgs = config.getboolean('leveldb', 'test_reorgs') # simulate random blockchain reorgs
51 self.test_reorgs = False
52 self.storage = Storage(config, shared, self.test_reorgs)
54 self.dblock = threading.Lock()
56 self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
57 config.get('bitcoind', 'user'),
58 config.get('bitcoind', 'password'),
59 config.get('bitcoind', 'host'),
60 config.get('bitcoind', 'port'))
64 self.bitcoind('getinfo')
67 print_log('cannot contact bitcoind...')
72 self.sent_header = None
75 self.init_headers(self.storage.height)
77 threading.Timer(0, lambda: self.catch_up(sync=False)).start()
78 while not shared.stopped() and not self.up_to_date:
82 print "keyboard interrupt: stopping threads"
86 print_log("Blockchain is up to date.")
87 self.memorypool_update()
88 print_log("Memory pool initialized.")
90 threading.Timer(10, self.main_iteration).start()
94 def mtime(self, name):
97 delta = now - self.now
98 t = self.mtimes.get(name, 0)
99 self.mtimes[name] = t + delta
102 def print_mtime(self):
104 for k, v in self.mtimes.items():
105 s += k+':'+"%.2f"%v+' '
109 def bitcoind(self, method, params=[]):
110 postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
112 respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
114 traceback.print_exc(file=sys.stdout)
118 if r['error'] is not None:
119 raise BaseException(r['error'])
120 return r.get('result')
123 def block2header(self, b):
125 "block_height": b.get('height'),
126 "version": b.get('version'),
127 "prev_block_hash": b.get('previousblockhash'),
128 "merkle_root": b.get('merkleroot'),
129 "timestamp": b.get('time'),
130 "bits": int(b.get('bits'), 16),
131 "nonce": b.get('nonce'),
134 def get_header(self, height):
135 block_hash = self.bitcoind('getblockhash', [height])
136 b = self.bitcoind('getblock', [block_hash])
137 return self.block2header(b)
139 def init_headers(self, db_height):
140 self.chunk_cache = {}
141 self.headers_filename = os.path.join(self.headers_path, 'blockchain_headers')
143 if os.path.exists(self.headers_filename):
144 height = os.path.getsize(self.headers_filename)/80 - 1 # the current height
146 prev_hash = self.hash_header(self.read_header(height))
150 open(self.headers_filename, 'wb').close()
154 if height < db_height:
155 print_log("catching up missing headers:", height, db_height)
158 while height < db_height:
160 header = self.get_header(height)
162 assert prev_hash == header.get('prev_block_hash')
163 self.write_header(header, sync=False)
164 prev_hash = self.hash_header(header)
165 if (height % 1000) == 0:
166 print_log("headers file:", height)
167 except KeyboardInterrupt:
173 def hash_header(self, header):
174 return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
176 def read_header(self, block_height):
177 if os.path.exists(self.headers_filename):
178 with open(self.headers_filename, 'rb') as f:
179 f.seek(block_height * 80)
182 h = header_from_string(h)
185 def read_chunk(self, index):
186 with open(self.headers_filename, 'rb') as f:
187 f.seek(index*2016*80)
188 chunk = f.read(2016*80)
189 return chunk.encode('hex')
191 def write_header(self, header, sync=True):
192 if not self.headers_data:
193 self.headers_offset = header.get('block_height')
195 self.headers_data += header_to_string(header).decode('hex')
196 if sync or len(self.headers_data) > 40*100:
199 with self.cache_lock:
200 chunk_index = header.get('block_height')/2016
201 if self.chunk_cache.get(chunk_index):
202 self.chunk_cache.pop(chunk_index)
204 def pop_header(self):
205 # we need to do this only if we have not flushed
206 if self.headers_data:
207 self.headers_data = self.headers_data[:-40]
209 def flush_headers(self):
210 if not self.headers_data:
212 with open(self.headers_filename, 'rb+') as f:
213 f.seek(self.headers_offset*80)
214 f.write(self.headers_data)
215 self.headers_data = ''
217 def get_chunk(self, i):
218 # store them on disk; store the current chunk in memory
219 with self.cache_lock:
220 chunk = self.chunk_cache.get(i)
222 chunk = self.read_chunk(i)
223 self.chunk_cache[i] = chunk
227 def get_mempool_transaction(self, txid):
229 raw_tx = self.bitcoind('getrawtransaction', [txid, 0])
233 vds = deserialize.BCDataStream()
234 vds.write(raw_tx.decode('hex'))
236 return deserialize.parse_Transaction(vds, is_coinbase=False)
238 print_log("ERROR: cannot parse", txid)
242 def get_history(self, addr, cache_only=False):
243 with self.cache_lock:
244 hist = self.history_cache.get(addr)
252 hist = self.storage.get_history(addr)
264 with self.mempool_lock:
265 for txid in self.mempool_hist.get(addr, []):
266 hist.append({'tx_hash':txid, 'height':0})
268 # add something to distinguish between unused and empty addresses
269 if hist == [] and is_known:
272 with self.cache_lock:
273 self.history_cache[addr] = hist
277 def get_status(self, addr, cache_only=False):
278 tx_points = self.get_history(addr, cache_only)
279 if cache_only and tx_points == -1:
284 if tx_points == ['*']:
288 status += tx.get('tx_hash') + ':%d:' % tx.get('height')
289 return hashlib.sha256(status).digest().encode('hex')
291 def get_merkle(self, tx_hash, height):
293 block_hash = self.bitcoind('getblockhash', [height])
294 b = self.bitcoind('getblock', [block_hash])
295 tx_list = b.get('tx')
296 tx_pos = tx_list.index(tx_hash)
298 merkle = map(hash_decode, tx_list)
299 target_hash = hash_decode(tx_hash)
301 while len(merkle) != 1:
303 merkle.append(merkle[-1])
306 new_hash = Hash(merkle[0] + merkle[1])
307 if merkle[0] == target_hash:
308 s.append(hash_encode(merkle[1]))
309 target_hash = new_hash
310 elif merkle[1] == target_hash:
311 s.append(hash_encode(merkle[0]))
312 target_hash = new_hash
317 return {"block_height": height, "merkle": s, "pos": tx_pos}
320 def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
322 s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
325 serialized_hist = self.batch_list[addr]
327 l = len(serialized_hist)/80
328 for i in range(l-1, -1, -1):
329 item = serialized_hist[80*i:80*(i+1)]
330 item_height = int(rev_hex(item[36:39].encode('hex')), 16)
331 if item_height <= tx_height:
332 serialized_hist = serialized_hist[0:80*(i+1)] + s + serialized_hist[80*(i+1):]
335 serialized_hist = s + serialized_hist
337 self.batch_list[addr] = serialized_hist
340 txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
341 self.batch_txio[txo] = addr
348 def deserialize_block(self, block):
349 txlist = block.get('tx')
350 tx_hashes = [] # ordered txids
351 txdict = {} # deserialized tx
353 for raw_tx in txlist:
354 tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
355 vds = deserialize.BCDataStream()
356 vds.write(raw_tx.decode('hex'))
358 tx = deserialize.parse_Transaction(vds, is_coinbase)
360 print_log("ERROR: cannot parse", tx_hash)
362 tx_hashes.append(tx_hash)
365 return tx_hashes, txdict
369 def import_block(self, block, block_hash, block_height, sync, revert=False):
371 touched_addr = set([])
373 # deserialize transactions
374 tx_hashes, txdict = self.deserialize_block(block)
378 undo_info = self.storage.get_undo_info(block_height)
383 for txid in tx_hashes: # must be ordered
386 undo = self.storage.import_transaction(txid, tx, block_height, touched_addr)
387 undo_info[txid] = undo
389 undo = undo_info.pop(txid)
390 self.storage.revert_transaction(txid, tx, block_height, touched_addr, undo)
393 assert undo_info == {}
397 self.storage.write_undo_info(block_height, self.bitcoind_height, undo_info)
400 self.storage.db_undo.put('height', repr( (block_hash, block_height, self.storage.db_version) ))
402 for addr in touched_addr:
403 self.invalidate_cache(addr)
405 self.storage.update_hashes()
408 def add_request(self, session, request):
409 # see if we can get if from cache. if not, add to queue
410 if self.process(session, request, cache_only=True) == -1:
411 self.queue.put((session, request))
414 def do_subscribe(self, method, params, session):
415 with self.watch_lock:
416 if method == 'blockchain.numblocks.subscribe':
417 if session not in self.watch_blocks:
418 self.watch_blocks.append(session)
420 elif method == 'blockchain.headers.subscribe':
421 if session not in self.watch_headers:
422 self.watch_headers.append(session)
424 elif method == 'blockchain.address.subscribe':
426 l = self.watched_addresses.get(address)
428 self.watched_addresses[address] = [session]
429 elif session not in l:
433 def do_unsubscribe(self, method, params, session):
434 with self.watch_lock:
435 if method == 'blockchain.numblocks.subscribe':
436 if session in self.watch_blocks:
437 self.watch_blocks.remove(session)
438 elif method == 'blockchain.headers.subscribe':
439 if session in self.watch_headers:
440 self.watch_headers.remove(session)
441 elif method == "blockchain.address.subscribe":
443 l = self.watched_addresses.get(addr)
452 self.watched_addresses.pop(addr)
455 def process(self, session, request, cache_only=False):
457 message_id = request['id']
458 method = request['method']
459 params = request.get('params', [])
463 if method == 'blockchain.numblocks.subscribe':
464 result = self.storage.height
466 elif method == 'blockchain.headers.subscribe':
469 elif method == 'blockchain.address.subscribe':
471 address = str(params[0])
472 result = self.get_status(address, cache_only)
473 except BaseException, e:
474 error = str(e) + ': ' + address
475 print_log("error:", error)
477 elif method == 'blockchain.address.get_history':
479 address = str(params[0])
480 result = self.get_history(address, cache_only)
481 except BaseException, e:
482 error = str(e) + ': ' + address
483 print_log("error:", error)
485 elif method == 'blockchain.address.get_balance':
487 address = str(params[0])
488 result = self.storage.get_balance(address)
489 except BaseException, e:
490 error = str(e) + ': ' + address
491 print_log("error:", error)
493 elif method == 'blockchain.address.get_proof':
495 address = str(params[0])
496 result = self.storage.get_proof(address)
497 except BaseException, e:
498 error = str(e) + ': ' + address
499 print_log("error:", error)
501 elif method == 'blockchain.address.listunspent':
503 address = str(params[0])
504 result = self.storage.listunspent(address)
505 except BaseException, e:
506 error = str(e) + ': ' + address
507 print_log("error:", error)
509 elif method == 'blockchain.utxo.get_address':
511 txid = str(params[0])
513 txi = (txid + int_to_hex(pos, 4)).decode('hex')
514 result = self.storage.get_address(txi)
515 except BaseException, e:
517 print_log("error:", error, params)
519 elif method == 'blockchain.block.get_header':
524 height = int(params[0])
525 result = self.get_header(height)
526 except BaseException, e:
527 error = str(e) + ': %d' % height
528 print_log("error:", error)
530 elif method == 'blockchain.block.get_chunk':
535 index = int(params[0])
536 result = self.get_chunk(index)
537 except BaseException, e:
538 error = str(e) + ': %d' % index
539 print_log("error:", error)
541 elif method == 'blockchain.transaction.broadcast':
543 txo = self.bitcoind('sendrawtransaction', params)
544 print_log("sent tx:", txo)
546 except BaseException, e:
547 result = str(e) # do not send an error
548 print_log("error:", result, params)
550 elif method == 'blockchain.transaction.get_merkle':
556 tx_height = params[1]
557 result = self.get_merkle(tx_hash, tx_height)
558 except BaseException, e:
559 error = str(e) + ': ' + repr(params)
560 print_log("get_merkle error:", error)
562 elif method == 'blockchain.transaction.get':
565 result = self.bitcoind('getrawtransaction', [tx_hash, 0])
566 except BaseException, e:
567 error = str(e) + ': ' + repr(params)
568 print_log("tx get error:", error)
571 error = "unknown method:%s" % method
573 if cache_only and result == -1:
577 self.push_response(session, {'id': message_id, 'error': error})
579 self.push_response(session, {'id': message_id, 'result': result})
582 def getfullblock(self, block_hash):
583 block = self.bitcoind('getblock', [block_hash])
587 for txid in block['tx']:
589 "method": "getrawtransaction",
595 postdata = dumps(rawtxreq)
597 respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
599 traceback.print_exc(file=sys.stdout)
605 if ir['error'] is not None:
607 print_log("Error: make sure you run bitcoind with txindex=1; use -reindex if needed.")
608 raise BaseException(ir['error'])
609 rawtxdata.append(ir['result'])
610 block['tx'] = rawtxdata
613 def catch_up(self, sync=True):
615 prev_root_hash = None
616 while not self.shared.stopped():
621 info = self.bitcoind('getinfo')
622 self.bitcoind_height = info.get('blocks')
623 bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
624 if self.storage.last_hash == bitcoind_block_hash:
625 self.up_to_date = True
629 self.up_to_date = False
630 next_block_hash = self.bitcoind('getblockhash', [self.storage.height + 1])
631 next_block = self.getfullblock(next_block_hash)
634 # fixme: this is unsafe, if we revert when the undo info is not yet written
635 revert = (random.randint(1, 100) == 1) if self.test_reorgs else False
637 if (next_block.get('previousblockhash') == self.storage.last_hash) and not revert:
639 prev_root_hash = self.storage.get_root_hash()
641 self.import_block(next_block, next_block_hash, self.storage.height+1, sync)
642 self.storage.height = self.storage.height + 1
643 self.write_header(self.block2header(next_block), sync)
644 self.storage.last_hash = next_block_hash
647 if self.storage.height % 1000 == 0 and not sync:
648 t_daemon = self.mtimes.get('daemon')
649 t_import = self.mtimes.get('import')
650 print_log("catch_up: block %d (%.3fs %.3fs)" % (self.storage.height, t_daemon, t_import), self.storage.get_root_hash().encode('hex'))
651 self.mtimes['daemon'] = 0
652 self.mtimes['import'] = 0
656 # revert current block
657 block = self.getfullblock(self.storage.last_hash)
658 print_log("blockchain reorg", self.storage.height, block.get('previousblockhash'), self.storage.last_hash)
659 self.import_block(block, self.storage.last_hash, self.storage.height, sync, revert=True)
663 self.storage.height -= 1
665 # read previous header from disk
666 self.header = self.read_header(self.storage.height)
667 self.storage.last_hash = self.hash_header(self.header)
670 assert prev_root_hash == self.storage.get_root_hash()
671 prev_root_hash = None
674 self.header = self.block2header(self.bitcoind('getblock', [self.storage.last_hash]))
675 self.header['utxo_root'] = self.storage.get_root_hash().encode('hex')
677 if self.shared.stopped():
678 print_log( "closing database" )
682 def memorypool_update(self):
683 mempool_hashes = set(self.bitcoind('getrawmempool'))
684 touched_addresses = set([])
686 for tx_hash in mempool_hashes:
687 if tx_hash in self.mempool_hashes:
690 tx = self.get_mempool_transaction(tx_hash)
694 mpa = self.mempool_addresses.get(tx_hash, [])
695 for x in tx.get('inputs'):
696 # we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions
697 addr = x.get('address')
698 if addr and addr not in mpa:
700 touched_addresses.add(addr)
702 for x in tx.get('outputs'):
703 addr = x.get('address')
704 if addr and addr not in mpa:
706 touched_addresses.add(addr)
708 self.mempool_addresses[tx_hash] = mpa
709 self.mempool_hashes.add(tx_hash)
711 # remove older entries from mempool_hashes
712 self.mempool_hashes = mempool_hashes
714 # remove deprecated entries from mempool_addresses
715 for tx_hash, addresses in self.mempool_addresses.items():
716 if tx_hash not in self.mempool_hashes:
717 self.mempool_addresses.pop(tx_hash)
718 for addr in addresses:
719 touched_addresses.add(addr)
721 # rebuild mempool histories
722 new_mempool_hist = {}
723 for tx_hash, addresses in self.mempool_addresses.items():
724 for addr in addresses:
725 h = new_mempool_hist.get(addr, [])
728 new_mempool_hist[addr] = h
730 with self.mempool_lock:
731 self.mempool_hist = new_mempool_hist
733 # invalidate cache for touched addresses
734 for addr in touched_addresses:
735 self.invalidate_cache(addr)
738 def invalidate_cache(self, address):
739 with self.cache_lock:
740 if address in self.history_cache:
741 print_log("cache: invalidating", address)
742 self.history_cache.pop(address)
744 with self.watch_lock:
745 sessions = self.watched_addresses.get(address)
748 # TODO: update cache here. if new value equals cached value, do not send notification
749 self.address_queue.put((address,sessions))
751 def main_iteration(self):
752 if self.shared.stopped():
753 print_log("blockchain processor terminating")
762 self.memorypool_update()
764 if self.sent_height != self.storage.height:
765 self.sent_height = self.storage.height
766 for session in self.watch_blocks:
767 self.push_response(session, {
769 'method': 'blockchain.numblocks.subscribe',
770 'params': [self.storage.height],
773 if self.sent_header != self.header:
774 print_log("blockchain: %d (%.3fs)" % (self.storage.height, t2 - t1))
775 self.sent_header = self.header
776 for session in self.watch_headers:
777 self.push_response(session, {
779 'method': 'blockchain.headers.subscribe',
780 'params': [self.header],
785 addr, sessions = self.address_queue.get(False)
789 status = self.get_status(addr)
790 for session in sessions:
791 self.push_response(session, {
793 'method': 'blockchain.address.subscribe',
794 'params': [addr, status],
797 if not self.shared.stopped():
798 threading.Timer(10, self.main_iteration).start()
800 print_log("blockchain processor terminating")