3 from json import dumps, loads
5 from Queue import Queue
13 from backends.bitcoind import deserialize
14 from processor import Processor, print_log
17 from storage import Storage
20 class BlockchainProcessor(Processor):
22 def __init__(self, config, shared):
23 Processor.__init__(self)
25 self.mtimes = {} # monitoring
28 self.up_to_date = False
30 self.watch_lock = threading.Lock()
31 self.watch_blocks = []
32 self.watch_headers = []
33 self.watched_addresses = {}
35 self.history_cache = {}
37 self.cache_lock = threading.Lock()
38 self.headers_data = ''
39 self.headers_path = config.get('leveldb', 'path_fulltree')
41 self.mempool_values = {}
42 self.mempool_addresses = {}
43 self.mempool_hist = {}
44 self.mempool_hashes = set([])
45 self.mempool_lock = threading.Lock()
47 self.address_queue = Queue()
50 self.test_reorgs = config.getboolean('leveldb', 'test_reorgs') # simulate random blockchain reorgs
52 self.test_reorgs = False
53 self.storage = Storage(config, shared, self.test_reorgs)
55 self.dblock = threading.Lock()
57 self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
58 config.get('bitcoind', 'user'),
59 config.get('bitcoind', 'password'),
60 config.get('bitcoind', 'host'),
61 config.get('bitcoind', 'port'))
65 self.bitcoind('getinfo')
68 print_log('cannot contact bitcoind...')
73 self.sent_header = None
76 self.init_headers(self.storage.height)
78 threading.Timer(0, lambda: self.catch_up(sync=False)).start()
79 while not shared.stopped() and not self.up_to_date:
83 print "keyboard interrupt: stopping threads"
87 print_log("Blockchain is up to date.")
88 self.memorypool_update()
89 print_log("Memory pool initialized.")
91 self.timer = threading.Timer(10, self.main_iteration)
96 def mtime(self, name):
99 delta = now - self.now
100 t = self.mtimes.get(name, 0)
101 self.mtimes[name] = t + delta
104 def print_mtime(self):
106 for k, v in self.mtimes.items():
107 s += k+':'+"%.2f"%v+' '
111 def bitcoind(self, method, params=[]):
112 postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
114 respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
116 print_log("error calling bitcoind")
117 traceback.print_exc(file=sys.stdout)
121 if r['error'] is not None:
122 raise BaseException(r['error'])
123 return r.get('result')
126 def block2header(self, b):
128 "block_height": b.get('height'),
129 "version": b.get('version'),
130 "prev_block_hash": b.get('previousblockhash'),
131 "merkle_root": b.get('merkleroot'),
132 "timestamp": b.get('time'),
133 "bits": int(b.get('bits'), 16),
134 "nonce": b.get('nonce'),
137 def get_header(self, height):
138 block_hash = self.bitcoind('getblockhash', [height])
139 b = self.bitcoind('getblock', [block_hash])
140 return self.block2header(b)
142 def init_headers(self, db_height):
143 self.chunk_cache = {}
144 self.headers_filename = os.path.join(self.headers_path, 'blockchain_headers')
146 if os.path.exists(self.headers_filename):
147 height = os.path.getsize(self.headers_filename)/80 - 1 # the current height
149 prev_hash = self.hash_header(self.read_header(height))
153 open(self.headers_filename, 'wb').close()
157 if height < db_height:
158 print_log("catching up missing headers:", height, db_height)
161 while height < db_height:
163 header = self.get_header(height)
165 assert prev_hash == header.get('prev_block_hash')
166 self.write_header(header, sync=False)
167 prev_hash = self.hash_header(header)
168 if (height % 1000) == 0:
169 print_log("headers file:", height)
170 except KeyboardInterrupt:
176 def hash_header(self, header):
177 return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
179 def read_header(self, block_height):
180 if os.path.exists(self.headers_filename):
181 with open(self.headers_filename, 'rb') as f:
182 f.seek(block_height * 80)
185 h = header_from_string(h)
188 def read_chunk(self, index):
189 with open(self.headers_filename, 'rb') as f:
190 f.seek(index*2016*80)
191 chunk = f.read(2016*80)
192 return chunk.encode('hex')
194 def write_header(self, header, sync=True):
195 if not self.headers_data:
196 self.headers_offset = header.get('block_height')
198 self.headers_data += header_to_string(header).decode('hex')
199 if sync or len(self.headers_data) > 40*100:
202 with self.cache_lock:
203 chunk_index = header.get('block_height')/2016
204 if self.chunk_cache.get(chunk_index):
205 self.chunk_cache.pop(chunk_index)
207 def pop_header(self):
208 # we need to do this only if we have not flushed
209 if self.headers_data:
210 self.headers_data = self.headers_data[:-40]
212 def flush_headers(self):
213 if not self.headers_data:
215 with open(self.headers_filename, 'rb+') as f:
216 f.seek(self.headers_offset*80)
217 f.write(self.headers_data)
218 self.headers_data = ''
220 def get_chunk(self, i):
221 # store them on disk; store the current chunk in memory
222 with self.cache_lock:
223 chunk = self.chunk_cache.get(i)
225 chunk = self.read_chunk(i)
226 self.chunk_cache[i] = chunk
230 def get_mempool_transaction(self, txid):
232 raw_tx = self.bitcoind('getrawtransaction', [txid, 0])
236 vds = deserialize.BCDataStream()
237 vds.write(raw_tx.decode('hex'))
239 return deserialize.parse_Transaction(vds, is_coinbase=False)
241 print_log("ERROR: cannot parse", txid)
245 def get_history(self, addr, cache_only=False):
246 with self.cache_lock:
247 hist = self.history_cache.get(addr)
255 hist = self.storage.get_history(addr)
258 print_log("error get_history")
259 traceback.print_exc(file=sys.stdout)
268 with self.mempool_lock:
269 for txid, delta in self.mempool_hist.get(addr, []):
270 hist.append({'tx_hash':txid, 'height':0})
272 # add something to distinguish between unused and empty addresses
273 if hist == [] and is_known:
276 with self.cache_lock:
277 self.history_cache[addr] = hist
281 def get_unconfirmed_value(self, addr):
283 with self.mempool_lock:
284 for txid, delta in self.mempool_hist.get(addr, []):
289 def get_status(self, addr, cache_only=False):
290 tx_points = self.get_history(addr, cache_only)
291 if cache_only and tx_points == -1:
296 if tx_points == ['*']:
300 status += tx.get('tx_hash') + ':%d:' % tx.get('height')
301 return hashlib.sha256(status).digest().encode('hex')
303 def get_merkle(self, tx_hash, height):
305 block_hash = self.bitcoind('getblockhash', [height])
306 b = self.bitcoind('getblock', [block_hash])
307 tx_list = b.get('tx')
308 tx_pos = tx_list.index(tx_hash)
310 merkle = map(hash_decode, tx_list)
311 target_hash = hash_decode(tx_hash)
313 while len(merkle) != 1:
315 merkle.append(merkle[-1])
318 new_hash = Hash(merkle[0] + merkle[1])
319 if merkle[0] == target_hash:
320 s.append(hash_encode(merkle[1]))
321 target_hash = new_hash
322 elif merkle[1] == target_hash:
323 s.append(hash_encode(merkle[0]))
324 target_hash = new_hash
329 return {"block_height": height, "merkle": s, "pos": tx_pos}
332 def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
334 s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
337 serialized_hist = self.batch_list[addr]
339 l = len(serialized_hist)/80
340 for i in range(l-1, -1, -1):
341 item = serialized_hist[80*i:80*(i+1)]
342 item_height = int(rev_hex(item[36:39].encode('hex')), 16)
343 if item_height <= tx_height:
344 serialized_hist = serialized_hist[0:80*(i+1)] + s + serialized_hist[80*(i+1):]
347 serialized_hist = s + serialized_hist
349 self.batch_list[addr] = serialized_hist
352 txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
353 self.batch_txio[txo] = addr
360 def deserialize_block(self, block):
361 txlist = block.get('tx')
362 tx_hashes = [] # ordered txids
363 txdict = {} # deserialized tx
365 for raw_tx in txlist:
366 tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
367 vds = deserialize.BCDataStream()
368 vds.write(raw_tx.decode('hex'))
370 tx = deserialize.parse_Transaction(vds, is_coinbase)
372 print_log("ERROR: cannot parse", tx_hash)
374 tx_hashes.append(tx_hash)
377 return tx_hashes, txdict
381 def import_block(self, block, block_hash, block_height, sync, revert=False):
383 touched_addr = set([])
385 # deserialize transactions
386 tx_hashes, txdict = self.deserialize_block(block)
390 undo_info = self.storage.get_undo_info(block_height)
395 for txid in tx_hashes: # must be ordered
398 undo = self.storage.import_transaction(txid, tx, block_height, touched_addr)
399 undo_info[txid] = undo
401 undo = undo_info.pop(txid)
402 self.storage.revert_transaction(txid, tx, block_height, touched_addr, undo)
405 assert undo_info == {}
409 self.storage.write_undo_info(block_height, self.bitcoind_height, undo_info)
412 self.storage.db_undo.put('height', repr( (block_hash, block_height, self.storage.db_version) ))
414 for addr in touched_addr:
415 self.invalidate_cache(addr)
417 self.storage.update_hashes()
420 def add_request(self, session, request):
421 # see if we can get if from cache. if not, add to queue
422 if self.process(session, request, cache_only=True) == -1:
423 self.queue.put((session, request))
426 def do_subscribe(self, method, params, session):
427 with self.watch_lock:
428 if method == 'blockchain.numblocks.subscribe':
429 if session not in self.watch_blocks:
430 self.watch_blocks.append(session)
432 elif method == 'blockchain.headers.subscribe':
433 if session not in self.watch_headers:
434 self.watch_headers.append(session)
436 elif method == 'blockchain.address.subscribe':
438 l = self.watched_addresses.get(address)
440 self.watched_addresses[address] = [session]
441 elif session not in l:
445 def do_unsubscribe(self, method, params, session):
446 with self.watch_lock:
447 if method == 'blockchain.numblocks.subscribe':
448 if session in self.watch_blocks:
449 self.watch_blocks.remove(session)
450 elif method == 'blockchain.headers.subscribe':
451 if session in self.watch_headers:
452 self.watch_headers.remove(session)
453 elif method == "blockchain.address.subscribe":
455 l = self.watched_addresses.get(addr)
461 print_log("error rc!!")
464 self.watched_addresses.pop(addr)
467 def process(self, session, request, cache_only=False):
469 message_id = request['id']
470 method = request['method']
471 params = request.get('params', [])
475 if method == 'blockchain.numblocks.subscribe':
476 result = self.storage.height
478 elif method == 'blockchain.headers.subscribe':
481 elif method == 'blockchain.address.subscribe':
483 address = str(params[0])
484 result = self.get_status(address, cache_only)
485 except BaseException, e:
486 error = str(e) + ': ' + address
487 print_log("error:", error)
489 elif method == 'blockchain.address.get_history':
491 address = str(params[0])
492 result = self.get_history(address, cache_only)
493 except BaseException, e:
494 error = str(e) + ': ' + address
495 print_log("error:", error)
497 elif method == 'blockchain.address.get_mempool':
499 address = str(params[0])
500 result = self.get_unconfirmed_history(address, cache_only)
501 except BaseException, e:
502 error = str(e) + ': ' + address
503 print_log("error:", error)
505 elif method == 'blockchain.address.get_balance':
507 address = str(params[0])
508 confirmed = self.storage.get_balance(address)
509 unconfirmed = self.get_unconfirmed_value(address)
510 result = { 'confirmed':confirmed, 'unconfirmed':unconfirmed }
511 except BaseException, e:
512 error = str(e) + ': ' + address
513 print_log("error:", error)
515 elif method == 'blockchain.address.get_proof':
517 address = str(params[0])
518 result = self.storage.get_proof(address)
519 except BaseException, e:
520 error = str(e) + ': ' + address
521 print_log("error:", error)
523 elif method == 'blockchain.address.listunspent':
525 address = str(params[0])
526 result = self.storage.listunspent(address)
527 except BaseException, e:
528 error = str(e) + ': ' + address
529 print_log("error:", error)
531 elif method == 'blockchain.utxo.get_address':
533 txid = str(params[0])
535 txi = (txid + int_to_hex(pos, 4)).decode('hex')
536 result = self.storage.get_address(txi)
537 except BaseException, e:
539 print_log("error:", error, params)
541 elif method == 'blockchain.block.get_header':
546 height = int(params[0])
547 result = self.get_header(height)
548 except BaseException, e:
549 error = str(e) + ': %d' % height
550 print_log("error:", error)
552 elif method == 'blockchain.block.get_chunk':
557 index = int(params[0])
558 result = self.get_chunk(index)
559 except BaseException, e:
560 error = str(e) + ': %d' % index
561 print_log("error:", error)
563 elif method == 'blockchain.transaction.broadcast':
565 txo = self.bitcoind('sendrawtransaction', params)
566 print_log("sent tx:", txo)
568 except BaseException, e:
569 result = str(e) # do not send an error
570 print_log("error:", result, params)
572 elif method == 'blockchain.transaction.get_merkle':
578 tx_height = params[1]
579 result = self.get_merkle(tx_hash, tx_height)
580 except BaseException, e:
581 error = str(e) + ': ' + repr(params)
582 print_log("get_merkle error:", error)
584 elif method == 'blockchain.transaction.get':
587 result = self.bitcoind('getrawtransaction', [tx_hash, 0])
588 except BaseException, e:
589 error = str(e) + ': ' + repr(params)
590 print_log("tx get error:", error)
593 error = "unknown method:%s" % method
595 if cache_only and result == -1:
599 self.push_response(session, {'id': message_id, 'error': error})
601 self.push_response(session, {'id': message_id, 'result': result})
604 def getfullblock(self, block_hash):
605 block = self.bitcoind('getblock', [block_hash])
609 for txid in block['tx']:
611 "method": "getrawtransaction",
617 postdata = dumps(rawtxreq)
619 respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
621 print_log("bitcoind error (getfullblock)")
622 traceback.print_exc(file=sys.stdout)
628 if ir['error'] is not None:
630 print_log("Error: make sure you run bitcoind with txindex=1; use -reindex if needed.")
631 raise BaseException(ir['error'])
632 rawtxdata.append(ir['result'])
633 block['tx'] = rawtxdata
636 def catch_up(self, sync=True):
638 prev_root_hash = None
639 while not self.shared.stopped():
644 info = self.bitcoind('getinfo')
645 self.bitcoind_height = info.get('blocks')
646 bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
647 if self.storage.last_hash == bitcoind_block_hash:
648 self.up_to_date = True
652 self.up_to_date = False
653 next_block_hash = self.bitcoind('getblockhash', [self.storage.height + 1])
654 next_block = self.getfullblock(next_block_hash)
657 # fixme: this is unsafe, if we revert when the undo info is not yet written
658 revert = (random.randint(1, 100) == 1) if self.test_reorgs else False
660 if (next_block.get('previousblockhash') == self.storage.last_hash) and not revert:
662 prev_root_hash = self.storage.get_root_hash()
664 self.import_block(next_block, next_block_hash, self.storage.height+1, sync)
665 self.storage.height = self.storage.height + 1
666 self.write_header(self.block2header(next_block), sync)
667 self.storage.last_hash = next_block_hash
670 if self.storage.height % 1000 == 0 and not sync:
671 t_daemon = self.mtimes.get('daemon')
672 t_import = self.mtimes.get('import')
673 print_log("catch_up: block %d (%.3fs %.3fs)" % (self.storage.height, t_daemon, t_import), self.storage.get_root_hash().encode('hex'))
674 self.mtimes['daemon'] = 0
675 self.mtimes['import'] = 0
679 # revert current block
680 block = self.getfullblock(self.storage.last_hash)
681 print_log("blockchain reorg", self.storage.height, block.get('previousblockhash'), self.storage.last_hash)
682 self.import_block(block, self.storage.last_hash, self.storage.height, sync, revert=True)
686 self.storage.height -= 1
688 # read previous header from disk
689 self.header = self.read_header(self.storage.height)
690 self.storage.last_hash = self.hash_header(self.header)
693 assert prev_root_hash == self.storage.get_root_hash()
694 prev_root_hash = None
697 self.header = self.block2header(self.bitcoind('getblock', [self.storage.last_hash]))
698 self.header['utxo_root'] = self.storage.get_root_hash().encode('hex')
700 if self.shared.stopped():
701 print_log( "closing database" )
705 def memorypool_update(self):
706 mempool_hashes = set(self.bitcoind('getrawmempool'))
707 touched_addresses = set([])
709 # get new transactions
711 for tx_hash in mempool_hashes:
712 if tx_hash in self.mempool_hashes:
715 tx = self.get_mempool_transaction(tx_hash)
720 self.mempool_hashes.add(tx_hash)
722 # remove older entries from mempool_hashes
723 self.mempool_hashes = mempool_hashes
726 # check all tx outputs
727 for tx_hash, tx in new_tx.items():
728 mpa = self.mempool_addresses.get(tx_hash, {})
730 for x in tx.get('outputs'):
731 out_values.append( x['value'] )
733 addr = x.get('address')
739 touched_addresses.add(addr)
741 self.mempool_addresses[tx_hash] = mpa
742 self.mempool_values[tx_hash] = out_values
745 for tx_hash, tx in new_tx.items():
746 mpa = self.mempool_addresses.get(tx_hash, {})
747 for x in tx.get('inputs'):
748 # we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions
749 addr = x.get('address')
753 v = self.mempool_values.get(x.get('prevout_hash'))
755 value = v[ x.get('prevout_n')]
757 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
759 value = self.storage.get_utxo_value(addr,txi)
761 print_log("utxo not in database; postponing mempool update")
767 touched_addresses.add(addr)
769 self.mempool_addresses[tx_hash] = mpa
772 # remove deprecated entries from mempool_addresses
773 for tx_hash, addresses in self.mempool_addresses.items():
774 if tx_hash not in self.mempool_hashes:
775 self.mempool_addresses.pop(tx_hash)
776 self.mempool_values.pop(tx_hash)
777 for addr in addresses:
778 touched_addresses.add(addr)
780 # rebuild mempool histories
781 new_mempool_hist = {}
782 for tx_hash, addresses in self.mempool_addresses.items():
783 for addr, delta in addresses.items():
784 h = new_mempool_hist.get(addr, [])
786 h.append((tx_hash, delta))
787 new_mempool_hist[addr] = h
789 with self.mempool_lock:
790 self.mempool_hist = new_mempool_hist
792 # invalidate cache for touched addresses
793 for addr in touched_addresses:
794 self.invalidate_cache(addr)
797 def invalidate_cache(self, address):
798 with self.cache_lock:
799 if address in self.history_cache:
800 print_log("cache: invalidating", address)
801 self.history_cache.pop(address)
803 with self.watch_lock:
804 sessions = self.watched_addresses.get(address)
807 # TODO: update cache here. if new value equals cached value, do not send notification
808 self.address_queue.put((address,sessions))
813 print_log("Closing database...")
815 print_log("Database is closed")
818 def main_iteration(self):
819 if self.shared.stopped():
820 print_log("Stopping timer")
828 self.memorypool_update()
830 if self.sent_height != self.storage.height:
831 self.sent_height = self.storage.height
832 for session in self.watch_blocks:
833 self.push_response(session, {
835 'method': 'blockchain.numblocks.subscribe',
836 'params': [self.storage.height],
839 if self.sent_header != self.header:
840 print_log("blockchain: %d (%.3fs)" % (self.storage.height, t2 - t1))
841 self.sent_header = self.header
842 for session in self.watch_headers:
843 self.push_response(session, {
845 'method': 'blockchain.headers.subscribe',
846 'params': [self.header],
851 addr, sessions = self.address_queue.get(False)
855 status = self.get_status(addr)
856 for session in sessions:
857 self.push_response(session, {
859 'method': 'blockchain.address.subscribe',
860 'params': [addr, status],
864 self.timer = threading.Timer(10, self.main_iteration)