X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=backends%2Fbitcoind%2Fblockchain_processor.py;h=68b81172bd388b1bcbb4a2e102f80e881a8e1368;hb=00f536451ff1cdc6031c477e601b22ffda502120;hp=756946d6ed6a958e363879f05640519b5ec909e4;hpb=86b0c5b5b0e5a3e072104d3fbde3927bb584f699;p=electrum-server.git diff --git a/backends/bitcoind/blockchain_processor.py b/backends/bitcoind/blockchain_processor.py index 756946d..68b8117 100644 --- a/backends/bitcoind/blockchain_processor.py +++ b/backends/bitcoind/blockchain_processor.py @@ -42,7 +42,7 @@ class BlockchainProcessor(Processor): self.dblock = threading.Lock() try: - self.db = leveldb.LevelDB(self.dbpath) + self.db = leveldb.LevelDB(self.dbpath, paranoid_checks=True) except: traceback.print_exc(file=sys.stdout) self.shared.stop() @@ -53,6 +53,15 @@ class BlockchainProcessor(Processor): config.get('bitcoind', 'host'), config.get('bitcoind', 'port')) + while True: + try: + self.bitcoind('getinfo') + break + except: + print_log('cannot contact bitcoind...') + time.sleep(5) + continue + self.height = 0 self.is_test = False self.sent_height = 0 @@ -241,14 +250,17 @@ class BlockchainProcessor(Processor): def get_mempool_transaction(self, txid): try: - raw_tx = self.bitcoind('getrawtransaction', [txid, 0, -1]) + raw_tx = self.bitcoind('getrawtransaction', [txid, 0]) except: return None vds = deserialize.BCDataStream() vds.write(raw_tx.decode('hex')) - - return deserialize.parse_Transaction(vds, is_coinbase=False) + try: + return deserialize.parse_Transaction(vds, is_coinbase=False) + except: + print_log("ERROR: cannot parse", txid) + return None def get_history(self, addr, cache_only=False): with self.cache_lock: @@ -448,10 +460,14 @@ class BlockchainProcessor(Processor): is_coinbase = True for raw_tx in txlist: tx_hash = hash_encode(Hash(raw_tx.decode('hex'))) - tx_hashes.append(tx_hash) vds = deserialize.BCDataStream() vds.write(raw_tx.decode('hex')) - tx = deserialize.parse_Transaction(vds, is_coinbase) + try: + tx = deserialize.parse_Transaction(vds, is_coinbase) + except: + print_log("ERROR: cannot parse", tx_hash) + continue + tx_hashes.append(tx_hash) txdict[tx_hash] = tx is_coinbase = False return tx_hashes, txdict @@ -746,8 +762,7 @@ class BlockchainProcessor(Processor): elif method == 'blockchain.transaction.get': try: tx_hash = params[0] - height = params[1] - result = self.bitcoind('getrawtransaction', [tx_hash, 0, height]) + result = self.bitcoind('getrawtransaction', [tx_hash, 0]) except BaseException, e: error = str(e) + ': ' + repr(params) print_log("tx get error:", error) @@ -767,6 +782,37 @@ class BlockchainProcessor(Processor): if addr not in self.watched_addresses: self.watched_addresses.append(addr) + def getfullblock(self, block_hash): + block = self.bitcoind('getblock', [block_hash]) + + rawtxreq = [] + i = 0 + for txid in block['tx']: + rawtxreq.append({ + "method": "getrawtransaction", + "params": [txid], + "id": i, + }) + i += 1 + + postdata = dumps(rawtxreq) + try: + respdata = urllib.urlopen(self.bitcoind_url, postdata).read() + except: + traceback.print_exc(file=sys.stdout) + self.shared.stop() + + r = loads(respdata) + rawtxdata = [] + for ir in r: + if ir['error'] is not None: + self.shared.stop() + print_log("Error: make sure you run bitcoind with txindex=1; use -reindex if needed.") + raise BaseException(ir['error']) + rawtxdata.append(ir['result']) + block['tx'] = rawtxdata + return block + def catch_up(self, sync=True): t1 = time.time() @@ -782,7 +828,7 @@ class BlockchainProcessor(Processor): # not done.. self.up_to_date = False next_block_hash = self.bitcoind('getblockhash', [self.height + 1]) - next_block = self.bitcoind('getblock', [next_block_hash, 1]) + next_block = self.getfullblock(next_block_hash) # fixme: this is unsafe, if we revert when the undo info is not yet written revert = (random.randint(1, 100) == 1) if self.is_test else False @@ -801,7 +847,7 @@ class BlockchainProcessor(Processor): else: # revert current block - block = self.bitcoind('getblock', [self.last_hash, 1]) + block = self.getfullblock(self.last_hash) print_log("blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash) self.import_block(block, self.last_hash, self.height, sync, revert=True) self.pop_header() @@ -827,33 +873,21 @@ class BlockchainProcessor(Processor): if not tx: continue + mpa = self.mempool_addresses.get(tx_hash, []) for x in tx.get('inputs'): - txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex') - try: - addr = self.db.Get(txi) - except: - tx_prev = self.get_mempool_transaction(x.get('prevout_hash')) - try: - addr = tx_prev['outputs'][x.get('prevout_n')]['address'] - if not addr: continue - except: - continue - l = self.mempool_addresses.get(tx_hash, []) - if addr not in l: - l.append(addr) - self.mempool_addresses[tx_hash] = l - #if addr not in touched_addresses: + # we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions + addr = x.get('address') + if addr and addr not in mpa: + mpa.append(addr) touched_addresses.append(addr) for x in tx.get('outputs'): addr = x.get('address') - l = self.mempool_addresses.get(tx_hash, []) - if addr not in l: - l.append(addr) - self.mempool_addresses[tx_hash] = l - #if addr not in touched_addresses: + if addr and addr not in mpa: + mpa.append(addr) touched_addresses.append(addr) + self.mempool_addresses[tx_hash] = mpa self.mempool_hashes.append(tx_hash) # remove older entries from mempool_hashes @@ -864,7 +898,6 @@ class BlockchainProcessor(Processor): if tx_hash not in self.mempool_hashes: self.mempool_addresses.pop(tx_hash) for addr in addresses: - #if addr not in touched_addresses: touched_addresses.append(addr) # rebuild mempool histories