import deserialize
import ast, time, threading, hashlib
from Queue import Queue
-import traceback, sys, os
-
-
-
-Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
-hash_encode = lambda x: x[::-1].encode('hex')
-hash_decode = lambda x: x.decode('hex')[::-1]
-
-
-
-def rev_hex(s):
- return s.decode('hex')[::-1].encode('hex')
-
-
-def int_to_hex(i, length=1):
- s = hex(i)[2:].rstrip('L')
- s = "0"*(2*length - len(s)) + s
- return rev_hex(s)
-
-def header_to_string(res):
- pbh = res.get('prev_block_hash')
- if pbh is None: pbh = '0'*64
- s = int_to_hex(res.get('version'),4) \
- + rev_hex(pbh) \
- + rev_hex(res.get('merkle_root')) \
- + int_to_hex(int(res.get('timestamp')),4) \
- + int_to_hex(int(res.get('bits')),4) \
- + int_to_hex(int(res.get('nonce')),4)
- return s
-
-def header_from_string( s):
- hex_to_int = lambda s: eval('0x' + s[::-1].encode('hex'))
- h = {}
- h['version'] = hex_to_int(s[0:4])
- h['prev_block_hash'] = hash_encode(s[4:36])
- h['merkle_root'] = hash_encode(s[36:68])
- h['timestamp'] = hex_to_int(s[68:72])
- h['bits'] = hex_to_int(s[72:76])
- h['nonce'] = hex_to_int(s[76:80])
- return h
-
-
+import traceback, sys, os, random
+from util import Hash, hash_encode, hash_decode, rev_hex, int_to_hex
+from util import bc_address_to_hash_160, hash_160_to_bc_address, header_to_string, header_from_string
from processor import Processor, print_log
class BlockchainProcessor(Processor):
Processor.__init__(self)
self.shared = shared
+ self.config = config
self.up_to_date = False
self.watched_addresses = []
self.history_cache = {}
config.get('bitcoind','port'))
self.height = 0
+ self.is_test = False
self.sent_height = 0
self.sent_header = None
print_log( "catching up missing headers:", height, db_height)
try:
- while height != db_height:
+ while height < db_height:
height = height + 1
header = self.get_header(height)
if height>1:
with self.dblock:
try:
- hist = self.deserialize(self.db.Get(addr))
+ hash_160 = bc_address_to_hash_160(addr)
+ hist = self.deserialize(self.db.Get(hash_160))
is_known = True
except:
hist = []
self.batch_txio[txo] = addr
- def remove_from_history(self, tx_hash, tx_pos):
+ def remove_from_history(self, addr, tx_hash, tx_pos):
txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
- try:
- addr = self.batch_txio[txi]
- except:
- raise BaseException(tx_hash, tx_pos)
- print "WARNING: cannot find address for", (tx_hash, tx_pos)
- return
+ if addr is None:
+ try:
+ addr = self.batch_txio[txi]
+ except:
+ raise BaseException(tx_hash, tx_pos)
+
serialized_hist = self.batch_list[addr]
l = len(serialized_hist)/40
for i in range(l):
- if serialized_hist[40*i:40*i+36] == txi:
+ item = serialized_hist[40*i:40*(i+1)]
+ if item[0:36] == txi:
+ height = int( rev_hex( item[36:40].encode('hex') ), 16 )
serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):]
break
else:
+ hist = self.deserialize(serialized_hist)
raise BaseException("prevout not found", addr, hist, tx_hash, tx_pos)
+
self.batch_list[addr] = serialized_hist
+ return height, addr
def deserialize_block(self, block):
is_coinbase = False
return tx_hashes, txdict
+ def get_undo_info(self, height):
+ s = self.db.Get("undo%d"%(height%100))
+ return eval(s)
+
+ def write_undo_info(self, batch, height, undo_info):
+ if self.is_test or height > self.bitcoind_height - 100:
+ batch.Put("undo%d"%(height%100), repr(undo_info))
+
def import_block(self, block, block_hash, block_height, sync, revert=False):
self.batch_list = {} # address -> history
self.batch_txio = {} # transaction i/o -> address
- inputs_to_read = []
+ block_inputs = []
+ block_outputs = []
addr_to_read = []
# deserialize transactions
t00 = time.time()
- if revert:
- # read addresses of tx outputs
- for tx_hash, tx in txdict.items():
- for x in tx.get('outputs'):
- txo = (tx_hash + int_to_hex(x.get('index'), 4)).decode('hex')
- self.batch_txio[txo] = x.get('address')
- else:
+
+ if not revert:
# read addresses of tx inputs
for tx in txdict.values():
for x in tx.get('inputs'):
txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
- inputs_to_read.append(txi)
+ block_inputs.append(txi)
- inputs_to_read.sort()
- for txi in inputs_to_read:
+ block_inputs.sort()
+ for txi in block_inputs:
try:
addr = self.db.Get(txi)
except:
self.batch_txio[txi] = addr
addr_to_read.append(addr)
-
+ else:
+ for txid, tx in txdict.items():
+ for x in tx.get('outputs'):
+ txo = (txid + int_to_hex(x.get('index'), 4)).decode('hex')
+ block_outputs.append(txo)
+
# read histories of addresses
for txid, tx in txdict.items():
for x in tx.get('outputs'):
- addr_to_read.append(x.get('address'))
+ hash_160 = bc_address_to_hash_160(x.get('address'))
+ addr_to_read.append(hash_160)
addr_to_read.sort()
for addr in addr_to_read:
self.batch_list[addr] = self.db.Get(addr)
except:
self.batch_list[addr] = ''
-
+
+
+ if revert:
+ undo_info = self.get_undo_info(block_height)
+ # print "undo", block_height, undo_info
+ else: undo_info = {}
+
# process
t1 = time.time()
+ if revert: tx_hashes = tx_hashes[::-1]
for txid in tx_hashes: # must be ordered
tx = txdict[txid]
if not revert:
+
+ undo = []
for x in tx.get('inputs'):
- self.remove_from_history( x.get('prevout_hash'), x.get('prevout_n'))
+ prevout_height, prevout_addr = self.remove_from_history( None, x.get('prevout_hash'), x.get('prevout_n'))
+ undo.append( (prevout_height, prevout_addr) )
+ undo_info[txid] = undo
+
for x in tx.get('outputs'):
- self.add_to_history( x.get('address'), txid, x.get('index'), block_height)
+ hash_160 = bc_address_to_hash_160(x.get('address'))
+ self.add_to_history( hash_160, txid, x.get('index'), block_height)
+
else:
for x in tx.get('outputs'):
- self.remove_from_history( txid, x.get('index'))
+ hash_160 = bc_address_to_hash_160(x.get('address'))
+ self.remove_from_history( hash_160, txid, x.get('index'))
+ i = 0
for x in tx.get('inputs'):
- prevout_height = self.db.Get(x['prevout_hash'].decode('hex'))
- try:
- # note: this will fail if the block containing txi is part of the reorg and has been orphaned by bitcoind
- txi = self.get_transaction(x.get('prevout_hash'), prevout_height )
- except:
- # so, if if it fails, we need to read the block containing txi
- prevout_block_hash = self.db.Get('%d'%prevout_height)
- prevout_block = self.bitcoind('getblock', [prevout_block_hash, 1])
- for txc in prevout_block['tx']:
- if hash_encode(Hash(txc)) == prevout_hash:
- raw_txi = txc
- break
- else:
- raise BaseException('txi not found')
-
- vds = deserialize.BCDataStream()
- vds.write(raw_txi.decode('hex'))
- txi = deserialize.parse_Transaction(vds, False)
-
- print "txi", txi
- output = txi.get('outputs')[x.get('prevout_n')]
- prevout_addr = output['address']
- self.batch_list[prevout_addr] = self.db.Get(prevout_addr)
- # no longer chronological..
+ prevout_height, prevout_addr = undo_info.get(txid)[i]
+ i += 1
+
+ # read the history into batch list
+ if self.batch_list.get(prevout_addr) is None:
+ self.batch_list[prevout_addr] = self.db.Get(prevout_addr)
+
+ # re-add them to the history
self.add_to_history( prevout_addr, x.get('prevout_hash'), x.get('prevout_n'), prevout_height)
- print "new hist", self.deserialize(self.batch_list[prevout_addr])
+ print_log( "new hist for", hash_160_to_bc_address(prevout_addr), self.deserialize(self.batch_list[prevout_addr]) )
# write
max_len = 0
max_len = l
max_addr = addr
- for txio, addr in self.batch_txio.items():
- batch.Put(txio, addr)
- # delete spent inputs
- for txi in inputs_to_read:
- batch.Delete(txi)
-
- # add tx -> height
- for txid in tx_hashes:
- batch.Put(txid.decode('hex'), "%d"%block_height)
- # add height -> block_hash
- batch.Put("%d"%block_height, block_hash)
+ if not revert:
+ # add new created outputs
+ for txio, addr in self.batch_txio.items():
+ batch.Put(txio, addr)
+ # delete spent inputs
+ for txi in block_inputs:
+ batch.Delete(txi)
+ # add undo info
+ self.write_undo_info(batch, block_height, undo_info)
+ else:
+ # restore spent inputs
+ for txio, addr in self.batch_txio.items():
+ batch.Put(txio, addr)
+ # delete spent outputs
+ for txo in block_outputs:
+ batch.Delete(txo)
+
+
# add the max
batch.Put('height', self.serialize( [(block_hash, block_height, 0)] ) )
"read:%0.2f "%(t1 - t00),
"proc:%.2f "%(t2-t1),
"write:%.2f "%(t3-t2),
- "max:", max_len, max_addr)
+ "max:", max_len, hash_160_to_bc_address(max_addr))
- for addr in self.batch_list.keys(): self.invalidate_cache(addr)
+ for h160 in self.batch_list.keys():
+ addr = hash_160_to_bc_address(h160)
+ self.invalidate_cache(addr)
error = str(e) + ': ' + address
print_log( "error:", error )
- elif method == 'blockchain.address.subscribe2':
+ elif method == 'blockchain.address.unsubscribe':
try:
- address = params[0]
- result = self.get_status(address, cache_only)
- self.watch_address(address)
+ password = params[0]
+ address = params[1]
+ if password == self.config.get('server','password'):
+ self.watched_addresses.remove(address)
+ print_log('unsubscribed', address)
+ result = "ok"
+ else:
+ print_log('incorrect password')
+ result = "authentication error"
except BaseException, e:
error = str(e) + ': ' + address
print_log( "error:", error )
- elif method == 'blockchain.address.get_history2':
+ elif method == 'blockchain.address.get_history':
try:
address = params[0]
result = self.get_history( address, cache_only )
def catch_up(self, sync = True):
- # a reorg in bitcoind id not synchronous with my database
- #
- # -------> F ------> G -------> H
- # /
- # /
- # A ------> B --------> C ------> E
- #
- # we always compare the hash in the headers file to the hash returned by bitcoind
-
t1 = time.time()
# are we done yet?
info = self.bitcoind('getinfo')
- bitcoind_height = info.get('blocks')
- bitcoind_block_hash = self.bitcoind('getblockhash', [bitcoind_height])
+ self.bitcoind_height = info.get('blocks')
+ bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
if self.last_hash == bitcoind_block_hash:
self.up_to_date = True
break
next_block_hash = self.bitcoind('getblockhash', [self.height+1])
next_block = self.bitcoind('getblock', [next_block_hash, 1])
- if next_block.get('previousblockhash') == self.last_hash:
+ # fixme: this is unsafe, if we revert when the undo info is not yet written
+ revert = (random.randint(1, 100)==1) if self.is_test else False
+
+ if (next_block.get('previousblockhash') == self.last_hash) and not revert:
self.import_block(next_block, next_block_hash, self.height+1, sync)
self.height = self.height + 1
else:
# revert current block
block = self.bitcoind('getblock', [self.last_hash, 1])
- print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash )
+ print_log( "blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash )
self.import_block(block, self.last_hash, self.height, sync, revert=True)
self.pop_header()
+ self.flush_headers()
self.height = self.height -1
# read previous header from disk
- self.header = self.read_header(self.height)
+ self.header = self.read_header(self.height)
self.last_hash = self.hash_header(self.header)
for x in tx.get('inputs'):
txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
try:
- addr = self.db.Get(txi)
+ h160 = self.db.Get(txi)
+ addr = hash_160_to_bc_address(h160)
except:
continue
l = self.mempool_addresses.get(tx_hash, [])
if addr in self.watched_addresses:
status = self.get_status( addr )
self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
- self.push_response({ 'id': None, 'method':'blockchain.address.subscribe2', 'params':[addr, status] })
-
if not self.shared.stopped():
threading.Timer(10, self.main_iteration).start()