import deserialize
import ast, time, threading, hashlib
from Queue import Queue
-import traceback, sys, os
+import traceback, sys, os, random
config.get('bitcoind','port'))
self.height = 0
+ self.is_test = False
self.sent_height = 0
self.sent_header = None
print_log( "catching up missing headers:", height, db_height)
try:
- while height != db_height:
+ while height < db_height:
height = height + 1
header = self.get_header(height)
if height>1:
self.batch_txio[txo] = addr
- def remove_from_history(self, tx_hash, tx_pos):
+ def remove_from_history(self, addr, tx_hash, tx_pos):
txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
- try:
- addr = self.batch_txio[txi]
- except:
- raise BaseException(tx_hash, tx_pos)
- print "WARNING: cannot find address for", (tx_hash, tx_pos)
- return
+ if addr is None:
+ try:
+ addr = self.batch_txio[txi]
+ except:
+ raise BaseException(tx_hash, tx_pos)
+
serialized_hist = self.batch_list[addr]
l = len(serialized_hist)/40
for i in range(l):
- if serialized_hist[40*i:40*i+36] == txi:
+ item = serialized_hist[40*i:40*(i+1)]
+ if item[0:36] == txi:
+ height = int( rev_hex( item[36:40].encode('hex') ), 16 )
serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):]
break
else:
+ hist = self.deserialize(serialized_hist)
raise BaseException("prevout not found", addr, hist, tx_hash, tx_pos)
+
self.batch_list[addr] = serialized_hist
+ return height, addr
def deserialize_block(self, block):
is_coinbase = False
return tx_hashes, txdict
+ def get_undo_info(self, height):
+ s = self.db.Get("undo%d"%(height%100))
+ return eval(s)
+
+ def write_undo_info(self, batch, height, undo_info):
+ batch.Put("undo%d"%(height%100), repr(undo_info))
+
def import_block(self, block, block_hash, block_height, sync, revert=False):
self.batch_list = {} # address -> history
self.batch_txio = {} # transaction i/o -> address
- inputs_to_read = []
+ block_inputs = []
+ block_outputs = []
addr_to_read = []
# deserialize transactions
t00 = time.time()
- if revert:
- # read addresses of tx outputs
- for tx_hash, tx in txdict.items():
- for x in tx.get('outputs'):
- txo = (tx_hash + int_to_hex(x.get('index'), 4)).decode('hex')
- self.batch_txio[txo] = x.get('address')
- else:
+
+ if not revert:
# read addresses of tx inputs
for tx in txdict.values():
for x in tx.get('inputs'):
txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
- inputs_to_read.append(txi)
+ block_inputs.append(txi)
- inputs_to_read.sort()
- for txi in inputs_to_read:
+ block_inputs.sort()
+ for txi in block_inputs:
try:
addr = self.db.Get(txi)
except:
self.batch_txio[txi] = addr
addr_to_read.append(addr)
+ else:
+ for txid, tx in txdict.items():
+ for x in tx.get('outputs'):
+ txo = (txid + int_to_hex(x.get('index'), 4)).decode('hex')
+ block_outputs.append(txo)
+
+
# read histories of addresses
for txid, tx in txdict.items():
self.batch_list[addr] = self.db.Get(addr)
except:
self.batch_list[addr] = ''
-
+
+
+ if revert:
+ undo_info = self.get_undo_info(block_height)
+ print "undo", block_height, undo_info
+ else: undo_info = {}
+
# process
t1 = time.time()
for txid in tx_hashes: # must be ordered
tx = txdict[txid]
if not revert:
+
+ undo = []
for x in tx.get('inputs'):
- self.remove_from_history( x.get('prevout_hash'), x.get('prevout_n'))
+ prevout_height, prevout_addr = self.remove_from_history( None, x.get('prevout_hash'), x.get('prevout_n'))
+ undo.append( (prevout_height, prevout_addr) )
+ undo_info[txid] = undo
+
for x in tx.get('outputs'):
self.add_to_history( x.get('address'), txid, x.get('index'), block_height)
+
else:
for x in tx.get('outputs'):
- self.remove_from_history( txid, x.get('index'))
+ self.remove_from_history( x.get('address'), txid, x.get('index'))
+ i = 0
for x in tx.get('inputs'):
- prevout_height = self.db.Get(x['prevout_hash'].decode('hex'))
- try:
- # note: this will fail if the block containing txi is part of the reorg and has been orphaned by bitcoind
- txi = self.get_transaction(x.get('prevout_hash'), prevout_height )
- except:
- # so, if if it fails, we need to read the block containing txi
- prevout_block_hash = self.db.Get('%d'%prevout_height)
- prevout_block = self.bitcoind('getblock', [prevout_block_hash, 1])
- for txc in prevout_block['tx']:
- if hash_encode(Hash(txc)) == prevout_hash:
- raw_txi = txc
- break
- else:
- raise BaseException('txi not found')
-
- vds = deserialize.BCDataStream()
- vds.write(raw_txi.decode('hex'))
- txi = deserialize.parse_Transaction(vds, False)
-
- print "txi", txi
- output = txi.get('outputs')[x.get('prevout_n')]
- prevout_addr = output['address']
+ prevout_height, prevout_addr = undo_info.get(txid)[i]
+ i += 1
+
+ # read the history into batch list
self.batch_list[prevout_addr] = self.db.Get(prevout_addr)
- # no longer chronological..
+ # re-add them to the history
self.add_to_history( prevout_addr, x.get('prevout_hash'), x.get('prevout_n'), prevout_height)
- print "new hist", self.deserialize(self.batch_list[prevout_addr])
+ print "new hist for", prevout_addr, self.deserialize(self.batch_list[prevout_addr])
# write
max_len = 0
max_len = l
max_addr = addr
- for txio, addr in self.batch_txio.items():
- batch.Put(txio, addr)
- # delete spent inputs
- for txi in inputs_to_read:
- batch.Delete(txi)
-
- # add tx -> height
- for txid in tx_hashes:
- batch.Put(txid.decode('hex'), "%d"%block_height)
- # add height -> block_hash
- batch.Put("%d"%block_height, block_hash)
+ if not revert:
+ # add new created outputs
+ for txio, addr in self.batch_txio.items():
+ batch.Put(txio, addr)
+ # delete spent inputs
+ for txi in block_inputs:
+ batch.Delete(txi)
+ # add undo info
+ self.write_undo_info(batch, block_height, undo_info)
+ else:
+ # restore spent inputs
+ for txio, addr in self.batch_txio.items():
+ batch.Put(txio, addr)
+ # delete spent outputs
+ for txo in block_outputs:
+ batch.Delete(txo)
+
+
# add the max
batch.Put('height', self.serialize( [(block_hash, block_height, 0)] ) )
def catch_up(self, sync = True):
- # a reorg in bitcoind id not synchronous with my database
- #
- # -------> F ------> G -------> H
- # /
- # /
- # A ------> B --------> C ------> E
- #
- # we always compare the hash in the headers file to the hash returned by bitcoind
-
t1 = time.time()
next_block_hash = self.bitcoind('getblockhash', [self.height+1])
next_block = self.bitcoind('getblock', [next_block_hash, 1])
- if next_block.get('previousblockhash') == self.last_hash:
+ revert = (random.randint(1, 100)==1) if self.is_test else False
+ if (next_block.get('previousblockhash') == self.last_hash) and not revert:
self.import_block(next_block, next_block_hash, self.height+1, sync)
self.height = self.height + 1
else:
# revert current block
block = self.bitcoind('getblock', [self.last_hash, 1])
- print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash )
+ print_log( "blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash )
self.import_block(block, self.last_hash, self.height, sync, revert=True)
self.pop_header()
+ self.flush_headers()
self.height = self.height -1
# read previous header from disk
- self.header = self.read_header(self.height)
+ self.header = self.read_header(self.height)
self.last_hash = self.hash_header(self.header)