rm backward compatibility for old clients on abe
[electrum-server.git] / backends / abe / __init__.py
1 from Abe.util import hash_to_address, decode_check_address
2 from Abe.DataStore import DataStore as Datastore_class
3 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
4
5 import binascii
6
7 import thread, traceback, sys, urllib, operator
8 from json import dumps, loads
9 from Queue import Queue
10 import time, threading
11
12
13 import hashlib
14 encode = lambda x: x[::-1].encode('hex')
15 decode = lambda x: x.decode('hex')[::-1]
16 Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
17
18 def rev_hex(s):
19     return s.decode('hex')[::-1].encode('hex')
20
21 def int_to_hex(i, length=1):
22     s = hex(i)[2:].rstrip('L')
23     s = "0"*(2*length - len(s)) + s
24     return rev_hex(s)
25
26 def header_to_string(res):
27     s = int_to_hex(res.get('version'),4) \
28         + rev_hex(res.get('prev_block_hash')) \
29         + rev_hex(res.get('merkle_root')) \
30         + int_to_hex(int(res.get('timestamp')),4) \
31         + int_to_hex(int(res.get('bits')),4) \
32         + int_to_hex(int(res.get('nonce')),4)
33     return s
34
35
36 class AbeStore(Datastore_class):
37
38     def __init__(self, config):
39         conf = DataStore.CONFIG_DEFAULTS
40         args, argv = readconf.parse_argv( [], conf)
41         args.dbtype = config.get('database','type')
42         if args.dbtype == 'sqlite3':
43             args.connect_args = { 'database' : config.get('database','database') }
44         elif args.dbtype == 'MySQLdb':
45             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
46         elif args.dbtype == 'psycopg2':
47             args.connect_args = { 'database' : config.get('database','database') }
48
49         coin = config.get('server', 'coin')
50         self.addrtype = 0
51         if coin == 'litecoin':
52             print 'Litecoin settings:'
53             datadir = config.get('server','datadir')
54             print '  datadir = ' + datadir
55             args.datadir = [{"dirname":datadir,"chain":"Litecoin","code3":"LTC","address_version":"\u0030"}]
56             print '  addrtype = 48'
57             self.addrtype = 48
58
59         Datastore_class.__init__(self,args)
60
61         # Use 1 (Bitcoin) if chain_id is not sent
62         self.chain_id = self.datadirs[0]["chain_id"] or 1
63         print 'Coin chain_id = %d' % self.chain_id
64
65         self.sql_limit = int( config.get('database','limit') )
66
67         self.tx_cache = {}
68         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
69
70         self.chunk_cache = {}
71
72         self.address_queue = Queue()
73
74         self.lock = threading.Lock()        # for the database
75         self.cache_lock = threading.Lock()  # for the cache
76         self.last_tx_id = 0
77         self.known_mempool_hashes = []
78
79
80     
81     def import_tx(self, tx, is_coinbase):
82         tx_id = super(AbeStore, self).import_tx(tx, is_coinbase)
83         self.last_tx_id = tx_id
84         return tx_id
85         
86
87
88
89     def import_block(self, b, chain_ids=frozenset()):
90         #print "import block"
91         block_id = super(AbeStore, self).import_block(b, chain_ids)
92         for pos in xrange(len(b['transactions'])):
93             tx = b['transactions'][pos]
94             if 'hash' not in tx:
95                 tx['hash'] = util.double_sha256(tx['tx'])
96             tx_id = self.tx_find_id_and_value(tx)
97             if tx_id:
98                 self.update_tx_cache(tx_id)
99             else:
100                 print "error: import_block: no tx_id"
101         return block_id
102
103
104     def update_tx_cache(self, txid):
105         inrows = self.get_tx_inputs(txid, False)
106         for row in inrows:
107             _hash = self.binout(row[6])
108             if not _hash:
109                 #print "WARNING: missing tx_in for tx", txid
110                 continue
111
112             address = hash_to_address(chr(self.addrtype), _hash)
113             with self.cache_lock:
114                 if self.tx_cache.has_key(address):
115                     print "cache: invalidating", address
116                     self.tx_cache.pop(address)
117
118             self.address_queue.put(address)
119
120         outrows = self.get_tx_outputs(txid, False)
121         for row in outrows:
122             _hash = self.binout(row[6])
123             if not _hash:
124                 #print "WARNING: missing tx_out for tx", txid
125                 continue
126
127             address = hash_to_address(chr(self.addrtype), _hash)
128             with self.cache_lock:
129                 if self.tx_cache.has_key(address):
130                     print "cache: invalidating", address
131                     self.tx_cache.pop(address)
132
133             self.address_queue.put(address)
134
135     def safe_sql(self,sql, params=(), lock=True):
136
137         error = False
138         try:
139             if lock: self.lock.acquire()
140             ret = self.selectall(sql,params)
141         except:
142             error = True
143             traceback.print_exc(file=sys.stdout)
144         finally:
145             if lock: self.lock.release()
146
147         if error: 
148             raise BaseException('sql error')
149
150         return ret
151             
152
153     def get_tx_outputs(self, tx_id, lock=True):
154         return self.safe_sql("""SELECT
155                 txout.txout_pos,
156                 txout.txout_scriptPubKey,
157                 txout.txout_value,
158                 nexttx.tx_hash,
159                 nexttx.tx_id,
160                 txin.txin_pos,
161                 pubkey.pubkey_hash
162               FROM txout
163               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
164               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
165               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
166              WHERE txout.tx_id = %d 
167              ORDER BY txout.txout_pos
168         """%(tx_id), (), lock)
169
170     def get_tx_inputs(self, tx_id, lock=True):
171         return self.safe_sql(""" SELECT
172                 txin.txin_pos,
173                 txin.txin_scriptSig,
174                 txout.txout_value,
175                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
176                 prevtx.tx_id,
177                 COALESCE(txout.txout_pos, u.txout_pos),
178                 pubkey.pubkey_hash
179               FROM txin
180               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
181               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
182               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
183               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
184              WHERE txin.tx_id = %d
185              ORDER BY txin.txin_pos
186              """%(tx_id,), (), lock)
187
188
189     def get_address_out_rows(self, dbhash):
190         out = self.safe_sql(""" SELECT
191                 b.block_nTime,
192                 cc.chain_id,
193                 b.block_height,
194                 1,
195                 b.block_hash,
196                 tx.tx_hash,
197                 tx.tx_id,
198                 txin.txin_pos,
199                 -prevout.txout_value
200               FROM chain_candidate cc
201               JOIN block b ON (b.block_id = cc.block_id)
202               JOIN block_tx ON (block_tx.block_id = b.block_id)
203               JOIN tx ON (tx.tx_id = block_tx.tx_id)
204               JOIN txin ON (txin.tx_id = tx.tx_id)
205               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
206               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
207              WHERE pubkey.pubkey_hash = ?
208                AND cc.chain_id = ?
209                AND cc.in_longest = 1
210              LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
211
212         if len(out)==self.sql_limit: 
213             raise BaseException('limit reached')
214         return out
215
216     def get_address_out_rows_memorypool(self, dbhash):
217         out = self.safe_sql(""" SELECT
218                 1,
219                 tx.tx_hash,
220                 tx.tx_id,
221                 txin.txin_pos,
222                 -prevout.txout_value
223               FROM tx 
224               JOIN txin ON (txin.tx_id = tx.tx_id)
225               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
226               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
227              WHERE pubkey.pubkey_hash = ?
228              LIMIT ? """, (dbhash,self.sql_limit))
229
230         if len(out)==self.sql_limit: 
231             raise BaseException('limit reached')
232         return out
233
234     def get_address_in_rows(self, dbhash):
235         out = self.safe_sql(""" SELECT
236                 b.block_nTime,
237                 cc.chain_id,
238                 b.block_height,
239                 0,
240                 b.block_hash,
241                 tx.tx_hash,
242                 tx.tx_id,
243                 txout.txout_pos,
244                 txout.txout_value
245               FROM chain_candidate cc
246               JOIN block b ON (b.block_id = cc.block_id)
247               JOIN block_tx ON (block_tx.block_id = b.block_id)
248               JOIN tx ON (tx.tx_id = block_tx.tx_id)
249               JOIN txout ON (txout.tx_id = tx.tx_id)
250               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
251              WHERE pubkey.pubkey_hash = ?
252                AND cc.chain_id = ?
253                AND cc.in_longest = 1
254                LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
255
256         if len(out)==self.sql_limit: 
257             raise BaseException('limit reached')
258         return out
259
260     def get_address_in_rows_memorypool(self, dbhash):
261         out = self.safe_sql( """ SELECT
262                 0,
263                 tx.tx_hash,
264                 tx.tx_id,
265                 txout.txout_pos,
266                 txout.txout_value
267               FROM tx
268               JOIN txout ON (txout.tx_id = tx.tx_id)
269               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
270              WHERE pubkey.pubkey_hash = ?
271              LIMIT ? """, (dbhash,self.sql_limit))
272
273         if len(out)==self.sql_limit: 
274             raise BaseException('limit reached')
275         return out
276
277
278
279     def get_history(self, addr, cache_only=False):
280         with self.cache_lock:
281             cached_version = self.tx_cache.get( addr )
282             if cached_version is not None:
283                 return cached_version
284
285         if cache_only: return -1
286
287         version, binaddr = decode_check_address(addr)
288         if binaddr is None:
289             return None
290
291         dbhash = self.binin(binaddr)
292         rows = []
293         rows += self.get_address_out_rows( dbhash )
294         rows += self.get_address_in_rows( dbhash )
295
296         txpoints = []
297         known_tx = []
298
299         for row in rows:
300             try:
301                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
302             except:
303                 print "cannot unpack row", row
304                 break
305             tx_hash = self.hashout_hex(tx_hash)
306             txpoint = {
307                     "timestamp":    int(nTime),
308                     "height":   int(height),
309                     "is_input":    int(is_in),
310                     "block_hash": self.hashout_hex(blk_hash),
311                     "tx_hash":  tx_hash,
312                     "tx_id":    int(tx_id),
313                     "index":      int(pos),
314                     "value":    int(value),
315                     }
316
317             txpoints.append(txpoint)
318             known_tx.append(self.hashout_hex(tx_hash))
319
320
321         # todo: sort them really...
322         txpoints = sorted(txpoints, key=operator.itemgetter("timestamp"))
323
324         # read memory pool
325         rows = []
326         rows += self.get_address_in_rows_memorypool( dbhash )
327         rows += self.get_address_out_rows_memorypool( dbhash )
328         address_has_mempool = False
329
330         for row in rows:
331             is_in, tx_hash, tx_id, pos, value = row
332             tx_hash = self.hashout_hex(tx_hash)
333             if tx_hash in known_tx:
334                 continue
335
336             # discard transactions that are too old
337             if self.last_tx_id - tx_id > 50000:
338                 print "discarding tx id", tx_id
339                 continue
340
341             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
342             address_has_mempool = True
343
344             #print "mempool", tx_hash
345             txpoint = {
346                     "timestamp":    0,
347                     "height":   0,
348                     "is_input":    int(is_in),
349                     "block_hash": 'mempool', 
350                     "tx_hash":  tx_hash,
351                     "tx_id":    int(tx_id),
352                     "index":      int(pos),
353                     "value":    int(value),
354                     }
355             txpoints.append(txpoint)
356
357
358         for txpoint in txpoints:
359             tx_id = txpoint['tx_id']
360             
361             txinputs = []
362             inrows = self.get_tx_inputs(tx_id)
363             for row in inrows:
364                 _hash = self.binout(row[6])
365                 if not _hash:
366                     #print "WARNING: missing tx_in for tx", tx_id, addr
367                     continue
368                 address = hash_to_address(chr(self.addrtype), _hash)
369                 txinputs.append(address)
370             txpoint['inputs'] = txinputs
371             txoutputs = []
372             outrows = self.get_tx_outputs(tx_id)
373             for row in outrows:
374                 _hash = self.binout(row[6])
375                 if not _hash:
376                     #print "WARNING: missing tx_out for tx", tx_id, addr
377                     continue
378                 address = hash_to_address(chr(self.addrtype), _hash)
379                 txoutputs.append(address)
380             txpoint['outputs'] = txoutputs
381
382             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
383             if not txpoint['is_input']:
384                 # detect if already redeemed...
385                 for row in outrows:
386                     if row[6] == dbhash: break
387                 else:
388                     raise
389                 #row = self.get_tx_output(tx_id,dbhash)
390                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
391                 # if not redeemed, we add the script
392                 if row:
393                     if not row[4]: txpoint['raw_output_script'] = row[1]
394
395             txpoint.pop('tx_id')
396
397         # cache result
398         # do not cache mempool results because statuses are ambiguous
399         if not address_has_mempool:
400             with self.cache_lock:
401                 self.tx_cache[addr] = txpoints
402         
403
404         out = map(lambda x: {'tx_hash':x['tx_hash'], 'height':x['height']}, txpoints)
405         out2 = []
406         for item in out:
407             if item not in out2: out2.append(item)
408         return out2
409
410
411     def get_status(self, addr, cache_only=False):
412         # for 0.5 clients
413         tx_points = self.get_history(addr, cache_only)
414         if cache_only and tx_points == -1: return -1
415
416         if not tx_points: return None
417         status = ''
418         for tx in tx_points:
419             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
420         return hashlib.sha256( status ).digest().encode('hex')
421
422
423     def get_block_header(self, block_height):
424         out = self.safe_sql("""
425             SELECT
426                 block_hash,
427                 block_version,
428                 block_hashMerkleRoot,
429                 block_nTime,
430                 block_nBits,
431                 block_nNonce,
432                 block_height,
433                 prev_block_hash,
434                 block_id
435               FROM chain_summary
436              WHERE block_height = %d AND in_longest = 1"""%block_height)
437
438         if not out: raise BaseException("block not found")
439         row = out[0]
440         (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height,prev_block_hash, block_id) \
441             = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
442
443         out = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash, 
444                 "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce}
445         return out
446         
447
448     def get_chunk(self, index):
449         with self.cache_lock:
450             msg = self.chunk_cache.get(index)
451             if msg: return msg
452
453         sql = """
454             SELECT
455                 block_hash,
456                 block_version,
457                 block_hashMerkleRoot,
458                 block_nTime,
459                 block_nBits,
460                 block_nNonce,
461                 block_height,
462                 prev_block_hash,
463                 block_height
464               FROM chain_summary
465              WHERE block_height >= %d AND block_height< %d AND in_longest = 1 ORDER BY block_height"""%(index*2016, (index+1)*2016)
466
467         out = self.safe_sql(sql)
468         msg = ''
469         for row in out:
470             (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height, prev_block_hash, block_height) \
471                 = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
472             h = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash, 
473                    "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce}
474
475             if h.get('block_height')==0: h['prev_block_hash'] = "0"*64
476             msg += header_to_string(h)
477
478             #print "hash", encode(Hash(msg.decode('hex')))
479             #if h.get('block_height')==1:break
480
481         with self.cache_lock:
482             self.chunk_cache[index] = msg
483         print "get_chunk", index, len(msg)
484         return msg
485
486
487
488     def get_raw_tx(self, tx_hash, height):
489         postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash, 0, height], 'id':'jsonrpc'})
490         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
491         r = loads(respdata)
492         if r['error'] != None:
493             raise BaseException(r['error'])
494
495         hextx = r.get('result')
496         return hextx
497
498
499     def get_tx_merkle(self, tx_hash):
500
501         out = self.safe_sql("""
502              SELECT block_tx.block_id FROM tx 
503              JOIN block_tx on tx.tx_id = block_tx.tx_id 
504              JOIN chain_summary on chain_summary.block_id = block_tx.block_id
505              WHERE tx_hash='%s' AND in_longest = 1"""%tx_hash)
506
507         if not out: raise BaseException("not in a block")
508         block_id = int(out[0][0])
509
510         # get block height
511         out = self.safe_sql("SELECT block_height FROM chain_summary WHERE block_id = %d AND in_longest = 1"%block_id)
512
513         if not out: raise BaseException("block not found")
514         block_height = int(out[0][0])
515
516         merkle = []
517         tx_pos = None
518
519         # list all tx in block
520         for row in self.safe_sql("""
521             SELECT DISTINCT tx_id, tx_pos, tx_hash
522               FROM txin_detail
523              WHERE block_id = ?
524              ORDER BY tx_pos""", (block_id,)):
525             _id, _pos, _hash = row
526             merkle.append(_hash)
527             if _hash == tx_hash: tx_pos = int(_pos)
528
529         # find subset.
530         # TODO: do not compute this on client request, better store the hash tree of each block in a database...
531
532         merkle = map(decode, merkle)
533         target_hash = decode(tx_hash)
534
535         s = []
536         while len(merkle) != 1:
537             if len(merkle)%2: merkle.append( merkle[-1] )
538             n = []
539             while merkle:
540                 new_hash = Hash( merkle[0] + merkle[1] )
541                 if merkle[0] == target_hash:
542                     s.append( encode(merkle[1]))
543                     target_hash = new_hash
544                 elif merkle[1] == target_hash:
545                     s.append( encode(merkle[0]))
546                     target_hash = new_hash
547                 n.append( new_hash )
548                 merkle = merkle[2:]
549             merkle = n
550
551         # send result
552         return {"block_height":block_height, "merkle":s, "pos":tx_pos}
553
554
555
556
557     def memorypool_update(store):
558
559         ds = BCDataStream.BCDataStream()
560         postdata = dumps({"method": 'getrawmempool', 'params': [], 'id':'jsonrpc'})
561         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
562
563         r = loads(respdata)
564         if r['error'] != None:
565             print r['error']
566             return
567
568         mempool_hashes = r.get('result')
569         num_new_tx = 0 
570
571         for tx_hash in mempool_hashes:
572
573             if tx_hash in store.known_mempool_hashes: continue
574             store.known_mempool_hashes.append(tx_hash)
575             num_new_tx += 1
576
577             postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash], 'id':'jsonrpc'})
578             respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
579             r = loads(respdata)
580             if r['error'] != None:
581                 continue
582             hextx = r.get('result')
583             ds.clear()
584             ds.write(hextx.decode('hex'))
585             tx = deserialize.parse_Transaction(ds)
586             tx['hash'] = util.double_sha256(tx['tx'])
587                 
588             if store.tx_find_id_and_value(tx):
589                 pass
590             else:
591                 tx_id = store.import_tx(tx, False)
592                 store.update_tx_cache(tx_id)
593                 #print tx_hash
594
595         store.commit()
596         store.known_mempool_hashes = mempool_hashes
597         return num_new_tx
598
599
600     def send_tx(self,tx):
601         postdata = dumps({"method": 'sendrawtransaction', 'params': [tx], 'id':'jsonrpc'})
602         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
603         r = loads(respdata)
604         if r['error'] != None:
605             msg = r['error'].get('message')
606             out = "error: transaction rejected by memorypool: " + msg + "\n" + tx
607         else:
608             out = r['result']
609         return out
610
611
612     def main_iteration(self):
613         with self.lock:
614             t1 = time.time()
615             self.catch_up()
616             t2 = time.time()
617             time_catch_up = t2 - t1
618             n = self.memorypool_update()
619             time_mempool = time.time() - t2
620             height = self.get_block_number( self.chain_id )
621
622         with self.cache_lock:
623             try: 
624                 self.chunk_cache.pop(height/2016) 
625             except: 
626                 pass
627
628         block_header = self.get_block_header( height )
629         return block_header, time_catch_up, time_mempool, n
630
631
632
633
634     def catch_up(store):
635         # if there is an exception, do rollback and then re-raise the exception
636         for dircfg in store.datadirs:
637             try:
638                 store.catch_up_dir(dircfg)
639             except Exception, e:
640                 store.log.exception("Failed to catch up %s", dircfg)
641                 store.rollback()
642                 raise e
643
644
645
646
647 from processor import Processor
648
649 class BlockchainProcessor(Processor):
650
651     def __init__(self, config, shared):
652         Processor.__init__(self)
653         self.store = AbeStore(config)
654         self.watched_addresses = []
655         self.shared = shared
656
657         # catch_up first
658         self.block_header, time_catch_up, time_mempool, n = self.store.main_iteration()
659         self.block_number = self.block_header.get('block_height')
660         print "blockchain: %d blocks"%self.block_number
661
662         threading.Timer(10, self.run_store_iteration).start()
663
664
665     def add_request(self, request):
666         # see if we can get if from cache. if not, add to queue
667         if self.process( request, cache_only = True) == -1:
668             self.queue.put(request)
669
670
671     def process(self, request, cache_only = False):
672         #print "abe process", request
673
674         message_id = request['id']
675         method = request['method']
676         params = request.get('params',[])
677         result = None
678         error = None
679
680         if method == 'blockchain.numblocks.subscribe':
681             result = self.block_number
682
683         elif method == 'blockchain.headers.subscribe':
684             result = self.block_header
685
686         elif method == 'blockchain.address.subscribe':
687             try:
688                 address = params[0]
689                 result = self.store.get_status(address, cache_only)
690                 self.watch_address(address)
691             except BaseException, e:
692                 error = str(e) + ': ' + address
693                 print "error:", error
694
695         elif method == 'blockchain.address.get_history':
696             try:
697                 address = params[0]
698                 result = self.store.get_history( address, cache_only )
699             except BaseException, e:
700                 error = str(e) + ': ' + address
701                 print "error:", error
702
703         elif method == 'blockchain.block.get_header':
704             if cache_only: 
705                 result = -1
706             else:
707                 try:
708                     height = params[0]
709                     result = self.store.get_block_header( height ) 
710                 except BaseException, e:
711                     error = str(e) + ': %d'% height
712                     print "error:", error
713                     
714         elif method == 'blockchain.block.get_chunk':
715             if cache_only:
716                 result = -1
717             else:
718                 try:
719                     index = params[0]
720                     result = self.store.get_chunk( index ) 
721                 except BaseException, e:
722                     error = str(e) + ': %d'% index
723                     print "error:", error
724                     
725         elif method == 'blockchain.transaction.broadcast':
726             txo = self.store.send_tx(params[0])
727             print "sent tx:", txo
728             result = txo 
729
730         elif method == 'blockchain.transaction.get_merkle':
731             if cache_only:
732                 result = -1
733             else:
734                 try:
735                     tx_hash = params[0]
736                     result = self.store.get_tx_merkle(tx_hash ) 
737                 except BaseException, e:
738                     error = str(e) + ': ' + tx_hash
739                     print "error:", error
740                     
741         elif method == 'blockchain.transaction.get':
742             try:
743                 tx_hash = params[0]
744                 height = params[1]
745                 result = self.store.get_raw_tx(tx_hash, height ) 
746             except BaseException, e:
747                 error = str(e) + ': ' + tx_hash
748                 print "error:", error
749
750         else:
751             error = "unknown method:%s"%method
752
753         if cache_only and result == -1: return -1
754
755         if error:
756             response = { 'id':message_id, 'error':error }
757             self.push_response(response)
758         elif result != '':
759             response = { 'id':message_id, 'result':result }
760             self.push_response(response)
761
762
763     def watch_address(self, addr):
764         if addr not in self.watched_addresses:
765             self.watched_addresses.append(addr)
766
767
768     def run_store_iteration(self):
769         
770         try:
771             block_header, time_catch_up, time_mempool, n = self.store.main_iteration()
772         except:
773             traceback.print_exc(file=sys.stdout)
774             print "terminating"
775             self.shared.stop()
776
777         if self.shared.stopped(): 
778             print "exit timer"
779             return
780
781         #print "block number: %d  (%.3fs)  mempool:%d (%.3fs)"%(self.block_number, time_catch_up, n, time_mempool)
782
783         if self.block_number != block_header.get('block_height'):
784             self.block_number = block_header.get('block_height')
785             print "block number: %d  (%.3fs)"%(self.block_number, time_catch_up)
786             self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
787
788         if self.block_header != block_header:
789             self.block_header = block_header
790             self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.block_header] })
791
792
793         while True:
794             try:
795                 addr = self.store.address_queue.get(False)
796             except:
797                 break
798             if addr in self.watched_addresses:
799                 status = self.store.get_status( addr )
800                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
801
802         threading.Timer(10, self.run_store_iteration).start()
803
804