do not queue requests that can be answered using the cache
[electrum-server.git] / backends / abe / __init__.py
1 from Abe.util import hash_to_address, decode_check_address
2 from Abe.DataStore import DataStore as Datastore_class
3 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
4
5 import binascii
6
7 import thread, traceback, sys, urllib, operator
8 from json import dumps, loads
9 from Queue import Queue
10 import time, threading
11
12
13 import hashlib
14 encode = lambda x: x[::-1].encode('hex')
15 decode = lambda x: x.decode('hex')[::-1]
16 Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
17
18 def rev_hex(s):
19     return s.decode('hex')[::-1].encode('hex')
20
21 def int_to_hex(i, length=1):
22     s = hex(i)[2:].rstrip('L')
23     s = "0"*(2*length - len(s)) + s
24     return rev_hex(s)
25
26 def header_to_string(res):
27     s = int_to_hex(res.get('version'),4) \
28         + rev_hex(res.get('prev_block_hash')) \
29         + rev_hex(res.get('merkle_root')) \
30         + int_to_hex(int(res.get('timestamp')),4) \
31         + int_to_hex(int(res.get('bits')),4) \
32         + int_to_hex(int(res.get('nonce')),4)
33     return s
34
35
36 class AbeStore(Datastore_class):
37
38     def __init__(self, config):
39         conf = DataStore.CONFIG_DEFAULTS
40         args, argv = readconf.parse_argv( [], conf)
41         args.dbtype = config.get('database','type')
42         if args.dbtype == 'sqlite3':
43             args.connect_args = { 'database' : config.get('database','database') }
44         elif args.dbtype == 'MySQLdb':
45             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
46         elif args.dbtype == 'psycopg2':
47             args.connect_args = { 'database' : config.get('database','database') }
48
49         coin = config.get('server', 'coin')
50         self.addrtype = 0
51         if coin == 'litecoin':
52             print 'Litecoin settings:'
53             datadir = config.get('server','datadir')
54             print '  datadir = ' + datadir
55             args.datadir = [{"dirname":datadir,"chain":"Litecoin","code3":"LTC","address_version":"\u0030"}]
56             print '  addrtype = 48'
57             self.addrtype = 48
58
59         Datastore_class.__init__(self,args)
60
61         # Use 1 (Bitcoin) if chain_id is not sent
62         self.chain_id = self.datadirs[0]["chain_id"] or 1
63         print 'Coin chain_id = %d' % self.chain_id
64
65         self.sql_limit = int( config.get('database','limit') )
66
67         self.tx_cache = {}
68         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
69
70         self.chunk_cache = {}
71
72         self.address_queue = Queue()
73
74         self.lock = threading.Lock()        # for the database
75         self.cache_lock = threading.Lock()  # for the cache
76         self.last_tx_id = 0
77         self.known_mempool_hashes = []
78
79
80     
81     def import_tx(self, tx, is_coinbase):
82         tx_id = super(AbeStore, self).import_tx(tx, is_coinbase)
83         self.last_tx_id = tx_id
84         return tx_id
85         
86
87
88
89     def import_block(self, b, chain_ids=frozenset()):
90         #print "import block"
91         block_id = super(AbeStore, self).import_block(b, chain_ids)
92         for pos in xrange(len(b['transactions'])):
93             tx = b['transactions'][pos]
94             if 'hash' not in tx:
95                 tx['hash'] = util.double_sha256(tx['tx'])
96             tx_id = self.tx_find_id_and_value(tx)
97             if tx_id:
98                 self.update_tx_cache(tx_id)
99             else:
100                 print "error: import_block: no tx_id"
101         return block_id
102
103
104     def update_tx_cache(self, txid):
105         inrows = self.get_tx_inputs(txid, False)
106         for row in inrows:
107             _hash = self.binout(row[6])
108             if not _hash:
109                 #print "WARNING: missing tx_in for tx", txid
110                 continue
111
112             address = hash_to_address(chr(self.addrtype), _hash)
113             with self.cache_lock:
114                 if self.tx_cache.has_key(address):
115                     print "cache: invalidating", address
116                     self.tx_cache.pop(address)
117
118             self.address_queue.put(address)
119
120         outrows = self.get_tx_outputs(txid, False)
121         for row in outrows:
122             _hash = self.binout(row[6])
123             if not _hash:
124                 #print "WARNING: missing tx_out for tx", txid
125                 continue
126
127             address = hash_to_address(chr(self.addrtype), _hash)
128             with self.cache_lock:
129                 if self.tx_cache.has_key(address):
130                     print "cache: invalidating", address
131                     self.tx_cache.pop(address)
132
133             self.address_queue.put(address)
134
135     def safe_sql(self,sql, params=(), lock=True):
136
137         error = False
138         try:
139             if lock: self.lock.acquire()
140             ret = self.selectall(sql,params)
141         except:
142             error = True
143             traceback.print_exc(file=sys.stdout)
144         finally:
145             if lock: self.lock.release()
146
147         if error: 
148             raise BaseException('sql error')
149
150         return ret
151             
152
153     def get_tx_outputs(self, tx_id, lock=True):
154         return self.safe_sql("""SELECT
155                 txout.txout_pos,
156                 txout.txout_scriptPubKey,
157                 txout.txout_value,
158                 nexttx.tx_hash,
159                 nexttx.tx_id,
160                 txin.txin_pos,
161                 pubkey.pubkey_hash
162               FROM txout
163               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
164               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
165               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
166              WHERE txout.tx_id = %d 
167              ORDER BY txout.txout_pos
168         """%(tx_id), (), lock)
169
170     def get_tx_inputs(self, tx_id, lock=True):
171         return self.safe_sql(""" SELECT
172                 txin.txin_pos,
173                 txin.txin_scriptSig,
174                 txout.txout_value,
175                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
176                 prevtx.tx_id,
177                 COALESCE(txout.txout_pos, u.txout_pos),
178                 pubkey.pubkey_hash
179               FROM txin
180               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
181               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
182               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
183               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
184              WHERE txin.tx_id = %d
185              ORDER BY txin.txin_pos
186              """%(tx_id,), (), lock)
187
188
189     def get_address_out_rows(self, dbhash):
190         out = self.safe_sql(""" SELECT
191                 b.block_nTime,
192                 cc.chain_id,
193                 b.block_height,
194                 1,
195                 b.block_hash,
196                 tx.tx_hash,
197                 tx.tx_id,
198                 txin.txin_pos,
199                 -prevout.txout_value
200               FROM chain_candidate cc
201               JOIN block b ON (b.block_id = cc.block_id)
202               JOIN block_tx ON (block_tx.block_id = b.block_id)
203               JOIN tx ON (tx.tx_id = block_tx.tx_id)
204               JOIN txin ON (txin.tx_id = tx.tx_id)
205               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
206               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
207              WHERE pubkey.pubkey_hash = ?
208                AND cc.chain_id = ?
209                AND cc.in_longest = 1
210              LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
211
212         if len(out)==self.sql_limit: 
213             raise BaseException('limit reached')
214         return out
215
216     def get_address_out_rows_memorypool(self, dbhash):
217         out = self.safe_sql(""" SELECT
218                 1,
219                 tx.tx_hash,
220                 tx.tx_id,
221                 txin.txin_pos,
222                 -prevout.txout_value
223               FROM tx 
224               JOIN txin ON (txin.tx_id = tx.tx_id)
225               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
226               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
227              WHERE pubkey.pubkey_hash = ?
228              LIMIT ? """, (dbhash,self.sql_limit))
229
230         if len(out)==self.sql_limit: 
231             raise BaseException('limit reached')
232         return out
233
234     def get_address_in_rows(self, dbhash):
235         out = self.safe_sql(""" SELECT
236                 b.block_nTime,
237                 cc.chain_id,
238                 b.block_height,
239                 0,
240                 b.block_hash,
241                 tx.tx_hash,
242                 tx.tx_id,
243                 txout.txout_pos,
244                 txout.txout_value
245               FROM chain_candidate cc
246               JOIN block b ON (b.block_id = cc.block_id)
247               JOIN block_tx ON (block_tx.block_id = b.block_id)
248               JOIN tx ON (tx.tx_id = block_tx.tx_id)
249               JOIN txout ON (txout.tx_id = tx.tx_id)
250               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
251              WHERE pubkey.pubkey_hash = ?
252                AND cc.chain_id = ?
253                AND cc.in_longest = 1
254                LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
255
256         if len(out)==self.sql_limit: 
257             raise BaseException('limit reached')
258         return out
259
260     def get_address_in_rows_memorypool(self, dbhash):
261         out = self.safe_sql( """ SELECT
262                 0,
263                 tx.tx_hash,
264                 tx.tx_id,
265                 txout.txout_pos,
266                 txout.txout_value
267               FROM tx
268               JOIN txout ON (txout.tx_id = tx.tx_id)
269               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
270              WHERE pubkey.pubkey_hash = ?
271              LIMIT ? """, (dbhash,self.sql_limit))
272
273         if len(out)==self.sql_limit: 
274             raise BaseException('limit reached')
275         return out
276
277
278
279     def get_history(self, addr, cache_only=False):
280         with self.cache_lock:
281             cached_version = self.tx_cache.get( addr )
282             if cached_version is not None:
283                 return cached_version
284
285         if cache_only: return -1
286
287         version, binaddr = decode_check_address(addr)
288         if binaddr is None:
289             return None
290
291         dbhash = self.binin(binaddr)
292         rows = []
293         rows += self.get_address_out_rows( dbhash )
294         rows += self.get_address_in_rows( dbhash )
295
296         txpoints = []
297         known_tx = []
298
299         for row in rows:
300             try:
301                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
302             except:
303                 print "cannot unpack row", row
304                 break
305             tx_hash = self.hashout_hex(tx_hash)
306             txpoint = {
307                     "timestamp":    int(nTime),
308                     "height":   int(height),
309                     "is_input":    int(is_in),
310                     "block_hash": self.hashout_hex(blk_hash),
311                     "tx_hash":  tx_hash,
312                     "tx_id":    int(tx_id),
313                     "index":      int(pos),
314                     "value":    int(value),
315                     }
316
317             txpoints.append(txpoint)
318             known_tx.append(self.hashout_hex(tx_hash))
319
320
321         # todo: sort them really...
322         txpoints = sorted(txpoints, key=operator.itemgetter("timestamp"))
323
324         # read memory pool
325         rows = []
326         rows += self.get_address_in_rows_memorypool( dbhash )
327         rows += self.get_address_out_rows_memorypool( dbhash )
328         address_has_mempool = False
329
330         for row in rows:
331             is_in, tx_hash, tx_id, pos, value = row
332             tx_hash = self.hashout_hex(tx_hash)
333             if tx_hash in known_tx:
334                 continue
335
336             # discard transactions that are too old
337             if self.last_tx_id - tx_id > 50000:
338                 print "discarding tx id", tx_id
339                 continue
340
341             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
342             address_has_mempool = True
343
344             #print "mempool", tx_hash
345             txpoint = {
346                     "timestamp":    0,
347                     "height":   0,
348                     "is_input":    int(is_in),
349                     "block_hash": 'mempool', 
350                     "tx_hash":  tx_hash,
351                     "tx_id":    int(tx_id),
352                     "index":      int(pos),
353                     "value":    int(value),
354                     }
355             txpoints.append(txpoint)
356
357
358         for txpoint in txpoints:
359             tx_id = txpoint['tx_id']
360             
361             txinputs = []
362             inrows = self.get_tx_inputs(tx_id)
363             for row in inrows:
364                 _hash = self.binout(row[6])
365                 if not _hash:
366                     #print "WARNING: missing tx_in for tx", tx_id, addr
367                     continue
368                 address = hash_to_address(chr(self.addrtype), _hash)
369                 txinputs.append(address)
370             txpoint['inputs'] = txinputs
371             txoutputs = []
372             outrows = self.get_tx_outputs(tx_id)
373             for row in outrows:
374                 _hash = self.binout(row[6])
375                 if not _hash:
376                     #print "WARNING: missing tx_out for tx", tx_id, addr
377                     continue
378                 address = hash_to_address(chr(self.addrtype), _hash)
379                 txoutputs.append(address)
380             txpoint['outputs'] = txoutputs
381
382             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
383             if not txpoint['is_input']:
384                 # detect if already redeemed...
385                 for row in outrows:
386                     if row[6] == dbhash: break
387                 else:
388                     raise
389                 #row = self.get_tx_output(tx_id,dbhash)
390                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
391                 # if not redeemed, we add the script
392                 if row:
393                     if not row[4]: txpoint['raw_output_script'] = row[1]
394
395             txpoint.pop('tx_id')
396
397         # cache result
398         # do not cache mempool results because statuses are ambiguous
399         if not address_has_mempool:
400             with self.cache_lock:
401                 self.tx_cache[addr] = txpoints
402         
403         return txpoints
404
405     def get_history2(self, addr, cache_only=False):
406         h = self.get_history(addr, cache_only)
407         if cache_only and h==-1: return -1
408
409         out = map(lambda x: {'tx_hash':x['tx_hash'], 'height':x['height']}, h)
410         out2 = []
411         for item in out:
412             if item not in out2: out2.append(item)
413         return out2
414
415
416     def get_status(self, addr, cache_only=False):
417         # get address status, i.e. the last block for that address.
418         tx_points = self.get_history(addr, cache_only)
419         if cache_only and tx_points == -1: return -1
420
421         if not tx_points:
422             status = None
423         else:
424             lastpoint = tx_points[-1]
425             status = lastpoint['block_hash']
426             # this is a temporary hack; move it up once old clients have disappeared
427             if status == 'mempool': # and session['version'] != "old":
428                 status = status + ':%d'% len(tx_points)
429         return status
430
431     def get_status2(self, addr, cache_only=False):
432         # for 0.5 clients
433         tx_points = self.get_history2(addr)
434         if cache_only and tx_points == -1: return -1
435
436         if not tx_points: return None
437         status = ''
438         for tx in tx_points:
439             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
440         return hashlib.sha256( status ).digest().encode('hex')
441
442
443     def get_block_header(self, block_height):
444         out = self.safe_sql("""
445             SELECT
446                 block_hash,
447                 block_version,
448                 block_hashMerkleRoot,
449                 block_nTime,
450                 block_nBits,
451                 block_nNonce,
452                 block_height,
453                 prev_block_hash,
454                 block_id
455               FROM chain_summary
456              WHERE block_height = %d AND in_longest = 1"""%block_height)
457
458         if not out: raise BaseException("block not found")
459         row = out[0]
460         (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height,prev_block_hash, block_id) \
461             = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
462
463         out = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash, 
464                 "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce}
465         return out
466         
467
468     def get_chunk(self, index):
469         with self.cache_lock:
470             msg = self.chunk_cache.get(index)
471             if msg: return msg
472
473         sql = """
474             SELECT
475                 block_hash,
476                 block_version,
477                 block_hashMerkleRoot,
478                 block_nTime,
479                 block_nBits,
480                 block_nNonce,
481                 block_height,
482                 prev_block_hash,
483                 block_height
484               FROM chain_summary
485              WHERE block_height >= %d AND block_height< %d AND in_longest = 1"""%(index*2016, (index+1)*2016)
486
487         out = self.safe_sql(sql)
488         msg = ''
489         for row in out:
490             (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height, prev_block_hash, block_height) \
491                 = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
492             h = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash, 
493                    "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce}
494
495             if h.get('block_height')==0: h['prev_block_hash'] = "0"*64
496             msg += header_to_string(h)
497
498             #print "hash", encode(Hash(msg.decode('hex')))
499             #if h.get('block_height')==1:break
500
501         with self.cache_lock:
502             self.chunk_cache[index] = msg
503         print "get_chunk", index, len(msg)
504         return msg
505
506
507
508     def get_raw_tx(self, tx_hash, height):
509         postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash, 0, height], 'id':'jsonrpc'})
510         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
511         r = loads(respdata)
512         if r['error'] != None:
513             raise BaseException(r['error'])
514
515         hextx = r.get('result')
516         return hextx
517
518
519     def get_tx_merkle(self, tx_hash):
520
521         out = self.safe_sql("""
522              SELECT block_tx.block_id FROM tx 
523              JOIN block_tx on tx.tx_id = block_tx.tx_id 
524              JOIN chain_summary on chain_summary.block_id = block_tx.block_id
525              WHERE tx_hash='%s' AND in_longest = 1"""%tx_hash)
526
527         if not out: raise BaseException("not in a block")
528         block_id = int(out[0][0])
529
530         # get block height
531         out = self.safe_sql("SELECT block_height FROM chain_summary WHERE block_id = %d AND in_longest = 1"%block_id)
532
533         if not out: raise BaseException("block not found")
534         block_height = int(out[0][0])
535
536         merkle = []
537         tx_pos = None
538
539         # list all tx in block
540         for row in self.safe_sql("""
541             SELECT DISTINCT tx_id, tx_pos, tx_hash
542               FROM txin_detail
543              WHERE block_id = ?
544              ORDER BY tx_pos""", (block_id,)):
545             _id, _pos, _hash = row
546             merkle.append(_hash)
547             if _hash == tx_hash: tx_pos = int(_pos)
548
549         # find subset.
550         # TODO: do not compute this on client request, better store the hash tree of each block in a database...
551
552         merkle = map(decode, merkle)
553         target_hash = decode(tx_hash)
554
555         s = []
556         while len(merkle) != 1:
557             if len(merkle)%2: merkle.append( merkle[-1] )
558             n = []
559             while merkle:
560                 new_hash = Hash( merkle[0] + merkle[1] )
561                 if merkle[0] == target_hash:
562                     s.append( encode(merkle[1]))
563                     target_hash = new_hash
564                 elif merkle[1] == target_hash:
565                     s.append( encode(merkle[0]))
566                     target_hash = new_hash
567                 n.append( new_hash )
568                 merkle = merkle[2:]
569             merkle = n
570
571         # send result
572         return {"block_height":block_height, "merkle":s, "pos":tx_pos}
573
574
575
576
577     def memorypool_update(store):
578
579         ds = BCDataStream.BCDataStream()
580         postdata = dumps({"method": 'getrawmempool', 'params': [], 'id':'jsonrpc'})
581         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
582         r = loads(respdata)
583         if r['error'] != None:
584             print r['error']
585             return
586
587         mempool_hashes = r.get('result')
588         for tx_hash in mempool_hashes:
589
590             if tx_hash in store.known_mempool_hashes: continue
591             store.known_mempool_hashes.append(tx_hash)
592
593             postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash], 'id':'jsonrpc'})
594             respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
595             r = loads(respdata)
596             if r['error'] != None:
597                 continue
598             hextx = r.get('result')
599             ds.clear()
600             ds.write(hextx.decode('hex'))
601             tx = deserialize.parse_Transaction(ds)
602             tx['hash'] = util.double_sha256(tx['tx'])
603                 
604             if store.tx_find_id_and_value(tx):
605                 pass
606             else:
607                 tx_id = store.import_tx(tx, False)
608                 store.update_tx_cache(tx_id)
609                 #print tx_hash
610
611         store.commit()
612         store.known_mempool_hashes = mempool_hashes
613
614
615     def send_tx(self,tx):
616         postdata = dumps({"method": 'sendrawtransaction', 'params': [tx], 'id':'jsonrpc'})
617         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
618         r = loads(respdata)
619         if r['error'] != None:
620             msg = r['error'].get('message')
621             out = "error: transaction rejected by memorypool: " + msg + "\n" + tx
622         else:
623             out = r['result']
624         return out
625
626
627     def main_iteration(self):
628         with self.lock:
629             self.catch_up()
630             self.memorypool_update()
631             height = self.get_block_number( self.chain_id )
632             try: self.chunk_cache.pop(height/2016) 
633             except: pass
634
635         block_header = self.get_block_header( height )
636         return block_header
637
638
639
640
641     def catch_up(store):
642         # if there is an exception, do rollback and then re-raise the exception
643         for dircfg in store.datadirs:
644             try:
645                 store.catch_up_dir(dircfg)
646             except Exception, e:
647                 store.log.exception("Failed to catch up %s", dircfg)
648                 store.rollback()
649                 raise e
650
651
652
653
654 from processor import Processor
655
656 class BlockchainProcessor(Processor):
657
658     def __init__(self, config):
659         Processor.__init__(self)
660         self.store = AbeStore(config)
661         self.watched_addresses = []
662
663         # catch_up first
664         self.block_header = self.store.main_iteration()
665         self.block_number = self.block_header.get('block_height')
666         print "blockchain: %d blocks"%self.block_number
667
668         threading.Timer(10, self.run_store_iteration).start()
669
670
671     def add_request(self, request):
672         # see if we can get if from cache. if not, add to queue
673         if self.process( request, cache_only=True) == -1:
674             self.queue.put(request)
675
676
677     def process(self, request, cache_only = False):
678         #print "abe process", request
679
680         message_id = request['id']
681         method = request['method']
682         params = request.get('params',[])
683         result = None
684         error = None
685
686         if method == 'blockchain.numblocks.subscribe':
687             result = self.block_number
688
689         elif method == 'blockchain.headers.subscribe':
690             result = self.block_header
691
692         elif method == 'blockchain.address.subscribe':
693             try:
694                 address = params[0]
695                 result = self.store.get_status(address, cache_only)
696                 self.watch_address(address)
697             except BaseException, e:
698                 error = str(e) + ': ' + address
699                 print "error:", error
700
701         elif method == 'blockchain.address.subscribe2':
702             try:
703                 address = params[0]
704                 result = self.store.get_status2(address, cache_only)
705                 self.watch_address(address)
706             except BaseException, e:
707                 error = str(e) + ': ' + address
708                 print "error:", error
709
710         elif method == 'blockchain.address.get_history':
711             try:
712                 address = params[0]
713                 result = self.store.get_history( address, cache_only )
714             except BaseException, e:
715                 error = str(e) + ': ' + address
716                 print "error:", error
717
718         elif method == 'blockchain.address.get_history2':
719             try:
720                 address = params[0]
721                 result = self.store.get_history2( address, cache_only )
722             except BaseException, e:
723                 error = str(e) + ': ' + address
724                 print "error:", error
725
726         elif method == 'blockchain.block.get_header':
727             if cache_only: 
728                 result = -1
729             else:
730                 try:
731                     height = params[0]
732                     result = self.store.get_block_header( height ) 
733                 except BaseException, e:
734                     error = str(e) + ': %d'% height
735                     print "error:", error
736                     
737         elif method == 'blockchain.block.get_chunk':
738             if cache_only:
739                 result = -1
740             else:
741                 try:
742                     index = params[0]
743                     result = self.store.get_chunk( index ) 
744                 except BaseException, e:
745                     error = str(e) + ': %d'% index
746                     print "error:", error
747                     
748         elif method == 'blockchain.transaction.broadcast':
749             txo = self.store.send_tx(params[0])
750             print "sent tx:", txo
751             result = txo 
752
753         elif method == 'blockchain.transaction.get_merkle':
754             if cache_only:
755                 result = -1
756             else:
757                 try:
758                     tx_hash = params[0]
759                     result = self.store.get_tx_merkle(tx_hash ) 
760                 except BaseException, e:
761                     error = str(e) + ': ' + tx_hash
762                     print "error:", error
763                     
764         elif method == 'blockchain.transaction.get':
765             try:
766                 tx_hash = params[0]
767                 height = params[1]
768                 result = self.store.get_raw_tx(tx_hash, height ) 
769             except BaseException, e:
770                 error = str(e) + ': ' + tx_hash
771                 print "error:", error
772
773         else:
774             error = "unknown method:%s"%method
775
776         if cache_only and result == -1: return -1
777
778         if error:
779             response = { 'id':message_id, 'error':error }
780             self.push_response(response)
781         elif result != '':
782             response = { 'id':message_id, 'result':result }
783             self.push_response(response)
784
785
786     def watch_address(self, addr):
787         if addr not in self.watched_addresses:
788             self.watched_addresses.append(addr)
789
790
791     def run_store_iteration(self):
792         
793         try:
794             t1 = time.time()
795             block_header = self.store.main_iteration()
796             t2 = time.time() - t1
797         except:
798             traceback.print_exc(file=sys.stdout)
799             print "terminating"
800             self.shared.stop()
801
802         if self.shared.stopped(): 
803             print "exit timer"
804             return
805
806         if self.block_number != block_header.get('block_height'):
807             self.block_number = block_header.get('block_height')
808             print "block number: %d  (%.3f seconds)"%(self.block_number, t2)
809             self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
810
811         if self.block_header != block_header:
812             self.block_header = block_header
813             self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.block_header] })
814
815
816         while True:
817             try:
818                 addr = self.store.address_queue.get(False)
819             except:
820                 break
821             if addr in self.watched_addresses:
822                 status = self.store.get_status( addr )
823                 status2 = self.store.get_status2( addr )
824                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
825                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe2', 'params':[addr, status2] })
826
827         threading.Timer(10, self.run_store_iteration).start()
828
829