1 from Abe.util import hash_to_address, decode_check_address
2 from Abe.DataStore import DataStore as Datastore_class
3 from Abe import DataStore, readconf, BCDataStream, deserialize, util, base58
7 import thread, traceback, sys, urllib, operator
8 from json import dumps, loads
9 from Queue import Queue
10 import time, threading
14 encode = lambda x: x[::-1].encode('hex')
15 decode = lambda x: x.decode('hex')[::-1]
16 Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
19 return s.decode('hex')[::-1].encode('hex')
21 def int_to_hex(i, length=1):
22 s = hex(i)[2:].rstrip('L')
23 s = "0"*(2*length - len(s)) + s
26 def header_to_string(res):
27 s = int_to_hex(res.get('version'),4) \
28 + rev_hex(res.get('prev_block_hash')) \
29 + rev_hex(res.get('merkle_root')) \
30 + int_to_hex(int(res.get('timestamp')),4) \
31 + int_to_hex(int(res.get('bits')),4) \
32 + int_to_hex(int(res.get('nonce')),4)
36 class AbeStore(Datastore_class):
38 def __init__(self, config):
39 conf = DataStore.CONFIG_DEFAULTS
40 args, argv = readconf.parse_argv( [], conf)
41 args.dbtype = config.get('database','type')
42 if args.dbtype == 'sqlite3':
43 args.connect_args = { 'database' : config.get('database','database') }
44 elif args.dbtype == 'MySQLdb':
45 args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
46 elif args.dbtype == 'psycopg2':
47 args.connect_args = { 'database' : config.get('database','database') }
49 coin = config.get('server', 'coin')
51 if coin == 'litecoin':
52 print 'Litecoin settings:'
53 datadir = config.get('server','datadir')
54 print ' datadir = ' + datadir
55 args.datadir = [{"dirname":datadir,"chain":"Litecoin","code3":"LTC","address_version":"\u0030"}]
56 print ' addrtype = 48'
59 Datastore_class.__init__(self,args)
61 # Use 1 (Bitcoin) if chain_id is not sent
62 self.chain_id = self.datadirs[0]["chain_id"] or 1
63 print 'Coin chain_id = %d' % self.chain_id
65 self.sql_limit = int( config.get('database','limit') )
68 self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
72 self.address_queue = Queue()
74 self.lock = threading.Lock()
76 self.known_mempool_hashes = []
80 def import_tx(self, tx, is_coinbase):
81 tx_id = super(AbeStore, self).import_tx(tx, is_coinbase)
82 self.last_tx_id = tx_id
88 def import_block(self, b, chain_ids=frozenset()):
90 block_id = super(AbeStore, self).import_block(b, chain_ids)
91 for pos in xrange(len(b['transactions'])):
92 tx = b['transactions'][pos]
94 tx['hash'] = util.double_sha256(tx['tx'])
95 tx_id = self.tx_find_id_and_value(tx)
97 self.update_tx_cache(tx_id)
99 print "error: import_block: no tx_id"
103 def update_tx_cache(self, txid):
104 inrows = self.get_tx_inputs(txid, False)
106 _hash = self.binout(row[6])
108 #print "WARNING: missing tx_in for tx", txid
111 address = hash_to_address(chr(self.addrtype), _hash)
112 if self.tx_cache.has_key(address):
113 print "cache: invalidating", address
114 self.tx_cache.pop(address)
115 self.address_queue.put(address)
117 outrows = self.get_tx_outputs(txid, False)
119 _hash = self.binout(row[6])
121 #print "WARNING: missing tx_out for tx", txid
124 address = hash_to_address(chr(self.addrtype), _hash)
125 if self.tx_cache.has_key(address):
126 print "cache: invalidating", address
127 self.tx_cache.pop(address)
128 self.address_queue.put(address)
130 def safe_sql(self,sql, params=(), lock=True):
134 if lock: self.lock.acquire()
135 ret = self.selectall(sql,params)
138 traceback.print_exc(file=sys.stdout)
140 if lock: self.lock.release()
143 raise BaseException('sql error')
148 def get_tx_outputs(self, tx_id, lock=True):
149 return self.safe_sql("""SELECT
151 txout.txout_scriptPubKey,
158 LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
159 LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
160 LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
161 WHERE txout.tx_id = %d
162 ORDER BY txout.txout_pos
163 """%(tx_id), (), lock)
165 def get_tx_inputs(self, tx_id, lock=True):
166 return self.safe_sql(""" SELECT
170 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
172 COALESCE(txout.txout_pos, u.txout_pos),
175 LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
176 LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
177 LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
178 LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
179 WHERE txin.tx_id = %d
180 ORDER BY txin.txin_pos
181 """%(tx_id,), (), lock)
184 def get_address_out_rows(self, dbhash):
185 out = self.safe_sql(""" SELECT
195 FROM chain_candidate cc
196 JOIN block b ON (b.block_id = cc.block_id)
197 JOIN block_tx ON (block_tx.block_id = b.block_id)
198 JOIN tx ON (tx.tx_id = block_tx.tx_id)
199 JOIN txin ON (txin.tx_id = tx.tx_id)
200 JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
201 JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
202 WHERE pubkey.pubkey_hash = ?
204 AND cc.in_longest = 1
205 LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
207 if len(out)==self.sql_limit:
208 raise BaseException('limit reached')
211 def get_address_out_rows_memorypool(self, dbhash):
212 out = self.safe_sql(""" SELECT
219 JOIN txin ON (txin.tx_id = tx.tx_id)
220 JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
221 JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
222 WHERE pubkey.pubkey_hash = ?
223 LIMIT ? """, (dbhash,self.sql_limit))
225 if len(out)==self.sql_limit:
226 raise BaseException('limit reached')
229 def get_address_in_rows(self, dbhash):
230 out = self.safe_sql(""" SELECT
240 FROM chain_candidate cc
241 JOIN block b ON (b.block_id = cc.block_id)
242 JOIN block_tx ON (block_tx.block_id = b.block_id)
243 JOIN tx ON (tx.tx_id = block_tx.tx_id)
244 JOIN txout ON (txout.tx_id = tx.tx_id)
245 JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
246 WHERE pubkey.pubkey_hash = ?
248 AND cc.in_longest = 1
249 LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
251 if len(out)==self.sql_limit:
252 raise BaseException('limit reached')
255 def get_address_in_rows_memorypool(self, dbhash):
256 out = self.safe_sql( """ SELECT
263 JOIN txout ON (txout.tx_id = tx.tx_id)
264 JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
265 WHERE pubkey.pubkey_hash = ?
266 LIMIT ? """, (dbhash,self.sql_limit))
268 if len(out)==self.sql_limit:
269 raise BaseException('limit reached')
272 def get_history(self, addr):
275 cached_version = self.tx_cache.get( addr )
276 if cached_version is not None:
277 return cached_version
279 version, binaddr = decode_check_address(addr)
283 dbhash = self.binin(binaddr)
285 rows += self.get_address_out_rows( dbhash )
286 rows += self.get_address_in_rows( dbhash )
293 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
295 print "cannot unpack row", row
297 tx_hash = self.hashout_hex(tx_hash)
299 "timestamp": int(nTime),
300 "height": int(height),
301 "is_input": int(is_in),
302 "block_hash": self.hashout_hex(blk_hash),
309 txpoints.append(txpoint)
310 known_tx.append(self.hashout_hex(tx_hash))
313 # todo: sort them really...
314 txpoints = sorted(txpoints, key=operator.itemgetter("timestamp"))
318 rows += self.get_address_in_rows_memorypool( dbhash )
319 rows += self.get_address_out_rows_memorypool( dbhash )
320 address_has_mempool = False
323 is_in, tx_hash, tx_id, pos, value = row
324 tx_hash = self.hashout_hex(tx_hash)
325 if tx_hash in known_tx:
328 # discard transactions that are too old
329 if self.last_tx_id - tx_id > 50000:
330 print "discarding tx id", tx_id
333 # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
334 address_has_mempool = True
336 #print "mempool", tx_hash
340 "is_input": int(is_in),
341 "block_hash": 'mempool',
347 txpoints.append(txpoint)
350 for txpoint in txpoints:
351 tx_id = txpoint['tx_id']
354 inrows = self.get_tx_inputs(tx_id)
356 _hash = self.binout(row[6])
358 #print "WARNING: missing tx_in for tx", tx_id, addr
360 address = hash_to_address(chr(self.addrtype), _hash)
361 txinputs.append(address)
362 txpoint['inputs'] = txinputs
364 outrows = self.get_tx_outputs(tx_id)
366 _hash = self.binout(row[6])
368 #print "WARNING: missing tx_out for tx", tx_id, addr
370 address = hash_to_address(chr(self.addrtype), _hash)
371 txoutputs.append(address)
372 txpoint['outputs'] = txoutputs
374 # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
375 if not txpoint['is_input']:
376 # detect if already redeemed...
378 if row[6] == dbhash: break
381 #row = self.get_tx_output(tx_id,dbhash)
382 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
383 # if not redeemed, we add the script
385 if not row[4]: txpoint['raw_output_script'] = row[1]
390 # do not cache mempool results because statuses are ambiguous
391 if not address_has_mempool:
393 self.tx_cache[addr] = txpoints
397 def get_history2(self, addr):
398 h = self.get_history(addr)
399 out = map(lambda x: {'tx_hash':x['tx_hash'], 'height':x['height']}, h)
402 if item not in out2: out2.append(item)
406 def get_status(self,addr):
407 # get address status, i.e. the last block for that address.
408 tx_points = self.get_history(addr)
412 lastpoint = tx_points[-1]
413 status = lastpoint['block_hash']
414 # this is a temporary hack; move it up once old clients have disappeared
415 if status == 'mempool': # and session['version'] != "old":
416 status = status + ':%d'% len(tx_points)
419 def get_status2(self,addr):
421 tx_points = self.get_history2(addr)
427 status += tx.get('tx_hash') + ':%d:' % tx.get('height')
428 return hashlib.sha256( status ).digest().encode('hex')
431 def get_block_header(self, block_height):
432 out = self.safe_sql("""
436 block_hashMerkleRoot,
444 WHERE block_height = %d AND in_longest = 1"""%block_height)
446 if not out: raise BaseException("block not found")
448 (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height,prev_block_hash, block_id) \
449 = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
451 out = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash,
452 "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce}
456 def get_chunk(self, index):
458 msg = self.chunk_cache.get(index)
465 block_hashMerkleRoot,
473 WHERE block_height >= %d AND block_height< %d AND in_longest = 1"""%(index*2016, (index+1)*2016)
475 out = self.safe_sql(sql)
478 (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height, prev_block_hash, block_height) \
479 = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
480 h = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash,
481 "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce}
483 if h.get('block_height')==0: h['prev_block_hash'] = "0"*64
484 msg += header_to_string(h)
486 #print "hash", encode(Hash(msg.decode('hex')))
487 #if h.get('block_height')==1:break
490 self.chunk_cache[index] = msg
491 print "get_chunk", index, len(msg)
496 def get_raw_tx(self, tx_hash, height):
497 postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash, 0, height], 'id':'jsonrpc'})
498 respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
500 if r['error'] != None:
501 raise BaseException(r['error'])
503 hextx = r.get('result')
507 def get_tx_merkle(self, tx_hash):
509 out = self.safe_sql("""
510 SELECT block_tx.block_id FROM tx
511 JOIN block_tx on tx.tx_id = block_tx.tx_id
512 JOIN chain_summary on chain_summary.block_id = block_tx.block_id
513 WHERE tx_hash='%s' AND in_longest = 1"""%tx_hash)
515 if not out: raise BaseException("not in a block")
516 block_id = int(out[0][0])
519 out = self.safe_sql("SELECT block_height FROM chain_summary WHERE block_id = %d AND in_longest = 1"%block_id)
521 if not out: raise BaseException("block not found")
522 block_height = int(out[0][0])
527 # list all tx in block
528 for row in self.safe_sql("""
529 SELECT DISTINCT tx_id, tx_pos, tx_hash
532 ORDER BY tx_pos""", (block_id,)):
533 _id, _pos, _hash = row
535 if _hash == tx_hash: tx_pos = int(_pos)
538 # TODO: do not compute this on client request, better store the hash tree of each block in a database...
540 merkle = map(decode, merkle)
541 target_hash = decode(tx_hash)
544 while len(merkle) != 1:
545 if len(merkle)%2: merkle.append( merkle[-1] )
548 new_hash = Hash( merkle[0] + merkle[1] )
549 if merkle[0] == target_hash:
550 s.append( encode(merkle[1]))
551 target_hash = new_hash
552 elif merkle[1] == target_hash:
553 s.append( encode(merkle[0]))
554 target_hash = new_hash
560 return {"block_height":block_height, "merkle":s, "pos":tx_pos}
565 def memorypool_update(store):
567 ds = BCDataStream.BCDataStream()
568 postdata = dumps({"method": 'getrawmempool', 'params': [], 'id':'jsonrpc'})
569 respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
571 if r['error'] != None:
575 mempool_hashes = r.get('result')
576 for tx_hash in mempool_hashes:
578 if tx_hash in store.known_mempool_hashes: continue
579 store.known_mempool_hashes.append(tx_hash)
581 postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash], 'id':'jsonrpc'})
582 respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
584 if r['error'] != None:
586 hextx = r.get('result')
588 ds.write(hextx.decode('hex'))
589 tx = deserialize.parse_Transaction(ds)
590 tx['hash'] = util.double_sha256(tx['tx'])
592 if store.tx_find_id_and_value(tx):
595 tx_id = store.import_tx(tx, False)
596 store.update_tx_cache(tx_id)
600 store.known_mempool_hashes = mempool_hashes
603 def send_tx(self,tx):
604 postdata = dumps({"method": 'sendrawtransaction', 'params': [tx], 'id':'jsonrpc'})
605 respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
607 if r['error'] != None:
608 msg = r['error'].get('message')
609 out = "error: transaction rejected by memorypool: " + msg + "\n" + tx
615 def main_iteration(self):
618 self.memorypool_update()
619 height = self.get_block_number( self.chain_id )
620 try: self.chunk_cache.pop(height/2016)
623 block_header = self.get_block_header( height )
630 # if there is an exception, do rollback and then re-raise the exception
631 for dircfg in store.datadirs:
633 store.catch_up_dir(dircfg)
635 store.log.exception("Failed to catch up %s", dircfg)
642 from processor import Processor
644 class BlockchainProcessor(Processor):
646 def __init__(self, config):
647 Processor.__init__(self)
648 self.store = AbeStore(config)
649 self.watched_addresses = []
652 self.block_header = self.store.main_iteration()
653 self.block_number = self.block_header.get('block_height')
654 print "blockchain: %d blocks"%self.block_number
656 threading.Timer(10, self.run_store_iteration).start()
658 def process(self, request):
659 #print "abe process", request
661 message_id = request['id']
662 method = request['method']
663 params = request.get('params',[])
667 if method == 'blockchain.numblocks.subscribe':
668 result = self.block_number
670 elif method == 'blockchain.headers.subscribe':
671 result = self.block_header
673 elif method == 'blockchain.address.subscribe':
676 result = self.store.get_status(address)
677 self.watch_address(address)
678 except BaseException, e:
679 error = str(e) + ': ' + address
680 print "error:", error
682 elif method == 'blockchain.address.subscribe2':
685 result = self.store.get_status2(address)
686 self.watch_address(address)
687 except BaseException, e:
688 error = str(e) + ': ' + address
689 print "error:", error
691 elif method == 'blockchain.address.get_history':
694 result = self.store.get_history( address )
695 except BaseException, e:
696 error = str(e) + ': ' + address
697 print "error:", error
699 elif method == 'blockchain.address.get_history2':
702 result = self.store.get_history2( address )
703 except BaseException, e:
704 error = str(e) + ': ' + address
705 print "error:", error
707 elif method == 'blockchain.block.get_header':
710 result = self.store.get_block_header( height )
711 except BaseException, e:
712 error = str(e) + ': %d'% height
713 print "error:", error
715 elif method == 'blockchain.block.get_chunk':
718 result = self.store.get_chunk( index )
719 except BaseException, e:
720 error = str(e) + ': %d'% index
721 print "error:", error
723 elif method == 'blockchain.transaction.broadcast':
724 txo = self.store.send_tx(params[0])
725 print "sent tx:", txo
728 elif method == 'blockchain.transaction.get_merkle':
731 result = self.store.get_tx_merkle(tx_hash )
732 except BaseException, e:
733 error = str(e) + ': ' + tx_hash
734 print "error:", error
736 elif method == 'blockchain.transaction.get':
740 result = self.store.get_raw_tx(tx_hash, height )
741 except BaseException, e:
742 error = str(e) + ': ' + tx_hash
743 print "error:", error
746 error = "unknown method:%s"%method
750 response = { 'id':message_id, 'error':error }
751 self.push_response(response)
753 response = { 'id':message_id, 'result':result }
754 self.push_response(response)
757 def watch_address(self, addr):
758 if addr not in self.watched_addresses:
759 self.watched_addresses.append(addr)
762 def run_store_iteration(self):
766 block_header = self.store.main_iteration()
767 t2 = time.time() - t1
769 traceback.print_exc(file=sys.stdout)
773 if self.shared.stopped():
777 if self.block_number != block_header.get('block_height'):
778 self.block_number = block_header.get('block_height')
779 print "block number: %d (%.3f seconds)"%(self.block_number, t2)
780 self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
782 if self.block_header != block_header:
783 self.block_header = block_header
784 self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.block_header] })
789 addr = self.store.address_queue.get(False)
792 if addr in self.watched_addresses:
793 status = self.store.get_status( addr )
794 status2 = self.store.get_status2( addr )
795 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
796 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe2', 'params':[addr, status2] })
798 threading.Timer(10, self.run_store_iteration).start()