1971a6c8eb152c42ff6727205d5fe4442e39dd33
[electrum-server.git] / backends / abe / __init__.py
1 from Abe.util import hash_to_address, decode_check_address
2 from Abe.DataStore import DataStore as Datastore_class
3 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
4
5 import binascii
6
7 import thread, traceback, sys, urllib, operator
8 from json import dumps, loads
9 from Queue import Queue
10 import time, threading
11
12
13
14 class AbeStore(Datastore_class):
15
16     def __init__(self, config):
17         conf = DataStore.CONFIG_DEFAULTS
18         args, argv = readconf.parse_argv( [], conf)
19         args.dbtype = config.get('database','type')
20         if args.dbtype == 'sqlite3':
21             args.connect_args = { 'database' : config.get('database','database') }
22         elif args.dbtype == 'MySQLdb':
23             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
24         elif args.dbtype == 'psycopg2':
25             args.connect_args = { 'database' : config.get('database','database') }
26
27         Datastore_class.__init__(self,args)
28
29         self.sql_limit = int( config.get('database','limit') )
30
31         self.tx_cache = {}
32         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
33
34         self.address_queue = Queue()
35
36         self.dblock = thread.allocate_lock()
37
38
39
40     def import_block(self, b, chain_ids=frozenset()):
41         #print "import block"
42         block_id = super(AbeStore, self).import_block(b, chain_ids)
43         for pos in xrange(len(b['transactions'])):
44             tx = b['transactions'][pos]
45             if 'hash' not in tx:
46                 tx['hash'] = util.double_sha256(tx['tx'])
47             tx_id = self.tx_find_id_and_value(tx)
48             if tx_id:
49                 self.update_tx_cache(tx_id)
50             else:
51                 print "error: import_block: no tx_id"
52         return block_id
53
54
55     def update_tx_cache(self, txid):
56         inrows = self.get_tx_inputs(txid, False)
57         for row in inrows:
58             _hash = self.binout(row[6])
59             if not _hash:
60                 #print "WARNING: missing tx_in for tx", txid
61                 continue
62
63             address = hash_to_address(chr(0), _hash)
64             if self.tx_cache.has_key(address):
65                 print "cache: invalidating", address
66                 self.tx_cache.pop(address)
67             self.address_queue.put(address)
68
69         outrows = self.get_tx_outputs(txid, False)
70         for row in outrows:
71             _hash = self.binout(row[6])
72             if not _hash:
73                 #print "WARNING: missing tx_out for tx", txid
74                 continue
75
76             address = hash_to_address(chr(0), _hash)
77             if self.tx_cache.has_key(address):
78                 print "cache: invalidating", address
79                 self.tx_cache.pop(address)
80             self.address_queue.put(address)
81
82     def safe_sql(self,sql, params=(), lock=True):
83
84         error = False
85         try:
86             if lock: self.dblock.acquire()
87             ret = self.selectall(sql,params)
88         except:
89             error = True
90             traceback.print_exc(file=sys.stdout)
91         finally:
92             if lock: self.dblock.release()
93
94         if error: 
95             raise BaseException('sql error')
96
97         return ret
98             
99
100     def get_tx_outputs(self, tx_id, lock=True):
101         return self.safe_sql("""SELECT
102                 txout.txout_pos,
103                 txout.txout_scriptPubKey,
104                 txout.txout_value,
105                 nexttx.tx_hash,
106                 nexttx.tx_id,
107                 txin.txin_pos,
108                 pubkey.pubkey_hash
109               FROM txout
110               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
111               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
112               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
113              WHERE txout.tx_id = %d 
114              ORDER BY txout.txout_pos
115         """%(tx_id), (), lock)
116
117     def get_tx_inputs(self, tx_id, lock=True):
118         return self.safe_sql(""" SELECT
119                 txin.txin_pos,
120                 txin.txin_scriptSig,
121                 txout.txout_value,
122                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
123                 prevtx.tx_id,
124                 COALESCE(txout.txout_pos, u.txout_pos),
125                 pubkey.pubkey_hash
126               FROM txin
127               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
128               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
129               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
130               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
131              WHERE txin.tx_id = %d
132              ORDER BY txin.txin_pos
133              """%(tx_id,), (), lock)
134
135
136     def get_address_out_rows(self, dbhash):
137         out = self.safe_sql(""" SELECT
138                 b.block_nTime,
139                 cc.chain_id,
140                 b.block_height,
141                 1,
142                 b.block_hash,
143                 tx.tx_hash,
144                 tx.tx_id,
145                 txin.txin_pos,
146                 -prevout.txout_value
147               FROM chain_candidate cc
148               JOIN block b ON (b.block_id = cc.block_id)
149               JOIN block_tx ON (block_tx.block_id = b.block_id)
150               JOIN tx ON (tx.tx_id = block_tx.tx_id)
151               JOIN txin ON (txin.tx_id = tx.tx_id)
152               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
153               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
154              WHERE pubkey.pubkey_hash = ?
155                AND cc.in_longest = 1
156              LIMIT ? """, (dbhash,self.sql_limit))
157
158         if len(out)==self.sql_limit: 
159             raise BaseException('limit reached')
160         return out
161
162     def get_address_out_rows_memorypool(self, dbhash):
163         out = self.safe_sql(""" SELECT
164                 1,
165                 tx.tx_hash,
166                 tx.tx_id,
167                 txin.txin_pos,
168                 -prevout.txout_value
169               FROM tx 
170               JOIN txin ON (txin.tx_id = tx.tx_id)
171               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
172               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
173              WHERE pubkey.pubkey_hash = ?
174              LIMIT ? """, (dbhash,self.sql_limit))
175
176         if len(out)==self.sql_limit: 
177             raise BaseException('limit reached')
178         return out
179
180     def get_address_in_rows(self, dbhash):
181         out = self.safe_sql(""" SELECT
182                 b.block_nTime,
183                 cc.chain_id,
184                 b.block_height,
185                 0,
186                 b.block_hash,
187                 tx.tx_hash,
188                 tx.tx_id,
189                 txout.txout_pos,
190                 txout.txout_value
191               FROM chain_candidate cc
192               JOIN block b ON (b.block_id = cc.block_id)
193               JOIN block_tx ON (block_tx.block_id = b.block_id)
194               JOIN tx ON (tx.tx_id = block_tx.tx_id)
195               JOIN txout ON (txout.tx_id = tx.tx_id)
196               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
197              WHERE pubkey.pubkey_hash = ?
198                AND cc.in_longest = 1
199                LIMIT ? """, (dbhash,self.sql_limit))
200
201         if len(out)==self.sql_limit: 
202             raise BaseException('limit reached')
203         return out
204
205     def get_address_in_rows_memorypool(self, dbhash):
206         out = self.safe_sql( """ SELECT
207                 0,
208                 tx.tx_hash,
209                 tx.tx_id,
210                 txout.txout_pos,
211                 txout.txout_value
212               FROM tx
213               JOIN txout ON (txout.tx_id = tx.tx_id)
214               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
215              WHERE pubkey.pubkey_hash = ?
216              LIMIT ? """, (dbhash,self.sql_limit))
217
218         if len(out)==self.sql_limit: 
219             raise BaseException('limit reached')
220         return out
221
222     def get_history(self, addr):
223
224         cached_version = self.tx_cache.get( addr )
225         if cached_version is not None:
226             return cached_version
227
228         version, binaddr = decode_check_address(addr)
229         if binaddr is None:
230             return None
231
232         dbhash = self.binin(binaddr)
233         rows = []
234         rows += self.get_address_out_rows( dbhash )
235         rows += self.get_address_in_rows( dbhash )
236
237         txpoints = []
238         known_tx = []
239
240         for row in rows:
241             try:
242                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
243             except:
244                 print "cannot unpack row", row
245                 break
246             tx_hash = self.hashout_hex(tx_hash)
247             txpoint = {
248                     "timestamp":    int(nTime),
249                     "height":   int(height),
250                     "is_input":    int(is_in),
251                     "block_hash": self.hashout_hex(blk_hash),
252                     "tx_hash":  tx_hash,
253                     "tx_id":    int(tx_id),
254                     "index":      int(pos),
255                     "value":    int(value),
256                     }
257
258             txpoints.append(txpoint)
259             known_tx.append(self.hashout_hex(tx_hash))
260
261
262         # todo: sort them really...
263         txpoints = sorted(txpoints, key=operator.itemgetter("timestamp"))
264
265         # read memory pool
266         rows = []
267         rows += self.get_address_in_rows_memorypool( dbhash )
268         rows += self.get_address_out_rows_memorypool( dbhash )
269         address_has_mempool = False
270
271         current_id = self.safe_sql("""SELECT last_value FROM tx_seq""")
272         current_id = current_id[0][0]
273
274         for row in rows:
275             is_in, tx_hash, tx_id, pos, value = row
276             tx_hash = self.hashout_hex(tx_hash)
277             if tx_hash in known_tx:
278                 continue
279
280             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
281             address_has_mempool = True
282
283             # fixme: we need to detect transactions that became invalid
284             if current_id - tx_id > 10000:
285                 continue
286
287
288             #print "mempool", tx_hash
289             txpoint = {
290                     "timestamp":    0,
291                     "height":   0,
292                     "is_input":    int(is_in),
293                     "block_hash": 'mempool', 
294                     "tx_hash":  tx_hash,
295                     "tx_id":    int(tx_id),
296                     "index":      int(pos),
297                     "value":    int(value),
298                     }
299             txpoints.append(txpoint)
300
301
302         for txpoint in txpoints:
303             tx_id = txpoint['tx_id']
304             
305             txinputs = []
306             inrows = self.get_tx_inputs(tx_id)
307             for row in inrows:
308                 _hash = self.binout(row[6])
309                 if not _hash:
310                     #print "WARNING: missing tx_in for tx", tx_id, addr
311                     continue
312                 address = hash_to_address(chr(0), _hash)
313                 txinputs.append(address)
314             txpoint['inputs'] = txinputs
315             txoutputs = []
316             outrows = self.get_tx_outputs(tx_id)
317             for row in outrows:
318                 _hash = self.binout(row[6])
319                 if not _hash:
320                     #print "WARNING: missing tx_out for tx", tx_id, addr
321                     continue
322                 address = hash_to_address(chr(0), _hash)
323                 txoutputs.append(address)
324             txpoint['outputs'] = txoutputs
325
326             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
327             if not txpoint['is_input']:
328                 # detect if already redeemed...
329                 for row in outrows:
330                     if row[6] == dbhash: break
331                 else:
332                     raise
333                 #row = self.get_tx_output(tx_id,dbhash)
334                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
335                 # if not redeemed, we add the script
336                 if row:
337                     if not row[4]: txpoint['raw_output_script'] = row[1]
338
339         # cache result
340         # do not cache mempool results because statuses are ambiguous
341         if not address_has_mempool:
342             self.tx_cache[addr] = txpoints
343         
344         return txpoints
345
346
347     def get_status(self,addr):
348         # get address status, i.e. the last block for that address.
349         tx_points = self.get_history(addr)
350         if not tx_points:
351             status = None
352         else:
353             lastpoint = tx_points[-1]
354             status = lastpoint['block_hash']
355             # this is a temporary hack; move it up once old clients have disappeared
356             if status == 'mempool': # and session['version'] != "old":
357                 status = status + ':%d'% len(tx_points)
358         return status
359
360
361
362     def memorypool_update(store):
363
364         ds = BCDataStream.BCDataStream()
365         postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
366
367         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
368         r = loads(respdata)
369         if r['error'] != None:
370             return
371
372         v = r['result'].get('transactions')
373         for hextx in v:
374             ds.clear()
375             ds.write(hextx.decode('hex'))
376             tx = deserialize.parse_Transaction(ds)
377             tx['hash'] = util.double_sha256(tx['tx'])
378             tx_hash = store.hashin(tx['hash'])
379
380             if store.tx_find_id_and_value(tx):
381                 pass
382             else:
383                 tx_id = store.import_tx(tx, False)
384                 store.update_tx_cache(tx_id)
385                 #print tx_hash
386     
387         store.commit()
388
389
390     def send_tx(self,tx):
391         postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
392         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
393         r = loads(respdata)
394         if r['error'] != None:
395             msg = r['error'].get('message')
396             out = "error: transaction rejected by memorypool: " + msg + "\n" + tx
397         else:
398             out = r['result']
399         return out
400
401
402     def main_iteration(store):
403         try:
404             store.dblock.acquire()
405             store.catch_up()
406             store.memorypool_update()
407             block_number = store.get_block_number(1)
408
409         except IOError:
410             print "IOError: cannot reach bitcoind"
411             block_number = 0
412         except:
413             traceback.print_exc(file=sys.stdout)
414             block_number = 0
415         finally:
416             store.dblock.release()
417
418         return block_number
419
420
421     def catch_up(store):
422         # if there is an exception, do rollback and then re-raise the exception
423         for dircfg in store.datadirs:
424             try:
425                 store.catch_up_dir(dircfg)
426             except Exception, e:
427                 store.log.exception("Failed to catch up %s", dircfg)
428                 store.rollback()
429                 raise e
430
431
432
433
434 from processor import Processor
435
436 class BlockchainProcessor(Processor):
437
438     def __init__(self, config):
439         Processor.__init__(self)
440         self.store = AbeStore(config)
441         self.block_number = -1
442         self.watched_addresses = []
443         threading.Timer(10, self.run_store_iteration).start()
444
445     def process(self, request):
446         #print "abe process", request
447
448         message_id = request['id']
449         method = request['method']
450         params = request.get('params',[])
451         result = None
452         error = None
453
454         if method == 'blockchain.numblocks.subscribe':
455             result = self.block_number
456
457         elif method == 'blockchain.address.subscribe':
458             try:
459                 address = params[0]
460                 result = self.store.get_status(address)
461                 self.watch_address(address)
462             except BaseException, e:
463                 error = str(e) + ': ' + address
464                 print "error:", error
465
466         elif method == 'blockchain.address.get_history':
467             try:
468                 address = params[0]
469                 result = self.store.get_history( address ) 
470             except BaseException, e:
471                 error = str(e) + ': ' + address
472                 print "error:", error
473
474         elif method == 'blockchain.transaction.broadcast':
475             txo = self.store.send_tx(params[0])
476             print "sent tx:", txo
477             result = txo 
478
479         else:
480             error = "unknown method:%s"%method
481
482
483         if error:
484             response = { 'id':message_id, 'error':error }
485             self.push_response(response)
486         elif result != '':
487             response = { 'id':message_id, 'result':result }
488             self.push_response(response)
489
490
491     def watch_address(self, addr):
492         if addr not in self.watched_addresses:
493             self.watched_addresses.append(addr)
494
495
496     def run_store_iteration(self):
497         if self.shared.stopped(): 
498             print "exit timer"
499             return
500         
501         block_number = self.store.main_iteration()
502         if self.block_number != block_number:
503             self.block_number = block_number
504             print "block number:", self.block_number
505             self.push_response({ 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
506
507         while True:
508             try:
509                 addr = self.store.address_queue.get(False)
510             except:
511                 break
512             if addr in self.watched_addresses:
513                 status = self.store.get_status( addr )
514                 self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr, status] })
515
516         threading.Timer(10, self.run_store_iteration).start()
517
518