1 from json import dumps, loads
4 import ast, time, threading, hashlib
5 from Queue import Queue
6 import traceback, sys, os, random
9 from util import Hash, hash_encode, hash_decode, rev_hex, int_to_hex
10 from util import bc_address_to_hash_160, hash_160_to_bc_address, header_to_string, header_from_string
11 from processor import Processor, print_log
13 class BlockchainProcessor(Processor):
15 def __init__(self, config, shared):
16 Processor.__init__(self)
20 self.up_to_date = False
21 self.watched_addresses = []
22 self.history_cache = {}
24 self.cache_lock = threading.Lock()
25 self.headers_data = ''
27 self.mempool_addresses = {}
28 self.mempool_hist = {}
29 self.mempool_hashes = []
30 self.mempool_lock = threading.Lock()
32 self.address_queue = Queue()
33 self.dbpath = config.get('leveldb', 'path')
35 self.dblock = threading.Lock()
37 self.db = leveldb.LevelDB(self.dbpath)
39 traceback.print_exc(file=sys.stdout)
42 self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
43 config.get('bitcoind','user'),
44 config.get('bitcoind','password'),
45 config.get('bitcoind','host'),
46 config.get('bitcoind','port'))
51 self.sent_header = None
55 hist = self.deserialize(self.db.Get('height'))
56 self.last_hash, self.height, _ = hist[0]
57 print_log( "hist", hist )
59 #traceback.print_exc(file=sys.stdout)
60 print_log('initializing database')
62 self.last_hash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
65 self.init_headers(self.height)
67 threading.Timer(0, lambda: self.catch_up(sync=False)).start()
68 while not shared.stopped() and not self.up_to_date:
72 print "keyboard interrupt: stopping threads"
76 print_log( "blockchain is up to date." )
78 threading.Timer(10, self.main_iteration).start()
82 def bitcoind(self, method, params=[]):
83 postdata = dumps({"method": method, 'params': params, 'id':'jsonrpc'})
85 respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
87 traceback.print_exc(file=sys.stdout)
91 if r['error'] != None:
92 raise BaseException(r['error'])
93 return r.get('result')
96 def serialize(self, h):
98 for txid, txpos, height in h:
99 s += txid + int_to_hex(txpos, 4) + int_to_hex(height, 4)
100 return s.decode('hex')
103 def deserialize(self, s):
106 txid = s[0:32].encode('hex')
107 txpos = int( rev_hex( s[32:36].encode('hex') ), 16 )
108 height = int( rev_hex( s[36:40].encode('hex') ), 16 )
109 h.append( ( txid, txpos, height ) )
114 def block2header(self, b):
115 return {"block_height":b.get('height'), "version":b.get('version'), "prev_block_hash":b.get('previousblockhash'),
116 "merkle_root":b.get('merkleroot'), "timestamp":b.get('time'), "bits":int(b.get('bits'),16), "nonce":b.get('nonce')}
119 def get_header(self, height):
120 block_hash = self.bitcoind('getblockhash', [height])
121 b = self.bitcoind('getblock', [block_hash])
122 return self.block2header(b)
125 def init_headers(self, db_height):
126 self.chunk_cache = {}
127 self.headers_filename = os.path.join( self.dbpath, 'blockchain_headers')
129 if os.path.exists(self.headers_filename):
130 height = os.path.getsize(self.headers_filename)/80 - 1 # the current height
132 prev_hash = self.hash_header(self.read_header(height))
136 open(self.headers_filename,'wb').close()
140 if height < db_height:
141 print_log( "catching up missing headers:", height, db_height)
144 while height < db_height:
146 header = self.get_header(height)
148 assert prev_hash == header.get('prev_block_hash')
149 self.write_header(header, sync=False)
150 prev_hash = self.hash_header(header)
151 if height%1000==0: print_log("headers file:",height)
152 except KeyboardInterrupt:
159 def hash_header(self, header):
160 return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
163 def read_header(self, block_height):
164 if os.path.exists(self.headers_filename):
165 f = open(self.headers_filename,'rb')
166 f.seek(block_height*80)
170 h = header_from_string(h)
174 def read_chunk(self, index):
175 f = open(self.headers_filename,'rb')
176 f.seek(index*2016*80)
177 chunk = f.read(2016*80)
179 return chunk.encode('hex')
182 def write_header(self, header, sync=True):
183 if not self.headers_data:
184 self.headers_offset = header.get('block_height')
186 self.headers_data += header_to_string(header).decode('hex')
187 if sync or len(self.headers_data) > 40*100:
190 with self.cache_lock:
191 chunk_index = header.get('block_height')/2016
192 if self.chunk_cache.get(chunk_index):
193 self.chunk_cache.pop(chunk_index)
195 def pop_header(self):
196 # we need to do this only if we have not flushed
197 if self.headers_data:
198 self.headers_data = self.headers_data[:-40]
200 def flush_headers(self):
201 if not self.headers_data: return
202 f = open(self.headers_filename,'rb+')
203 f.seek(self.headers_offset*80)
204 f.write(self.headers_data)
206 self.headers_data = ''
209 def get_chunk(self, i):
210 # store them on disk; store the current chunk in memory
211 with self.cache_lock:
212 chunk = self.chunk_cache.get(i)
214 chunk = self.read_chunk(i)
215 self.chunk_cache[i] = chunk
220 def get_transaction(self, txid, block_height=-1, is_coinbase = False):
221 raw_tx = self.bitcoind('getrawtransaction', [txid, 0, block_height])
222 vds = deserialize.BCDataStream()
223 vds.write(raw_tx.decode('hex'))
224 out = deserialize.parse_Transaction(vds, is_coinbase)
228 def get_history(self, addr, cache_only=False):
229 with self.cache_lock: hist = self.history_cache.get( addr )
230 if hist is not None: return hist
231 if cache_only: return -1
235 hash_160 = bc_address_to_hash_160(addr)
236 hist = self.deserialize(self.db.Get(hash_160))
242 # should not be necessary
243 hist.sort( key=lambda tup: tup[1])
244 # check uniqueness too...
247 with self.mempool_lock:
248 for txid in self.mempool_hist.get(addr,[]):
249 hist.append((txid, 0, 0))
251 hist = map(lambda x: {'tx_hash':x[0], 'height':x[2]}, hist)
252 # add something to distinguish between unused and empty addresses
253 if hist == [] and is_known: hist = ['*']
255 with self.cache_lock: self.history_cache[addr] = hist
259 def get_status(self, addr, cache_only=False):
260 tx_points = self.get_history(addr, cache_only)
261 if cache_only and tx_points == -1: return -1
263 if not tx_points: return None
264 if tx_points == ['*']: return '*'
267 status += tx.get('tx_hash') + ':%d:' % tx.get('height')
268 return hashlib.sha256( status ).digest().encode('hex')
271 def get_merkle(self, tx_hash, height):
273 block_hash = self.bitcoind('getblockhash', [height])
274 b = self.bitcoind('getblock', [block_hash])
275 tx_list = b.get('tx')
276 tx_pos = tx_list.index(tx_hash)
278 merkle = map(hash_decode, tx_list)
279 target_hash = hash_decode(tx_hash)
281 while len(merkle) != 1:
282 if len(merkle)%2: merkle.append( merkle[-1] )
285 new_hash = Hash( merkle[0] + merkle[1] )
286 if merkle[0] == target_hash:
287 s.append( hash_encode( merkle[1]))
288 target_hash = new_hash
289 elif merkle[1] == target_hash:
290 s.append( hash_encode( merkle[0]))
291 target_hash = new_hash
296 return {"block_height":height, "merkle":s, "pos":tx_pos}
301 def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
304 s = (tx_hash + int_to_hex(tx_pos, 4) + int_to_hex(tx_height, 4)).decode('hex')
306 serialized_hist = self.batch_list[addr]
308 l = len(serialized_hist)/40
309 for i in range(l-1, -1, -1):
310 item = serialized_hist[40*i:40*(i+1)]
311 item_height = int( rev_hex( item[36:40].encode('hex') ), 16 )
312 if item_height < tx_height:
313 serialized_hist = serialized_hist[0:40*(i+1)] + s + serialized_hist[40*(i+1):]
316 serialized_hist = s + serialized_hist
318 self.batch_list[addr] = serialized_hist
321 txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
322 self.batch_txio[txo] = addr
325 def remove_from_history(self, addr, tx_hash, tx_pos):
327 txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
331 addr = self.batch_txio[txi]
333 raise BaseException(tx_hash, tx_pos)
335 serialized_hist = self.batch_list[addr]
337 l = len(serialized_hist)/40
339 item = serialized_hist[40*i:40*(i+1)]
340 if item[0:36] == txi:
341 height = int( rev_hex( item[36:40].encode('hex') ), 16 )
342 serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):]
345 hist = self.deserialize(serialized_hist)
346 raise BaseException("prevout not found", addr, hist, tx_hash, tx_pos)
348 self.batch_list[addr] = serialized_hist
352 def deserialize_block(self, block):
353 txlist = block.get('tx')
354 tx_hashes = [] # ordered txids
355 txdict = {} # deserialized tx
357 for raw_tx in txlist:
358 tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
359 tx_hashes.append(tx_hash)
360 vds = deserialize.BCDataStream()
361 vds.write(raw_tx.decode('hex'))
362 tx = deserialize.parse_Transaction(vds, is_coinbase)
365 return tx_hashes, txdict
367 def get_undo_info(self, height):
368 s = self.db.Get("undo%d"%(height%100))
371 def write_undo_info(self, batch, height, undo_info):
372 if self.is_test or height > self.bitcoind_height - 100:
373 batch.Put("undo%d"%(height%100), repr(undo_info))
376 def import_block(self, block, block_hash, block_height, sync, revert=False):
378 self.batch_list = {} # address -> history
379 self.batch_txio = {} # transaction i/o -> address
385 # deserialize transactions
387 tx_hashes, txdict = self.deserialize_block(block)
393 # read addresses of tx inputs
394 for tx in txdict.values():
395 for x in tx.get('inputs'):
396 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
397 block_inputs.append(txi)
400 for txi in block_inputs:
402 addr = self.db.Get(txi)
404 # the input could come from the same block
406 self.batch_txio[txi] = addr
407 addr_to_read.append(addr)
410 for txid, tx in txdict.items():
411 for x in tx.get('outputs'):
412 txo = (txid + int_to_hex(x.get('index'), 4)).decode('hex')
413 block_outputs.append(txo)
415 # read histories of addresses
416 for txid, tx in txdict.items():
417 for x in tx.get('outputs'):
418 hash_160 = bc_address_to_hash_160(x.get('address'))
419 addr_to_read.append(hash_160)
422 for addr in addr_to_read:
424 self.batch_list[addr] = self.db.Get(addr)
426 self.batch_list[addr] = ''
430 undo_info = self.get_undo_info(block_height)
431 # print "undo", block_height, undo_info
437 if revert: tx_hashes = tx_hashes[::-1]
438 for txid in tx_hashes: # must be ordered
443 for x in tx.get('inputs'):
444 prevout_height, prevout_addr = self.remove_from_history( None, x.get('prevout_hash'), x.get('prevout_n'))
445 undo.append( (prevout_height, prevout_addr) )
446 undo_info[txid] = undo
448 for x in tx.get('outputs'):
449 hash_160 = bc_address_to_hash_160(x.get('address'))
450 self.add_to_history( hash_160, txid, x.get('index'), block_height)
453 for x in tx.get('outputs'):
454 hash_160 = bc_address_to_hash_160(x.get('address'))
455 self.remove_from_history( hash_160, txid, x.get('index'))
458 for x in tx.get('inputs'):
459 prevout_height, prevout_addr = undo_info.get(txid)[i]
462 # read the history into batch list
463 if self.batch_list.get(prevout_addr) is None:
464 self.batch_list[prevout_addr] = self.db.Get(prevout_addr)
466 # re-add them to the history
467 self.add_to_history( prevout_addr, x.get('prevout_hash'), x.get('prevout_n'), prevout_height)
468 # print_log( "new hist for", hash_160_to_bc_address(prevout_addr), self.deserialize(self.batch_list[prevout_addr]) )
475 batch = leveldb.WriteBatch()
476 for addr, serialized_hist in self.batch_list.items():
477 batch.Put(addr, serialized_hist)
478 l = len(serialized_hist)
484 # add new created outputs
485 for txio, addr in self.batch_txio.items():
486 batch.Put(txio, addr)
487 # delete spent inputs
488 for txi in block_inputs:
491 self.write_undo_info(batch, block_height, undo_info)
493 # restore spent inputs
494 for txio, addr in self.batch_txio.items():
495 batch.Put(txio, addr)
496 # delete spent outputs
497 for txo in block_outputs:
502 batch.Put('height', self.serialize( [(block_hash, block_height, 0)] ) )
505 self.db.Write(batch, sync = sync)
508 if t3 - t0 > 10 and not sync:
509 print_log("block", block_height,
510 "parse:%0.2f "%(t00 - t0),
511 "read:%0.2f "%(t1 - t00),
512 "proc:%.2f "%(t2-t1),
513 "write:%.2f "%(t3-t2),
514 "max:", max_len, hash_160_to_bc_address(max_addr))
516 for h160 in self.batch_list.keys():
517 addr = hash_160_to_bc_address(h160)
518 self.invalidate_cache(addr)
522 def add_request(self, request):
523 # see if we can get if from cache. if not, add to queue
524 if self.process( request, cache_only = True) == -1:
525 self.queue.put(request)
529 def process(self, request, cache_only = False):
530 #print "abe process", request
532 message_id = request['id']
533 method = request['method']
534 params = request.get('params',[])
538 if method == 'blockchain.numblocks.subscribe':
541 elif method == 'blockchain.headers.subscribe':
544 elif method == 'blockchain.address.subscribe':
547 result = self.get_status(address, cache_only)
548 self.watch_address(address)
549 except BaseException, e:
550 error = str(e) + ': ' + address
551 print_log( "error:", error )
553 elif method == 'blockchain.address.unsubscribe':
557 if password == self.config.get('server','password'):
558 self.watched_addresses.remove(address)
559 print_log('unsubscribed', address)
562 print_log('incorrect password')
563 result = "authentication error"
564 except BaseException, e:
565 error = str(e) + ': ' + address
566 print_log( "error:", error )
568 elif method == 'blockchain.address.get_history':
571 result = self.get_history( address, cache_only )
572 except BaseException, e:
573 error = str(e) + ': ' + address
574 print_log( "error:", error )
576 elif method == 'blockchain.block.get_header':
582 result = self.get_header( height )
583 except BaseException, e:
584 error = str(e) + ': %d'% height
585 print_log( "error:", error )
587 elif method == 'blockchain.block.get_chunk':
593 result = self.get_chunk( index )
594 except BaseException, e:
595 error = str(e) + ': %d'% index
596 print_log( "error:", error)
598 elif method == 'blockchain.transaction.broadcast':
600 txo = self.bitcoind('sendrawtransaction', params)
601 print_log( "sent tx:", txo )
603 except BaseException, e:
604 result = str(e) # do not send an error
605 print_log( "error:", str(e), params )
607 elif method == 'blockchain.transaction.get_merkle':
613 tx_height = params[1]
614 result = self.get_merkle(tx_hash, tx_height)
615 except BaseException, e:
616 error = str(e) + ': ' + tx_hash
617 print_log( "error:", error )
619 elif method == 'blockchain.transaction.get':
623 result = self.bitcoind('getrawtransaction', [tx_hash, 0, height] )
624 except BaseException, e:
625 error = str(e) + ': ' + tx_hash
626 print_log( "error:", error )
629 error = "unknown method:%s"%method
631 if cache_only and result == -1: return -1
634 response = { 'id':message_id, 'error':error }
635 self.push_response(response)
637 response = { 'id':message_id, 'result':result }
638 self.push_response(response)
641 def watch_address(self, addr):
642 if addr not in self.watched_addresses:
643 self.watched_addresses.append(addr)
647 def catch_up(self, sync = True):
651 while not self.shared.stopped():
654 info = self.bitcoind('getinfo')
655 self.bitcoind_height = info.get('blocks')
656 bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
657 if self.last_hash == bitcoind_block_hash:
658 self.up_to_date = True
662 self.up_to_date = False
663 next_block_hash = self.bitcoind('getblockhash', [self.height+1])
664 next_block = self.bitcoind('getblock', [next_block_hash, 1])
666 # fixme: this is unsafe, if we revert when the undo info is not yet written
667 revert = (random.randint(1, 100)==1) if self.is_test else False
669 if (next_block.get('previousblockhash') == self.last_hash) and not revert:
671 self.import_block(next_block, next_block_hash, self.height+1, sync)
672 self.height = self.height + 1
673 self.write_header(self.block2header(next_block), sync)
674 self.last_hash = next_block_hash
676 if (self.height)%100 == 0 and not sync:
678 print_log( "catch_up: block %d (%.3fs)"%( self.height, t2 - t1 ) )
682 # revert current block
683 block = self.bitcoind('getblock', [self.last_hash, 1])
684 print_log( "blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash )
685 self.import_block(block, self.last_hash, self.height, sync, revert=True)
689 self.height = self.height -1
691 # read previous header from disk
692 self.header = self.read_header(self.height)
693 self.last_hash = self.hash_header(self.header)
696 self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
701 def memorypool_update(self):
703 mempool_hashes = self.bitcoind('getrawmempool')
705 for tx_hash in mempool_hashes:
706 if tx_hash in self.mempool_hashes: continue
708 tx = self.get_transaction(tx_hash)
711 for x in tx.get('inputs'):
712 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
714 h160 = self.db.Get(txi)
715 addr = hash_160_to_bc_address(h160)
718 l = self.mempool_addresses.get(tx_hash, [])
721 self.mempool_addresses[tx_hash] = l
723 for x in tx.get('outputs'):
724 addr = x.get('address')
725 l = self.mempool_addresses.get(tx_hash, [])
728 self.mempool_addresses[tx_hash] = l
730 self.mempool_hashes.append(tx_hash)
732 # remove older entries from mempool_hashes
733 self.mempool_hashes = mempool_hashes
735 # remove deprecated entries from mempool_addresses
736 for tx_hash, addresses in self.mempool_addresses.items():
737 if tx_hash not in self.mempool_hashes:
738 self.mempool_addresses.pop(tx_hash)
741 new_mempool_hist = {}
742 for tx_hash, addresses in self.mempool_addresses.items():
743 for addr in addresses:
744 h = new_mempool_hist.get(addr, [])
747 new_mempool_hist[addr] = h
749 for addr in new_mempool_hist.keys():
750 if addr in self.mempool_hist.keys():
751 if self.mempool_hist[addr] != new_mempool_hist[addr]:
752 self.invalidate_cache(addr)
754 self.invalidate_cache(addr)
756 with self.mempool_lock:
757 self.mempool_hist = new_mempool_hist
761 def invalidate_cache(self, address):
762 with self.cache_lock:
763 if self.history_cache.has_key(address):
764 print_log( "cache: invalidating", address )
765 self.history_cache.pop(address)
767 if address in self.watched_addresses:
768 self.address_queue.put(address)
772 def main_iteration(self):
774 if self.shared.stopped():
775 print_log( "blockchain processor terminating" )
783 self.memorypool_update()
785 # print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2)
788 if self.sent_height != self.height:
789 self.sent_height = self.height
790 self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.height] })
792 if self.sent_header != self.header:
793 print_log( "blockchain: %d (%.3fs)"%( self.height, t2 - t1 ) )
794 self.sent_header = self.header
795 self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.header] })
799 addr = self.address_queue.get(False)
802 if addr in self.watched_addresses:
803 status = self.get_status( addr )
804 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
806 if not self.shared.stopped():
807 threading.Timer(10, self.main_iteration).start()
809 print_log( "blockchain processor terminating" )