'id' is mandatory field in JSON-RPC notification, even when it's 'null'.
[electrum-server.git] / backends / abe / __init__.py
1 from Abe.util import hash_to_address, decode_check_address
2 from Abe.DataStore import DataStore as Datastore_class
3 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
4
5 import binascii
6
7 import thread, traceback, sys, urllib, operator
8 from json import dumps, loads
9 from Queue import Queue
10 import time, threading
11
12
13 class AbeStore(Datastore_class):
14
15     def __init__(self, config):
16         conf = DataStore.CONFIG_DEFAULTS
17         args, argv = readconf.parse_argv( [], conf)
18         args.dbtype = config.get('database','type')
19         if args.dbtype == 'sqlite3':
20             args.connect_args = { 'database' : config.get('database','database') }
21         elif args.dbtype == 'MySQLdb':
22             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
23         elif args.dbtype == 'psycopg2':
24             args.connect_args = { 'database' : config.get('database','database') }
25
26         coin = config.get('server', 'coin')
27         self.addrtype = 0
28         if coin == 'litecoin':
29             print 'Litecoin settings:'
30             datadir = config.get('server','datadir')
31             print '  datadir = ' + datadir
32             args.datadir = [{"dirname":datadir,"chain":"Litecoin","code3":"LTC","address_version":"\u0030"}]
33             print '  addrtype = 48'
34             self.addrtype = 48
35
36         Datastore_class.__init__(self,args)
37
38         # Use 1 (Bitcoin) if chain_id is not sent
39         self.chain_id = self.datadirs[0]["chain_id"] or 1
40         print 'Coin chain_id = %d' % self.chain_id
41
42         self.sql_limit = int( config.get('database','limit') )
43
44         self.tx_cache = {}
45         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
46
47         self.address_queue = Queue()
48
49         self.dblock = thread.allocate_lock()
50         self.last_tx_id = 0
51
52     
53     def import_tx(self, tx, is_coinbase):
54         tx_id = super(AbeStore, self).import_tx(tx, is_coinbase)
55         self.last_tx_id = tx_id
56         return tx_id
57         
58
59
60
61     def import_block(self, b, chain_ids=frozenset()):
62         #print "import block"
63         block_id = super(AbeStore, self).import_block(b, chain_ids)
64         for pos in xrange(len(b['transactions'])):
65             tx = b['transactions'][pos]
66             if 'hash' not in tx:
67                 tx['hash'] = util.double_sha256(tx['tx'])
68             tx_id = self.tx_find_id_and_value(tx)
69             if tx_id:
70                 self.update_tx_cache(tx_id)
71             else:
72                 print "error: import_block: no tx_id"
73         return block_id
74
75
76     def update_tx_cache(self, txid):
77         inrows = self.get_tx_inputs(txid, False)
78         for row in inrows:
79             _hash = self.binout(row[6])
80             if not _hash:
81                 #print "WARNING: missing tx_in for tx", txid
82                 continue
83
84             address = hash_to_address(chr(self.addrtype), _hash)
85             if self.tx_cache.has_key(address):
86                 print "cache: invalidating", address
87                 self.tx_cache.pop(address)
88             self.address_queue.put(address)
89
90         outrows = self.get_tx_outputs(txid, False)
91         for row in outrows:
92             _hash = self.binout(row[6])
93             if not _hash:
94                 #print "WARNING: missing tx_out for tx", txid
95                 continue
96
97             address = hash_to_address(chr(self.addrtype), _hash)
98             if self.tx_cache.has_key(address):
99                 print "cache: invalidating", address
100                 self.tx_cache.pop(address)
101             self.address_queue.put(address)
102
103     def safe_sql(self,sql, params=(), lock=True):
104
105         error = False
106         try:
107             if lock: self.dblock.acquire()
108             ret = self.selectall(sql,params)
109         except:
110             error = True
111             traceback.print_exc(file=sys.stdout)
112         finally:
113             if lock: self.dblock.release()
114
115         if error: 
116             raise BaseException('sql error')
117
118         return ret
119             
120
121     def get_tx_outputs(self, tx_id, lock=True):
122         return self.safe_sql("""SELECT
123                 txout.txout_pos,
124                 txout.txout_scriptPubKey,
125                 txout.txout_value,
126                 nexttx.tx_hash,
127                 nexttx.tx_id,
128                 txin.txin_pos,
129                 pubkey.pubkey_hash
130               FROM txout
131               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
132               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
133               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
134              WHERE txout.tx_id = %d 
135              ORDER BY txout.txout_pos
136         """%(tx_id), (), lock)
137
138     def get_tx_inputs(self, tx_id, lock=True):
139         return self.safe_sql(""" SELECT
140                 txin.txin_pos,
141                 txin.txin_scriptSig,
142                 txout.txout_value,
143                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
144                 prevtx.tx_id,
145                 COALESCE(txout.txout_pos, u.txout_pos),
146                 pubkey.pubkey_hash
147               FROM txin
148               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
149               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
150               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
151               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
152              WHERE txin.tx_id = %d
153              ORDER BY txin.txin_pos
154              """%(tx_id,), (), lock)
155
156
157     def get_address_out_rows(self, dbhash):
158         out = self.safe_sql(""" SELECT
159                 b.block_nTime,
160                 cc.chain_id,
161                 b.block_height,
162                 1,
163                 b.block_hash,
164                 tx.tx_hash,
165                 tx.tx_id,
166                 txin.txin_pos,
167                 -prevout.txout_value
168               FROM chain_candidate cc
169               JOIN block b ON (b.block_id = cc.block_id)
170               JOIN block_tx ON (block_tx.block_id = b.block_id)
171               JOIN tx ON (tx.tx_id = block_tx.tx_id)
172               JOIN txin ON (txin.tx_id = tx.tx_id)
173               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
174               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
175              WHERE pubkey.pubkey_hash = ?
176                AND cc.chain_id = ?
177                AND cc.in_longest = 1
178              LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
179
180         if len(out)==self.sql_limit: 
181             raise BaseException('limit reached')
182         return out
183
184     def get_address_out_rows_memorypool(self, dbhash):
185         out = self.safe_sql(""" SELECT
186                 1,
187                 tx.tx_hash,
188                 tx.tx_id,
189                 txin.txin_pos,
190                 -prevout.txout_value
191               FROM tx 
192               JOIN txin ON (txin.tx_id = tx.tx_id)
193               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
194               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
195              WHERE pubkey.pubkey_hash = ?
196              LIMIT ? """, (dbhash,self.sql_limit))
197
198         if len(out)==self.sql_limit: 
199             raise BaseException('limit reached')
200         return out
201
202     def get_address_in_rows(self, dbhash):
203         out = self.safe_sql(""" SELECT
204                 b.block_nTime,
205                 cc.chain_id,
206                 b.block_height,
207                 0,
208                 b.block_hash,
209                 tx.tx_hash,
210                 tx.tx_id,
211                 txout.txout_pos,
212                 txout.txout_value
213               FROM chain_candidate cc
214               JOIN block b ON (b.block_id = cc.block_id)
215               JOIN block_tx ON (block_tx.block_id = b.block_id)
216               JOIN tx ON (tx.tx_id = block_tx.tx_id)
217               JOIN txout ON (txout.tx_id = tx.tx_id)
218               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
219              WHERE pubkey.pubkey_hash = ?
220                AND cc.chain_id = ?
221                AND cc.in_longest = 1
222                LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
223
224         if len(out)==self.sql_limit: 
225             raise BaseException('limit reached')
226         return out
227
228     def get_address_in_rows_memorypool(self, dbhash):
229         out = self.safe_sql( """ SELECT
230                 0,
231                 tx.tx_hash,
232                 tx.tx_id,
233                 txout.txout_pos,
234                 txout.txout_value
235               FROM tx
236               JOIN txout ON (txout.tx_id = tx.tx_id)
237               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
238              WHERE pubkey.pubkey_hash = ?
239              LIMIT ? """, (dbhash,self.sql_limit))
240
241         if len(out)==self.sql_limit: 
242             raise BaseException('limit reached')
243         return out
244
245     def get_history(self, addr):
246
247         cached_version = self.tx_cache.get( addr )
248         if cached_version is not None:
249             return cached_version
250
251         version, binaddr = decode_check_address(addr)
252         if binaddr is None:
253             return None
254
255         dbhash = self.binin(binaddr)
256         rows = []
257         rows += self.get_address_out_rows( dbhash )
258         rows += self.get_address_in_rows( dbhash )
259
260         txpoints = []
261         known_tx = []
262
263         for row in rows:
264             try:
265                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
266             except:
267                 print "cannot unpack row", row
268                 break
269             tx_hash = self.hashout_hex(tx_hash)
270             txpoint = {
271                     "timestamp":    int(nTime),
272                     "height":   int(height),
273                     "is_input":    int(is_in),
274                     "block_hash": self.hashout_hex(blk_hash),
275                     "tx_hash":  tx_hash,
276                     "tx_id":    int(tx_id),
277                     "index":      int(pos),
278                     "value":    int(value),
279                     }
280
281             txpoints.append(txpoint)
282             known_tx.append(self.hashout_hex(tx_hash))
283
284
285         # todo: sort them really...
286         txpoints = sorted(txpoints, key=operator.itemgetter("timestamp"))
287
288         # read memory pool
289         rows = []
290         rows += self.get_address_in_rows_memorypool( dbhash )
291         rows += self.get_address_out_rows_memorypool( dbhash )
292         address_has_mempool = False
293
294         for row in rows:
295             is_in, tx_hash, tx_id, pos, value = row
296             tx_hash = self.hashout_hex(tx_hash)
297             if tx_hash in known_tx:
298                 continue
299
300             # discard transactions that are too old
301             if self.last_tx_id - tx_id > 50000:
302                 print "discarding tx id", tx_id
303                 continue
304
305             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
306             address_has_mempool = True
307
308             #print "mempool", tx_hash
309             txpoint = {
310                     "timestamp":    0,
311                     "height":   0,
312                     "is_input":    int(is_in),
313                     "block_hash": 'mempool', 
314                     "tx_hash":  tx_hash,
315                     "tx_id":    int(tx_id),
316                     "index":      int(pos),
317                     "value":    int(value),
318                     }
319             txpoints.append(txpoint)
320
321
322         for txpoint in txpoints:
323             tx_id = txpoint['tx_id']
324             
325             txinputs = []
326             inrows = self.get_tx_inputs(tx_id)
327             for row in inrows:
328                 _hash = self.binout(row[6])
329                 if not _hash:
330                     #print "WARNING: missing tx_in for tx", tx_id, addr
331                     continue
332                 address = hash_to_address(chr(self.addrtype), _hash)
333                 txinputs.append(address)
334             txpoint['inputs'] = txinputs
335             txoutputs = []
336             outrows = self.get_tx_outputs(tx_id)
337             for row in outrows:
338                 _hash = self.binout(row[6])
339                 if not _hash:
340                     #print "WARNING: missing tx_out for tx", tx_id, addr
341                     continue
342                 address = hash_to_address(chr(self.addrtype), _hash)
343                 txoutputs.append(address)
344             txpoint['outputs'] = txoutputs
345
346             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
347             if not txpoint['is_input']:
348                 # detect if already redeemed...
349                 for row in outrows:
350                     if row[6] == dbhash: break
351                 else:
352                     raise
353                 #row = self.get_tx_output(tx_id,dbhash)
354                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
355                 # if not redeemed, we add the script
356                 if row:
357                     if not row[4]: txpoint['raw_output_script'] = row[1]
358
359             txpoint.pop('tx_id')
360
361         # cache result
362         # do not cache mempool results because statuses are ambiguous
363         if not address_has_mempool:
364             self.tx_cache[addr] = txpoints
365         
366         return txpoints
367
368
369     def get_status(self,addr):
370         # get address status, i.e. the last block for that address.
371         tx_points = self.get_history(addr)
372         if not tx_points:
373             status = None
374         else:
375             lastpoint = tx_points[-1]
376             status = lastpoint['block_hash']
377             # this is a temporary hack; move it up once old clients have disappeared
378             if status == 'mempool': # and session['version'] != "old":
379                 status = status + ':%d'% len(tx_points)
380         return status
381
382
383
384     def memorypool_update(store):
385
386         ds = BCDataStream.BCDataStream()
387         postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
388
389         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
390         r = loads(respdata)
391         if r['error'] != None:
392             return
393
394         v = r['result'].get('transactions')
395         for hextx in v:
396             ds.clear()
397             ds.write(hextx.decode('hex'))
398             tx = deserialize.parse_Transaction(ds)
399             tx['hash'] = util.double_sha256(tx['tx'])
400             tx_hash = store.hashin(tx['hash'])
401
402             if store.tx_find_id_and_value(tx):
403                 pass
404             else:
405                 tx_id = store.import_tx(tx, False)
406                 store.update_tx_cache(tx_id)
407                 #print tx_hash
408     
409         store.commit()
410
411
412     def send_tx(self,tx):
413         postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
414         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
415         r = loads(respdata)
416         if r['error'] != None:
417             msg = r['error'].get('message')
418             out = "error: transaction rejected by memorypool: " + msg + "\n" + tx
419         else:
420             out = r['result']
421         return out
422
423
424     def main_iteration(store):
425         with store.dblock:
426             store.catch_up()
427             store.memorypool_update()
428             block_number = store.get_block_number(store.chain_id)
429             return block_number
430
431
432
433
434     def catch_up(store):
435         # if there is an exception, do rollback and then re-raise the exception
436         for dircfg in store.datadirs:
437             try:
438                 store.catch_up_dir(dircfg)
439             except Exception, e:
440                 store.log.exception("Failed to catch up %s", dircfg)
441                 store.rollback()
442                 raise e
443
444
445
446
447 from processor import Processor
448
449 class BlockchainProcessor(Processor):
450
451     def __init__(self, config):
452         Processor.__init__(self)
453         self.store = AbeStore(config)
454         self.block_number = -1
455         self.watched_addresses = []
456
457         # catch_up first
458         n = self.store.main_iteration()
459         print "blockchain: %d blocks"%n
460
461         threading.Timer(10, self.run_store_iteration).start()
462
463     def process(self, request):
464         #print "abe process", request
465
466         message_id = request['id']
467         method = request['method']
468         params = request.get('params',[])
469         result = None
470         error = None
471
472         if method == 'blockchain.numblocks.subscribe':
473             result = self.block_number
474
475         elif method == 'blockchain.address.subscribe':
476             try:
477                 address = params[0]
478                 result = self.store.get_status(address)
479                 self.watch_address(address)
480             except BaseException, e:
481                 error = str(e) + ': ' + address
482                 print "error:", error
483
484         elif method == 'blockchain.address.get_history':
485             try:
486                 address = params[0]
487                 result = self.store.get_history( address ) 
488             except BaseException, e:
489                 error = str(e) + ': ' + address
490                 print "error:", error
491
492         elif method == 'blockchain.transaction.broadcast':
493             txo = self.store.send_tx(params[0])
494             print "sent tx:", txo
495             result = txo 
496
497         else:
498             error = "unknown method:%s"%method
499
500
501         if error:
502             response = { 'id':message_id, 'error':error }
503             self.push_response(response)
504         elif result != '':
505             response = { 'id':message_id, 'result':result }
506             self.push_response(response)
507
508
509     def watch_address(self, addr):
510         if addr not in self.watched_addresses:
511             self.watched_addresses.append(addr)
512
513
514     def run_store_iteration(self):
515         
516         try:
517             block_number = self.store.main_iteration()
518         except:
519             traceback.print_exc(file=sys.stdout)
520             print "terminating"
521             self.shared.stop()
522
523         if self.shared.stopped(): 
524             print "exit timer"
525             return
526
527         if self.block_number != block_number:
528             self.block_number = block_number
529             print "block number:", self.block_number
530             self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
531
532         while True:
533             try:
534                 addr = self.store.address_queue.get(False)
535             except:
536                 break
537             if addr in self.watched_addresses:
538                 status = self.store.get_status( addr )
539                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
540
541         threading.Timer(10, self.run_store_iteration).start()
542
543