dc322fdd7c3ae7a5d3f2509f6c59e5f3f27660ad
[electrum-server.git] / backends / abe / __init__.py
1 from Abe.util import hash_to_address, decode_check_address
2 from Abe.DataStore import DataStore as Datastore_class
3 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
4
5 import binascii
6
7 import thread, traceback, sys, urllib, operator
8 from json import dumps, loads
9 from Queue import Queue
10 import time, threading
11
12
13 import hashlib
14 encode = lambda x: x[::-1].encode('hex')
15 decode = lambda x: x.decode('hex')[::-1]
16 Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
17
18 def rev_hex(s):
19     return s.decode('hex')[::-1].encode('hex')
20
21 def int_to_hex(i, length=1):
22     s = hex(i)[2:].rstrip('L')
23     s = "0"*(2*length - len(s)) + s
24     return rev_hex(s)
25
26 def header_to_string(res):
27     s = int_to_hex(res.get('version'),4) \
28         + rev_hex(res.get('prev_block_hash')) \
29         + rev_hex(res.get('merkle_root')) \
30         + int_to_hex(int(res.get('timestamp')),4) \
31         + int_to_hex(int(res.get('bits')),4) \
32         + int_to_hex(int(res.get('nonce')),4)
33     return s
34
35
36 class AbeStore(Datastore_class):
37
38     def __init__(self, config):
39         conf = DataStore.CONFIG_DEFAULTS
40         args, argv = readconf.parse_argv( [], conf)
41         args.dbtype = config.get('database','type')
42         if args.dbtype == 'sqlite3':
43             args.connect_args = { 'database' : config.get('database','database') }
44         elif args.dbtype == 'MySQLdb':
45             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
46         elif args.dbtype == 'psycopg2':
47             args.connect_args = { 'database' : config.get('database','database') }
48
49         coin = config.get('server', 'coin')
50         self.addrtype = 0
51         if coin == 'litecoin':
52             print 'Litecoin settings:'
53             datadir = config.get('server','datadir')
54             print '  datadir = ' + datadir
55             args.datadir = [{"dirname":datadir,"chain":"Litecoin","code3":"LTC","address_version":"\u0030"}]
56             print '  addrtype = 48'
57             self.addrtype = 48
58
59         Datastore_class.__init__(self,args)
60
61         # Use 1 (Bitcoin) if chain_id is not sent
62         self.chain_id = self.datadirs[0]["chain_id"] or 1
63         print 'Coin chain_id = %d' % self.chain_id
64
65         self.sql_limit = int( config.get('database','limit') )
66
67         self.tx_cache = {}
68         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
69
70         self.chunk_cache = {}
71
72         self.address_queue = Queue()
73
74         self.lock = threading.Lock()        # for the database
75         self.cache_lock = threading.Lock()  # for the cache
76         self.last_tx_id = 0
77         self.known_mempool_hashes = []
78
79
80     
81     def import_tx(self, tx, is_coinbase):
82         tx_id = super(AbeStore, self).import_tx(tx, is_coinbase)
83         self.last_tx_id = tx_id
84         return tx_id
85         
86
87
88
89     def import_block(self, b, chain_ids=frozenset()):
90         #print "import block"
91         block_id = super(AbeStore, self).import_block(b, chain_ids)
92         for pos in xrange(len(b['transactions'])):
93             tx = b['transactions'][pos]
94             if 'hash' not in tx:
95                 tx['hash'] = util.double_sha256(tx['tx'])
96             tx_id = self.tx_find_id_and_value(tx)
97             if tx_id:
98                 self.update_tx_cache(tx_id)
99             else:
100                 print "error: import_block: no tx_id"
101         return block_id
102
103
104     def update_tx_cache(self, txid):
105         inrows = self.get_tx_inputs(txid, False)
106         for row in inrows:
107             _hash = self.binout(row[6])
108             if not _hash:
109                 #print "WARNING: missing tx_in for tx", txid
110                 continue
111
112             address = hash_to_address(chr(self.addrtype), _hash)
113             with self.cache_lock:
114                 if self.tx_cache.has_key(address):
115                     print "cache: invalidating", address
116                     self.tx_cache.pop(address)
117
118             self.address_queue.put(address)
119
120         outrows = self.get_tx_outputs(txid, False)
121         for row in outrows:
122             _hash = self.binout(row[6])
123             if not _hash:
124                 #print "WARNING: missing tx_out for tx", txid
125                 continue
126
127             address = hash_to_address(chr(self.addrtype), _hash)
128             with self.cache_lock:
129                 if self.tx_cache.has_key(address):
130                     print "cache: invalidating", address
131                     self.tx_cache.pop(address)
132
133             self.address_queue.put(address)
134
135     def safe_sql(self,sql, params=(), lock=True):
136
137         error = False
138         try:
139             if lock: self.lock.acquire()
140             ret = self.selectall(sql,params)
141         except:
142             error = True
143             traceback.print_exc(file=sys.stdout)
144         finally:
145             if lock: self.lock.release()
146
147         if error: 
148             raise BaseException('sql error')
149
150         return ret
151             
152
153     def get_tx_outputs(self, tx_id, lock=True):
154         return self.safe_sql("""SELECT
155                 txout.txout_pos,
156                 txout.txout_scriptPubKey,
157                 txout.txout_value,
158                 nexttx.tx_hash,
159                 nexttx.tx_id,
160                 txin.txin_pos,
161                 pubkey.pubkey_hash
162               FROM txout
163               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
164               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
165               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
166              WHERE txout.tx_id = %d 
167              ORDER BY txout.txout_pos
168         """%(tx_id), (), lock)
169
170     def get_tx_inputs(self, tx_id, lock=True):
171         return self.safe_sql(""" SELECT
172                 txin.txin_pos,
173                 txin.txin_scriptSig,
174                 txout.txout_value,
175                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
176                 prevtx.tx_id,
177                 COALESCE(txout.txout_pos, u.txout_pos),
178                 pubkey.pubkey_hash
179               FROM txin
180               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
181               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
182               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
183               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
184              WHERE txin.tx_id = %d
185              ORDER BY txin.txin_pos
186              """%(tx_id,), (), lock)
187
188
189     def get_address_out_rows(self, dbhash):
190         out = self.safe_sql(""" SELECT
191                 b.block_nTime,
192                 cc.chain_id,
193                 b.block_height,
194                 1,
195                 b.block_hash,
196                 tx.tx_hash,
197                 tx.tx_id,
198                 txin.txin_pos,
199                 -prevout.txout_value
200               FROM chain_candidate cc
201               JOIN block b ON (b.block_id = cc.block_id)
202               JOIN block_tx ON (block_tx.block_id = b.block_id)
203               JOIN tx ON (tx.tx_id = block_tx.tx_id)
204               JOIN txin ON (txin.tx_id = tx.tx_id)
205               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
206               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
207              WHERE pubkey.pubkey_hash = ?
208                AND cc.chain_id = ?
209                AND cc.in_longest = 1
210              LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
211
212         if len(out)==self.sql_limit: 
213             raise BaseException('limit reached')
214         return out
215
216     def get_address_out_rows_memorypool(self, dbhash):
217         out = self.safe_sql(""" SELECT
218                 1,
219                 tx.tx_hash,
220                 tx.tx_id,
221                 txin.txin_pos,
222                 -prevout.txout_value
223               FROM tx 
224               JOIN txin ON (txin.tx_id = tx.tx_id)
225               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
226               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
227              WHERE pubkey.pubkey_hash = ?
228              LIMIT ? """, (dbhash,self.sql_limit))
229
230         if len(out)==self.sql_limit: 
231             raise BaseException('limit reached')
232         return out
233
234     def get_address_in_rows(self, dbhash):
235         out = self.safe_sql(""" SELECT
236                 b.block_nTime,
237                 cc.chain_id,
238                 b.block_height,
239                 0,
240                 b.block_hash,
241                 tx.tx_hash,
242                 tx.tx_id,
243                 txout.txout_pos,
244                 txout.txout_value
245               FROM chain_candidate cc
246               JOIN block b ON (b.block_id = cc.block_id)
247               JOIN block_tx ON (block_tx.block_id = b.block_id)
248               JOIN tx ON (tx.tx_id = block_tx.tx_id)
249               JOIN txout ON (txout.tx_id = tx.tx_id)
250               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
251              WHERE pubkey.pubkey_hash = ?
252                AND cc.chain_id = ?
253                AND cc.in_longest = 1
254                LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
255
256         if len(out)==self.sql_limit: 
257             raise BaseException('limit reached')
258         return out
259
260     def get_address_in_rows_memorypool(self, dbhash):
261         out = self.safe_sql( """ SELECT
262                 0,
263                 tx.tx_hash,
264                 tx.tx_id,
265                 txout.txout_pos,
266                 txout.txout_value
267               FROM tx
268               JOIN txout ON (txout.tx_id = tx.tx_id)
269               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
270              WHERE pubkey.pubkey_hash = ?
271              LIMIT ? """, (dbhash,self.sql_limit))
272
273         if len(out)==self.sql_limit: 
274             raise BaseException('limit reached')
275         return out
276
277     def get_history(self, addr):
278
279         with self.cache_lock:
280             cached_version = self.tx_cache.get( addr )
281             if cached_version is not None:
282                 return cached_version
283
284         version, binaddr = decode_check_address(addr)
285         if binaddr is None:
286             return None
287
288         dbhash = self.binin(binaddr)
289         rows = []
290         rows += self.get_address_out_rows( dbhash )
291         rows += self.get_address_in_rows( dbhash )
292
293         txpoints = []
294         known_tx = []
295
296         for row in rows:
297             try:
298                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
299             except:
300                 print "cannot unpack row", row
301                 break
302             tx_hash = self.hashout_hex(tx_hash)
303             txpoint = {
304                     "timestamp":    int(nTime),
305                     "height":   int(height),
306                     "is_input":    int(is_in),
307                     "block_hash": self.hashout_hex(blk_hash),
308                     "tx_hash":  tx_hash,
309                     "tx_id":    int(tx_id),
310                     "index":      int(pos),
311                     "value":    int(value),
312                     }
313
314             txpoints.append(txpoint)
315             known_tx.append(self.hashout_hex(tx_hash))
316
317
318         # todo: sort them really...
319         txpoints = sorted(txpoints, key=operator.itemgetter("timestamp"))
320
321         # read memory pool
322         rows = []
323         rows += self.get_address_in_rows_memorypool( dbhash )
324         rows += self.get_address_out_rows_memorypool( dbhash )
325         address_has_mempool = False
326
327         for row in rows:
328             is_in, tx_hash, tx_id, pos, value = row
329             tx_hash = self.hashout_hex(tx_hash)
330             if tx_hash in known_tx:
331                 continue
332
333             # discard transactions that are too old
334             if self.last_tx_id - tx_id > 50000:
335                 print "discarding tx id", tx_id
336                 continue
337
338             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
339             address_has_mempool = True
340
341             #print "mempool", tx_hash
342             txpoint = {
343                     "timestamp":    0,
344                     "height":   0,
345                     "is_input":    int(is_in),
346                     "block_hash": 'mempool', 
347                     "tx_hash":  tx_hash,
348                     "tx_id":    int(tx_id),
349                     "index":      int(pos),
350                     "value":    int(value),
351                     }
352             txpoints.append(txpoint)
353
354
355         for txpoint in txpoints:
356             tx_id = txpoint['tx_id']
357             
358             txinputs = []
359             inrows = self.get_tx_inputs(tx_id)
360             for row in inrows:
361                 _hash = self.binout(row[6])
362                 if not _hash:
363                     #print "WARNING: missing tx_in for tx", tx_id, addr
364                     continue
365                 address = hash_to_address(chr(self.addrtype), _hash)
366                 txinputs.append(address)
367             txpoint['inputs'] = txinputs
368             txoutputs = []
369             outrows = self.get_tx_outputs(tx_id)
370             for row in outrows:
371                 _hash = self.binout(row[6])
372                 if not _hash:
373                     #print "WARNING: missing tx_out for tx", tx_id, addr
374                     continue
375                 address = hash_to_address(chr(self.addrtype), _hash)
376                 txoutputs.append(address)
377             txpoint['outputs'] = txoutputs
378
379             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
380             if not txpoint['is_input']:
381                 # detect if already redeemed...
382                 for row in outrows:
383                     if row[6] == dbhash: break
384                 else:
385                     raise
386                 #row = self.get_tx_output(tx_id,dbhash)
387                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
388                 # if not redeemed, we add the script
389                 if row:
390                     if not row[4]: txpoint['raw_output_script'] = row[1]
391
392             txpoint.pop('tx_id')
393
394         # cache result
395         # do not cache mempool results because statuses are ambiguous
396         if not address_has_mempool:
397             with self.cache_lock:
398                 self.tx_cache[addr] = txpoints
399         
400         return txpoints
401
402     def get_history2(self, addr):
403         h = self.get_history(addr)
404         out = map(lambda x: {'tx_hash':x['tx_hash'], 'height':x['height']}, h)
405         out2 = []
406         for item in out:
407             if item not in out2: out2.append(item)
408         return out2
409
410
411     def get_status(self,addr):
412         # get address status, i.e. the last block for that address.
413         tx_points = self.get_history(addr)
414         if not tx_points:
415             status = None
416         else:
417             lastpoint = tx_points[-1]
418             status = lastpoint['block_hash']
419             # this is a temporary hack; move it up once old clients have disappeared
420             if status == 'mempool': # and session['version'] != "old":
421                 status = status + ':%d'% len(tx_points)
422         return status
423
424     def get_status2(self,addr):
425         # for 0.5 clients
426         tx_points = self.get_history2(addr)
427         if not tx_points:
428             return None
429
430         status = ''
431         for tx in tx_points:
432             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
433         return hashlib.sha256( status ).digest().encode('hex')
434
435
436     def get_block_header(self, block_height):
437         out = self.safe_sql("""
438             SELECT
439                 block_hash,
440                 block_version,
441                 block_hashMerkleRoot,
442                 block_nTime,
443                 block_nBits,
444                 block_nNonce,
445                 block_height,
446                 prev_block_hash,
447                 block_id
448               FROM chain_summary
449              WHERE block_height = %d AND in_longest = 1"""%block_height)
450
451         if not out: raise BaseException("block not found")
452         row = out[0]
453         (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height,prev_block_hash, block_id) \
454             = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
455
456         out = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash, 
457                 "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce}
458         return out
459         
460
461     def get_chunk(self, index):
462         with self.cache_lock:
463             msg = self.chunk_cache.get(index)
464             if msg: return msg
465
466         sql = """
467             SELECT
468                 block_hash,
469                 block_version,
470                 block_hashMerkleRoot,
471                 block_nTime,
472                 block_nBits,
473                 block_nNonce,
474                 block_height,
475                 prev_block_hash,
476                 block_height
477               FROM chain_summary
478              WHERE block_height >= %d AND block_height< %d AND in_longest = 1"""%(index*2016, (index+1)*2016)
479
480         out = self.safe_sql(sql)
481         msg = ''
482         for row in out:
483             (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height, prev_block_hash, block_height) \
484                 = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
485             h = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash, 
486                    "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce}
487
488             if h.get('block_height')==0: h['prev_block_hash'] = "0"*64
489             msg += header_to_string(h)
490
491             #print "hash", encode(Hash(msg.decode('hex')))
492             #if h.get('block_height')==1:break
493
494         with self.cache_lock:
495             self.chunk_cache[index] = msg
496         print "get_chunk", index, len(msg)
497         return msg
498
499
500
501     def get_raw_tx(self, tx_hash, height):
502         postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash, 0, height], 'id':'jsonrpc'})
503         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
504         r = loads(respdata)
505         if r['error'] != None:
506             raise BaseException(r['error'])
507
508         hextx = r.get('result')
509         return hextx
510
511
512     def get_tx_merkle(self, tx_hash):
513
514         out = self.safe_sql("""
515              SELECT block_tx.block_id FROM tx 
516              JOIN block_tx on tx.tx_id = block_tx.tx_id 
517              JOIN chain_summary on chain_summary.block_id = block_tx.block_id
518              WHERE tx_hash='%s' AND in_longest = 1"""%tx_hash)
519
520         if not out: raise BaseException("not in a block")
521         block_id = int(out[0][0])
522
523         # get block height
524         out = self.safe_sql("SELECT block_height FROM chain_summary WHERE block_id = %d AND in_longest = 1"%block_id)
525
526         if not out: raise BaseException("block not found")
527         block_height = int(out[0][0])
528
529         merkle = []
530         tx_pos = None
531
532         # list all tx in block
533         for row in self.safe_sql("""
534             SELECT DISTINCT tx_id, tx_pos, tx_hash
535               FROM txin_detail
536              WHERE block_id = ?
537              ORDER BY tx_pos""", (block_id,)):
538             _id, _pos, _hash = row
539             merkle.append(_hash)
540             if _hash == tx_hash: tx_pos = int(_pos)
541
542         # find subset.
543         # TODO: do not compute this on client request, better store the hash tree of each block in a database...
544
545         merkle = map(decode, merkle)
546         target_hash = decode(tx_hash)
547
548         s = []
549         while len(merkle) != 1:
550             if len(merkle)%2: merkle.append( merkle[-1] )
551             n = []
552             while merkle:
553                 new_hash = Hash( merkle[0] + merkle[1] )
554                 if merkle[0] == target_hash:
555                     s.append( encode(merkle[1]))
556                     target_hash = new_hash
557                 elif merkle[1] == target_hash:
558                     s.append( encode(merkle[0]))
559                     target_hash = new_hash
560                 n.append( new_hash )
561                 merkle = merkle[2:]
562             merkle = n
563
564         # send result
565         return {"block_height":block_height, "merkle":s, "pos":tx_pos}
566
567
568
569
570     def memorypool_update(store):
571
572         ds = BCDataStream.BCDataStream()
573         postdata = dumps({"method": 'getrawmempool', 'params': [], 'id':'jsonrpc'})
574         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
575         r = loads(respdata)
576         if r['error'] != None:
577             print r['error']
578             return
579
580         mempool_hashes = r.get('result')
581         for tx_hash in mempool_hashes:
582
583             if tx_hash in store.known_mempool_hashes: continue
584             store.known_mempool_hashes.append(tx_hash)
585
586             postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash], 'id':'jsonrpc'})
587             respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
588             r = loads(respdata)
589             if r['error'] != None:
590                 continue
591             hextx = r.get('result')
592             ds.clear()
593             ds.write(hextx.decode('hex'))
594             tx = deserialize.parse_Transaction(ds)
595             tx['hash'] = util.double_sha256(tx['tx'])
596                 
597             if store.tx_find_id_and_value(tx):
598                 pass
599             else:
600                 tx_id = store.import_tx(tx, False)
601                 store.update_tx_cache(tx_id)
602                 #print tx_hash
603
604         store.commit()
605         store.known_mempool_hashes = mempool_hashes
606
607
608     def send_tx(self,tx):
609         postdata = dumps({"method": 'sendrawtransaction', 'params': [tx], 'id':'jsonrpc'})
610         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
611         r = loads(respdata)
612         if r['error'] != None:
613             msg = r['error'].get('message')
614             out = "error: transaction rejected by memorypool: " + msg + "\n" + tx
615         else:
616             out = r['result']
617         return out
618
619
620     def main_iteration(self):
621         with self.lock:
622             self.catch_up()
623             self.memorypool_update()
624             height = self.get_block_number( self.chain_id )
625             try: self.chunk_cache.pop(height/2016) 
626             except: pass
627
628         block_header = self.get_block_header( height )
629         return block_header
630
631
632
633
634     def catch_up(store):
635         # if there is an exception, do rollback and then re-raise the exception
636         for dircfg in store.datadirs:
637             try:
638                 store.catch_up_dir(dircfg)
639             except Exception, e:
640                 store.log.exception("Failed to catch up %s", dircfg)
641                 store.rollback()
642                 raise e
643
644
645
646
647 from processor import Processor
648
649 class BlockchainProcessor(Processor):
650
651     def __init__(self, config):
652         Processor.__init__(self)
653         self.store = AbeStore(config)
654         self.watched_addresses = []
655
656         # catch_up first
657         self.block_header = self.store.main_iteration()
658         self.block_number = self.block_header.get('block_height')
659         print "blockchain: %d blocks"%self.block_number
660
661         threading.Timer(10, self.run_store_iteration).start()
662
663     def process(self, request):
664         #print "abe process", request
665
666         message_id = request['id']
667         method = request['method']
668         params = request.get('params',[])
669         result = None
670         error = None
671
672         if method == 'blockchain.numblocks.subscribe':
673             result = self.block_number
674
675         elif method == 'blockchain.headers.subscribe':
676             result = self.block_header
677
678         elif method == 'blockchain.address.subscribe':
679             try:
680                 address = params[0]
681                 result = self.store.get_status(address)
682                 self.watch_address(address)
683             except BaseException, e:
684                 error = str(e) + ': ' + address
685                 print "error:", error
686
687         elif method == 'blockchain.address.subscribe2':
688             try:
689                 address = params[0]
690                 result = self.store.get_status2(address)
691                 self.watch_address(address)
692             except BaseException, e:
693                 error = str(e) + ': ' + address
694                 print "error:", error
695
696         elif method == 'blockchain.address.get_history':
697             try:
698                 address = params[0]
699                 result = self.store.get_history( address ) 
700             except BaseException, e:
701                 error = str(e) + ': ' + address
702                 print "error:", error
703
704         elif method == 'blockchain.address.get_history2':
705             try:
706                 address = params[0]
707                 result = self.store.get_history2( address ) 
708             except BaseException, e:
709                 error = str(e) + ': ' + address
710                 print "error:", error
711
712         elif method == 'blockchain.block.get_header':
713             try:
714                 height = params[0]
715                 result = self.store.get_block_header( height ) 
716             except BaseException, e:
717                 error = str(e) + ': %d'% height
718                 print "error:", error
719
720         elif method == 'blockchain.block.get_chunk':
721             try:
722                 index = params[0]
723                 result = self.store.get_chunk( index ) 
724             except BaseException, e:
725                 error = str(e) + ': %d'% index
726                 print "error:", error
727
728         elif method == 'blockchain.transaction.broadcast':
729             txo = self.store.send_tx(params[0])
730             print "sent tx:", txo
731             result = txo 
732
733         elif method == 'blockchain.transaction.get_merkle':
734             try:
735                 tx_hash = params[0]
736                 result = self.store.get_tx_merkle(tx_hash ) 
737             except BaseException, e:
738                 error = str(e) + ': ' + tx_hash
739                 print "error:", error
740
741         elif method == 'blockchain.transaction.get':
742             try:
743                 tx_hash = params[0]
744                 height = params[1]
745                 result = self.store.get_raw_tx(tx_hash, height ) 
746             except BaseException, e:
747                 error = str(e) + ': ' + tx_hash
748                 print "error:", error
749
750         else:
751             error = "unknown method:%s"%method
752
753
754         if error:
755             response = { 'id':message_id, 'error':error }
756             self.push_response(response)
757         elif result != '':
758             response = { 'id':message_id, 'result':result }
759             self.push_response(response)
760
761
762     def watch_address(self, addr):
763         if addr not in self.watched_addresses:
764             self.watched_addresses.append(addr)
765
766
767     def run_store_iteration(self):
768         
769         try:
770             t1 = time.time()
771             block_header = self.store.main_iteration()
772             t2 = time.time() - t1
773         except:
774             traceback.print_exc(file=sys.stdout)
775             print "terminating"
776             self.shared.stop()
777
778         if self.shared.stopped(): 
779             print "exit timer"
780             return
781
782         if self.block_number != block_header.get('block_height'):
783             self.block_number = block_header.get('block_height')
784             print "block number: %d  (%.3f seconds)"%(self.block_number, t2)
785             self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
786
787         if self.block_header != block_header:
788             self.block_header = block_header
789             self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.block_header] })
790
791
792         while True:
793             try:
794                 addr = self.store.address_queue.get(False)
795             except:
796                 break
797             if addr in self.watched_addresses:
798                 status = self.store.get_status( addr )
799                 status2 = self.store.get_status2( addr )
800                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
801                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe2', 'params':[addr, status2] })
802
803         threading.Timer(10, self.run_store_iteration).start()
804
805