1 from json import dumps, loads
4 import ast, time, threading, hashlib
5 from Queue import Queue
6 import traceback, sys, os
10 Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
11 hash_encode = lambda x: x[::-1].encode('hex')
12 hash_decode = lambda x: x.decode('hex')[::-1]
17 return s.decode('hex')[::-1].encode('hex')
20 def int_to_hex(i, length=1):
21 s = hex(i)[2:].rstrip('L')
22 s = "0"*(2*length - len(s)) + s
25 def header_to_string(res):
26 pbh = res.get('prev_block_hash')
27 if pbh is None: pbh = '0'*64
28 s = int_to_hex(res.get('version'),4) \
30 + rev_hex(res.get('merkle_root')) \
31 + int_to_hex(int(res.get('timestamp')),4) \
32 + int_to_hex(int(res.get('bits')),4) \
33 + int_to_hex(int(res.get('nonce')),4)
36 def header_from_string( s):
37 hex_to_int = lambda s: eval('0x' + s[::-1].encode('hex'))
39 h['version'] = hex_to_int(s[0:4])
40 h['prev_block_hash'] = hash_encode(s[4:36])
41 h['merkle_root'] = hash_encode(s[36:68])
42 h['timestamp'] = hex_to_int(s[68:72])
43 h['bits'] = hex_to_int(s[72:76])
44 h['nonce'] = hex_to_int(s[76:80])
50 from processor import Processor, print_log
52 class BlockchainProcessor(Processor):
54 def __init__(self, config, shared):
55 Processor.__init__(self)
58 self.up_to_date = False
59 self.watched_addresses = []
60 self.history_cache = {}
62 self.cache_lock = threading.Lock()
63 self.headers_data = ''
65 self.mempool_hist = {}
66 self.known_mempool_hashes = []
67 self.address_queue = Queue()
68 self.dbpath = config.get('leveldb', 'path')
70 self.dblock = threading.Lock()
72 self.db = leveldb.LevelDB(self.dbpath)
74 traceback.print_exc(file=sys.stdout)
77 self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
78 config.get('bitcoind','user'),
79 config.get('bitcoind','password'),
80 config.get('bitcoind','host'),
81 config.get('bitcoind','port'))
85 self.sent_header = None
89 hist = self.deserialize(self.db.Get('0'))
90 hh, self.height, _ = hist[0]
91 self.block_hashes = [hh]
92 print_log( "hist", hist )
94 #traceback.print_exc(file=sys.stdout)
95 print_log('initializing database')
97 self.block_hashes = [ '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' ]
100 self.init_headers(self.height)
102 threading.Timer(0, lambda: self.catch_up(sync=False)).start()
103 while not shared.stopped() and not self.up_to_date:
107 print "keyboard interrupt: stopping threads"
111 print "blockchain is up to date."
113 threading.Timer(10, self.main_iteration).start()
117 def bitcoind(self, method, params=[]):
118 postdata = dumps({"method": method, 'params': params, 'id':'jsonrpc'})
119 respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
121 if r['error'] != None:
122 raise BaseException(r['error'])
123 return r.get('result')
126 def serialize(self, h):
128 for txid, txpos, height in h:
129 s += txid + int_to_hex(txpos, 4) + int_to_hex(height, 4)
130 return s.decode('hex')
133 def deserialize(self, s):
136 txid = s[0:32].encode('hex')
137 txpos = int( rev_hex( s[32:36].encode('hex') ), 16 )
138 height = int( rev_hex( s[36:40].encode('hex') ), 16 )
139 h.append( ( txid, txpos, height ) )
144 def block2header(self, b):
145 return {"block_height":b.get('height'), "version":b.get('version'), "prev_block_hash":b.get('previousblockhash'),
146 "merkle_root":b.get('merkleroot'), "timestamp":b.get('time'), "bits":int(b.get('bits'),16), "nonce":b.get('nonce')}
149 def get_header(self, height):
150 block_hash = self.bitcoind('getblockhash', [height])
151 b = self.bitcoind('getblock', [block_hash])
152 return self.block2header(b)
155 def init_headers(self, db_height):
156 self.chunk_cache = {}
157 self.headers_filename = os.path.join( self.dbpath, 'blockchain_headers')
160 if os.path.exists(self.headers_filename):
161 height = os.path.getsize(self.headers_filename)/80
164 prev_header = self.read_header(height -1)
165 prev_hash = self.hash_header(prev_header)
167 open(self.headers_filename,'wb').close()
170 if height != db_height:
171 print_log( "catching up missing headers:", height, db_height)
175 for i in range(height, db_height):
176 header = self.get_header(i)
177 assert prev_hash == header.get('prev_block_hash')
178 self.write_header(header, sync=False)
179 prev_hash = self.hash_header(header)
180 if i%1000==0: print_log("headers file:",i)
181 except KeyboardInterrupt:
188 def hash_header(self, header):
189 return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
192 def read_header(self, block_height):
193 if os.path.exists(self.headers_filename):
194 f = open(self.headers_filename,'rb')
195 f.seek(block_height*80)
199 h = header_from_string(h)
203 def read_chunk(self, index):
204 f = open(self.headers_filename,'rb')
205 f.seek(index*2016*80)
206 chunk = f.read(2016*80)
208 return chunk.encode('hex')
211 def write_header(self, header, sync=True):
212 if not self.headers_data:
213 self.headers_offset = header.get('block_height')
214 self.headers_data += header_to_string(header).decode('hex')
215 if sync or len(self.headers_data) > 40*100:
218 def pop_header(self):
219 # we need to do this only if we have not flushed
220 if self.headers_data:
221 self.headers_data = self.headers_data[:-40]
223 def flush_headers(self):
224 if not self.headers_data: return
225 f = open(self.headers_filename,'rb+')
226 f.seek(self.headers_offset*80)
227 f.write(self.headers_data)
229 self.headers_data = ''
232 def get_chunk(self, i):
233 # store them on disk; store the current chunk in memory
234 chunk = self.chunk_cache.get(i)
236 chunk = self.read_chunk(i)
237 self.chunk_cache[i] = chunk
241 def get_transaction(self, txid, block_height=-1, is_coinbase = False):
242 raw_tx = self.bitcoind('getrawtransaction', [txid, 0, block_height])
243 vds = deserialize.BCDataStream()
244 vds.write(raw_tx.decode('hex'))
245 out = deserialize.parse_Transaction(vds, is_coinbase)
249 def get_history(self, addr, cache_only=False):
250 with self.cache_lock: hist = self.history_cache.get( addr )
251 if hist is not None: return hist
252 if cache_only: return -1
256 hist = self.deserialize(self.db.Get(addr))
262 # should not be necessary
263 hist.sort( key=lambda tup: tup[1])
264 # check uniqueness too...
267 for txid in self.mempool_hist.get(addr,[]):
268 hist.append((txid, 0))
270 hist = map(lambda x: {'tx_hash':x[0], 'height':x[2]}, hist)
271 # add something to distinguish between unused and empty addresses
272 if hist == [] and is_known: hist = ['*']
274 with self.cache_lock: self.history_cache[addr] = hist
278 def get_status(self, addr, cache_only=False):
279 tx_points = self.get_history(addr, cache_only)
280 if cache_only and tx_points == -1: return -1
282 if not tx_points: return None
283 if tx_points == ['*']: return '*'
286 status += tx.get('tx_hash') + ':%d:' % tx.get('height')
287 return hashlib.sha256( status ).digest().encode('hex')
290 def get_merkle(self, tx_hash, height):
292 block_hash = self.bitcoind('getblockhash', [height])
293 b = self.bitcoind('getblock', [block_hash])
294 tx_list = b.get('tx')
295 tx_pos = tx_list.index(tx_hash)
297 merkle = map(hash_decode, tx_list)
298 target_hash = hash_decode(tx_hash)
300 while len(merkle) != 1:
301 if len(merkle)%2: merkle.append( merkle[-1] )
304 new_hash = Hash( merkle[0] + merkle[1] )
305 if merkle[0] == target_hash:
306 s.append( hash_encode( merkle[1]))
307 target_hash = new_hash
308 elif merkle[1] == target_hash:
309 s.append( hash_encode( merkle[0]))
310 target_hash = new_hash
315 return {"block_height":height, "merkle":s, "pos":tx_pos}
318 def add_to_batch(self, addr, tx_hash, tx_pos, tx_height):
320 # we do it chronologically, so nothing wrong can happen...
321 s = (tx_hash + int_to_hex(tx_pos, 4) + int_to_hex(tx_height, 4)).decode('hex')
322 self.batch_list[addr] += s
325 txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
326 self.batch_txio[txo] = addr
329 def remove_from_batch(self, tx_hash, tx_pos):
331 txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
333 addr = self.batch_txio[txi]
335 #raise BaseException(tx_hash, tx_pos)
336 print "WARNING: cannot find address for", (tx_hash, tx_pos)
339 serialized_hist = self.batch_list[addr]
341 l = len(serialized_hist)/40
343 if serialized_hist[40*i:40*i+36] == txi:
344 serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):]
347 raise BaseException("prevout not found", addr, hist, tx_hash, tx_pos)
348 self.batch_list[addr] = serialized_hist
351 def deserialize_block(self, block):
352 txlist = block.get('tx')
353 tx_hashes = [] # ordered txids
354 txdict = {} # deserialized tx
356 for raw_tx in txlist:
357 tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
358 tx_hashes.append(tx_hash)
359 vds = deserialize.BCDataStream()
360 vds.write(raw_tx.decode('hex'))
361 tx = deserialize.parse_Transaction(vds, is_coinbase)
364 return tx_hashes, txdict
367 def import_block(self, block, block_hash, block_height, sync, revert=False):
369 self.batch_list = {} # address -> history
370 self.batch_txio = {} # transaction i/o -> address
375 # deserialize transactions
377 tx_hashes, txdict = self.deserialize_block(block)
379 # read addresses of tx inputs
381 for tx in txdict.values():
382 for x in tx.get('inputs'):
383 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
384 inputs_to_read.append(txi)
386 inputs_to_read.sort()
387 for txi in inputs_to_read:
389 addr = self.db.Get(txi)
391 # the input could come from the same block
393 self.batch_txio[txi] = addr
394 addr_to_read.append(addr)
396 # read histories of addresses
397 for txid, tx in txdict.items():
398 for x in tx.get('outputs'):
399 addr_to_read.append(x.get('address'))
402 for addr in addr_to_read:
404 self.batch_list[addr] = self.db.Get(addr)
406 self.batch_list[addr] = ''
411 for txid in tx_hashes: # must be ordered
414 for x in tx.get('inputs'):
415 self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n'))
416 for x in tx.get('outputs'):
417 self.add_to_batch( x.get('address'), txid, x.get('index'), block_height)
419 for x in tx.get('outputs'):
420 self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n'))
421 for x in tx.get('inputs'):
422 self.add_to_batch( x.get('address'), txid, x.get('index'), block_height)
429 batch = leveldb.WriteBatch()
430 for addr, serialized_hist in self.batch_list.items():
431 batch.Put(addr, serialized_hist)
432 l = len(serialized_hist)
437 for txio, addr in self.batch_txio.items():
438 batch.Put(txio, addr)
439 # delete spent inputs
440 for txi in inputs_to_read:
442 batch.Put('0', self.serialize( [(block_hash, block_height, 0)] ) )
445 self.db.Write(batch, sync = sync)
449 print_log("block", block_height,
450 "parse:%0.2f "%(t00 - t0),
451 "read:%0.2f "%(t1 - t00),
452 "proc:%.2f "%(t2-t1),
453 "write:%.2f "%(t3-t2),
454 "max:", max_len, max_addr)
457 for addr in self.batch_list.keys(): self.update_history_cache(addr)
461 def add_request(self, request):
462 # see if we can get if from cache. if not, add to queue
463 if self.process( request, cache_only = True) == -1:
464 self.queue.put(request)
468 def process(self, request, cache_only = False):
469 #print "abe process", request
471 message_id = request['id']
472 method = request['method']
473 params = request.get('params',[])
477 if method == 'blockchain.numblocks.subscribe':
480 elif method == 'blockchain.headers.subscribe':
483 elif method == 'blockchain.address.subscribe':
486 result = self.get_status(address, cache_only)
487 self.watch_address(address)
488 except BaseException, e:
489 error = str(e) + ': ' + address
490 print_log( "error:", error )
492 elif method == 'blockchain.address.subscribe2':
495 result = self.get_status(address, cache_only)
496 self.watch_address(address)
497 except BaseException, e:
498 error = str(e) + ': ' + address
499 print_log( "error:", error )
501 elif method == 'blockchain.address.get_history2':
504 result = self.get_history( address, cache_only )
505 except BaseException, e:
506 error = str(e) + ': ' + address
507 print_log( "error:", error )
509 elif method == 'blockchain.block.get_header':
515 result = self.get_header( height )
516 except BaseException, e:
517 error = str(e) + ': %d'% height
518 print_log( "error:", error )
520 elif method == 'blockchain.block.get_chunk':
526 result = self.get_chunk( index )
527 except BaseException, e:
528 error = str(e) + ': %d'% index
529 print_log( "error:", error)
531 elif method == 'blockchain.transaction.broadcast':
532 txo = self.bitcoind('sendrawtransaction', params[0])
533 print_log( "sent tx:", txo )
536 elif method == 'blockchain.transaction.get_merkle':
542 tx_height = params[1]
543 result = self.get_merkle(tx_hash, tx_height)
544 except BaseException, e:
545 error = str(e) + ': ' + tx_hash
546 print_log( "error:", error )
548 elif method == 'blockchain.transaction.get':
552 result = self.bitcoind('getrawtransaction', [tx_hash, 0, height] )
553 except BaseException, e:
554 error = str(e) + ': ' + tx_hash
555 print_log( "error:", error )
558 error = "unknown method:%s"%method
560 if cache_only and result == -1: return -1
563 response = { 'id':message_id, 'error':error }
564 self.push_response(response)
566 response = { 'id':message_id, 'result':result }
567 self.push_response(response)
570 def watch_address(self, addr):
571 if addr not in self.watched_addresses:
572 self.watched_addresses.append(addr)
577 return self.block_hashes[-1]
580 def catch_up(self, sync = True):
583 while not self.shared.stopped():
586 info = self.bitcoind('getinfo')
587 bitcoind_height = info.get('blocks')
588 bitcoind_block_hash = self.bitcoind('getblockhash', [bitcoind_height])
589 if self.last_hash() == bitcoind_block_hash:
590 self.up_to_date = True
594 self.up_to_date = False
595 block_hash = self.bitcoind('getblockhash', [self.height+1])
596 block = self.bitcoind('getblock', [block_hash, 1])
598 if block.get('previousblockhash') == self.last_hash():
600 self.import_block(block, block_hash, self.height+1, sync)
601 self.height = self.height + 1
602 self.write_header(self.block2header(block), sync)
604 self.block_hashes.append(block_hash)
605 self.block_hashes = self.block_hashes[-10:]
607 if (self.height+1)%100 == 0 and not sync:
609 print_log( "catch_up: block %d (%.3fs)"%( self.height, t2 - t1 ) )
614 # revert current block
615 print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash() )
616 block_hash = self.last_hash()
617 block = self.bitcoind('getblock', [block_hash, 1])
618 self.height = self.height -1
621 self.block_hashes.remove(block_hash)
622 self.import_block(block, self.last_hash(), self.height, revert=True)
625 self.header = self.block2header(self.bitcoind('getblock', [self.last_hash()]))
630 def memorypool_update(self):
632 mempool_hashes = self.bitcoind('getrawmempool')
634 for tx_hash in mempool_hashes:
635 if tx_hash in self.known_mempool_hashes: continue
636 self.known_mempool_hashes.append(tx_hash)
638 tx = self.get_transaction(tx_hash)
641 for x in tx.get('inputs') + tx.get('outputs'):
642 addr = x.get('address')
643 hist = self.mempool_hist.get(addr, [])
644 if tx_hash not in hist:
645 hist.append( tx_hash )
646 self.mempool_hist[addr] = hist
647 self.update_history_cache(addr)
649 self.known_mempool_hashes = mempool_hashes
652 def update_history_cache(self, address):
653 with self.cache_lock:
654 if self.history_cache.has_key(address):
655 print_log( "cache: invalidating", address )
656 self.history_cache.pop(address)
660 def main_iteration(self):
662 if self.shared.stopped():
663 print_log( "blockchain processor terminating" )
671 self.memorypool_update()
673 if self.sent_height != self.height:
674 self.sent_height = self.height
675 self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.height] })
677 if self.sent_header != self.header:
678 print_log( "blockchain: %d (%.3fs)"%( self.height, t2 - t1 ) )
679 self.sent_header = self.header
680 self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.header] })
684 addr = self.address_queue.get(False)
687 if addr in self.watched_addresses:
688 status = self.get_status( addr )
689 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
692 if not self.shared.stopped():
693 threading.Timer(10, self.main_iteration).start()
695 print_log( "blockchain processor terminating" )