Generalize the way to setup the server to use Litecoin.
[electrum-server.git] / backends / abe / __init__.py
1 from Abe.util import hash_to_address, decode_check_address
2 from Abe.DataStore import DataStore as Datastore_class
3 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
4
5 import binascii
6
7 import thread, traceback, sys, urllib, operator
8 from json import dumps, loads
9 from Queue import Queue
10 import time, threading
11
12
13 class AbeStore(Datastore_class):
14
15     def __init__(self, config):
16         conf = DataStore.CONFIG_DEFAULTS
17         args, argv = readconf.parse_argv( [], conf)
18         args.dbtype = config.get('database','type')
19         if args.dbtype == 'sqlite3':
20             args.connect_args = { 'database' : config.get('database','database') }
21         elif args.dbtype == 'MySQLdb':
22             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
23         elif args.dbtype == 'psycopg2':
24             args.connect_args = { 'database' : config.get('database','database') }
25
26         coin = config.get('server', 'coin')
27         self.addrtype = 0
28         if coin == 'litecoin':
29             print 'Litecoin settings:'
30             datadir = config.get('server','datadir')
31             print '  datadir = ' + datadir
32             args.datadir = [{"dirname":datadir,"chain":"Litecoin","code3":"LTC","address_version":"\u0030"}]
33             print '  addrtype = 48'
34             self.addrtype = 48
35
36         Datastore_class.__init__(self,args)
37
38         self.sql_limit = int( config.get('database','limit') )
39
40         self.tx_cache = {}
41         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
42
43         self.address_queue = Queue()
44
45         self.dblock = thread.allocate_lock()
46         self.last_tx_id = 0
47
48     
49     def import_tx(self, tx, is_coinbase):
50         tx_id = super(AbeStore, self).import_tx(tx, is_coinbase)
51         self.last_tx_id = tx_id
52         return tx_id
53         
54
55
56
57     def import_block(self, b, chain_ids=frozenset()):
58         #print "import block"
59         block_id = super(AbeStore, self).import_block(b, chain_ids)
60         for pos in xrange(len(b['transactions'])):
61             tx = b['transactions'][pos]
62             if 'hash' not in tx:
63                 tx['hash'] = util.double_sha256(tx['tx'])
64             tx_id = self.tx_find_id_and_value(tx)
65             if tx_id:
66                 self.update_tx_cache(tx_id)
67             else:
68                 print "error: import_block: no tx_id"
69         return block_id
70
71
72     def update_tx_cache(self, txid):
73         inrows = self.get_tx_inputs(txid, False)
74         for row in inrows:
75             _hash = self.binout(row[6])
76             if not _hash:
77                 #print "WARNING: missing tx_in for tx", txid
78                 continue
79
80             address = hash_to_address(chr(self.addrtype), _hash)
81             if self.tx_cache.has_key(address):
82                 print "cache: invalidating", address
83                 self.tx_cache.pop(address)
84             self.address_queue.put(address)
85
86         outrows = self.get_tx_outputs(txid, False)
87         for row in outrows:
88             _hash = self.binout(row[6])
89             if not _hash:
90                 #print "WARNING: missing tx_out for tx", txid
91                 continue
92
93             address = hash_to_address(chr(self.addrtype), _hash)
94             if self.tx_cache.has_key(address):
95                 print "cache: invalidating", address
96                 self.tx_cache.pop(address)
97             self.address_queue.put(address)
98
99     def safe_sql(self,sql, params=(), lock=True):
100
101         error = False
102         try:
103             if lock: self.dblock.acquire()
104             ret = self.selectall(sql,params)
105         except:
106             error = True
107             traceback.print_exc(file=sys.stdout)
108         finally:
109             if lock: self.dblock.release()
110
111         if error: 
112             raise BaseException('sql error')
113
114         return ret
115             
116
117     def get_tx_outputs(self, tx_id, lock=True):
118         return self.safe_sql("""SELECT
119                 txout.txout_pos,
120                 txout.txout_scriptPubKey,
121                 txout.txout_value,
122                 nexttx.tx_hash,
123                 nexttx.tx_id,
124                 txin.txin_pos,
125                 pubkey.pubkey_hash
126               FROM txout
127               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
128               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
129               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
130              WHERE txout.tx_id = %d 
131              ORDER BY txout.txout_pos
132         """%(tx_id), (), lock)
133
134     def get_tx_inputs(self, tx_id, lock=True):
135         return self.safe_sql(""" SELECT
136                 txin.txin_pos,
137                 txin.txin_scriptSig,
138                 txout.txout_value,
139                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
140                 prevtx.tx_id,
141                 COALESCE(txout.txout_pos, u.txout_pos),
142                 pubkey.pubkey_hash
143               FROM txin
144               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
145               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
146               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
147               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
148              WHERE txin.tx_id = %d
149              ORDER BY txin.txin_pos
150              """%(tx_id,), (), lock)
151
152
153     def get_address_out_rows(self, dbhash):
154         out = self.safe_sql(""" SELECT
155                 b.block_nTime,
156                 cc.chain_id,
157                 b.block_height,
158                 1,
159                 b.block_hash,
160                 tx.tx_hash,
161                 tx.tx_id,
162                 txin.txin_pos,
163                 -prevout.txout_value
164               FROM chain_candidate cc
165               JOIN block b ON (b.block_id = cc.block_id)
166               JOIN block_tx ON (block_tx.block_id = b.block_id)
167               JOIN tx ON (tx.tx_id = block_tx.tx_id)
168               JOIN txin ON (txin.tx_id = tx.tx_id)
169               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
170               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
171              WHERE pubkey.pubkey_hash = ?
172                AND cc.in_longest = 1
173              LIMIT ? """, (dbhash,self.sql_limit))
174
175         if len(out)==self.sql_limit: 
176             raise BaseException('limit reached')
177         return out
178
179     def get_address_out_rows_memorypool(self, dbhash):
180         out = self.safe_sql(""" SELECT
181                 1,
182                 tx.tx_hash,
183                 tx.tx_id,
184                 txin.txin_pos,
185                 -prevout.txout_value
186               FROM tx 
187               JOIN txin ON (txin.tx_id = tx.tx_id)
188               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
189               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
190              WHERE pubkey.pubkey_hash = ?
191              LIMIT ? """, (dbhash,self.sql_limit))
192
193         if len(out)==self.sql_limit: 
194             raise BaseException('limit reached')
195         return out
196
197     def get_address_in_rows(self, dbhash):
198         out = self.safe_sql(""" SELECT
199                 b.block_nTime,
200                 cc.chain_id,
201                 b.block_height,
202                 0,
203                 b.block_hash,
204                 tx.tx_hash,
205                 tx.tx_id,
206                 txout.txout_pos,
207                 txout.txout_value
208               FROM chain_candidate cc
209               JOIN block b ON (b.block_id = cc.block_id)
210               JOIN block_tx ON (block_tx.block_id = b.block_id)
211               JOIN tx ON (tx.tx_id = block_tx.tx_id)
212               JOIN txout ON (txout.tx_id = tx.tx_id)
213               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
214              WHERE pubkey.pubkey_hash = ?
215                AND cc.in_longest = 1
216                LIMIT ? """, (dbhash,self.sql_limit))
217
218         if len(out)==self.sql_limit: 
219             raise BaseException('limit reached')
220         return out
221
222     def get_address_in_rows_memorypool(self, dbhash):
223         out = self.safe_sql( """ SELECT
224                 0,
225                 tx.tx_hash,
226                 tx.tx_id,
227                 txout.txout_pos,
228                 txout.txout_value
229               FROM tx
230               JOIN txout ON (txout.tx_id = tx.tx_id)
231               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
232              WHERE pubkey.pubkey_hash = ?
233              LIMIT ? """, (dbhash,self.sql_limit))
234
235         if len(out)==self.sql_limit: 
236             raise BaseException('limit reached')
237         return out
238
239     def get_history(self, addr):
240
241         cached_version = self.tx_cache.get( addr )
242         if cached_version is not None:
243             return cached_version
244
245         version, binaddr = decode_check_address(addr)
246         if binaddr is None:
247             return None
248
249         dbhash = self.binin(binaddr)
250         rows = []
251         rows += self.get_address_out_rows( dbhash )
252         rows += self.get_address_in_rows( dbhash )
253
254         txpoints = []
255         known_tx = []
256
257         for row in rows:
258             try:
259                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
260             except:
261                 print "cannot unpack row", row
262                 break
263             tx_hash = self.hashout_hex(tx_hash)
264             txpoint = {
265                     "timestamp":    int(nTime),
266                     "height":   int(height),
267                     "is_input":    int(is_in),
268                     "block_hash": self.hashout_hex(blk_hash),
269                     "tx_hash":  tx_hash,
270                     "tx_id":    int(tx_id),
271                     "index":      int(pos),
272                     "value":    int(value),
273                     }
274
275             txpoints.append(txpoint)
276             known_tx.append(self.hashout_hex(tx_hash))
277
278
279         # todo: sort them really...
280         txpoints = sorted(txpoints, key=operator.itemgetter("timestamp"))
281
282         # read memory pool
283         rows = []
284         rows += self.get_address_in_rows_memorypool( dbhash )
285         rows += self.get_address_out_rows_memorypool( dbhash )
286         address_has_mempool = False
287
288         for row in rows:
289             is_in, tx_hash, tx_id, pos, value = row
290             tx_hash = self.hashout_hex(tx_hash)
291             if tx_hash in known_tx:
292                 continue
293
294             # discard transactions that are too old
295             if self.last_tx_id - tx_id > 50000:
296                 print "discarding tx id", tx_id
297                 continue
298
299             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
300             address_has_mempool = True
301
302             #print "mempool", tx_hash
303             txpoint = {
304                     "timestamp":    0,
305                     "height":   0,
306                     "is_input":    int(is_in),
307                     "block_hash": 'mempool', 
308                     "tx_hash":  tx_hash,
309                     "tx_id":    int(tx_id),
310                     "index":      int(pos),
311                     "value":    int(value),
312                     }
313             txpoints.append(txpoint)
314
315
316         for txpoint in txpoints:
317             tx_id = txpoint['tx_id']
318             
319             txinputs = []
320             inrows = self.get_tx_inputs(tx_id)
321             for row in inrows:
322                 _hash = self.binout(row[6])
323                 if not _hash:
324                     #print "WARNING: missing tx_in for tx", tx_id, addr
325                     continue
326                 address = hash_to_address(chr(self.addrtype), _hash)
327                 txinputs.append(address)
328             txpoint['inputs'] = txinputs
329             txoutputs = []
330             outrows = self.get_tx_outputs(tx_id)
331             for row in outrows:
332                 _hash = self.binout(row[6])
333                 if not _hash:
334                     #print "WARNING: missing tx_out for tx", tx_id, addr
335                     continue
336                 address = hash_to_address(chr(self.addrtype), _hash)
337                 txoutputs.append(address)
338             txpoint['outputs'] = txoutputs
339
340             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
341             if not txpoint['is_input']:
342                 # detect if already redeemed...
343                 for row in outrows:
344                     if row[6] == dbhash: break
345                 else:
346                     raise
347                 #row = self.get_tx_output(tx_id,dbhash)
348                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
349                 # if not redeemed, we add the script
350                 if row:
351                     if not row[4]: txpoint['raw_output_script'] = row[1]
352
353             txpoint.pop('tx_id')
354
355         # cache result
356         # do not cache mempool results because statuses are ambiguous
357         if not address_has_mempool:
358             self.tx_cache[addr] = txpoints
359         
360         return txpoints
361
362
363     def get_status(self,addr):
364         # get address status, i.e. the last block for that address.
365         tx_points = self.get_history(addr)
366         if not tx_points:
367             status = None
368         else:
369             lastpoint = tx_points[-1]
370             status = lastpoint['block_hash']
371             # this is a temporary hack; move it up once old clients have disappeared
372             if status == 'mempool': # and session['version'] != "old":
373                 status = status + ':%d'% len(tx_points)
374         return status
375
376
377
378     def memorypool_update(store):
379
380         ds = BCDataStream.BCDataStream()
381         postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
382
383         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
384         r = loads(respdata)
385         if r['error'] != None:
386             return
387
388         v = r['result'].get('transactions')
389         for hextx in v:
390             ds.clear()
391             ds.write(hextx.decode('hex'))
392             tx = deserialize.parse_Transaction(ds)
393             tx['hash'] = util.double_sha256(tx['tx'])
394             tx_hash = store.hashin(tx['hash'])
395
396             if store.tx_find_id_and_value(tx):
397                 pass
398             else:
399                 tx_id = store.import_tx(tx, False)
400                 store.update_tx_cache(tx_id)
401                 #print tx_hash
402     
403         store.commit()
404
405
406     def send_tx(self,tx):
407         postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
408         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
409         r = loads(respdata)
410         if r['error'] != None:
411             msg = r['error'].get('message')
412             out = "error: transaction rejected by memorypool: " + msg + "\n" + tx
413         else:
414             out = r['result']
415         return out
416
417
418     def main_iteration(store):
419         with store.dblock:
420             store.catch_up()
421             store.memorypool_update()
422             block_number = store.get_block_number(1)
423             return block_number
424
425
426
427
428     def catch_up(store):
429         # if there is an exception, do rollback and then re-raise the exception
430         for dircfg in store.datadirs:
431             try:
432                 store.catch_up_dir(dircfg)
433             except Exception, e:
434                 store.log.exception("Failed to catch up %s", dircfg)
435                 store.rollback()
436                 raise e
437
438
439
440
441 from processor import Processor
442
443 class BlockchainProcessor(Processor):
444
445     def __init__(self, config):
446         Processor.__init__(self)
447         self.store = AbeStore(config)
448         self.block_number = -1
449         self.watched_addresses = []
450
451         # catch_up first
452         n = self.store.main_iteration()
453         print "blockchain: %d blocks"%n
454
455         threading.Timer(10, self.run_store_iteration).start()
456
457     def process(self, request):
458         #print "abe process", request
459
460         message_id = request['id']
461         method = request['method']
462         params = request.get('params',[])
463         result = None
464         error = None
465
466         if method == 'blockchain.numblocks.subscribe':
467             result = self.block_number
468
469         elif method == 'blockchain.address.subscribe':
470             try:
471                 address = params[0]
472                 result = self.store.get_status(address)
473                 self.watch_address(address)
474             except BaseException, e:
475                 error = str(e) + ': ' + address
476                 print "error:", error
477
478         elif method == 'blockchain.address.get_history':
479             try:
480                 address = params[0]
481                 result = self.store.get_history( address ) 
482             except BaseException, e:
483                 error = str(e) + ': ' + address
484                 print "error:", error
485
486         elif method == 'blockchain.transaction.broadcast':
487             txo = self.store.send_tx(params[0])
488             print "sent tx:", txo
489             result = txo 
490
491         else:
492             error = "unknown method:%s"%method
493
494
495         if error:
496             response = { 'id':message_id, 'error':error }
497             self.push_response(response)
498         elif result != '':
499             response = { 'id':message_id, 'result':result }
500             self.push_response(response)
501
502
503     def watch_address(self, addr):
504         if addr not in self.watched_addresses:
505             self.watched_addresses.append(addr)
506
507
508     def run_store_iteration(self):
509         
510         try:
511             block_number = self.store.main_iteration()
512         except:
513             traceback.print_exc(file=sys.stdout)
514             print "terminating"
515             self.shared.stop()
516
517         if self.shared.stopped(): 
518             print "exit timer"
519             return
520
521         if self.block_number != block_number:
522             self.block_number = block_number
523             print "block number:", self.block_number
524             self.push_response({ 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
525
526         while True:
527             try:
528                 addr = self.store.address_queue.get(False)
529             except:
530                 break
531             if addr in self.watched_addresses:
532                 status = self.store.get_status( addr )
533                 self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr, status] })
534
535         threading.Timer(10, self.run_store_iteration).start()
536
537