new protocol (version 0.5), sending serialized transactions
[electrum-server.git] / backends / abe / __init__.py
1 from Abe.util import hash_to_address, decode_check_address
2 from Abe.DataStore import DataStore as Datastore_class
3 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
4
5 import binascii
6
7 import thread, traceback, sys, urllib, operator
8 from json import dumps, loads
9 from Queue import Queue
10 import time, threading
11
12
13 import hashlib
14 encode = lambda x: x[::-1].encode('hex')
15 decode = lambda x: x.decode('hex')[::-1]
16 Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
17
18 def rev_hex(s):
19     return s.decode('hex')[::-1].encode('hex')
20
21 def int_to_hex(i, length=1):
22     s = hex(i)[2:].rstrip('L')
23     s = "0"*(2*length - len(s)) + s
24     return rev_hex(s)
25
26 def header_to_string(res):
27     s = int_to_hex(res.get('version'),4) \
28         + rev_hex(res.get('prev_block_hash')) \
29         + rev_hex(res.get('merkle_root')) \
30         + int_to_hex(int(res.get('timestamp')),4) \
31         + int_to_hex(int(res.get('bits')),4) \
32         + int_to_hex(int(res.get('nonce')),4)
33     return s
34
35
36 class AbeStore(Datastore_class):
37
38     def __init__(self, config):
39         conf = DataStore.CONFIG_DEFAULTS
40         args, argv = readconf.parse_argv( [], conf)
41         args.dbtype = config.get('database','type')
42         if args.dbtype == 'sqlite3':
43             args.connect_args = { 'database' : config.get('database','database') }
44         elif args.dbtype == 'MySQLdb':
45             args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
46         elif args.dbtype == 'psycopg2':
47             args.connect_args = { 'database' : config.get('database','database') }
48
49         coin = config.get('server', 'coin')
50         self.addrtype = 0
51         if coin == 'litecoin':
52             print 'Litecoin settings:'
53             datadir = config.get('server','datadir')
54             print '  datadir = ' + datadir
55             args.datadir = [{"dirname":datadir,"chain":"Litecoin","code3":"LTC","address_version":"\u0030"}]
56             print '  addrtype = 48'
57             self.addrtype = 48
58
59         Datastore_class.__init__(self,args)
60
61         # Use 1 (Bitcoin) if chain_id is not sent
62         self.chain_id = self.datadirs[0]["chain_id"] or 1
63         print 'Coin chain_id = %d' % self.chain_id
64
65         self.sql_limit = int( config.get('database','limit') )
66
67         self.tx_cache = {}
68         self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
69
70         self.chunk_cache = {}
71
72         self.address_queue = Queue()
73
74         self.lock = threading.Lock()
75         self.last_tx_id = 0
76         self.known_mempool_hashes = []
77
78
79     
80     def import_tx(self, tx, is_coinbase):
81         tx_id = super(AbeStore, self).import_tx(tx, is_coinbase)
82         self.last_tx_id = tx_id
83         return tx_id
84         
85
86
87
88     def import_block(self, b, chain_ids=frozenset()):
89         #print "import block"
90         block_id = super(AbeStore, self).import_block(b, chain_ids)
91         for pos in xrange(len(b['transactions'])):
92             tx = b['transactions'][pos]
93             if 'hash' not in tx:
94                 tx['hash'] = util.double_sha256(tx['tx'])
95             tx_id = self.tx_find_id_and_value(tx)
96             if tx_id:
97                 self.update_tx_cache(tx_id)
98             else:
99                 print "error: import_block: no tx_id"
100         return block_id
101
102
103     def update_tx_cache(self, txid):
104         inrows = self.get_tx_inputs(txid, False)
105         for row in inrows:
106             _hash = self.binout(row[6])
107             if not _hash:
108                 #print "WARNING: missing tx_in for tx", txid
109                 continue
110
111             address = hash_to_address(chr(self.addrtype), _hash)
112             if self.tx_cache.has_key(address):
113                 print "cache: invalidating", address
114                 self.tx_cache.pop(address)
115             self.address_queue.put(address)
116
117         outrows = self.get_tx_outputs(txid, False)
118         for row in outrows:
119             _hash = self.binout(row[6])
120             if not _hash:
121                 #print "WARNING: missing tx_out for tx", txid
122                 continue
123
124             address = hash_to_address(chr(self.addrtype), _hash)
125             if self.tx_cache.has_key(address):
126                 print "cache: invalidating", address
127                 self.tx_cache.pop(address)
128             self.address_queue.put(address)
129
130     def safe_sql(self,sql, params=(), lock=True):
131
132         error = False
133         try:
134             if lock: self.lock.acquire()
135             ret = self.selectall(sql,params)
136         except:
137             error = True
138             traceback.print_exc(file=sys.stdout)
139         finally:
140             if lock: self.lock.release()
141
142         if error: 
143             raise BaseException('sql error')
144
145         return ret
146             
147
148     def get_tx_outputs(self, tx_id, lock=True):
149         return self.safe_sql("""SELECT
150                 txout.txout_pos,
151                 txout.txout_scriptPubKey,
152                 txout.txout_value,
153                 nexttx.tx_hash,
154                 nexttx.tx_id,
155                 txin.txin_pos,
156                 pubkey.pubkey_hash
157               FROM txout
158               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
159               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
160               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
161              WHERE txout.tx_id = %d 
162              ORDER BY txout.txout_pos
163         """%(tx_id), (), lock)
164
165     def get_tx_inputs(self, tx_id, lock=True):
166         return self.safe_sql(""" SELECT
167                 txin.txin_pos,
168                 txin.txin_scriptSig,
169                 txout.txout_value,
170                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
171                 prevtx.tx_id,
172                 COALESCE(txout.txout_pos, u.txout_pos),
173                 pubkey.pubkey_hash
174               FROM txin
175               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
176               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
177               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
178               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
179              WHERE txin.tx_id = %d
180              ORDER BY txin.txin_pos
181              """%(tx_id,), (), lock)
182
183
184     def get_address_out_rows(self, dbhash):
185         out = self.safe_sql(""" SELECT
186                 b.block_nTime,
187                 cc.chain_id,
188                 b.block_height,
189                 1,
190                 b.block_hash,
191                 tx.tx_hash,
192                 tx.tx_id,
193                 txin.txin_pos,
194                 -prevout.txout_value
195               FROM chain_candidate cc
196               JOIN block b ON (b.block_id = cc.block_id)
197               JOIN block_tx ON (block_tx.block_id = b.block_id)
198               JOIN tx ON (tx.tx_id = block_tx.tx_id)
199               JOIN txin ON (txin.tx_id = tx.tx_id)
200               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
201               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
202              WHERE pubkey.pubkey_hash = ?
203                AND cc.chain_id = ?
204                AND cc.in_longest = 1
205              LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
206
207         if len(out)==self.sql_limit: 
208             raise BaseException('limit reached')
209         return out
210
211     def get_address_out_rows_memorypool(self, dbhash):
212         out = self.safe_sql(""" SELECT
213                 1,
214                 tx.tx_hash,
215                 tx.tx_id,
216                 txin.txin_pos,
217                 -prevout.txout_value
218               FROM tx 
219               JOIN txin ON (txin.tx_id = tx.tx_id)
220               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
221               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
222              WHERE pubkey.pubkey_hash = ?
223              LIMIT ? """, (dbhash,self.sql_limit))
224
225         if len(out)==self.sql_limit: 
226             raise BaseException('limit reached')
227         return out
228
229     def get_address_in_rows(self, dbhash):
230         out = self.safe_sql(""" SELECT
231                 b.block_nTime,
232                 cc.chain_id,
233                 b.block_height,
234                 0,
235                 b.block_hash,
236                 tx.tx_hash,
237                 tx.tx_id,
238                 txout.txout_pos,
239                 txout.txout_value
240               FROM chain_candidate cc
241               JOIN block b ON (b.block_id = cc.block_id)
242               JOIN block_tx ON (block_tx.block_id = b.block_id)
243               JOIN tx ON (tx.tx_id = block_tx.tx_id)
244               JOIN txout ON (txout.tx_id = tx.tx_id)
245               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
246              WHERE pubkey.pubkey_hash = ?
247                AND cc.chain_id = ?
248                AND cc.in_longest = 1
249                LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
250
251         if len(out)==self.sql_limit: 
252             raise BaseException('limit reached')
253         return out
254
255     def get_address_in_rows_memorypool(self, dbhash):
256         out = self.safe_sql( """ SELECT
257                 0,
258                 tx.tx_hash,
259                 tx.tx_id,
260                 txout.txout_pos,
261                 txout.txout_value
262               FROM tx
263               JOIN txout ON (txout.tx_id = tx.tx_id)
264               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
265              WHERE pubkey.pubkey_hash = ?
266              LIMIT ? """, (dbhash,self.sql_limit))
267
268         if len(out)==self.sql_limit: 
269             raise BaseException('limit reached')
270         return out
271
272     def get_history(self, addr):
273
274         with self.lock:
275             cached_version = self.tx_cache.get( addr )
276             if cached_version is not None:
277                 return cached_version
278
279         version, binaddr = decode_check_address(addr)
280         if binaddr is None:
281             return None
282
283         dbhash = self.binin(binaddr)
284         rows = []
285         rows += self.get_address_out_rows( dbhash )
286         rows += self.get_address_in_rows( dbhash )
287
288         txpoints = []
289         known_tx = []
290
291         for row in rows:
292             try:
293                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
294             except:
295                 print "cannot unpack row", row
296                 break
297             tx_hash = self.hashout_hex(tx_hash)
298             txpoint = {
299                     "timestamp":    int(nTime),
300                     "height":   int(height),
301                     "is_input":    int(is_in),
302                     "block_hash": self.hashout_hex(blk_hash),
303                     "tx_hash":  tx_hash,
304                     "tx_id":    int(tx_id),
305                     "index":      int(pos),
306                     "value":    int(value),
307                     }
308
309             txpoints.append(txpoint)
310             known_tx.append(self.hashout_hex(tx_hash))
311
312
313         # todo: sort them really...
314         txpoints = sorted(txpoints, key=operator.itemgetter("timestamp"))
315
316         # read memory pool
317         rows = []
318         rows += self.get_address_in_rows_memorypool( dbhash )
319         rows += self.get_address_out_rows_memorypool( dbhash )
320         address_has_mempool = False
321
322         for row in rows:
323             is_in, tx_hash, tx_id, pos, value = row
324             tx_hash = self.hashout_hex(tx_hash)
325             if tx_hash in known_tx:
326                 continue
327
328             # discard transactions that are too old
329             if self.last_tx_id - tx_id > 50000:
330                 print "discarding tx id", tx_id
331                 continue
332
333             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
334             address_has_mempool = True
335
336             #print "mempool", tx_hash
337             txpoint = {
338                     "timestamp":    0,
339                     "height":   0,
340                     "is_input":    int(is_in),
341                     "block_hash": 'mempool', 
342                     "tx_hash":  tx_hash,
343                     "tx_id":    int(tx_id),
344                     "index":      int(pos),
345                     "value":    int(value),
346                     }
347             txpoints.append(txpoint)
348
349
350         for txpoint in txpoints:
351             tx_id = txpoint['tx_id']
352             
353             txinputs = []
354             inrows = self.get_tx_inputs(tx_id)
355             for row in inrows:
356                 _hash = self.binout(row[6])
357                 if not _hash:
358                     #print "WARNING: missing tx_in for tx", tx_id, addr
359                     continue
360                 address = hash_to_address(chr(self.addrtype), _hash)
361                 txinputs.append(address)
362             txpoint['inputs'] = txinputs
363             txoutputs = []
364             outrows = self.get_tx_outputs(tx_id)
365             for row in outrows:
366                 _hash = self.binout(row[6])
367                 if not _hash:
368                     #print "WARNING: missing tx_out for tx", tx_id, addr
369                     continue
370                 address = hash_to_address(chr(self.addrtype), _hash)
371                 txoutputs.append(address)
372             txpoint['outputs'] = txoutputs
373
374             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
375             if not txpoint['is_input']:
376                 # detect if already redeemed...
377                 for row in outrows:
378                     if row[6] == dbhash: break
379                 else:
380                     raise
381                 #row = self.get_tx_output(tx_id,dbhash)
382                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
383                 # if not redeemed, we add the script
384                 if row:
385                     if not row[4]: txpoint['raw_output_script'] = row[1]
386
387             txpoint.pop('tx_id')
388
389         # cache result
390         # do not cache mempool results because statuses are ambiguous
391         if not address_has_mempool:
392             with self.lock:
393                 self.tx_cache[addr] = txpoints
394         
395         return txpoints
396
397     def get_history2(self, addr):
398         h = self.get_history(addr)
399         out = map(lambda x: {'tx_hash':x['tx_hash'], 'height':x['height']}, h)
400         out2 = []
401         for item in out:
402             if item not in out2: out2.append(item)
403         return out2
404
405
406     def get_status(self,addr):
407         # get address status, i.e. the last block for that address.
408         tx_points = self.get_history(addr)
409         if not tx_points:
410             status = None
411         else:
412             lastpoint = tx_points[-1]
413             status = lastpoint['block_hash']
414             # this is a temporary hack; move it up once old clients have disappeared
415             if status == 'mempool': # and session['version'] != "old":
416                 status = status + ':%d'% len(tx_points)
417         return status
418
419     def get_status2(self,addr):
420         # for 0.5 clients
421         tx_points = self.get_history2(addr)
422         if not tx_points:
423             return None
424
425         status = ''
426         for tx in tx_points:
427             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
428         return hashlib.sha256( status ).digest().encode('hex')
429
430
431     def get_block_header(self, block_height):
432         out = self.safe_sql("""
433             SELECT
434                 block_hash,
435                 block_version,
436                 block_hashMerkleRoot,
437                 block_nTime,
438                 block_nBits,
439                 block_nNonce,
440                 block_height,
441                 prev_block_hash,
442                 block_id
443               FROM chain_summary
444              WHERE block_height = %d AND in_longest = 1"""%block_height)
445
446         if not out: raise BaseException("block not found")
447         row = out[0]
448         (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height,prev_block_hash, block_id) \
449             = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
450
451         out = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash, 
452                 "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce}
453         return out
454         
455
456     def get_chunk(self, index):
457         with self.lock:
458             msg = self.chunk_cache.get(index)
459             if msg: return msg
460
461         sql = """
462             SELECT
463                 block_hash,
464                 block_version,
465                 block_hashMerkleRoot,
466                 block_nTime,
467                 block_nBits,
468                 block_nNonce,
469                 block_height,
470                 prev_block_hash,
471                 block_height
472               FROM chain_summary
473              WHERE block_height >= %d AND block_height< %d AND in_longest = 1"""%(index*2016, (index+1)*2016)
474
475         out = self.safe_sql(sql)
476         msg = ''
477         for row in out:
478             (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height, prev_block_hash, block_height) \
479                 = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) )
480             h = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash, 
481                    "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce}
482
483             if h.get('block_height')==0: h['prev_block_hash'] = "0"*64
484             msg += header_to_string(h)
485
486             #print "hash", encode(Hash(msg.decode('hex')))
487             #if h.get('block_height')==1:break
488
489         with self.lock:
490             self.chunk_cache[index] = msg
491         print "get_chunk", index, len(msg)
492         return msg
493
494
495
496     def get_raw_tx(self, tx_hash, height):
497         postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash, 0, height], 'id':'jsonrpc'})
498         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
499         r = loads(respdata)
500         if r['error'] != None:
501             raise BaseException(r['error'])
502
503         hextx = r.get('result')
504         return hextx
505
506
507     def get_tx_merkle(self, tx_hash):
508
509         out = self.safe_sql("""
510              SELECT block_tx.block_id FROM tx 
511              JOIN block_tx on tx.tx_id = block_tx.tx_id 
512              JOIN chain_summary on chain_summary.block_id = block_tx.block_id
513              WHERE tx_hash='%s' AND in_longest = 1"""%tx_hash)
514
515         if not out: raise BaseException("not in a block")
516         block_id = int(out[0][0])
517
518         # get block height
519         out = self.safe_sql("SELECT block_height FROM chain_summary WHERE block_id = %d AND in_longest = 1"%block_id)
520
521         if not out: raise BaseException("block not found")
522         block_height = int(out[0][0])
523
524         merkle = []
525         tx_pos = None
526
527         # list all tx in block
528         for row in self.safe_sql("""
529             SELECT DISTINCT tx_id, tx_pos, tx_hash
530               FROM txin_detail
531              WHERE block_id = ?
532              ORDER BY tx_pos""", (block_id,)):
533             _id, _pos, _hash = row
534             merkle.append(_hash)
535             if _hash == tx_hash: tx_pos = int(_pos)
536
537         # find subset.
538         # TODO: do not compute this on client request, better store the hash tree of each block in a database...
539
540         merkle = map(decode, merkle)
541         target_hash = decode(tx_hash)
542
543         s = []
544         while len(merkle) != 1:
545             if len(merkle)%2: merkle.append( merkle[-1] )
546             n = []
547             while merkle:
548                 new_hash = Hash( merkle[0] + merkle[1] )
549                 if merkle[0] == target_hash:
550                     s.append( encode(merkle[1]))
551                     target_hash = new_hash
552                 elif merkle[1] == target_hash:
553                     s.append( encode(merkle[0]))
554                     target_hash = new_hash
555                 n.append( new_hash )
556                 merkle = merkle[2:]
557             merkle = n
558
559         # send result
560         return {"block_height":block_height, "merkle":s, "pos":tx_pos}
561
562
563
564
565     def memorypool_update(store):
566
567         ds = BCDataStream.BCDataStream()
568         postdata = dumps({"method": 'getrawmempool', 'params': [], 'id':'jsonrpc'})
569         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
570         r = loads(respdata)
571         if r['error'] != None:
572             print r['error']
573             return
574
575         mempool_hashes = r.get('result')
576         for tx_hash in mempool_hashes:
577
578             if tx_hash in store.known_mempool_hashes: continue
579             store.known_mempool_hashes.append(tx_hash)
580
581             postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash], 'id':'jsonrpc'})
582             respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
583             r = loads(respdata)
584             if r['error'] != None:
585                 continue
586             hextx = r.get('result')
587             ds.clear()
588             ds.write(hextx.decode('hex'))
589             tx = deserialize.parse_Transaction(ds)
590             tx['hash'] = util.double_sha256(tx['tx'])
591                 
592             if store.tx_find_id_and_value(tx):
593                 pass
594             else:
595                 tx_id = store.import_tx(tx, False)
596                 store.update_tx_cache(tx_id)
597                 #print tx_hash
598
599         store.commit()
600         store.known_mempool_hashes = mempool_hashes
601
602
603     def send_tx(self,tx):
604         postdata = dumps({"method": 'sendrawtransaction', 'params': [tx], 'id':'jsonrpc'})
605         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
606         r = loads(respdata)
607         if r['error'] != None:
608             msg = r['error'].get('message')
609             out = "error: transaction rejected by memorypool: " + msg + "\n" + tx
610         else:
611             out = r['result']
612         return out
613
614
615     def main_iteration(self):
616         with self.lock:
617             self.catch_up()
618             self.memorypool_update()
619             height = self.get_block_number( self.chain_id )
620             try: self.chunk_cache.pop(height/2016) 
621             except: pass
622
623         block_header = self.get_block_header( height )
624         return block_header
625
626
627
628
629     def catch_up(store):
630         # if there is an exception, do rollback and then re-raise the exception
631         for dircfg in store.datadirs:
632             try:
633                 store.catch_up_dir(dircfg)
634             except Exception, e:
635                 store.log.exception("Failed to catch up %s", dircfg)
636                 store.rollback()
637                 raise e
638
639
640
641
642 from processor import Processor
643
644 class BlockchainProcessor(Processor):
645
646     def __init__(self, config):
647         Processor.__init__(self)
648         self.store = AbeStore(config)
649         self.watched_addresses = []
650
651         # catch_up first
652         self.block_header = self.store.main_iteration()
653         self.block_number = self.block_header.get('block_height')
654         print "blockchain: %d blocks"%self.block_number
655
656         threading.Timer(10, self.run_store_iteration).start()
657
658     def process(self, request):
659         #print "abe process", request
660
661         message_id = request['id']
662         method = request['method']
663         params = request.get('params',[])
664         result = None
665         error = None
666
667         if method == 'blockchain.numblocks.subscribe':
668             result = self.block_number
669
670         elif method == 'blockchain.headers.subscribe':
671             result = self.block_header
672
673         elif method == 'blockchain.address.subscribe':
674             try:
675                 address = params[0]
676                 result = self.store.get_status(address)
677                 self.watch_address(address)
678             except BaseException, e:
679                 error = str(e) + ': ' + address
680                 print "error:", error
681
682         elif method == 'blockchain.address.subscribe2':
683             try:
684                 address = params[0]
685                 result = self.store.get_status2(address)
686                 self.watch_address(address)
687             except BaseException, e:
688                 error = str(e) + ': ' + address
689                 print "error:", error
690
691         elif method == 'blockchain.address.get_history':
692             try:
693                 address = params[0]
694                 result = self.store.get_history( address ) 
695             except BaseException, e:
696                 error = str(e) + ': ' + address
697                 print "error:", error
698
699         elif method == 'blockchain.address.get_history2':
700             try:
701                 address = params[0]
702                 result = self.store.get_history2( address ) 
703             except BaseException, e:
704                 error = str(e) + ': ' + address
705                 print "error:", error
706
707         elif method == 'blockchain.block.get_header':
708             try:
709                 height = params[0]
710                 result = self.store.get_block_header( height ) 
711             except BaseException, e:
712                 error = str(e) + ': %d'% height
713                 print "error:", error
714
715         elif method == 'blockchain.block.get_chunk':
716             try:
717                 index = params[0]
718                 result = self.store.get_chunk( index ) 
719             except BaseException, e:
720                 error = str(e) + ': %d'% index
721                 print "error:", error
722
723         elif method == 'blockchain.transaction.broadcast':
724             txo = self.store.send_tx(params[0])
725             print "sent tx:", txo
726             result = txo 
727
728         elif method == 'blockchain.transaction.get_merkle':
729             try:
730                 tx_hash = params[0]
731                 result = self.store.get_tx_merkle(tx_hash ) 
732             except BaseException, e:
733                 error = str(e) + ': ' + tx_hash
734                 print "error:", error
735
736         elif method == 'blockchain.transaction.get':
737             try:
738                 tx_hash = params[0]
739                 height = params[1]
740                 result = self.store.get_raw_tx(tx_hash, height ) 
741             except BaseException, e:
742                 error = str(e) + ': ' + tx_hash
743                 print "error:", error
744
745         else:
746             error = "unknown method:%s"%method
747
748
749         if error:
750             response = { 'id':message_id, 'error':error }
751             self.push_response(response)
752         elif result != '':
753             response = { 'id':message_id, 'result':result }
754             self.push_response(response)
755
756
757     def watch_address(self, addr):
758         if addr not in self.watched_addresses:
759             self.watched_addresses.append(addr)
760
761
762     def run_store_iteration(self):
763         
764         try:
765             t1 = time.time()
766             block_header = self.store.main_iteration()
767             t2 = time.time() - t1
768         except:
769             traceback.print_exc(file=sys.stdout)
770             print "terminating"
771             self.shared.stop()
772
773         if self.shared.stopped(): 
774             print "exit timer"
775             return
776
777         if self.block_number != block_header.get('block_height'):
778             self.block_number = block_header.get('block_height')
779             print "block number: %d  (%.3f seconds)"%(self.block_number, t2)
780             self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
781
782         if self.block_header != block_header:
783             self.block_header = block_header
784             self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.block_header] })
785
786
787         while True:
788             try:
789                 addr = self.store.address_queue.get(False)
790             except:
791                 break
792             if addr in self.watched_addresses:
793                 status = self.store.get_status( addr )
794                 status2 = self.store.get_status2( addr )
795                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
796                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe2', 'params':[addr, status2] })
797
798         threading.Timer(10, self.run_store_iteration).start()
799
800