add two new RPCs: transaction.get_merkle and block.get_header
[electrum-server.git] / backends / abe / __init__.py
1 from Abe.util import hash_to_address, decode_check_address
2 from Abe.DataStore import DataStore as Datastore_class
3 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
4
5 import binascii
6
7 import thread, traceback, sys, urllib, operator
8 from json import dumps, loads
9 from Queue import Queue
10 import time, threading
11
12
13 class AbeStore(Datastore_class):
14
15     def __init__(self, config):
16         conf = DataStore.CONFIG_DEFAULTS
17         args, argv = readconf.parse_argv( [], conf)
18         args.dbtype = config.get('database','type')
19         if args.dbtype == 'sqlite3':
20             args.connect_args = { 'database' : config.get('database','database') }
21         elif args.dbtype == 'MySQLdb':
22             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
23         elif args.dbtype == 'psycopg2':
24             args.connect_args = { 'database' : config.get('database','database') }
25
26         coin = config.get('server', 'coin')
27         self.addrtype = 0
28         if coin == 'litecoin':
29             print 'Litecoin settings:'
30             datadir = config.get('server','datadir')
31             print '  datadir = ' + datadir
32             args.datadir = [{"dirname":datadir,"chain":"Litecoin","code3":"LTC","address_version":"\u0030"}]
33             print '  addrtype = 48'
34             self.addrtype = 48
35
36         Datastore_class.__init__(self,args)
37
38         # Use 1 (Bitcoin) if chain_id is not sent
39         self.chain_id = self.datadirs[0]["chain_id"] or 1
40         print 'Coin chain_id = %d' % self.chain_id
41
42         self.sql_limit = int( config.get('database','limit') )
43
44         self.tx_cache = {}
45         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
46
47         self.address_queue = Queue()
48
49         self.dblock = thread.allocate_lock()
50         self.last_tx_id = 0
51
52     
53     def import_tx(self, tx, is_coinbase):
54         tx_id = super(AbeStore, self).import_tx(tx, is_coinbase)
55         self.last_tx_id = tx_id
56         return tx_id
57         
58
59
60
61     def import_block(self, b, chain_ids=frozenset()):
62         #print "import block"
63         block_id = super(AbeStore, self).import_block(b, chain_ids)
64         for pos in xrange(len(b['transactions'])):
65             tx = b['transactions'][pos]
66             if 'hash' not in tx:
67                 tx['hash'] = util.double_sha256(tx['tx'])
68             tx_id = self.tx_find_id_and_value(tx)
69             if tx_id:
70                 self.update_tx_cache(tx_id)
71             else:
72                 print "error: import_block: no tx_id"
73         return block_id
74
75
76     def update_tx_cache(self, txid):
77         inrows = self.get_tx_inputs(txid, False)
78         for row in inrows:
79             _hash = self.binout(row[6])
80             if not _hash:
81                 #print "WARNING: missing tx_in for tx", txid
82                 continue
83
84             address = hash_to_address(chr(self.addrtype), _hash)
85             if self.tx_cache.has_key(address):
86                 print "cache: invalidating", address
87                 self.tx_cache.pop(address)
88             self.address_queue.put(address)
89
90         outrows = self.get_tx_outputs(txid, False)
91         for row in outrows:
92             _hash = self.binout(row[6])
93             if not _hash:
94                 #print "WARNING: missing tx_out for tx", txid
95                 continue
96
97             address = hash_to_address(chr(self.addrtype), _hash)
98             if self.tx_cache.has_key(address):
99                 print "cache: invalidating", address
100                 self.tx_cache.pop(address)
101             self.address_queue.put(address)
102
103     def safe_sql(self,sql, params=(), lock=True):
104
105         error = False
106         try:
107             if lock: self.dblock.acquire()
108             ret = self.selectall(sql,params)
109         except:
110             error = True
111             traceback.print_exc(file=sys.stdout)
112         finally:
113             if lock: self.dblock.release()
114
115         if error: 
116             raise BaseException('sql error')
117
118         return ret
119             
120
121     def get_tx_outputs(self, tx_id, lock=True):
122         return self.safe_sql("""SELECT
123                 txout.txout_pos,
124                 txout.txout_scriptPubKey,
125                 txout.txout_value,
126                 nexttx.tx_hash,
127                 nexttx.tx_id,
128                 txin.txin_pos,
129                 pubkey.pubkey_hash
130               FROM txout
131               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
132               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
133               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
134              WHERE txout.tx_id = %d 
135              ORDER BY txout.txout_pos
136         """%(tx_id), (), lock)
137
138     def get_tx_inputs(self, tx_id, lock=True):
139         return self.safe_sql(""" SELECT
140                 txin.txin_pos,
141                 txin.txin_scriptSig,
142                 txout.txout_value,
143                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
144                 prevtx.tx_id,
145                 COALESCE(txout.txout_pos, u.txout_pos),
146                 pubkey.pubkey_hash
147               FROM txin
148               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
149               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
150               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
151               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
152              WHERE txin.tx_id = %d
153              ORDER BY txin.txin_pos
154              """%(tx_id,), (), lock)
155
156
157     def get_address_out_rows(self, dbhash):
158         out = self.safe_sql(""" SELECT
159                 b.block_nTime,
160                 cc.chain_id,
161                 b.block_height,
162                 1,
163                 b.block_hash,
164                 tx.tx_hash,
165                 tx.tx_id,
166                 txin.txin_pos,
167                 -prevout.txout_value
168               FROM chain_candidate cc
169               JOIN block b ON (b.block_id = cc.block_id)
170               JOIN block_tx ON (block_tx.block_id = b.block_id)
171               JOIN tx ON (tx.tx_id = block_tx.tx_id)
172               JOIN txin ON (txin.tx_id = tx.tx_id)
173               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
174               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
175              WHERE pubkey.pubkey_hash = ?
176                AND cc.chain_id = ?
177                AND cc.in_longest = 1
178              LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
179
180         if len(out)==self.sql_limit: 
181             raise BaseException('limit reached')
182         return out
183
184     def get_address_out_rows_memorypool(self, dbhash):
185         out = self.safe_sql(""" SELECT
186                 1,
187                 tx.tx_hash,
188                 tx.tx_id,
189                 txin.txin_pos,
190                 -prevout.txout_value
191               FROM tx 
192               JOIN txin ON (txin.tx_id = tx.tx_id)
193               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
194               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
195              WHERE pubkey.pubkey_hash = ?
196              LIMIT ? """, (dbhash,self.sql_limit))
197
198         if len(out)==self.sql_limit: 
199             raise BaseException('limit reached')
200         return out
201
202     def get_address_in_rows(self, dbhash):
203         out = self.safe_sql(""" SELECT
204                 b.block_nTime,
205                 cc.chain_id,
206                 b.block_height,
207                 0,
208                 b.block_hash,
209                 tx.tx_hash,
210                 tx.tx_id,
211                 txout.txout_pos,
212                 txout.txout_value
213               FROM chain_candidate cc
214               JOIN block b ON (b.block_id = cc.block_id)
215               JOIN block_tx ON (block_tx.block_id = b.block_id)
216               JOIN tx ON (tx.tx_id = block_tx.tx_id)
217               JOIN txout ON (txout.tx_id = tx.tx_id)
218               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
219              WHERE pubkey.pubkey_hash = ?
220                AND cc.chain_id = ?
221                AND cc.in_longest = 1
222                LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
223
224         if len(out)==self.sql_limit: 
225             raise BaseException('limit reached')
226         return out
227
228     def get_address_in_rows_memorypool(self, dbhash):
229         out = self.safe_sql( """ SELECT
230                 0,
231                 tx.tx_hash,
232                 tx.tx_id,
233                 txout.txout_pos,
234                 txout.txout_value
235               FROM tx
236               JOIN txout ON (txout.tx_id = tx.tx_id)
237               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
238              WHERE pubkey.pubkey_hash = ?
239              LIMIT ? """, (dbhash,self.sql_limit))
240
241         if len(out)==self.sql_limit: 
242             raise BaseException('limit reached')
243         return out
244
245     def get_history(self, addr):
246
247         cached_version = self.tx_cache.get( addr )
248         if cached_version is not None:
249             return cached_version
250
251         version, binaddr = decode_check_address(addr)
252         if binaddr is None:
253             return None
254
255         dbhash = self.binin(binaddr)
256         rows = []
257         rows += self.get_address_out_rows( dbhash )
258         rows += self.get_address_in_rows( dbhash )
259
260         txpoints = []
261         known_tx = []
262
263         for row in rows:
264             try:
265                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
266             except:
267                 print "cannot unpack row", row
268                 break
269             tx_hash = self.hashout_hex(tx_hash)
270             txpoint = {
271                     "timestamp":    int(nTime),
272                     "height":   int(height),
273                     "is_input":    int(is_in),
274                     "block_hash": self.hashout_hex(blk_hash),
275                     "tx_hash":  tx_hash,
276                     "tx_id":    int(tx_id),
277                     "index":      int(pos),
278                     "value":    int(value),
279                     }
280
281             txpoints.append(txpoint)
282             known_tx.append(self.hashout_hex(tx_hash))
283
284
285         # todo: sort them really...
286         txpoints = sorted(txpoints, key=operator.itemgetter("timestamp"))
287
288         # read memory pool
289         rows = []
290         rows += self.get_address_in_rows_memorypool( dbhash )
291         rows += self.get_address_out_rows_memorypool( dbhash )
292         address_has_mempool = False
293
294         for row in rows:
295             is_in, tx_hash, tx_id, pos, value = row
296             tx_hash = self.hashout_hex(tx_hash)
297             if tx_hash in known_tx:
298                 continue
299
300             # discard transactions that are too old
301             if self.last_tx_id - tx_id > 50000:
302                 print "discarding tx id", tx_id
303                 continue
304
305             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
306             address_has_mempool = True
307
308             #print "mempool", tx_hash
309             txpoint = {
310                     "timestamp":    0,
311                     "height":   0,
312                     "is_input":    int(is_in),
313                     "block_hash": 'mempool', 
314                     "tx_hash":  tx_hash,
315                     "tx_id":    int(tx_id),
316                     "index":      int(pos),
317                     "value":    int(value),
318                     }
319             txpoints.append(txpoint)
320
321
322         for txpoint in txpoints:
323             tx_id = txpoint['tx_id']
324             
325             txinputs = []
326             inrows = self.get_tx_inputs(tx_id)
327             for row in inrows:
328                 _hash = self.binout(row[6])
329                 if not _hash:
330                     #print "WARNING: missing tx_in for tx", tx_id, addr
331                     continue
332                 address = hash_to_address(chr(self.addrtype), _hash)
333                 txinputs.append(address)
334             txpoint['inputs'] = txinputs
335             txoutputs = []
336             outrows = self.get_tx_outputs(tx_id)
337             for row in outrows:
338                 _hash = self.binout(row[6])
339                 if not _hash:
340                     #print "WARNING: missing tx_out for tx", tx_id, addr
341                     continue
342                 address = hash_to_address(chr(self.addrtype), _hash)
343                 txoutputs.append(address)
344             txpoint['outputs'] = txoutputs
345
346             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
347             if not txpoint['is_input']:
348                 # detect if already redeemed...
349                 for row in outrows:
350                     if row[6] == dbhash: break
351                 else:
352                     raise
353                 #row = self.get_tx_output(tx_id,dbhash)
354                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
355                 # if not redeemed, we add the script
356                 if row:
357                     if not row[4]: txpoint['raw_output_script'] = row[1]
358
359             txpoint.pop('tx_id')
360
361         # cache result
362         # do not cache mempool results because statuses are ambiguous
363         if not address_has_mempool:
364             self.tx_cache[addr] = txpoints
365         
366         return txpoints
367
368
369     def get_status(self,addr):
370         # get address status, i.e. the last block for that address.
371         tx_points = self.get_history(addr)
372         if not tx_points:
373             status = None
374         else:
375             lastpoint = tx_points[-1]
376             status = lastpoint['block_hash']
377             # this is a temporary hack; move it up once old clients have disappeared
378             if status == 'mempool': # and session['version'] != "old":
379                 status = status + ':%d'% len(tx_points)
380         return status
381
382
383     def get_block_header(self, block_height):
384         out = self.safe_sql("""
385             SELECT
386                 block_hash,
387                 block_version,
388                 block_hashMerkleRoot,
389                 block_nTime,
390                 block_nBits,
391                 block_nNonce,
392                 block_height,
393                 prev_block_hash,
394                 block_id
395               FROM chain_summary
396              WHERE block_height = %d AND in_longest = 1"""%block_height)
397
398         if not out: raise BaseException("block not found")
399         row = out[0]
400         (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height,prev_block_hash, block_id) \
401             = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
402
403         out = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash, 
404                 "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce}
405         return out
406         
407
408     def get_tx_merkle(self, tx_hash):
409
410         out = self.safe_sql("""
411              SELECT block_tx.block_id FROM tx 
412              JOIN block_tx on tx.tx_id = block_tx.tx_id 
413              JOIN chain_summary on chain_summary.block_id = block_tx.block_id
414              WHERE tx_hash='%s' AND in_longest = 1"""%tx_hash)
415         block_id = out[0]
416
417         # get the block header
418         out = self.safe_sql("""
419             SELECT
420                 block_hash,
421                 block_version,
422                 block_hashMerkleRoot,
423                 block_nTime,
424                 block_nBits,
425                 block_nNonce,
426                 block_height,
427                 prev_block_hash,
428                 block_height
429               FROM chain_summary
430              WHERE block_id = %d AND in_longest = 1"""%block_id)
431
432         if not out: raise BaseException("block not found")
433         row = out[0]
434         (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height, prev_block_hash, block_height) \
435             = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
436
437         merkle = []
438         # list all tx inputs in block
439         for row in self.safe_sql("""
440             SELECT DISTINCT tx_id, tx_pos, tx_hash
441               FROM txin_detail
442              WHERE block_id = ?
443              ORDER BY tx_pos""", (block_id,)):
444             tx_id, tx_pos, tx_hash = row
445             merkle.append(tx_hash)
446
447         out = {"block_height":block_height, "version":block_version, "prev_block":prev_block_hash, 
448                 "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce, "merkle":merkle}
449         return out
450
451
452
453
454     def memorypool_update(store):
455
456         ds = BCDataStream.BCDataStream()
457         postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
458
459         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
460         r = loads(respdata)
461         if r['error'] != None:
462             return
463
464         v = r['result'].get('transactions')
465         for hextx in v:
466             ds.clear()
467             ds.write(hextx.decode('hex'))
468             tx = deserialize.parse_Transaction(ds)
469             tx['hash'] = util.double_sha256(tx['tx'])
470             tx_hash = store.hashin(tx['hash'])
471
472             if store.tx_find_id_and_value(tx):
473                 pass
474             else:
475                 tx_id = store.import_tx(tx, False)
476                 store.update_tx_cache(tx_id)
477                 #print tx_hash
478     
479         store.commit()
480
481
482     def send_tx(self,tx):
483         postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
484         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
485         r = loads(respdata)
486         if r['error'] != None:
487             msg = r['error'].get('message')
488             out = "error: transaction rejected by memorypool: " + msg + "\n" + tx
489         else:
490             out = r['result']
491         return out
492
493
494     def main_iteration(store):
495         with store.dblock:
496             store.catch_up()
497             store.memorypool_update()
498             block_number = store.get_block_number(store.chain_id)
499             return block_number
500
501
502
503
504     def catch_up(store):
505         # if there is an exception, do rollback and then re-raise the exception
506         for dircfg in store.datadirs:
507             try:
508                 store.catch_up_dir(dircfg)
509             except Exception, e:
510                 store.log.exception("Failed to catch up %s", dircfg)
511                 store.rollback()
512                 raise e
513
514
515
516
517 from processor import Processor
518
519 class BlockchainProcessor(Processor):
520
521     def __init__(self, config):
522         Processor.__init__(self)
523         self.store = AbeStore(config)
524         self.block_number = -1
525         self.watched_addresses = []
526
527         # catch_up first
528         n = self.store.main_iteration()
529         print "blockchain: %d blocks"%n
530
531         threading.Timer(10, self.run_store_iteration).start()
532
533     def process(self, request):
534         #print "abe process", request
535
536         message_id = request['id']
537         method = request['method']
538         params = request.get('params',[])
539         result = None
540         error = None
541
542         if method == 'blockchain.numblocks.subscribe':
543             result = self.block_number
544
545         elif method == 'blockchain.address.subscribe':
546             try:
547                 address = params[0]
548                 result = self.store.get_status(address)
549                 self.watch_address(address)
550             except BaseException, e:
551                 error = str(e) + ': ' + address
552                 print "error:", error
553
554         elif method == 'blockchain.address.get_history':
555             try:
556                 address = params[0]
557                 result = self.store.get_history( address ) 
558             except BaseException, e:
559                 error = str(e) + ': ' + address
560                 print "error:", error
561
562         elif method == 'blockchain.block.get_header':
563             try:
564                 height = params[0]
565                 result = self.store.get_block_header( height ) 
566             except BaseException, e:
567                 error = str(e) + ': %d'% height
568                 print "error:", error
569
570         elif method == 'blockchain.transaction.broadcast':
571             txo = self.store.send_tx(params[0])
572             print "sent tx:", txo
573             result = txo 
574
575         elif method == 'blockchain.transaction.get_merkle':
576             try:
577                 tx_hash = params[0]
578                 result = self.store.get_tx_merkle(tx_hash ) 
579             except BaseException, e:
580                 error = str(e) + ': ' + tx_hash
581                 print "error:", error
582
583         else:
584             error = "unknown method:%s"%method
585
586
587         if error:
588             response = { 'id':message_id, 'error':error }
589             self.push_response(response)
590         elif result != '':
591             response = { 'id':message_id, 'result':result }
592             self.push_response(response)
593
594
595     def watch_address(self, addr):
596         if addr not in self.watched_addresses:
597             self.watched_addresses.append(addr)
598
599
600     def run_store_iteration(self):
601         
602         try:
603             block_number = self.store.main_iteration()
604         except:
605             traceback.print_exc(file=sys.stdout)
606             print "terminating"
607             self.shared.stop()
608
609         if self.shared.stopped(): 
610             print "exit timer"
611             return
612
613         if self.block_number != block_number:
614             self.block_number = block_number
615             print "block number:", self.block_number
616             self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
617
618         while True:
619             try:
620                 addr = self.store.address_queue.get(False)
621             except:
622                 break
623             if addr in self.watched_addresses:
624                 status = self.store.get_status( addr )
625                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
626
627         threading.Timer(10, self.run_store_iteration).start()
628
629