no more patch
[electrum-server.git] / backends / abe / __init__.py
1 import binascii
2 from json import dumps, loads
3 import operator
4 from Queue import Queue
5 import sys
6 import thread
7 import threading
8 import time
9 import traceback
10 import urllib
11
12 from Abe import DataStore, readconf, BCDataStream, deserialize
13 from Abe.util import hash_to_address, decode_check_address
14
15 from processor import Processor, print_log
16 from utils import *
17
18
19 class AbeStore(DataStore.DataStore):
20
21     def __init__(self, config):
22         conf = DataStore.CONFIG_DEFAULTS
23         args, argv = readconf.parse_argv([], conf)
24         args.dbtype = config.get('database', 'type')
25         if args.dbtype == 'sqlite3':
26             args.connect_args = {'database': config.get('database', 'database')}
27         elif args.dbtype == 'MySQLdb':
28             args.connect_args = {'db': config.get('database', 'database'), 'user': config.get('database', 'username'), 'passwd': config.get('database', 'password')}
29         elif args.dbtype == 'psycopg2':
30             args.connect_args = {'database': config.get('database', 'database')}
31
32         coin = config.get('server', 'coin')
33         self.addrtype = 0
34         if coin == 'litecoin':
35             print_log('Litecoin settings:')
36             datadir = config.get('server', 'datadir')
37             print_log('  datadir = ' + datadir)
38             args.datadir = [{"dirname": datadir, "chain": "Litecoin", "code3": "LTC", "address_version": "\u0030"}]
39             print_log('  addrtype = 48')
40             self.addrtype = 48
41
42         DataStore.DataStore.__init__(self, args)
43
44         # Use 1 (Bitcoin) if chain_id is not sent
45         self.chain_id = self.datadirs[0]["chain_id"] or 1
46         print_log('Coin chain_id = %d' % self.chain_id)
47
48         self.sql_limit = int(config.get('database', 'limit'))
49
50         self.tx_cache = {}
51         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (config.get('bitcoind', 'user'), config.get('bitcoind', 'password'), config.get('bitcoind', 'host'), config.get('bitcoind', 'port'))
52
53         self.chunk_cache = {}
54
55         self.address_queue = Queue()
56
57         self.lock = threading.Lock()        # for the database
58         self.cache_lock = threading.Lock()  # for the cache
59         self.last_tx_id = 0
60         self.known_mempool_hashes = []
61
62     def import_tx(self, tx, is_coinbase):
63         tx_id = super(AbeStore, self).import_tx(tx, is_coinbase)
64         self.last_tx_id = tx_id
65         return tx_id
66
67     def import_block(self, b, chain_ids=frozenset()):
68         #print_log("import block")
69         block_id = super(AbeStore, self).import_block(b, chain_ids)
70         for pos in xrange(len(b['transactions'])):
71             tx = b['transactions'][pos]
72             if 'hash' not in tx:
73                 tx['hash'] = Hash(tx['tx'])
74             tx_id = self.tx_find_id_and_value(tx)
75             if tx_id:
76                 self.update_tx_cache(tx_id)
77             else:
78                 print_log("error: import_block: no tx_id")
79         return block_id
80
81     def update_tx_cache(self, txid):
82         inrows = self.get_tx_inputs(txid, False)
83         for row in inrows:
84             _hash = self.binout(row[6])
85             if not _hash:
86                 #print_log("WARNING: missing tx_in for tx", txid)
87                 continue
88
89             address = hash_to_address(chr(self.addrtype), _hash)
90             with self.cache_lock:
91                 if address in self.tx_cache:
92                     print_log("cache: invalidating", address)
93                     self.tx_cache.pop(address)
94
95             self.address_queue.put(address)
96
97         outrows = self.get_tx_outputs(txid, False)
98         for row in outrows:
99             _hash = self.binout(row[6])
100             if not _hash:
101                 #print_log("WARNING: missing tx_out for tx", txid)
102                 continue
103
104             address = hash_to_address(chr(self.addrtype), _hash)
105             with self.cache_lock:
106                 if address in self.tx_cache:
107                     print_log("cache: invalidating", address)
108                     self.tx_cache.pop(address)
109
110             self.address_queue.put(address)
111
112     def safe_sql(self, sql, params=(), lock=True):
113         error = False
114         try:
115             if lock:
116                 self.lock.acquire()
117             ret = self.selectall(sql, params)
118         except:
119             error = True
120             traceback.print_exc(file=sys.stdout)
121         finally:
122             if lock:
123                 self.lock.release()
124
125         if error:
126             raise Exception('sql error')
127
128         return ret
129
130     def get_tx_outputs(self, tx_id, lock=True):
131         return self.safe_sql("""SELECT
132                 txout.txout_pos,
133                 txout.txout_scriptPubKey,
134                 txout.txout_value,
135                 nexttx.tx_hash,
136                 nexttx.tx_id,
137                 txin.txin_pos,
138                 pubkey.pubkey_hash
139               FROM txout
140               LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
141               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
142               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
143              WHERE txout.tx_id = %d
144              ORDER BY txout.txout_pos
145         """ % (tx_id), (), lock)
146
147     def get_tx_inputs(self, tx_id, lock=True):
148         return self.safe_sql(""" SELECT
149                 txin.txin_pos,
150                 txin.txin_scriptSig,
151                 txout.txout_value,
152                 COALESCE(prevtx.tx_hash, u.txout_tx_hash),
153                 prevtx.tx_id,
154                 COALESCE(txout.txout_pos, u.txout_pos),
155                 pubkey.pubkey_hash
156               FROM txin
157               LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
158               LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
159               LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
160               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
161              WHERE txin.tx_id = %d
162              ORDER BY txin.txin_pos
163              """ % (tx_id,), (), lock)
164
165     def get_address_out_rows(self, dbhash):
166         out = self.safe_sql(""" SELECT
167                 b.block_nTime,
168                 cc.chain_id,
169                 b.block_height,
170                 1,
171                 b.block_hash,
172                 tx.tx_hash,
173                 tx.tx_id,
174                 txin.txin_pos,
175                 -prevout.txout_value
176               FROM chain_candidate cc
177               JOIN block b ON (b.block_id = cc.block_id)
178               JOIN block_tx ON (block_tx.block_id = b.block_id)
179               JOIN tx ON (tx.tx_id = block_tx.tx_id)
180               JOIN txin ON (txin.tx_id = tx.tx_id)
181               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
182               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
183              WHERE pubkey.pubkey_hash = ?
184                AND cc.chain_id = ?
185                AND cc.in_longest = 1
186              LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
187
188         if len(out) == self.sql_limit:
189             raise Exception('limit reached')
190         return out
191
192     def get_address_out_rows_memorypool(self, dbhash):
193         out = self.safe_sql(""" SELECT
194                 1,
195                 tx.tx_hash,
196                 tx.tx_id,
197                 txin.txin_pos,
198                 -prevout.txout_value
199               FROM tx
200               JOIN txin ON (txin.tx_id = tx.tx_id)
201               JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
202               JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
203              WHERE pubkey.pubkey_hash = ?
204              LIMIT ? """, (dbhash, self.sql_limit))
205
206         if len(out) == self.sql_limit:
207             raise Exception('limit reached')
208         return out
209
210     def get_address_in_rows(self, dbhash):
211         out = self.safe_sql(""" SELECT
212                 b.block_nTime,
213                 cc.chain_id,
214                 b.block_height,
215                 0,
216                 b.block_hash,
217                 tx.tx_hash,
218                 tx.tx_id,
219                 txout.txout_pos,
220                 txout.txout_value
221               FROM chain_candidate cc
222               JOIN block b ON (b.block_id = cc.block_id)
223               JOIN block_tx ON (block_tx.block_id = b.block_id)
224               JOIN tx ON (tx.tx_id = block_tx.tx_id)
225               JOIN txout ON (txout.tx_id = tx.tx_id)
226               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
227              WHERE pubkey.pubkey_hash = ?
228                AND cc.chain_id = ?
229                AND cc.in_longest = 1
230                LIMIT ? """, (dbhash, self.chain_id, self.sql_limit))
231
232         if len(out) == self.sql_limit:
233             raise Exception('limit reached')
234         return out
235
236     def get_address_in_rows_memorypool(self, dbhash):
237         out = self.safe_sql(""" SELECT
238                 0,
239                 tx.tx_hash,
240                 tx.tx_id,
241                 txout.txout_pos,
242                 txout.txout_value
243               FROM tx
244               JOIN txout ON (txout.tx_id = tx.tx_id)
245               JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
246              WHERE pubkey.pubkey_hash = ?
247              LIMIT ? """, (dbhash, self.sql_limit))
248
249         if len(out) == self.sql_limit:
250             raise Exception('limit reached')
251         return out
252
253     def get_history(self, addr, cache_only=False):
254         # todo: make this more efficient. it iterates over txpoints multiple times
255         with self.cache_lock:
256             cached_version = self.tx_cache.get(addr)
257             if cached_version is not None:
258                 return cached_version
259
260         if cache_only:
261             return -1
262
263         version, binaddr = decode_check_address(addr)
264         if binaddr is None:
265             return None
266
267         dbhash = self.binin(binaddr)
268         rows = []
269         rows += self.get_address_out_rows(dbhash)
270         rows += self.get_address_in_rows(dbhash)
271
272         txpoints = []
273         known_tx = []
274
275         for row in rows:
276             try:
277                 nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
278             except:
279                 print_log("cannot unpack row", row)
280                 break
281             tx_hash = self.hashout_hex(tx_hash)
282
283             txpoints.append({
284                 "timestamp": int(nTime),
285                 "height": int(height),
286                 "is_input": int(is_in),
287                 "block_hash": self.hashout_hex(blk_hash),
288                 "tx_hash": tx_hash,
289                 "tx_id": int(tx_id),
290                 "index": int(pos),
291                 "value": int(value),
292             })
293             known_tx.append(tx_hash)
294
295         # todo: sort them really...
296         txpoints = sorted(txpoints, key=operator.itemgetter("timestamp"))
297
298         # read memory pool
299         rows = []
300         rows += self.get_address_in_rows_memorypool(dbhash)
301         rows += self.get_address_out_rows_memorypool(dbhash)
302         address_has_mempool = False
303
304         for row in rows:
305             is_in, tx_hash, tx_id, pos, value = row
306             tx_hash = self.hashout_hex(tx_hash)
307             if tx_hash in known_tx:
308                 continue
309
310             # discard transactions that are too old
311             if self.last_tx_id - tx_id > 50000:
312                 print_log("discarding tx id", tx_id)
313                 continue
314
315             # this means that pending transactions were added to the db, even if they are not returned by getmemorypool
316             address_has_mempool = True
317
318             #print_log("mempool", tx_hash)
319             txpoints.append({
320                 "timestamp": 0,
321                 "height": 0,
322                 "is_input": int(is_in),
323                 "block_hash": 'mempool',
324                 "tx_hash": tx_hash,
325                 "tx_id": int(tx_id),
326                 "index": int(pos),
327                 "value": int(value),
328             })
329
330         for txpoint in txpoints:
331             tx_id = txpoint['tx_id']
332
333             txinputs = []
334             inrows = self.get_tx_inputs(tx_id)
335             for row in inrows:
336                 _hash = self.binout(row[6])
337                 if not _hash:
338                     #print_log("WARNING: missing tx_in for tx", tx_id, addr)
339                     continue
340                 address = hash_to_address(chr(self.addrtype), _hash)
341                 txinputs.append(address)
342             txpoint['inputs'] = txinputs
343             txoutputs = []
344             outrows = self.get_tx_outputs(tx_id)
345             for row in outrows:
346                 _hash = self.binout(row[6])
347                 if not _hash:
348                     #print_log("WARNING: missing tx_out for tx", tx_id, addr)
349                     continue
350                 address = hash_to_address(chr(self.addrtype), _hash)
351                 txoutputs.append(address)
352             txpoint['outputs'] = txoutputs
353
354             # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
355             if not txpoint['is_input']:
356                 # detect if already redeemed...
357                 for row in outrows:
358                     if row[6] == dbhash:
359                         break
360                 else:
361                     raise
362                 #row = self.get_tx_output(tx_id,dbhash)
363                 # pos, script, value, o_hash, o_id, o_pos, binaddr = row
364                 # if not redeemed, we add the script
365                 if row:
366                     if not row[4]:
367                         txpoint['raw_output_script'] = row[1]
368
369             txpoint.pop('tx_id')
370
371         txpoints = map(lambda x: {'tx_hash': x['tx_hash'], 'height': x['height']}, txpoints)
372         out = []
373         for item in txpoints:
374             if item not in out:
375                 out.append(item)
376
377         # cache result
378         ## do not cache mempool results because statuses are ambiguous
379         #if not address_has_mempool:
380         with self.cache_lock:
381             self.tx_cache[addr] = out
382
383         return out
384
385     def get_status(self, addr, cache_only=False):
386         # for 0.5 clients
387         tx_points = self.get_history(addr, cache_only)
388         if cache_only and tx_points == -1:
389             return -1
390
391         if not tx_points:
392             return None
393         status = ''
394         for tx in tx_points:
395             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
396         return hashlib.sha256(status).digest().encode('hex')
397
398     def get_block_header(self, block_height):
399         out = self.safe_sql("""
400             SELECT
401                 block_hash,
402                 block_version,
403                 block_hashMerkleRoot,
404                 block_nTime,
405                 block_nBits,
406                 block_nNonce,
407                 block_height,
408                 prev_block_hash,
409                 block_id
410               FROM chain_summary
411              WHERE block_height = %d AND in_longest = 1""" % block_height)
412
413         if not out:
414             raise Exception("block not found")
415         row = out[0]
416         (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height, prev_block_hash, block_id) \
417             = (self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]))
418
419         return {
420             "block_height": block_height,
421             "version": block_version,
422             "prev_block_hash": prev_block_hash,
423             "merkle_root": hashMerkleRoot,
424             "timestamp": nTime,
425             "bits": nBits,
426             "nonce": nNonce,
427         }
428
429     def get_chunk(self, index):
430         with self.cache_lock:
431             msg = self.chunk_cache.get(index)
432             if msg:
433                 return msg
434
435         sql = """
436             SELECT
437                 block_hash,
438                 block_version,
439                 block_hashMerkleRoot,
440                 block_nTime,
441                 block_nBits,
442                 block_nNonce,
443                 block_height,
444                 prev_block_hash,
445                 block_height
446               FROM chain_summary
447              WHERE block_height >= %d AND block_height< %d AND in_longest = 1 ORDER BY block_height""" % (index * 2016, (index+1) * 2016)
448
449         out = self.safe_sql(sql)
450         msg = ''
451         for row in out:
452             (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height, prev_block_hash, block_height) \
453                 = (self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]))
454             h = {
455                 "block_height": block_height,
456                 "version": block_version,
457                 "prev_block_hash": prev_block_hash,
458                 "merkle_root": hashMerkleRoot,
459                 "timestamp": nTime,
460                 "bits": nBits,
461                 "nonce": nNonce,
462             }
463
464             if h.get('block_height') == 0:
465                 h['prev_block_hash'] = "0" * 64
466             msg += header_to_string(h)
467
468             #print_log("hash", encode(Hash(msg.decode('hex'))))
469             #if h.get('block_height')==1:break
470
471         with self.cache_lock:
472             self.chunk_cache[index] = msg
473         print_log("get_chunk", index, len(msg))
474         return msg
475
476     def get_raw_tx(self, tx_hash):
477         postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash, 0], 'id': 'jsonrpc'})
478         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
479         r = loads(respdata)
480         if r['error'] is not None:
481             raise Exception(r['error'])
482
483         return r.get('result')
484
485     def get_tx_merkle(self, tx_hash):
486         out = self.safe_sql("""
487              SELECT block_tx.block_id FROM tx
488              JOIN block_tx on tx.tx_id = block_tx.tx_id
489              JOIN chain_summary on chain_summary.block_id = block_tx.block_id
490              WHERE tx_hash='%s' AND in_longest = 1""" % tx_hash)
491
492         if not out:
493             raise Exception("not in a block")
494         block_id = int(out[0][0])
495
496         # get block height
497         out = self.safe_sql("SELECT block_height FROM chain_summary WHERE block_id = %d AND in_longest = 1" % block_id)
498
499         if not out:
500             raise Exception("block not found")
501         block_height = int(out[0][0])
502
503         merkle = []
504         tx_pos = None
505
506         # list all tx in block
507         for row in self.safe_sql("""
508             SELECT DISTINCT tx_id, tx_pos, tx_hash
509               FROM txin_detail
510              WHERE block_id = ?
511              ORDER BY tx_pos""", (block_id,)):
512             _id, _pos, _hash = row
513             merkle.append(_hash)
514             if _hash == tx_hash:
515                 tx_pos = int(_pos)
516
517         # find subset.
518         # TODO: do not compute this on client request, better store the hash tree of each block in a database...
519
520         merkle = map(hash_decode, merkle)
521         target_hash = hash_decode(tx_hash)
522
523         s = []
524         while len(merkle) != 1:
525             if len(merkle) % 2:
526                 merkle.append(merkle[-1])
527             n = []
528             while merkle:
529                 new_hash = Hash(merkle[0] + merkle[1])
530                 if merkle[0] == target_hash:
531                     s.append(hash_encode(merkle[1]))
532                     target_hash = new_hash
533                 elif merkle[1] == target_hash:
534                     s.append(hash_encode(merkle[0]))
535                     target_hash = new_hash
536                 n.append(new_hash)
537                 merkle = merkle[2:]
538             merkle = n
539
540         # send result
541         return {"block_height": block_height, "merkle": s, "pos": tx_pos}
542
543     def memorypool_update(store):
544         ds = BCDataStream.BCDataStream()
545         postdata = dumps({"method": 'getrawmempool', 'params': [], 'id': 'jsonrpc'})
546         respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
547
548         r = loads(respdata)
549         if r['error'] is not None:
550             print_log(r['error'])
551             return
552
553         mempool_hashes = r.get('result')
554         num_new_tx = 0
555
556         for tx_hash in mempool_hashes:
557
558             if tx_hash in store.known_mempool_hashes:
559                 continue
560             store.known_mempool_hashes.append(tx_hash)
561             num_new_tx += 1
562
563             postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash], 'id': 'jsonrpc'})
564             respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
565             r = loads(respdata)
566             if r['error'] is not None:
567                 continue
568             hextx = r.get('result')
569             ds.clear()
570             ds.write(hextx.decode('hex'))
571             tx = deserialize.parse_Transaction(ds)
572             tx['hash'] = Hash(tx['tx'])
573
574             if store.tx_find_id_and_value(tx):
575                 pass
576             else:
577                 tx_id = store.import_tx(tx, False)
578                 store.update_tx_cache(tx_id)
579                 #print_log(tx_hash)
580
581         store.commit()
582         store.known_mempool_hashes = mempool_hashes
583         return num_new_tx
584
585     def send_tx(self, tx):
586         postdata = dumps({"method": 'sendrawtransaction', 'params': [tx], 'id': 'jsonrpc'})
587         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
588         r = loads(respdata)
589         if r['error'] is not None:
590             msg = r['error'].get('message')
591             out = "error: transaction rejected by memorypool: " + msg + "\n" + tx
592         else:
593             out = r['result']
594         return out
595
596     def main_iteration(self):
597         with self.lock:
598             t1 = time.time()
599             self.catch_up()
600             t2 = time.time()
601             time_catch_up = t2 - t1
602             n = self.memorypool_update()
603             time_mempool = time.time() - t2
604             height = self.get_block_number(self.chain_id)
605
606         with self.cache_lock:
607             try:
608                 self.chunk_cache.pop(height/2016)
609             except:
610                 pass
611
612         block_header = self.get_block_header(height)
613         return block_header, time_catch_up, time_mempool, n
614
615     def catch_up(store):
616         # if there is an exception, do rollback and then re-raise the exception
617         for dircfg in store.datadirs:
618             try:
619                 store.catch_up_dir(dircfg)
620             except Exception, e:
621                 store.log.exception("Failed to catch up %s", dircfg)
622                 store.rollback()
623                 raise e
624
625
626 class BlockchainProcessor(Processor):
627
628     def __init__(self, config, shared):
629         Processor.__init__(self)
630         self.store = AbeStore(config)
631         self.watched_addresses = []
632         self.shared = shared
633
634         # catch_up first
635         self.block_header, time_catch_up, time_mempool, n = self.store.main_iteration()
636         self.block_number = self.block_header.get('block_height')
637         print_log("blockchain: %d blocks" % self.block_number)
638
639         threading.Timer(10, self.run_store_iteration).start()
640
641     def add_request(self, request):
642         # see if we can get if from cache. if not, add to queue
643         if self.process(request, cache_only=True) == -1:
644             self.queue.put(request)
645
646     def process(self, request, cache_only=False):
647         #print_log("abe process", request)
648
649         message_id = request['id']
650         method = request['method']
651         params = request.get('params', [])
652         result = None
653         error = None
654
655         if method == 'blockchain.numblocks.subscribe':
656             result = self.block_number
657
658         elif method == 'blockchain.headers.subscribe':
659             result = self.block_header
660
661         elif method == 'blockchain.address.subscribe':
662             try:
663                 address = params[0]
664                 result = self.store.get_status(address, cache_only)
665                 self.watch_address(address)
666             except Exception, e:
667                 error = str(e) + ': ' + address
668                 print_log("error:", error)
669
670         elif method == 'blockchain.address.get_history':
671             try:
672                 address = params[0]
673                 result = self.store.get_history(address, cache_only)
674             except Exception, e:
675                 error = str(e) + ': ' + address
676                 print_log("error:", error)
677
678         elif method == 'blockchain.block.get_header':
679             if cache_only:
680                 result = -1
681             else:
682                 try:
683                     height = params[0]
684                     result = self.store.get_block_header(height)
685                 except Exception, e:
686                     error = str(e) + ': %d' % height
687                     print_log("error:", error)
688
689         elif method == 'blockchain.block.get_chunk':
690             if cache_only:
691                 result = -1
692             else:
693                 try:
694                     index = params[0]
695                     result = self.store.get_chunk(index)
696                 except Exception, e:
697                     error = str(e) + ': %d' % index
698                     print_log("error:", error)
699
700         elif method == 'blockchain.transaction.broadcast':
701             txo = self.store.send_tx(params[0])
702             print_log("sent tx:", txo)
703             result = txo
704
705         elif method == 'blockchain.transaction.get_merkle':
706             if cache_only:
707                 result = -1
708             else:
709                 try:
710                     tx_hash = params[0]
711                     result = self.store.get_tx_merkle(tx_hash)
712                 except Exception, e:
713                     error = str(e) + ': ' + tx_hash
714                     print_log("error:", error)
715
716         elif method == 'blockchain.transaction.get':
717             try:
718                 tx_hash = params[0]
719                 height = params[1]
720                 result = self.store.get_raw_tx(tx_hash)
721             except Exception, e:
722                 error = str(e) + ': ' + tx_hash
723                 print_log("error:", error)
724
725         else:
726             error = "unknown method:%s" % method
727
728         if cache_only and result == -1:
729             return -1
730
731         if error:
732             response = {'id': message_id, 'error': error}
733             self.push_response(response)
734         elif result != '':
735             response = {'id': message_id, 'result': result}
736             self.push_response(response)
737
738     def watch_address(self, addr):
739         if addr not in self.watched_addresses:
740             self.watched_addresses.append(addr)
741
742     def run_store_iteration(self):
743         try:
744             block_header, time_catch_up, time_mempool, n = self.store.main_iteration()
745         except:
746             traceback.print_exc(file=sys.stdout)
747             print_log("terminating")
748             self.shared.stop()
749
750         if self.shared.stopped():
751             print_log("exit timer")
752             return
753
754         #print_log("block number: %d  (%.3fs)  mempool:%d (%.3fs)"%(self.block_number, time_catch_up, n, time_mempool))
755
756         if self.block_number != block_header.get('block_height'):
757             self.block_number = block_header.get('block_height')
758             print_log("block number: %d  (%.3fs)" % (self.block_number, time_catch_up))
759             self.push_response({'id': None, 'method': 'blockchain.numblocks.subscribe', 'params': [self.block_number]})
760
761         if self.block_header != block_header:
762             self.block_header = block_header
763             self.push_response({'id': None, 'method': 'blockchain.headers.subscribe', 'params': [self.block_header]})
764
765         while True:
766             try:
767                 addr = self.store.address_queue.get(False)
768             except:
769                 break
770             if addr in self.watched_addresses:
771                 try:
772                     status = self.store.get_status(addr)
773                     self.push_response({'id': None, 'method': 'blockchain.address.subscribe', 'params': [addr, status]})
774                 except:
775                     break
776
777         threading.Timer(10, self.run_store_iteration).start()