48c01de63dbc55d58858343a7ed1e53c3936fc2b
[electrum-server.git] / backends / abe / __init__.py
1 from Abe.util import hash_to_address, decode_check_address
2 from Abe.DataStore import DataStore as Datastore_class
3 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
4
5 import binascii
6
7 import thread, traceback, sys, urllib, operator
8 from json import dumps, loads
9 from Queue import Queue
10 import time, threading
11
12
13
14 class AbeStore(Datastore_class):
15
16     addrtype = 48
17
18     def __init__(self, config):
19         conf = DataStore.CONFIG_DEFAULTS
20         args, argv = readconf.parse_argv( [], conf)
21         args.dbtype = config.get('database','type')
22         if args.dbtype == 'sqlite3':
23             args.connect_args = { 'database' : config.get('database','database') }
24         elif args.dbtype == 'MySQLdb':
25             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
26         elif args.dbtype == 'psycopg2':
27             args.connect_args = { 'database' : config.get('database','database') }
28
29         Datastore_class.__init__(self,args)
30
31         self.sql_limit = int( config.get('database','limit') )
32
33         self.tx_cache = {}
34         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
35
36         self.address_queue = Queue()
37
38         self.dblock = thread.allocate_lock()
39         self.last_tx_id = 0
40
41     
42     def import_tx(self, tx, is_coinbase):
43         tx_id = super(AbeStore, self).import_tx(tx, is_coinbase)
44         self.last_tx_id = tx_id
45         return tx_id
46         
47
48
49
50     def import_block(self, b, chain_ids=frozenset()):
51         #print "import block"
52         block_id = super(AbeStore, self).import_block(b, chain_ids)
53         for pos in xrange(len(b['transactions'])):
54             tx = b['transactions'][pos]
55             if 'hash' not in tx:
56                 tx['hash'] = util.double_sha256(tx['tx'])
57             tx_id = self.tx_find_id_and_value(tx)
58             if tx_id:
59                 self.update_tx_cache(tx_id)
60             else:
61                 print "error: import_block: no tx_id"
62         return block_id
63
64
65     def update_tx_cache(self, txid):
66         inrows = self.get_tx_inputs(txid, False)
67         for row in inrows:
68             _hash = self.binout(row[6])
69             if not _hash:
70                 #print "WARNING: missing tx_in for tx", txid
71                 continue
72
73             address = hash_to_address(chr(addrtype), _hash)
74             if self.tx_cache.has_key(address):
75                 print "cache: invalidating", address
76                 self.tx_cache.pop(address)
77             self.address_queue.put(address)
78
79         outrows = self.get_tx_outputs(txid, False)
80         for row in outrows:
81             _hash = self.binout(row[6])
82             if not _hash:
83                 #print "WARNING: missing tx_out for tx", txid
84                 continue
85
86             address = hash_to_address(chr(addrtype), _hash)
87             if self.tx_cache.has_key(address):
88                 print "cache: invalidating", address
89                 self.tx_cache.pop(address)
90             self.address_queue.put(address)
91
92     def safe_sql(self,sql, params=(), lock=True):
93
94         error = False
95         try:
96             if lock: self.dblock.acquire()
97             ret = self.selectall(sql,params)
98         except:
99             error = True
100             traceback.print_exc(file=sys.stdout)
101         finally:
102             if lock: self.dblock.release()
103
104         if error: 
105             raise BaseException('sql error')
106
107         return ret
108             
109
110     def get_tx_outputs(self, tx_id, lock=True):
111         return self.safe_sql("""SELECT
112                 txout.txout_pos,
113                 txout.txout_scriptPubKey,
114                 txout.txout_value,
115                 nexttx.tx_hash,
116                 nexttx.tx_id,
117                 txin.txin_pos,
118                 pubkey.pubkey_hash
119               FROM txout
120               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
121               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
122               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
123              WHERE txout.tx_id = %d 
124              ORDER BY txout.txout_pos
125         """%(tx_id), (), lock)
126
127     def get_tx_inputs(self, tx_id, lock=True):
128         return self.safe_sql(""" SELECT
129                 txin.txin_pos,
130                 txin.txin_scriptSig,
131                 txout.txout_value,
132                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
133                 prevtx.tx_id,
134                 COALESCE(txout.txout_pos, u.txout_pos),
135                 pubkey.pubkey_hash
136               FROM txin
137               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
138               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
139               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
140               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
141              WHERE txin.tx_id = %d
142              ORDER BY txin.txin_pos
143              """%(tx_id,), (), lock)
144
145
146     def get_address_out_rows(self, dbhash):
147         out = self.safe_sql(""" SELECT
148                 b.block_nTime,
149                 cc.chain_id,
150                 b.block_height,
151                 1,
152                 b.block_hash,
153                 tx.tx_hash,
154                 tx.tx_id,
155                 txin.txin_pos,
156                 -prevout.txout_value
157               FROM chain_candidate cc
158               JOIN block b ON (b.block_id = cc.block_id)
159               JOIN block_tx ON (block_tx.block_id = b.block_id)
160               JOIN tx ON (tx.tx_id = block_tx.tx_id)
161               JOIN txin ON (txin.tx_id = tx.tx_id)
162               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
163               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
164              WHERE pubkey.pubkey_hash = ?
165                AND cc.in_longest = 1
166              LIMIT ? """, (dbhash,self.sql_limit))
167
168         if len(out)==self.sql_limit: 
169             raise BaseException('limit reached')
170         return out
171
172     def get_address_out_rows_memorypool(self, dbhash):
173         out = self.safe_sql(""" SELECT
174                 1,
175                 tx.tx_hash,
176                 tx.tx_id,
177                 txin.txin_pos,
178                 -prevout.txout_value
179               FROM tx 
180               JOIN txin ON (txin.tx_id = tx.tx_id)
181               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
182               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
183              WHERE pubkey.pubkey_hash = ?
184              LIMIT ? """, (dbhash,self.sql_limit))
185
186         if len(out)==self.sql_limit: 
187             raise BaseException('limit reached')
188         return out
189
190     def get_address_in_rows(self, dbhash):
191         out = self.safe_sql(""" SELECT
192                 b.block_nTime,
193                 cc.chain_id,
194                 b.block_height,
195                 0,
196                 b.block_hash,
197                 tx.tx_hash,
198                 tx.tx_id,
199                 txout.txout_pos,
200                 txout.txout_value
201               FROM chain_candidate cc
202               JOIN block b ON (b.block_id = cc.block_id)
203               JOIN block_tx ON (block_tx.block_id = b.block_id)
204               JOIN tx ON (tx.tx_id = block_tx.tx_id)
205               JOIN txout ON (txout.tx_id = tx.tx_id)
206               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
207              WHERE pubkey.pubkey_hash = ?
208                AND cc.in_longest = 1
209                LIMIT ? """, (dbhash,self.sql_limit))
210
211         if len(out)==self.sql_limit: 
212             raise BaseException('limit reached')
213         return out
214
215     def get_address_in_rows_memorypool(self, dbhash):
216         out = self.safe_sql( """ SELECT
217                 0,
218                 tx.tx_hash,
219                 tx.tx_id,
220                 txout.txout_pos,
221                 txout.txout_value
222               FROM tx
223               JOIN txout ON (txout.tx_id = tx.tx_id)
224               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
225              WHERE pubkey.pubkey_hash = ?
226              LIMIT ? """, (dbhash,self.sql_limit))
227
228         if len(out)==self.sql_limit: 
229             raise BaseException('limit reached')
230         return out
231
232     def get_history(self, addr):
233
234         cached_version = self.tx_cache.get( addr )
235         if cached_version is not None:
236             return cached_version
237
238         version, binaddr = decode_check_address(addr)
239         if binaddr is None:
240             return None
241
242         dbhash = self.binin(binaddr)
243         rows = []
244         rows += self.get_address_out_rows( dbhash )
245         rows += self.get_address_in_rows( dbhash )
246
247         txpoints = []
248         known_tx = []
249
250         for row in rows:
251             try:
252                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
253             except:
254                 print "cannot unpack row", row
255                 break
256             tx_hash = self.hashout_hex(tx_hash)
257             txpoint = {
258                     "timestamp":    int(nTime),
259                     "height":   int(height),
260                     "is_input":    int(is_in),
261                     "block_hash": self.hashout_hex(blk_hash),
262                     "tx_hash":  tx_hash,
263                     "tx_id":    int(tx_id),
264                     "index":      int(pos),
265                     "value":    int(value),
266                     }
267
268             txpoints.append(txpoint)
269             known_tx.append(self.hashout_hex(tx_hash))
270
271
272         # todo: sort them really...
273         txpoints = sorted(txpoints, key=operator.itemgetter("timestamp"))
274
275         # read memory pool
276         rows = []
277         rows += self.get_address_in_rows_memorypool( dbhash )
278         rows += self.get_address_out_rows_memorypool( dbhash )
279         address_has_mempool = False
280
281         for row in rows:
282             is_in, tx_hash, tx_id, pos, value = row
283             tx_hash = self.hashout_hex(tx_hash)
284             if tx_hash in known_tx:
285                 continue
286
287             # discard transactions that are too old
288             if self.last_tx_id - tx_id > 50000:
289                 print "discarding tx id", tx_id
290                 continue
291
292             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
293             address_has_mempool = True
294
295             #print "mempool", tx_hash
296             txpoint = {
297                     "timestamp":    0,
298                     "height":   0,
299                     "is_input":    int(is_in),
300                     "block_hash": 'mempool', 
301                     "tx_hash":  tx_hash,
302                     "tx_id":    int(tx_id),
303                     "index":      int(pos),
304                     "value":    int(value),
305                     }
306             txpoints.append(txpoint)
307
308
309         for txpoint in txpoints:
310             tx_id = txpoint['tx_id']
311             
312             txinputs = []
313             inrows = self.get_tx_inputs(tx_id)
314             for row in inrows:
315                 _hash = self.binout(row[6])
316                 if not _hash:
317                     #print "WARNING: missing tx_in for tx", tx_id, addr
318                     continue
319                 address = hash_to_address(chr(addrtype), _hash)
320                 txinputs.append(address)
321             txpoint['inputs'] = txinputs
322             txoutputs = []
323             outrows = self.get_tx_outputs(tx_id)
324             for row in outrows:
325                 _hash = self.binout(row[6])
326                 if not _hash:
327                     #print "WARNING: missing tx_out for tx", tx_id, addr
328                     continue
329                 address = hash_to_address(chr(addrtype), _hash)
330                 txoutputs.append(address)
331             txpoint['outputs'] = txoutputs
332
333             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
334             if not txpoint['is_input']:
335                 # detect if already redeemed...
336                 for row in outrows:
337                     if row[6] == dbhash: break
338                 else:
339                     raise
340                 #row = self.get_tx_output(tx_id,dbhash)
341                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
342                 # if not redeemed, we add the script
343                 if row:
344                     if not row[4]: txpoint['raw_output_script'] = row[1]
345
346             txpoint.pop('tx_id')
347
348         # cache result
349         # do not cache mempool results because statuses are ambiguous
350         if not address_has_mempool:
351             self.tx_cache[addr] = txpoints
352         
353         return txpoints
354
355
356     def get_status(self,addr):
357         # get address status, i.e. the last block for that address.
358         tx_points = self.get_history(addr)
359         if not tx_points:
360             status = None
361         else:
362             lastpoint = tx_points[-1]
363             status = lastpoint['block_hash']
364             # this is a temporary hack; move it up once old clients have disappeared
365             if status == 'mempool': # and session['version'] != "old":
366                 status = status + ':%d'% len(tx_points)
367         return status
368
369
370
371     def memorypool_update(store):
372
373         ds = BCDataStream.BCDataStream()
374         postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
375
376         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
377         r = loads(respdata)
378         if r['error'] != None:
379             return
380
381         v = r['result'].get('transactions')
382         for hextx in v:
383             ds.clear()
384             ds.write(hextx.decode('hex'))
385             tx = deserialize.parse_Transaction(ds)
386             tx['hash'] = util.double_sha256(tx['tx'])
387             tx_hash = store.hashin(tx['hash'])
388
389             if store.tx_find_id_and_value(tx):
390                 pass
391             else:
392                 tx_id = store.import_tx(tx, False)
393                 store.update_tx_cache(tx_id)
394                 #print tx_hash
395     
396         store.commit()
397
398
399     def send_tx(self,tx):
400         postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
401         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
402         r = loads(respdata)
403         if r['error'] != None:
404             msg = r['error'].get('message')
405             out = "error: transaction rejected by memorypool: " + msg + "\n" + tx
406         else:
407             out = r['result']
408         return out
409
410
411     def main_iteration(store):
412         with store.dblock:
413             store.catch_up()
414             store.memorypool_update()
415             block_number = store.get_block_number(1)
416             return block_number
417
418
419
420
421     def catch_up(store):
422         # if there is an exception, do rollback and then re-raise the exception
423         for dircfg in store.datadirs:
424             try:
425                 store.catch_up_dir(dircfg)
426             except Exception, e:
427                 store.log.exception("Failed to catch up %s", dircfg)
428                 store.rollback()
429                 raise e
430
431
432
433
434 from processor import Processor
435
436 class BlockchainProcessor(Processor):
437
438     def __init__(self, config):
439         Processor.__init__(self)
440         self.store = AbeStore(config)
441         self.block_number = -1
442         self.watched_addresses = []
443
444         # catch_up first
445         n = self.store.main_iteration()
446         print "blockchain: %d blocks"%n
447
448         threading.Timer(10, self.run_store_iteration).start()
449
450     def process(self, request):
451         #print "abe process", request
452
453         message_id = request['id']
454         method = request['method']
455         params = request.get('params',[])
456         result = None
457         error = None
458
459         if method == 'blockchain.numblocks.subscribe':
460             result = self.block_number
461
462         elif method == 'blockchain.address.subscribe':
463             try:
464                 address = params[0]
465                 result = self.store.get_status(address)
466                 self.watch_address(address)
467             except BaseException, e:
468                 error = str(e) + ': ' + address
469                 print "error:", error
470
471         elif method == 'blockchain.address.get_history':
472             try:
473                 address = params[0]
474                 result = self.store.get_history( address ) 
475             except BaseException, e:
476                 error = str(e) + ': ' + address
477                 print "error:", error
478
479         elif method == 'blockchain.transaction.broadcast':
480             txo = self.store.send_tx(params[0])
481             print "sent tx:", txo
482             result = txo 
483
484         else:
485             error = "unknown method:%s"%method
486
487
488         if error:
489             response = { 'id':message_id, 'error':error }
490             self.push_response(response)
491         elif result != '':
492             response = { 'id':message_id, 'result':result }
493             self.push_response(response)
494
495
496     def watch_address(self, addr):
497         if addr not in self.watched_addresses:
498             self.watched_addresses.append(addr)
499
500
501     def run_store_iteration(self):
502         
503         try:
504             block_number = self.store.main_iteration()
505         except:
506             traceback.print_exc(file=sys.stdout)
507             print "terminating"
508             self.shared.stop()
509
510         if self.shared.stopped(): 
511             print "exit timer"
512             return
513
514         if self.block_number != block_number:
515             self.block_number = block_number
516             print "block number:", self.block_number
517             self.push_response({ 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
518
519         while True:
520             try:
521                 addr = self.store.address_queue.get(False)
522             except:
523                 break
524             if addr in self.watched_addresses:
525                 status = self.store.get_status( addr )
526                 self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr, status] })
527
528         threading.Timer(10, self.run_store_iteration).start()
529
530