fix abe crash (blockindexing stops) on limit reached for get_address_out_rows in...
[electrum-server.git] / backends / abe / __init__.py
1 from Abe.util import hash_to_address, decode_check_address
2 from Abe.DataStore import DataStore as Datastore_class
3 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
4
5 import binascii
6
7 import thread, traceback, sys, urllib, operator
8 from json import dumps, loads
9 from Queue import Queue
10 import time, threading
11
12
13 import hashlib
14 encode = lambda x: x[::-1].encode('hex')
15 decode = lambda x: x.decode('hex')[::-1]
16 Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
17
18 def rev_hex(s):
19     return s.decode('hex')[::-1].encode('hex')
20
21 def int_to_hex(i, length=1):
22     s = hex(i)[2:].rstrip('L')
23     s = "0"*(2*length - len(s)) + s
24     return rev_hex(s)
25
26 def header_to_string(res):
27     s = int_to_hex(res.get('version'),4) \
28         + rev_hex(res.get('prev_block_hash')) \
29         + rev_hex(res.get('merkle_root')) \
30         + int_to_hex(int(res.get('timestamp')),4) \
31         + int_to_hex(int(res.get('bits')),4) \
32         + int_to_hex(int(res.get('nonce')),4)
33     return s
34
35
36 class AbeStore(Datastore_class):
37
38     def __init__(self, config):
39         conf = DataStore.CONFIG_DEFAULTS
40         args, argv = readconf.parse_argv( [], conf)
41         args.dbtype = config.get('database','type')
42         if args.dbtype == 'sqlite3':
43             args.connect_args = { 'database' : config.get('database','database') }
44         elif args.dbtype == 'MySQLdb':
45             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
46         elif args.dbtype == 'psycopg2':
47             args.connect_args = { 'database' : config.get('database','database') }
48
49         coin = config.get('server', 'coin')
50         self.addrtype = 0
51         if coin == 'litecoin':
52             print_log ('Litecoin settings:')
53             datadir = config.get('server','datadir')
54             print_log ('  datadir = ' + datadir)
55             args.datadir = [{"dirname":datadir,"chain":"Litecoin","code3":"LTC","address_version":"\u0030"}]
56             print_log ('  addrtype = 48')
57             self.addrtype = 48
58
59         Datastore_class.__init__(self,args)
60
61         # Use 1 (Bitcoin) if chain_id is not sent
62         self.chain_id = self.datadirs[0]["chain_id"] or 1
63         print_log ('Coin chain_id = %d' % self.chain_id)
64
65         self.sql_limit = int( config.get('database','limit') )
66
67         self.tx_cache = {}
68         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
69
70         self.chunk_cache = {}
71
72         self.address_queue = Queue()
73
74         self.lock = threading.Lock()        # for the database
75         self.cache_lock = threading.Lock()  # for the cache
76         self.last_tx_id = 0
77         self.known_mempool_hashes = []
78
79
80     
81     def import_tx(self, tx, is_coinbase):
82         tx_id = super(AbeStore, self).import_tx(tx, is_coinbase)
83         self.last_tx_id = tx_id
84         return tx_id
85         
86
87
88
89     def import_block(self, b, chain_ids=frozenset()):
90         #print_log ("import block")
91         block_id = super(AbeStore, self).import_block(b, chain_ids)
92         for pos in xrange(len(b['transactions'])):
93             tx = b['transactions'][pos]
94             if 'hash' not in tx:
95                 tx['hash'] = util.double_sha256(tx['tx'])
96             tx_id = self.tx_find_id_and_value(tx)
97             if tx_id:
98                 self.update_tx_cache(tx_id)
99             else:
100                 print_log ("error: import_block: no tx_id")
101         return block_id
102
103
104     def update_tx_cache(self, txid):
105         inrows = self.get_tx_inputs(txid, False)
106         for row in inrows:
107             _hash = self.binout(row[6])
108             if not _hash:
109                 #print_log ("WARNING: missing tx_in for tx", txid)
110                 continue
111
112             address = hash_to_address(chr(self.addrtype), _hash)
113             with self.cache_lock:
114                 if self.tx_cache.has_key(address):
115                     print_log ("cache: invalidating", address)
116                     self.tx_cache.pop(address)
117
118             self.address_queue.put(address)
119
120         outrows = self.get_tx_outputs(txid, False)
121         for row in outrows:
122             _hash = self.binout(row[6])
123             if not _hash:
124                 #print_log ("WARNING: missing tx_out for tx", txid)
125                 continue
126
127             address = hash_to_address(chr(self.addrtype), _hash)
128             with self.cache_lock:
129                 if self.tx_cache.has_key(address):
130                     print_log ("cache: invalidating", address)
131                     self.tx_cache.pop(address)
132
133             self.address_queue.put(address)
134
135     def safe_sql(self,sql, params=(), lock=True):
136
137         error = False
138         try:
139             if lock: self.lock.acquire()
140             ret = self.selectall(sql,params)
141         except:
142             error = True
143             traceback.print_exc(file=sys.stdout)
144         finally:
145             if lock: self.lock.release()
146
147         if error: 
148             raise BaseException('sql error')
149
150         return ret
151             
152
153     def get_tx_outputs(self, tx_id, lock=True):
154         return self.safe_sql("""SELECT
155                 txout.txout_pos,
156                 txout.txout_scriptPubKey,
157                 txout.txout_value,
158                 nexttx.tx_hash,
159                 nexttx.tx_id,
160                 txin.txin_pos,
161                 pubkey.pubkey_hash
162               FROM txout
163               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
164               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
165               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
166              WHERE txout.tx_id = %d 
167              ORDER BY txout.txout_pos
168         """%(tx_id), (), lock)
169
170     def get_tx_inputs(self, tx_id, lock=True):
171         return self.safe_sql(""" SELECT
172                 txin.txin_pos,
173                 txin.txin_scriptSig,
174                 txout.txout_value,
175                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
176                 prevtx.tx_id,
177                 COALESCE(txout.txout_pos, u.txout_pos),
178                 pubkey.pubkey_hash
179               FROM txin
180               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
181               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
182               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
183               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
184              WHERE txin.tx_id = %d
185              ORDER BY txin.txin_pos
186              """%(tx_id,), (), lock)
187
188
189     def get_address_out_rows(self, dbhash):
190         out = self.safe_sql(""" SELECT
191                 b.block_nTime,
192                 cc.chain_id,
193                 b.block_height,
194                 1,
195                 b.block_hash,
196                 tx.tx_hash,
197                 tx.tx_id,
198                 txin.txin_pos,
199                 -prevout.txout_value
200               FROM chain_candidate cc
201               JOIN block b ON (b.block_id = cc.block_id)
202               JOIN block_tx ON (block_tx.block_id = b.block_id)
203               JOIN tx ON (tx.tx_id = block_tx.tx_id)
204               JOIN txin ON (txin.tx_id = tx.tx_id)
205               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
206               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
207              WHERE pubkey.pubkey_hash = ?
208                AND cc.chain_id = ?
209                AND cc.in_longest = 1
210              LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
211
212         if len(out)==self.sql_limit: 
213             raise BaseException('limit reached')
214         return out
215
216     def get_address_out_rows_memorypool(self, dbhash):
217         out = self.safe_sql(""" SELECT
218                 1,
219                 tx.tx_hash,
220                 tx.tx_id,
221                 txin.txin_pos,
222                 -prevout.txout_value
223               FROM tx 
224               JOIN txin ON (txin.tx_id = tx.tx_id)
225               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
226               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
227              WHERE pubkey.pubkey_hash = ?
228              LIMIT ? """, (dbhash,self.sql_limit))
229
230         if len(out)==self.sql_limit: 
231             raise BaseException('limit reached')
232         return out
233
234     def get_address_in_rows(self, dbhash):
235         out = self.safe_sql(""" SELECT
236                 b.block_nTime,
237                 cc.chain_id,
238                 b.block_height,
239                 0,
240                 b.block_hash,
241                 tx.tx_hash,
242                 tx.tx_id,
243                 txout.txout_pos,
244                 txout.txout_value
245               FROM chain_candidate cc
246               JOIN block b ON (b.block_id = cc.block_id)
247               JOIN block_tx ON (block_tx.block_id = b.block_id)
248               JOIN tx ON (tx.tx_id = block_tx.tx_id)
249               JOIN txout ON (txout.tx_id = tx.tx_id)
250               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
251              WHERE pubkey.pubkey_hash = ?
252                AND cc.chain_id = ?
253                AND cc.in_longest = 1
254                LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
255
256         if len(out)==self.sql_limit: 
257             raise BaseException('limit reached')
258         return out
259
260     def get_address_in_rows_memorypool(self, dbhash):
261         out = self.safe_sql( """ SELECT
262                 0,
263                 tx.tx_hash,
264                 tx.tx_id,
265                 txout.txout_pos,
266                 txout.txout_value
267               FROM tx
268               JOIN txout ON (txout.tx_id = tx.tx_id)
269               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
270              WHERE pubkey.pubkey_hash = ?
271              LIMIT ? """, (dbhash,self.sql_limit))
272
273         if len(out)==self.sql_limit: 
274             raise BaseException('limit reached')
275         return out
276
277
278
279     def get_history(self, addr, cache_only=False):
280         with self.cache_lock:
281             cached_version = self.tx_cache.get( addr )
282             if cached_version is not None:
283                 return cached_version
284
285         if cache_only: return -1
286
287         version, binaddr = decode_check_address(addr)
288         if binaddr is None:
289             return None
290
291         dbhash = self.binin(binaddr)
292         rows = []
293         rows += self.get_address_out_rows( dbhash )
294         rows += self.get_address_in_rows( dbhash )
295
296         txpoints = []
297         known_tx = []
298
299         for row in rows:
300             try:
301                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
302             except:
303                 print_log ("cannot unpack row", row)
304                 break
305             tx_hash = self.hashout_hex(tx_hash)
306             txpoint = {
307                     "timestamp":    int(nTime),
308                     "height":   int(height),
309                     "is_input":    int(is_in),
310                     "block_hash": self.hashout_hex(blk_hash),
311                     "tx_hash":  tx_hash,
312                     "tx_id":    int(tx_id),
313                     "index":      int(pos),
314                     "value":    int(value),
315                     }
316
317             txpoints.append(txpoint)
318             known_tx.append(self.hashout_hex(tx_hash))
319
320
321         # todo: sort them really...
322         txpoints = sorted(txpoints, key=operator.itemgetter("timestamp"))
323
324         # read memory pool
325         rows = []
326         rows += self.get_address_in_rows_memorypool( dbhash )
327         rows += self.get_address_out_rows_memorypool( dbhash )
328         address_has_mempool = False
329
330         for row in rows:
331             is_in, tx_hash, tx_id, pos, value = row
332             tx_hash = self.hashout_hex(tx_hash)
333             if tx_hash in known_tx:
334                 continue
335
336             # discard transactions that are too old
337             if self.last_tx_id - tx_id > 50000:
338                 print_log ("discarding tx id", tx_id)
339                 continue
340
341             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
342             address_has_mempool = True
343
344             #print_log ("mempool", tx_hash)
345             txpoint = {
346                     "timestamp":    0,
347                     "height":   0,
348                     "is_input":    int(is_in),
349                     "block_hash": 'mempool', 
350                     "tx_hash":  tx_hash,
351                     "tx_id":    int(tx_id),
352                     "index":      int(pos),
353                     "value":    int(value),
354                     }
355             txpoints.append(txpoint)
356
357
358         for txpoint in txpoints:
359             tx_id = txpoint['tx_id']
360             
361             txinputs = []
362             inrows = self.get_tx_inputs(tx_id)
363             for row in inrows:
364                 _hash = self.binout(row[6])
365                 if not _hash:
366                     #print_log ("WARNING: missing tx_in for tx", tx_id, addr)
367                     continue
368                 address = hash_to_address(chr(self.addrtype), _hash)
369                 txinputs.append(address)
370             txpoint['inputs'] = txinputs
371             txoutputs = []
372             outrows = self.get_tx_outputs(tx_id)
373             for row in outrows:
374                 _hash = self.binout(row[6])
375                 if not _hash:
376                     #print_log ("WARNING: missing tx_out for tx", tx_id, addr)
377                     continue
378                 address = hash_to_address(chr(self.addrtype), _hash)
379                 txoutputs.append(address)
380             txpoint['outputs'] = txoutputs
381
382             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
383             if not txpoint['is_input']:
384                 # detect if already redeemed...
385                 for row in outrows:
386                     if row[6] == dbhash: break
387                 else:
388                     raise
389                 #row = self.get_tx_output(tx_id,dbhash)
390                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
391                 # if not redeemed, we add the script
392                 if row:
393                     if not row[4]: txpoint['raw_output_script'] = row[1]
394
395             txpoint.pop('tx_id')
396
397
398         txpoints = map(lambda x: {'tx_hash':x['tx_hash'], 'height':x['height']}, txpoints)
399         out = []
400         for item in txpoints:
401             if item not in out: out.append(item)
402
403         # cache result
404         ## do not cache mempool results because statuses are ambiguous
405         #if not address_has_mempool:
406         with self.cache_lock:
407             self.tx_cache[addr] = out
408         
409         return out
410
411
412     def get_status(self, addr, cache_only=False):
413         # for 0.5 clients
414         tx_points = self.get_history(addr, cache_only)
415         if cache_only and tx_points == -1: return -1
416
417         if not tx_points: return None
418         status = ''
419         for tx in tx_points:
420             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
421         return hashlib.sha256( status ).digest().encode('hex')
422
423
424     def get_block_header(self, block_height):
425         out = self.safe_sql("""
426             SELECT
427                 block_hash,
428                 block_version,
429                 block_hashMerkleRoot,
430                 block_nTime,
431                 block_nBits,
432                 block_nNonce,
433                 block_height,
434                 prev_block_hash,
435                 block_id
436               FROM chain_summary
437              WHERE block_height = %d AND in_longest = 1"""%block_height)
438
439         if not out: raise BaseException("block not found")
440         row = out[0]
441         (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height,prev_block_hash, block_id) \
442             = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
443
444         out = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash, 
445                 "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce}
446         return out
447         
448
449     def get_chunk(self, index):
450         with self.cache_lock:
451             msg = self.chunk_cache.get(index)
452             if msg: return msg
453
454         sql = """
455             SELECT
456                 block_hash,
457                 block_version,
458                 block_hashMerkleRoot,
459                 block_nTime,
460                 block_nBits,
461                 block_nNonce,
462                 block_height,
463                 prev_block_hash,
464                 block_height
465               FROM chain_summary
466              WHERE block_height >= %d AND block_height< %d AND in_longest = 1 ORDER BY block_height"""%(index*2016, (index+1)*2016)
467
468         out = self.safe_sql(sql)
469         msg = ''
470         for row in out:
471             (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height, prev_block_hash, block_height) \
472                 = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
473             h = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash, 
474                    "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce}
475
476             if h.get('block_height')==0: h['prev_block_hash'] = "0"*64
477             msg += header_to_string(h)
478
479             #print_log ("hash", encode(Hash(msg.decode('hex'))))
480             #if h.get('block_height')==1:break
481
482         with self.cache_lock:
483             self.chunk_cache[index] = msg
484         print_log ("get_chunk", index, len(msg))
485         return msg
486
487
488
489     def get_raw_tx(self, tx_hash, height):
490         postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash, 0, height], 'id':'jsonrpc'})
491         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
492         r = loads(respdata)
493         if r['error'] != None:
494             raise BaseException(r['error'])
495
496         hextx = r.get('result')
497         return hextx
498
499
500     def get_tx_merkle(self, tx_hash):
501
502         out = self.safe_sql("""
503              SELECT block_tx.block_id FROM tx 
504              JOIN block_tx on tx.tx_id = block_tx.tx_id 
505              JOIN chain_summary on chain_summary.block_id = block_tx.block_id
506              WHERE tx_hash='%s' AND in_longest = 1"""%tx_hash)
507
508         if not out: raise BaseException("not in a block")
509         block_id = int(out[0][0])
510
511         # get block height
512         out = self.safe_sql("SELECT block_height FROM chain_summary WHERE block_id = %d AND in_longest = 1"%block_id)
513
514         if not out: raise BaseException("block not found")
515         block_height = int(out[0][0])
516
517         merkle = []
518         tx_pos = None
519
520         # list all tx in block
521         for row in self.safe_sql("""
522             SELECT DISTINCT tx_id, tx_pos, tx_hash
523               FROM txin_detail
524              WHERE block_id = ?
525              ORDER BY tx_pos""", (block_id,)):
526             _id, _pos, _hash = row
527             merkle.append(_hash)
528             if _hash == tx_hash: tx_pos = int(_pos)
529
530         # find subset.
531         # TODO: do not compute this on client request, better store the hash tree of each block in a database...
532
533         merkle = map(decode, merkle)
534         target_hash = decode(tx_hash)
535
536         s = []
537         while len(merkle) != 1:
538             if len(merkle)%2: merkle.append( merkle[-1] )
539             n = []
540             while merkle:
541                 new_hash = Hash( merkle[0] + merkle[1] )
542                 if merkle[0] == target_hash:
543                     s.append( encode(merkle[1]))
544                     target_hash = new_hash
545                 elif merkle[1] == target_hash:
546                     s.append( encode(merkle[0]))
547                     target_hash = new_hash
548                 n.append( new_hash )
549                 merkle = merkle[2:]
550             merkle = n
551
552         # send result
553         return {"block_height":block_height, "merkle":s, "pos":tx_pos}
554
555
556
557
558     def memorypool_update(store):
559
560         ds = BCDataStream.BCDataStream()
561         postdata = dumps({"method": 'getrawmempool', 'params': [], 'id':'jsonrpc'})
562         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
563
564         r = loads(respdata)
565         if r['error'] != None:
566             print_log (r['error'])
567             return
568
569         mempool_hashes = r.get('result')
570         num_new_tx = 0 
571
572         for tx_hash in mempool_hashes:
573
574             if tx_hash in store.known_mempool_hashes: continue
575             store.known_mempool_hashes.append(tx_hash)
576             num_new_tx += 1
577
578             postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash], 'id':'jsonrpc'})
579             respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
580             r = loads(respdata)
581             if r['error'] != None:
582                 continue
583             hextx = r.get('result')
584             ds.clear()
585             ds.write(hextx.decode('hex'))
586             tx = deserialize.parse_Transaction(ds)
587             tx['hash'] = util.double_sha256(tx['tx'])
588                 
589             if store.tx_find_id_and_value(tx):
590                 pass
591             else:
592                 tx_id = store.import_tx(tx, False)
593                 store.update_tx_cache(tx_id)
594                 #print_log (tx_hash)
595
596         store.commit()
597         store.known_mempool_hashes = mempool_hashes
598         return num_new_tx
599
600
601     def send_tx(self,tx):
602         postdata = dumps({"method": 'sendrawtransaction', 'params': [tx], 'id':'jsonrpc'})
603         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
604         r = loads(respdata)
605         if r['error'] != None:
606             msg = r['error'].get('message')
607             out = "error: transaction rejected by memorypool: " + msg + "\n" + tx
608         else:
609             out = r['result']
610         return out
611
612
613     def main_iteration(self):
614         with self.lock:
615             t1 = time.time()
616             self.catch_up()
617             t2 = time.time()
618             time_catch_up = t2 - t1
619             n = self.memorypool_update()
620             time_mempool = time.time() - t2
621             height = self.get_block_number( self.chain_id )
622
623         with self.cache_lock:
624             try: 
625                 self.chunk_cache.pop(height/2016) 
626             except: 
627                 pass
628
629         block_header = self.get_block_header( height )
630         return block_header, time_catch_up, time_mempool, n
631
632
633
634
635     def catch_up(store):
636         # if there is an exception, do rollback and then re-raise the exception
637         for dircfg in store.datadirs:
638             try:
639                 store.catch_up_dir(dircfg)
640             except Exception, e:
641                 store.log.exception("Failed to catch up %s", dircfg)
642                 store.rollback()
643                 raise e
644
645
646
647
648 from processor import Processor, print_log
649
650 class BlockchainProcessor(Processor):
651
652     def __init__(self, config, shared):
653         Processor.__init__(self)
654         self.store = AbeStore(config)
655         self.watched_addresses = []
656         self.shared = shared
657
658         # catch_up first
659         self.block_header, time_catch_up, time_mempool, n = self.store.main_iteration()
660         self.block_number = self.block_header.get('block_height')
661         print_log ("blockchain: %d blocks"%self.block_number)
662
663         threading.Timer(10, self.run_store_iteration).start()
664
665
666     def add_request(self, request):
667         # see if we can get if from cache. if not, add to queue
668         if self.process( request, cache_only = True) == -1:
669             self.queue.put(request)
670
671
672     def process(self, request, cache_only = False):
673         #print_log ("abe process", request)
674
675         message_id = request['id']
676         method = request['method']
677         params = request.get('params',[])
678         result = None
679         error = None
680
681         if method == 'blockchain.numblocks.subscribe':
682             result = self.block_number
683
684         elif method == 'blockchain.headers.subscribe':
685             result = self.block_header
686
687         elif method == 'blockchain.address.subscribe':
688             try:
689                 address = params[0]
690                 result = self.store.get_status(address, cache_only)
691                 self.watch_address(address)
692             except BaseException, e:
693                 error = str(e) + ': ' + address
694                 print_log ("error:", error)
695
696         elif method == 'blockchain.address.get_history':
697             try:
698                 address = params[0]
699                 result = self.store.get_history( address, cache_only )
700             except BaseException, e:
701                 error = str(e) + ': ' + address
702                 print_log ("error:", error)
703
704         elif method == 'blockchain.block.get_header':
705             if cache_only: 
706                 result = -1
707             else:
708                 try:
709                     height = params[0]
710                     result = self.store.get_block_header( height ) 
711                 except BaseException, e:
712                     error = str(e) + ': %d'% height
713                     print_log ("error:", error)
714                     
715         elif method == 'blockchain.block.get_chunk':
716             if cache_only:
717                 result = -1
718             else:
719                 try:
720                     index = params[0]
721                     result = self.store.get_chunk( index ) 
722                 except BaseException, e:
723                     error = str(e) + ': %d'% index
724                     print_log ("error:", error)
725                     
726         elif method == 'blockchain.transaction.broadcast':
727             txo = self.store.send_tx(params[0])
728             print_log ("sent tx:", txo)
729             result = txo 
730
731         elif method == 'blockchain.transaction.get_merkle':
732             if cache_only:
733                 result = -1
734             else:
735                 try:
736                     tx_hash = params[0]
737                     result = self.store.get_tx_merkle(tx_hash ) 
738                 except BaseException, e:
739                     error = str(e) + ': ' + tx_hash
740                     print_log ("error:", error)
741                     
742         elif method == 'blockchain.transaction.get':
743             try:
744                 tx_hash = params[0]
745                 height = params[1]
746                 result = self.store.get_raw_tx(tx_hash, height ) 
747             except BaseException, e:
748                 error = str(e) + ': ' + tx_hash
749                 print_log ("error:", error)
750
751         else:
752             error = "unknown method:%s"%method
753
754         if cache_only and result == -1: return -1
755
756         if error:
757             response = { 'id':message_id, 'error':error }
758             self.push_response(response)
759         elif result != '':
760             response = { 'id':message_id, 'result':result }
761             self.push_response(response)
762
763
764     def watch_address(self, addr):
765         if addr not in self.watched_addresses:
766             self.watched_addresses.append(addr)
767
768
769     def run_store_iteration(self):
770         
771         try:
772             block_header, time_catch_up, time_mempool, n = self.store.main_iteration()
773         except:
774             traceback.print_exc(file=sys.stdout)
775             print_log ("terminating")
776             self.shared.stop()
777
778         if self.shared.stopped(): 
779             print_log ("exit timer")
780             return
781
782         #print_log ("block number: %d  (%.3fs)  mempool:%d (%.3fs)"%(self.block_number, time_catch_up, n, time_mempool))
783
784         if self.block_number != block_header.get('block_height'):
785             self.block_number = block_header.get('block_height')
786             print_log ("block number: %d  (%.3fs)"%(self.block_number, time_catch_up))
787             self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
788
789         if self.block_header != block_header:
790             self.block_header = block_header
791             self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.block_header] })
792
793
794         while True:
795             try:
796                 addr = self.store.address_queue.get(False)
797             except:
798                 break
799             if addr in self.watched_addresses:
800                 try:
801                     status = self.store.get_status( addr )
802                     self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
803                 except:
804                     break
805
806         threading.Timer(10, self.run_store_iteration).start()
807
808