self.shared = shared
self.config = config
self.up_to_date = False
- self.watched_addresses = []
+
+ self.watch_lock = threading.Lock()
+ self.watch_blocks = []
+ self.watch_headers = []
+ self.watched_addresses = {}
+
self.history_cache = {}
self.chunk_cache = {}
self.cache_lock = threading.Lock()
self.dblock = threading.Lock()
try:
- self.db = leveldb.LevelDB(self.dbpath)
+ self.db = leveldb.LevelDB(self.dbpath, paranoid_checks=True)
except:
traceback.print_exc(file=sys.stdout)
self.shared.stop()
config.get('bitcoind', 'host'),
config.get('bitcoind', 'port'))
+ while True:
+ try:
+ self.bitcoind('getinfo')
+ break
+ except:
+ print_log('cannot contact bitcoind...')
+ time.sleep(5)
+ continue
+
self.height = 0
self.is_test = False
self.sent_height = 0
print_log('initializing database')
self.height = 0
self.last_hash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
+ db_version = self.db_version
# check version
if self.db_version != db_version:
shared.stop()
sys.exit(0)
- print_log("blockchain is up to date.")
+ print_log("Blockchain is up to date.")
+ self.memorypool_update()
+ print_log("Memory pool initialized.")
threading.Timer(10, self.main_iteration).start()
def get_mempool_transaction(self, txid):
try:
- raw_tx = self.bitcoind('getrawtransaction', [txid, 0, -1])
+ raw_tx = self.bitcoind('getrawtransaction', [txid, 0])
except:
return None
vds = deserialize.BCDataStream()
vds.write(raw_tx.decode('hex'))
-
- return deserialize.parse_Transaction(vds, is_coinbase=False)
+ try:
+ return deserialize.parse_Transaction(vds, is_coinbase=False)
+ except:
+ print_log("ERROR: cannot parse", txid)
+ return None
def get_history(self, addr, cache_only=False):
with self.cache_lock:
for i in range(l-1, -1, -1):
item = serialized_hist[80*i:80*(i+1)]
item_height = int(rev_hex(item[36:39].encode('hex')), 16)
- if item_height < tx_height:
+ if item_height <= tx_height:
serialized_hist = serialized_hist[0:80*(i+1)] + s + serialized_hist[80*(i+1):]
break
else:
serialized_hist = serialized_hist[0:80*i] + new_item + serialized_hist[80*(i+1):]
break
else:
+ self.shared.stop()
hist = self.deserialize(serialized_hist)
raise BaseException("prevout not found", addr, hist, txi.encode('hex'))
is_coinbase = True
for raw_tx in txlist:
tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
- tx_hashes.append(tx_hash)
vds = deserialize.BCDataStream()
vds.write(raw_tx.decode('hex'))
- tx = deserialize.parse_Transaction(vds, is_coinbase)
+ try:
+ tx = deserialize.parse_Transaction(vds, is_coinbase)
+ except:
+ print_log("ERROR: cannot parse", tx_hash)
+ continue
+ tx_hashes.append(tx_hash)
txdict[tx_hash] = tx
is_coinbase = False
return tx_hashes, txdict
for txi in block_inputs:
try:
addr = self.db.Get(txi)
- except:
- # print "addr not in db", txi.encode('hex')
+ except KeyError:
# the input could come from the same block
continue
+ except:
+ traceback.print_exc(file=sys.stdout)
+ self.shared.stop()
+ raise
+
self.batch_txio[txi] = addr
addr_to_read.append(addr)
for addr in addr_to_read:
try:
self.batch_list[addr] = self.db.Get(addr)
- except:
+ except KeyError:
self.batch_list[addr] = ''
+ except:
+ traceback.print_exc(file=sys.stdout)
+ self.shared.stop()
+ raise
# process
for addr in self.batch_list.keys():
self.invalidate_cache(addr)
- def add_request(self, request):
+ def add_request(self, session, request):
# see if we can get if from cache. if not, add to queue
- if self.process(request, cache_only=True) == -1:
- self.queue.put(request)
+ if self.process(session, request, cache_only=True) == -1:
+ self.queue.put((session, request))
+
- def process(self, request, cache_only=False):
- #print "abe process", request
+
+ def process(self, session, request, cache_only=False):
+
message_id = request['id']
method = request['method']
params = request.get('params', [])
error = None
if method == 'blockchain.numblocks.subscribe':
+ with self.watch_lock:
+ if session not in self.watch_blocks:
+ self.watch_blocks.append(session)
result = self.height
elif method == 'blockchain.headers.subscribe':
+ with self.watch_lock:
+ if session not in self.watch_headers:
+ self.watch_headers.append(session)
result = self.header
elif method == 'blockchain.address.subscribe':
try:
address = params[0]
result = self.get_status(address, cache_only)
- self.watch_address(address)
- except BaseException, e:
- error = str(e) + ': ' + address
- print_log("error:", error)
+ with self.watch_lock:
+ l = self.watched_addresses.get(address)
+ if l is None:
+ self.watched_addresses[address] = [session]
+ elif session not in l:
+ l.append(session)
- elif method == 'blockchain.address.unsubscribe':
- try:
- password = params[0]
- address = params[1]
- if password == self.config.get('server', 'password'):
- self.watched_addresses.remove(address)
- # print_log('unsubscribed', address)
- result = "ok"
- else:
- print_log('incorrect password')
- result = "authentication error"
except BaseException, e:
error = str(e) + ': ' + address
print_log("error:", error)
+
elif method == 'blockchain.address.get_history':
try:
address = params[0]
elif method == 'blockchain.transaction.get':
try:
tx_hash = params[0]
- height = params[1]
- result = self.bitcoind('getrawtransaction', [tx_hash, 0, height])
+ result = self.bitcoind('getrawtransaction', [tx_hash, 0])
except BaseException, e:
error = str(e) + ': ' + repr(params)
print_log("tx get error:", error)
return -1
if error:
- self.push_response({'id': message_id, 'error': error})
+ self.push_response(session, {'id': message_id, 'error': error})
elif result != '':
- self.push_response({'id': message_id, 'result': result})
+ self.push_response(session, {'id': message_id, 'result': result})
+
+
+ def getfullblock(self, block_hash):
+ block = self.bitcoind('getblock', [block_hash])
+
+ rawtxreq = []
+ i = 0
+ for txid in block['tx']:
+ rawtxreq.append({
+ "method": "getrawtransaction",
+ "params": [txid],
+ "id": i,
+ })
+ i += 1
+
+ postdata = dumps(rawtxreq)
+ try:
+ respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
+ except:
+ traceback.print_exc(file=sys.stdout)
+ self.shared.stop()
- def watch_address(self, addr):
- if addr not in self.watched_addresses:
- self.watched_addresses.append(addr)
+ r = loads(respdata)
+ rawtxdata = []
+ for ir in r:
+ if ir['error'] is not None:
+ self.shared.stop()
+ print_log("Error: make sure you run bitcoind with txindex=1; use -reindex if needed.")
+ raise BaseException(ir['error'])
+ rawtxdata.append(ir['result'])
+ block['tx'] = rawtxdata
+ return block
def catch_up(self, sync=True):
t1 = time.time()
# not done..
self.up_to_date = False
next_block_hash = self.bitcoind('getblockhash', [self.height + 1])
- next_block = self.bitcoind('getblock', [next_block_hash, 1])
+ next_block = self.getfullblock(next_block_hash)
# fixme: this is unsafe, if we revert when the undo info is not yet written
revert = (random.randint(1, 100) == 1) if self.is_test else False
else:
# revert current block
- block = self.bitcoind('getblock', [self.last_hash, 1])
+ block = self.getfullblock(self.last_hash)
print_log("blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash)
self.import_block(block, self.last_hash, self.height, sync, revert=True)
self.pop_header()
def memorypool_update(self):
mempool_hashes = self.bitcoind('getrawmempool')
+ touched_addresses = []
for tx_hash in mempool_hashes:
if tx_hash in self.mempool_hashes:
continue
if not tx:
continue
+ mpa = self.mempool_addresses.get(tx_hash, [])
for x in tx.get('inputs'):
- txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
- try:
- addr = self.db.Get(txi)
- except:
- continue
- l = self.mempool_addresses.get(tx_hash, [])
- if addr not in l:
- l.append(addr)
- self.mempool_addresses[tx_hash] = l
+ # we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions
+ addr = x.get('address')
+ if addr and addr not in mpa:
+ mpa.append(addr)
+ touched_addresses.append(addr)
for x in tx.get('outputs'):
addr = x.get('address')
- l = self.mempool_addresses.get(tx_hash, [])
- if addr not in l:
- l.append(addr)
- self.mempool_addresses[tx_hash] = l
+ if addr and addr not in mpa:
+ mpa.append(addr)
+ touched_addresses.append(addr)
+ self.mempool_addresses[tx_hash] = mpa
self.mempool_hashes.append(tx_hash)
# remove older entries from mempool_hashes
for tx_hash, addresses in self.mempool_addresses.items():
if tx_hash not in self.mempool_hashes:
self.mempool_addresses.pop(tx_hash)
+ for addr in addresses:
+ touched_addresses.append(addr)
# rebuild mempool histories
new_mempool_hist = {}
h.append(tx_hash)
new_mempool_hist[addr] = h
- # invalidate cache for mempool addresses whose mempool history has changed
- new_mempool_hist_keys = new_mempool_hist.keys()
- self_mempool_hist_keys = self.mempool_hist.keys()
-
- for addr in new_mempool_hist_keys:
- if addr in self_mempool_hist_keys:
- if self.mempool_hist[addr] != new_mempool_hist[addr]:
- self.invalidate_cache(addr)
- else:
- self.invalidate_cache(addr)
-
- # invalidate cache for addresses that are removed from mempool ?
- # this should not be necessary if they go into a block, but they might not
- for addr in self_mempool_hist_keys:
- if addr not in new_mempool_hist_keys:
- self.invalidate_cache(addr)
-
-
with self.mempool_lock:
self.mempool_hist = new_mempool_hist
+ # invalidate cache for touched addresses
+ for addr in touched_addresses:
+ self.invalidate_cache(addr)
+
+
def invalidate_cache(self, address):
with self.cache_lock:
if address in self.history_cache:
self.history_cache.pop(address)
if address in self.watched_addresses:
+ # TODO: update cache here. if new value equals cached value, do not send notification
self.address_queue.put(address)
def main_iteration(self):
t2 = time.time()
self.memorypool_update()
- t3 = time.time()
- # print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2)
if self.sent_height != self.height:
self.sent_height = self.height
- self.push_response({
- 'id': None,
- 'method': 'blockchain.numblocks.subscribe',
- 'params': [self.height],
- })
+ for session in self.watch_blocks:
+ self.push_response(session, {
+ 'id': None,
+ 'method': 'blockchain.numblocks.subscribe',
+ 'params': [self.height],
+ })
if self.sent_header != self.header:
print_log("blockchain: %d (%.3fs)" % (self.height, t2 - t1))
self.sent_header = self.header
- self.push_response({
- 'id': None,
- 'method': 'blockchain.headers.subscribe',
- 'params': [self.header],
- })
+ for session in self.watch_headers:
+ self.push_response(session, {
+ 'id': None,
+ 'method': 'blockchain.headers.subscribe',
+ 'params': [self.header],
+ })
while True:
try:
addr = self.address_queue.get(False)
except:
break
- if addr in self.watched_addresses:
- status = self.get_status(addr)
- self.push_response({
- 'id': None,
- 'method': 'blockchain.address.subscribe',
- 'params': [addr, status],
- })
+
+ status = self.get_status(addr)
+ for session in self.watched_addresses[addr]:
+ self.push_response(session, {
+ 'id': None,
+ 'method': 'blockchain.address.subscribe',
+ 'params': [addr, status],
+ })
if not self.shared.stopped():
threading.Timer(10, self.main_iteration).start()