Filter only blocks for the chain we want.
[electrum-server.git] / backends / abe / __init__.py
1 from Abe.util import hash_to_address, decode_check_address
2 from Abe.DataStore import DataStore as Datastore_class
3 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
4
5 import binascii
6
7 import thread, traceback, sys, urllib, operator
8 from json import dumps, loads
9 from Queue import Queue
10 import time, threading
11
12
13 class AbeStore(Datastore_class):
14
15     def __init__(self, config):
16         conf = DataStore.CONFIG_DEFAULTS
17         args, argv = readconf.parse_argv( [], conf)
18         args.dbtype = config.get('database','type')
19         if args.dbtype == 'sqlite3':
20             args.connect_args = { 'database' : config.get('database','database') }
21         elif args.dbtype == 'MySQLdb':
22             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
23         elif args.dbtype == 'psycopg2':
24             args.connect_args = { 'database' : config.get('database','database') }
25
26         coin = config.get('server', 'coin')
27         self.addrtype = 0
28         if coin == 'litecoin':
29             print 'Litecoin settings:'
30             datadir = config.get('server','datadir')
31             print '  datadir = ' + datadir
32             args.datadir = [{"dirname":datadir,"chain":"Litecoin","code3":"LTC","address_version":"\u0030"}]
33             print '  addrtype = 48'
34             self.addrtype = 48
35
36         Datastore_class.__init__(self,args)
37
38         self.chain_id = self.datadirs[0]["chain_id"];
39         print 'Coin chain_id = %d' % self.chain_id
40
41         self.sql_limit = int( config.get('database','limit') )
42
43         self.tx_cache = {}
44         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
45
46         self.address_queue = Queue()
47
48         self.dblock = thread.allocate_lock()
49         self.last_tx_id = 0
50
51     
52     def import_tx(self, tx, is_coinbase):
53         tx_id = super(AbeStore, self).import_tx(tx, is_coinbase)
54         self.last_tx_id = tx_id
55         return tx_id
56         
57
58
59
60     def import_block(self, b, chain_ids=frozenset()):
61         #print "import block"
62         block_id = super(AbeStore, self).import_block(b, chain_ids)
63         for pos in xrange(len(b['transactions'])):
64             tx = b['transactions'][pos]
65             if 'hash' not in tx:
66                 tx['hash'] = util.double_sha256(tx['tx'])
67             tx_id = self.tx_find_id_and_value(tx)
68             if tx_id:
69                 self.update_tx_cache(tx_id)
70             else:
71                 print "error: import_block: no tx_id"
72         return block_id
73
74
75     def update_tx_cache(self, txid):
76         inrows = self.get_tx_inputs(txid, False)
77         for row in inrows:
78             _hash = self.binout(row[6])
79             if not _hash:
80                 #print "WARNING: missing tx_in for tx", txid
81                 continue
82
83             address = hash_to_address(chr(self.addrtype), _hash)
84             if self.tx_cache.has_key(address):
85                 print "cache: invalidating", address
86                 self.tx_cache.pop(address)
87             self.address_queue.put(address)
88
89         outrows = self.get_tx_outputs(txid, False)
90         for row in outrows:
91             _hash = self.binout(row[6])
92             if not _hash:
93                 #print "WARNING: missing tx_out for tx", txid
94                 continue
95
96             address = hash_to_address(chr(self.addrtype), _hash)
97             if self.tx_cache.has_key(address):
98                 print "cache: invalidating", address
99                 self.tx_cache.pop(address)
100             self.address_queue.put(address)
101
102     def safe_sql(self,sql, params=(), lock=True):
103
104         error = False
105         try:
106             if lock: self.dblock.acquire()
107             ret = self.selectall(sql,params)
108         except:
109             error = True
110             traceback.print_exc(file=sys.stdout)
111         finally:
112             if lock: self.dblock.release()
113
114         if error: 
115             raise BaseException('sql error')
116
117         return ret
118             
119
120     def get_tx_outputs(self, tx_id, lock=True):
121         return self.safe_sql("""SELECT
122                 txout.txout_pos,
123                 txout.txout_scriptPubKey,
124                 txout.txout_value,
125                 nexttx.tx_hash,
126                 nexttx.tx_id,
127                 txin.txin_pos,
128                 pubkey.pubkey_hash
129               FROM txout
130               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
131               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
132               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
133              WHERE txout.tx_id = %d 
134              ORDER BY txout.txout_pos
135         """%(tx_id), (), lock)
136
137     def get_tx_inputs(self, tx_id, lock=True):
138         return self.safe_sql(""" SELECT
139                 txin.txin_pos,
140                 txin.txin_scriptSig,
141                 txout.txout_value,
142                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
143                 prevtx.tx_id,
144                 COALESCE(txout.txout_pos, u.txout_pos),
145                 pubkey.pubkey_hash
146               FROM txin
147               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
148               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
149               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
150               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
151              WHERE txin.tx_id = %d
152              ORDER BY txin.txin_pos
153              """%(tx_id,), (), lock)
154
155
156     def get_address_out_rows(self, dbhash):
157         out = self.safe_sql(""" SELECT
158                 b.block_nTime,
159                 cc.chain_id,
160                 b.block_height,
161                 1,
162                 b.block_hash,
163                 tx.tx_hash,
164                 tx.tx_id,
165                 txin.txin_pos,
166                 -prevout.txout_value
167               FROM chain_candidate cc
168               JOIN block b ON (b.block_id = cc.block_id)
169               JOIN block_tx ON (block_tx.block_id = b.block_id)
170               JOIN tx ON (tx.tx_id = block_tx.tx_id)
171               JOIN txin ON (txin.tx_id = tx.tx_id)
172               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
173               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
174              WHERE pubkey.pubkey_hash = ?
175                AND cc.chain_id = ?
176                AND cc.in_longest = 1
177              LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
178
179         if len(out)==self.sql_limit: 
180             raise BaseException('limit reached')
181         return out
182
183     def get_address_out_rows_memorypool(self, dbhash):
184         out = self.safe_sql(""" SELECT
185                 1,
186                 tx.tx_hash,
187                 tx.tx_id,
188                 txin.txin_pos,
189                 -prevout.txout_value
190               FROM tx 
191               JOIN txin ON (txin.tx_id = tx.tx_id)
192               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
193               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
194              WHERE pubkey.pubkey_hash = ?
195              LIMIT ? """, (dbhash,self.sql_limit))
196
197         if len(out)==self.sql_limit: 
198             raise BaseException('limit reached')
199         return out
200
201     def get_address_in_rows(self, dbhash):
202         out = self.safe_sql(""" SELECT
203                 b.block_nTime,
204                 cc.chain_id,
205                 b.block_height,
206                 0,
207                 b.block_hash,
208                 tx.tx_hash,
209                 tx.tx_id,
210                 txout.txout_pos,
211                 txout.txout_value
212               FROM chain_candidate cc
213               JOIN block b ON (b.block_id = cc.block_id)
214               JOIN block_tx ON (block_tx.block_id = b.block_id)
215               JOIN tx ON (tx.tx_id = block_tx.tx_id)
216               JOIN txout ON (txout.tx_id = tx.tx_id)
217               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
218              WHERE pubkey.pubkey_hash = ?
219                AND cc.chain_id = ?
220                AND cc.in_longest = 1
221                LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
222
223         if len(out)==self.sql_limit: 
224             raise BaseException('limit reached')
225         return out
226
227     def get_address_in_rows_memorypool(self, dbhash):
228         out = self.safe_sql( """ SELECT
229                 0,
230                 tx.tx_hash,
231                 tx.tx_id,
232                 txout.txout_pos,
233                 txout.txout_value
234               FROM tx
235               JOIN txout ON (txout.tx_id = tx.tx_id)
236               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
237              WHERE pubkey.pubkey_hash = ?
238              LIMIT ? """, (dbhash,self.sql_limit))
239
240         if len(out)==self.sql_limit: 
241             raise BaseException('limit reached')
242         return out
243
244     def get_history(self, addr):
245
246         cached_version = self.tx_cache.get( addr )
247         if cached_version is not None:
248             return cached_version
249
250         version, binaddr = decode_check_address(addr)
251         if binaddr is None:
252             return None
253
254         dbhash = self.binin(binaddr)
255         rows = []
256         rows += self.get_address_out_rows( dbhash )
257         rows += self.get_address_in_rows( dbhash )
258
259         txpoints = []
260         known_tx = []
261
262         for row in rows:
263             try:
264                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
265             except:
266                 print "cannot unpack row", row
267                 break
268             tx_hash = self.hashout_hex(tx_hash)
269             txpoint = {
270                     "timestamp":    int(nTime),
271                     "height":   int(height),
272                     "is_input":    int(is_in),
273                     "block_hash": self.hashout_hex(blk_hash),
274                     "tx_hash":  tx_hash,
275                     "tx_id":    int(tx_id),
276                     "index":      int(pos),
277                     "value":    int(value),
278                     }
279
280             txpoints.append(txpoint)
281             known_tx.append(self.hashout_hex(tx_hash))
282
283
284         # todo: sort them really...
285         txpoints = sorted(txpoints, key=operator.itemgetter("timestamp"))
286
287         # read memory pool
288         rows = []
289         rows += self.get_address_in_rows_memorypool( dbhash )
290         rows += self.get_address_out_rows_memorypool( dbhash )
291         address_has_mempool = False
292
293         for row in rows:
294             is_in, tx_hash, tx_id, pos, value = row
295             tx_hash = self.hashout_hex(tx_hash)
296             if tx_hash in known_tx:
297                 continue
298
299             # discard transactions that are too old
300             if self.last_tx_id - tx_id > 50000:
301                 print "discarding tx id", tx_id
302                 continue
303
304             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
305             address_has_mempool = True
306
307             #print "mempool", tx_hash
308             txpoint = {
309                     "timestamp":    0,
310                     "height":   0,
311                     "is_input":    int(is_in),
312                     "block_hash": 'mempool', 
313                     "tx_hash":  tx_hash,
314                     "tx_id":    int(tx_id),
315                     "index":      int(pos),
316                     "value":    int(value),
317                     }
318             txpoints.append(txpoint)
319
320
321         for txpoint in txpoints:
322             tx_id = txpoint['tx_id']
323             
324             txinputs = []
325             inrows = self.get_tx_inputs(tx_id)
326             for row in inrows:
327                 _hash = self.binout(row[6])
328                 if not _hash:
329                     #print "WARNING: missing tx_in for tx", tx_id, addr
330                     continue
331                 address = hash_to_address(chr(self.addrtype), _hash)
332                 txinputs.append(address)
333             txpoint['inputs'] = txinputs
334             txoutputs = []
335             outrows = self.get_tx_outputs(tx_id)
336             for row in outrows:
337                 _hash = self.binout(row[6])
338                 if not _hash:
339                     #print "WARNING: missing tx_out for tx", tx_id, addr
340                     continue
341                 address = hash_to_address(chr(self.addrtype), _hash)
342                 txoutputs.append(address)
343             txpoint['outputs'] = txoutputs
344
345             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
346             if not txpoint['is_input']:
347                 # detect if already redeemed...
348                 for row in outrows:
349                     if row[6] == dbhash: break
350                 else:
351                     raise
352                 #row = self.get_tx_output(tx_id,dbhash)
353                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
354                 # if not redeemed, we add the script
355                 if row:
356                     if not row[4]: txpoint['raw_output_script'] = row[1]
357
358             txpoint.pop('tx_id')
359
360         # cache result
361         # do not cache mempool results because statuses are ambiguous
362         if not address_has_mempool:
363             self.tx_cache[addr] = txpoints
364         
365         return txpoints
366
367
368     def get_status(self,addr):
369         # get address status, i.e. the last block for that address.
370         tx_points = self.get_history(addr)
371         if not tx_points:
372             status = None
373         else:
374             lastpoint = tx_points[-1]
375             status = lastpoint['block_hash']
376             # this is a temporary hack; move it up once old clients have disappeared
377             if status == 'mempool': # and session['version'] != "old":
378                 status = status + ':%d'% len(tx_points)
379         return status
380
381
382
383     def memorypool_update(store):
384
385         ds = BCDataStream.BCDataStream()
386         postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
387
388         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
389         r = loads(respdata)
390         if r['error'] != None:
391             return
392
393         v = r['result'].get('transactions')
394         for hextx in v:
395             ds.clear()
396             ds.write(hextx.decode('hex'))
397             tx = deserialize.parse_Transaction(ds)
398             tx['hash'] = util.double_sha256(tx['tx'])
399             tx_hash = store.hashin(tx['hash'])
400
401             if store.tx_find_id_and_value(tx):
402                 pass
403             else:
404                 tx_id = store.import_tx(tx, False)
405                 store.update_tx_cache(tx_id)
406                 #print tx_hash
407     
408         store.commit()
409
410
411     def send_tx(self,tx):
412         postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
413         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
414         r = loads(respdata)
415         if r['error'] != None:
416             msg = r['error'].get('message')
417             out = "error: transaction rejected by memorypool: " + msg + "\n" + tx
418         else:
419             out = r['result']
420         return out
421
422
423     def main_iteration(store):
424         with store.dblock:
425             store.catch_up()
426             store.memorypool_update()
427             block_number = store.get_block_number(1)
428             return block_number
429
430
431
432
433     def catch_up(store):
434         # if there is an exception, do rollback and then re-raise the exception
435         for dircfg in store.datadirs:
436             try:
437                 store.catch_up_dir(dircfg)
438             except Exception, e:
439                 store.log.exception("Failed to catch up %s", dircfg)
440                 store.rollback()
441                 raise e
442
443
444
445
446 from processor import Processor
447
448 class BlockchainProcessor(Processor):
449
450     def __init__(self, config):
451         Processor.__init__(self)
452         self.store = AbeStore(config)
453         self.block_number = -1
454         self.watched_addresses = []
455
456         # catch_up first
457         n = self.store.main_iteration()
458         print "blockchain: %d blocks"%n
459
460         threading.Timer(10, self.run_store_iteration).start()
461
462     def process(self, request):
463         #print "abe process", request
464
465         message_id = request['id']
466         method = request['method']
467         params = request.get('params',[])
468         result = None
469         error = None
470
471         if method == 'blockchain.numblocks.subscribe':
472             result = self.block_number
473
474         elif method == 'blockchain.address.subscribe':
475             try:
476                 address = params[0]
477                 result = self.store.get_status(address)
478                 self.watch_address(address)
479             except BaseException, e:
480                 error = str(e) + ': ' + address
481                 print "error:", error
482
483         elif method == 'blockchain.address.get_history':
484             try:
485                 address = params[0]
486                 result = self.store.get_history( address ) 
487             except BaseException, e:
488                 error = str(e) + ': ' + address
489                 print "error:", error
490
491         elif method == 'blockchain.transaction.broadcast':
492             txo = self.store.send_tx(params[0])
493             print "sent tx:", txo
494             result = txo 
495
496         else:
497             error = "unknown method:%s"%method
498
499
500         if error:
501             response = { 'id':message_id, 'error':error }
502             self.push_response(response)
503         elif result != '':
504             response = { 'id':message_id, 'result':result }
505             self.push_response(response)
506
507
508     def watch_address(self, addr):
509         if addr not in self.watched_addresses:
510             self.watched_addresses.append(addr)
511
512
513     def run_store_iteration(self):
514         
515         try:
516             block_number = self.store.main_iteration()
517         except:
518             traceback.print_exc(file=sys.stdout)
519             print "terminating"
520             self.shared.stop()
521
522         if self.shared.stopped(): 
523             print "exit timer"
524             return
525
526         if self.block_number != block_number:
527             self.block_number = block_number
528             print "block number:", self.block_number
529             self.push_response({ 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
530
531         while True:
532             try:
533                 addr = self.store.address_queue.get(False)
534             except:
535                 break
536             if addr in self.watched_addresses:
537                 status = self.store.get_status( addr )
538                 self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr, status] })
539
540         threading.Timer(10, self.run_store_iteration).start()
541
542