X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=backends%2Fbitcoind%2Fblockchain_processor.py;h=a1f382268dc5c1edfef64430372b1d58497a337f;hb=b9d74456c47ed3a54c73185d449ff7f23cb6edf4;hp=1de5db643846ef92787611e26d132e483f78c53d;hpb=dec881f99cdda62dfe0459d55a2be129ba491bae;p=electrum-server.git diff --git a/backends/bitcoind/blockchain_processor.py b/backends/bitcoind/blockchain_processor.py index 1de5db6..a1f3822 100644 --- a/backends/bitcoind/blockchain_processor.py +++ b/backends/bitcoind/blockchain_processor.py @@ -38,6 +38,7 @@ class BlockchainProcessor(Processor): self.headers_data = '' self.headers_path = config.get('leveldb', 'path_fulltree') + self.mempool_values = {} self.mempool_addresses = {} self.mempool_hist = {} self.mempool_hashes = set([]) @@ -87,7 +88,8 @@ class BlockchainProcessor(Processor): self.memorypool_update() print_log("Memory pool initialized.") - threading.Timer(10, self.main_iteration).start() + self.timer = threading.Timer(10, self.main_iteration) + self.timer.start() @@ -111,6 +113,7 @@ class BlockchainProcessor(Processor): try: respdata = urllib.urlopen(self.bitcoind_url, postdata).read() except: + print_log("error calling bitcoind") traceback.print_exc(file=sys.stdout) self.shared.stop() @@ -252,6 +255,7 @@ class BlockchainProcessor(Processor): hist = self.storage.get_history(addr) is_known = True except: + print_log("error get_history") self.shared.stop() raise if hist: @@ -262,7 +266,7 @@ class BlockchainProcessor(Processor): # add memory pool with self.mempool_lock: - for txid in self.mempool_hist.get(addr, []): + for txid, delta in self.mempool_hist.get(addr, []): hist.append({'tx_hash':txid, 'height':0}) # add something to distinguish between unused and empty addresses @@ -274,6 +278,14 @@ class BlockchainProcessor(Processor): return hist + def get_unconfirmed_value(self, addr): + v = 0 + with self.mempool_lock: + for txid, delta in self.mempool_hist.get(addr, []): + v += delta + return v + + def get_status(self, addr, cache_only=False): tx_points = self.get_history(addr, cache_only) if cache_only and tx_points == -1: @@ -446,7 +458,7 @@ class BlockchainProcessor(Processor): if session in l: l.remove(session) if session in l: - print "error rc!!" + print_log("error rc!!") self.shared.stop() if l == []: self.watched_addresses.pop(addr) @@ -482,18 +494,28 @@ class BlockchainProcessor(Processor): error = str(e) + ': ' + address print_log("error:", error) + elif method == 'blockchain.address.get_mempool': + try: + address = str(params[0]) + result = self.get_unconfirmed_history(address, cache_only) + except BaseException, e: + error = str(e) + ': ' + address + print_log("error:", error) + elif method == 'blockchain.address.get_balance': try: address = str(params[0]) - result = self.storage.get_balance(address) + confirmed = self.storage.get_balance(address) + unconfirmed = self.get_unconfirmed_value(address) + result = { 'confirmed':confirmed, 'unconfirmed':unconfirmed } except BaseException, e: error = str(e) + ': ' + address print_log("error:", error) - elif method == 'blockchain.address.get_path': + elif method == 'blockchain.address.get_proof': try: address = str(params[0]) - result = self.storage.get_address_path(address) + result = self.storage.get_proof(address) except BaseException, e: error = str(e) + ': ' + address print_log("error:", error) @@ -506,12 +528,22 @@ class BlockchainProcessor(Processor): error = str(e) + ': ' + address print_log("error:", error) + elif method == 'blockchain.utxo.get_address': + try: + txid = str(params[0]) + pos = int(params[1]) + txi = (txid + int_to_hex(pos, 4)).decode('hex') + result = self.storage.get_address(txi) + except BaseException, e: + error = str(e) + print_log("error:", error, params) + elif method == 'blockchain.block.get_header': if cache_only: result = -1 else: try: - height = params[0] + height = int(params[0]) result = self.get_header(height) except BaseException, e: error = str(e) + ': %d' % height @@ -522,7 +554,7 @@ class BlockchainProcessor(Processor): result = -1 else: try: - index = params[0] + index = int(params[0]) result = self.get_chunk(index) except BaseException, e: error = str(e) + ': %d' % index @@ -586,6 +618,7 @@ class BlockchainProcessor(Processor): try: respdata = urllib.urlopen(self.bitcoind_url, postdata).read() except: + print_log("bitcoind error (getfullblock)") traceback.print_exc(file=sys.stdout) self.shared.stop() @@ -602,7 +635,7 @@ class BlockchainProcessor(Processor): def catch_up(self, sync=True): - prh = None + prev_root_hash = None while not self.shared.stopped(): self.mtime('') @@ -626,6 +659,8 @@ class BlockchainProcessor(Processor): if (next_block.get('previousblockhash') == self.storage.last_hash) and not revert: + prev_root_hash = self.storage.get_root_hash() + self.import_block(next_block, next_block_hash, self.storage.height+1, sync) self.storage.height = self.storage.height + 1 self.write_header(self.block2header(next_block), sync) @@ -639,15 +674,7 @@ class BlockchainProcessor(Processor): self.mtimes['daemon'] = 0 self.mtimes['import'] = 0 - if prh: - if prh != self.storage.get_root_hash().encode('hex'): - print_log("root hash error", prh) - self.shared.stop() - raise - prh = None - else: - prh = self.storage.get_root_hash().encode('hex') # revert current block block = self.getfullblock(self.storage.last_hash) @@ -662,8 +689,13 @@ class BlockchainProcessor(Processor): self.header = self.read_header(self.storage.height) self.storage.last_hash = self.hash_header(self.header) + if prev_root_hash: + assert prev_root_hash == self.storage.get_root_hash() + prev_root_hash = None + self.header = self.block2header(self.bitcoind('getblock', [self.storage.last_hash])) + self.header['utxo_root'] = self.storage.get_root_hash().encode('hex') if self.shared.stopped(): print_log( "closing database" ) @@ -674,6 +706,8 @@ class BlockchainProcessor(Processor): mempool_hashes = set(self.bitcoind('getrawmempool')) touched_addresses = set([]) + # get new transactions + new_tx = {} for tx_hash in mempool_hashes: if tx_hash in self.mempool_hashes: continue @@ -682,40 +716,70 @@ class BlockchainProcessor(Processor): if not tx: continue - mpa = self.mempool_addresses.get(tx_hash, []) + new_tx[tx_hash] = tx + self.mempool_hashes.add(tx_hash) + + # remove older entries from mempool_hashes + self.mempool_hashes = mempool_hashes + + + # check all tx outputs + for tx_hash, tx in new_tx.items(): + mpa = self.mempool_addresses.get(tx_hash, {}) + out_values = [] + for x in tx.get('outputs'): + out_values.append( x['value'] ) + + addr = x.get('address') + if not addr: + continue + v = mpa.get(addr,0) + v += x['value'] + mpa[addr] = v + touched_addresses.add(addr) + + self.mempool_addresses[tx_hash] = mpa + self.mempool_values[tx_hash] = out_values + + # check all inputs + for tx_hash, tx in new_tx.items(): + mpa = self.mempool_addresses.get(tx_hash, {}) for x in tx.get('inputs'): # we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions addr = x.get('address') - if addr and addr not in mpa: - mpa.append(addr) - touched_addresses.add(addr) + if not addr: + continue - for x in tx.get('outputs'): - addr = x.get('address') - if addr and addr not in mpa: - mpa.append(addr) - touched_addresses.add(addr) + v = self.mempool_values.get(x.get('prevout_hash')) + if v: + value = v[ x.get('prevout_n')] + else: + txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex') + value = self.storage.get_utxo_value(addr,txi) + + v = mpa.get(addr,0) + v -= value + mpa[addr] = v + touched_addresses.add(addr) self.mempool_addresses[tx_hash] = mpa - self.mempool_hashes.add(tx_hash) - # remove older entries from mempool_hashes - self.mempool_hashes = mempool_hashes # remove deprecated entries from mempool_addresses for tx_hash, addresses in self.mempool_addresses.items(): if tx_hash not in self.mempool_hashes: self.mempool_addresses.pop(tx_hash) + self.mempool_values.pop(tx_hash) for addr in addresses: touched_addresses.add(addr) # rebuild mempool histories new_mempool_hist = {} for tx_hash, addresses in self.mempool_addresses.items(): - for addr in addresses: + for addr, delta in addresses.items(): h = new_mempool_hist.get(addr, []) if tx_hash not in h: - h.append(tx_hash) + h.append((tx_hash, delta)) new_mempool_hist[addr] = h with self.mempool_lock: @@ -739,10 +803,17 @@ class BlockchainProcessor(Processor): # TODO: update cache here. if new value equals cached value, do not send notification self.address_queue.put((address,sessions)) + + def close(self): + self.timer.join() + print_log("Closing database...") + self.storage.close() + print_log("Database is closed") + + def main_iteration(self): if self.shared.stopped(): - print_log("blockchain processor terminating") - self.storage.close() + print_log("Stopping timer") return with self.dblock: @@ -785,7 +856,7 @@ class BlockchainProcessor(Processor): 'params': [addr, status], }) - if not self.shared.stopped(): - threading.Timer(10, self.main_iteration).start() - else: - print_log("blockchain processor terminating") + # next iteration + self.timer = threading.Timer(10, self.main_iteration) + self.timer.start() +