import deserialize
import ast, time, threading, hashlib
from Queue import Queue
-import traceback, sys, os
+import traceback, sys, os, random
config.get('bitcoind','port'))
self.height = 0
+ self.is_test = False
self.sent_height = 0
self.sent_header = None
try:
- hist = self.deserialize(self.db.Get('0'))
- hh, self.height, _ = hist[0]
- self.block_hashes = [hh]
+ hist = self.deserialize(self.db.Get('height'))
+ self.last_hash, self.height, _ = hist[0]
print_log( "hist", hist )
except:
#traceback.print_exc(file=sys.stdout)
print_log('initializing database')
self.height = 0
- self.block_hashes = [ '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' ]
+ self.last_hash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
# catch_up headers
self.init_headers(self.height)
shared.stop()
sys.exit(0)
- print "blockchain is up to date."
+ print_log( "blockchain is up to date." )
threading.Timer(10, self.main_iteration).start()
self.chunk_cache = {}
self.headers_filename = os.path.join( self.dbpath, 'blockchain_headers')
- height = 0
if os.path.exists(self.headers_filename):
- height = os.path.getsize(self.headers_filename)/80
-
- if height:
- prev_header = self.read_header(height -1)
- prev_hash = self.hash_header(prev_header)
+ height = os.path.getsize(self.headers_filename)/80 - 1 # the current height
+ if height > 0:
+ prev_hash = self.hash_header(self.read_header(height))
+ else:
+ prev_hash = None
else:
open(self.headers_filename,'wb').close()
prev_hash = None
+ height = -1
- if height != db_height:
+ if height < db_height:
print_log( "catching up missing headers:", height, db_height)
- s = ''
try:
- for i in range(height, db_height):
- header = self.get_header(i)
- assert prev_hash == header.get('prev_block_hash')
+ while height != db_height:
+ height = height + 1
+ header = self.get_header(height)
+ if height>1:
+ assert prev_hash == header.get('prev_block_hash')
self.write_header(header, sync=False)
prev_hash = self.hash_header(header)
- if i%1000==0: print_log("headers file:",i)
+ if height%1000==0: print_log("headers file:",height)
except KeyboardInterrupt:
self.flush_headers()
sys.exit()
def write_header(self, header, sync=True):
if not self.headers_data:
self.headers_offset = header.get('block_height')
+
self.headers_data += header_to_string(header).decode('hex')
if sync or len(self.headers_data) > 40*100:
self.flush_headers()
return {"block_height":height, "merkle":s, "pos":tx_pos}
- def add_to_batch(self, addr, tx_hash, tx_pos, tx_height):
- # we do it chronologically, so nothing wrong can happen...
+
+ def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
+
+ # keep it sorted
s = (tx_hash + int_to_hex(tx_pos, 4) + int_to_hex(tx_height, 4)).decode('hex')
- self.batch_list[addr] += s
+
+ serialized_hist = self.batch_list[addr]
+
+ l = len(serialized_hist)/40
+ for i in range(l-1, -1, -1):
+ item = serialized_hist[40*i:40*(i+1)]
+ item_height = int( rev_hex( item[36:40].encode('hex') ), 16 )
+ if item_height < tx_height:
+ serialized_hist = serialized_hist[0:40*(i+1)] + s + serialized_hist[40*(i+1):]
+ break
+ else:
+ serialized_hist = s + serialized_hist
+
+ self.batch_list[addr] = serialized_hist
# backlink
txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
self.batch_txio[txo] = addr
- def remove_from_batch(self, tx_hash, tx_pos):
+ def remove_from_history(self, tx_hash, tx_pos):
txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
try:
addr = self.batch_txio[txi]
except:
- #raise BaseException(tx_hash, tx_pos)
+ raise BaseException(tx_hash, tx_pos)
print "WARNING: cannot find address for", (tx_hash, tx_pos)
return
l = len(serialized_hist)/40
for i in range(l):
- if serialized_hist[40*i:40*i+36] == txi:
+ item = serialized_hist[40*i:40*(i+1)]
+ if item[0:36] == txi:
+ height = int( rev_hex( item[36:40].encode('hex') ), 16 )
serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):]
break
else:
raise BaseException("prevout not found", addr, hist, tx_hash, tx_pos)
+
self.batch_list[addr] = serialized_hist
+ return height, addr
def deserialize_block(self, block):
is_coinbase = False
return tx_hashes, txdict
+ def get_undo_info(self, height):
+ s = self.db.Get("undo%d"%(height%100))
+ return eval(s)
+
+ def write_undo_info(self, batch, height, undo_info):
+ batch.Put("undo%d"%(height%100), repr(undo_info))
+
def import_block(self, block, block_hash, block_height, sync, revert=False):
t0 = time.time()
tx_hashes, txdict = self.deserialize_block(block)
- # read addresses of tx inputs
t00 = time.time()
- for tx in txdict.values():
- for x in tx.get('inputs'):
- txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
- inputs_to_read.append(txi)
- inputs_to_read.sort()
- for txi in inputs_to_read:
- try:
- addr = self.db.Get(txi)
- except:
- # the input could come from the same block
- continue
- self.batch_txio[txi] = addr
- addr_to_read.append(addr)
+ if revert:
+ # read addresses of tx outputs
+ for tx_hash, tx in txdict.items():
+ for x in tx.get('outputs'):
+ txo = (tx_hash + int_to_hex(x.get('index'), 4)).decode('hex')
+ self.batch_txio[txo] = x.get('address')
+ else:
+ # read addresses of tx inputs
+ for tx in txdict.values():
+ for x in tx.get('inputs'):
+ txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
+ inputs_to_read.append(txi)
+
+ inputs_to_read.sort()
+ for txi in inputs_to_read:
+ try:
+ addr = self.db.Get(txi)
+ except:
+ # the input could come from the same block
+ continue
+ self.batch_txio[txi] = addr
+ addr_to_read.append(addr)
+
# read histories of addresses
for txid, tx in txdict.items():
self.batch_list[addr] = self.db.Get(addr)
except:
self.batch_list[addr] = ''
-
+
+
+ if revert:
+ undo_info = self.get_undo_info(block_height)
+ print "undo", block_height, undo_info
+ else: undo_info = {}
+
# process
t1 = time.time()
for txid in tx_hashes: # must be ordered
tx = txdict[txid]
if not revert:
+
+ undo = []
for x in tx.get('inputs'):
- self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n'))
+ prevout_height, prevout_addr = self.remove_from_history( x.get('prevout_hash'), x.get('prevout_n'))
+ undo.append( (prevout_height, prevout_addr) )
+ undo_info[txid] = undo
+
for x in tx.get('outputs'):
- self.add_to_batch( x.get('address'), txid, x.get('index'), block_height)
+ self.add_to_history( x.get('address'), txid, x.get('index'), block_height)
+
else:
for x in tx.get('outputs'):
- self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n'))
+ self.remove_from_history( txid, x.get('index'))
+
+ i = 0
for x in tx.get('inputs'):
- self.add_to_batch( x.get('address'), txid, x.get('index'), block_height)
+ prevout_height, prevout_addr = undo_info.get(txid)[i]
+ i += 1
+
+ # read the history into batch list
+ self.batch_list[prevout_addr] = self.db.Get(prevout_addr)
+ # re-add them to the history
+ self.add_to_history( prevout_addr, x.get('prevout_hash'), x.get('prevout_n'), prevout_height)
+ print "new hist", self.deserialize(self.batch_list[prevout_addr])
# write
max_len = 0
# delete spent inputs
for txi in inputs_to_read:
batch.Delete(txi)
- batch.Put('0', self.serialize( [(block_hash, block_height, 0)] ) )
+
+ # add undo info
+ if not revert: self.write_undo_info(batch, block_height, undo_info)
+
+ # add the max
+ batch.Put('height', self.serialize( [(block_hash, block_height, 0)] ) )
# actual write
self.db.Write(batch, sync = sync)
t3 = time.time()
- if t3 - t0 > 10:
+ if t3 - t0 > 10 and not sync:
print_log("block", block_height,
"parse:%0.2f "%(t00 - t0),
"read:%0.2f "%(t1 - t00),
- def last_hash(self):
- return self.block_hashes[-1]
-
-
def catch_up(self, sync = True):
+
t1 = time.time()
while not self.shared.stopped():
info = self.bitcoind('getinfo')
bitcoind_height = info.get('blocks')
bitcoind_block_hash = self.bitcoind('getblockhash', [bitcoind_height])
- if self.last_hash() == bitcoind_block_hash:
+ if self.last_hash == bitcoind_block_hash:
self.up_to_date = True
break
# not done..
self.up_to_date = False
- block_hash = self.bitcoind('getblockhash', [self.height+1])
- block = self.bitcoind('getblock', [block_hash, 1])
+ next_block_hash = self.bitcoind('getblockhash', [self.height+1])
+ next_block = self.bitcoind('getblock', [next_block_hash, 1])
- if block.get('previousblockhash') == self.last_hash():
+ revert = (random.randint(1, 1000)!=1) if self.is_test else False
+ if (next_block.get('previousblockhash') == self.last_hash) and not revert:
- self.import_block(block, block_hash, self.height+1, sync)
+ self.import_block(next_block, next_block_hash, self.height+1, sync)
self.height = self.height + 1
- self.write_header(self.block2header(block), sync)
-
- self.block_hashes.append(block_hash)
- self.block_hashes = self.block_hashes[-10:]
+ self.write_header(self.block2header(next_block), sync)
+ self.last_hash = next_block_hash
- if (self.height+1)%100 == 0 and not sync:
+ if (self.height)%100 == 0 and not sync:
t2 = time.time()
print_log( "catch_up: block %d (%.3fs)"%( self.height, t2 - t1 ) )
t1 = t2
-
else:
# revert current block
- print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash() )
- block_hash = self.last_hash()
- block = self.bitcoind('getblock', [block_hash, 1])
- self.height = self.height -1
+ block = self.bitcoind('getblock', [self.last_hash, 1])
+ print_log( "blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash )
+ self.import_block(block, self.last_hash, self.height, sync, revert=True)
self.pop_header()
+ self.flush_headers()
- self.block_hashes.remove(block_hash)
- self.import_block(block, self.last_hash(), self.height, revert=True)
+ self.height = self.height -1
+
+ # read previous header from disk
+ self.header = self.read_header(self.height)
+ self.last_hash = self.hash_header(self.header)
- self.header = self.block2header(self.bitcoind('getblock', [self.last_hash()]))
+ self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
+
-
def memorypool_update(self):
self.mempool_addresses.pop(tx_hash)
# rebuild histories
- with self.mempool_lock:
- self.mempool_hist = {}
- for tx_hash, addresses in self.mempool_addresses.items():
- for addr in addresses:
- h = self.mempool_hist.get(addr, [])
- if tx_hash not in h:
- h.append( tx_hash )
- self.mempool_hist[addr] = h
- self.invalidate_cache(addr)
+ new_mempool_hist = {}
+ for tx_hash, addresses in self.mempool_addresses.items():
+ for addr in addresses:
+ h = new_mempool_hist.get(addr, [])
+ if tx_hash not in h:
+ h.append( tx_hash )
+ new_mempool_hist[addr] = h
+
+ for addr in new_mempool_hist.keys():
+ if addr in self.mempool_hist.keys():
+ if self.mempool_hist[addr] != new_mempool_hist[addr]:
+ self.invalidate_cache(addr)
+ else:
+ self.invalidate_cache(addr)
+ with self.mempool_lock:
+ self.mempool_hist = new_mempool_hist
print_log( "cache: invalidating", address )
self.history_cache.pop(address)
+ if address in self.watched_addresses:
+ self.address_queue.put(address)
+
def main_iteration(self):
if addr in self.watched_addresses:
status = self.get_status( addr )
self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
+ self.push_response({ 'id': None, 'method':'blockchain.address.subscribe2', 'params':[addr, status] })
if not self.shared.stopped():