import deserialize
import ast, time, threading, hashlib
from Queue import Queue
-import traceback, sys
+import traceback, sys, os
s = "0"*(2*length - len(s)) + s
return rev_hex(s)
+def header_to_string(res):
+ pbh = res.get('prev_block_hash')
+ if pbh is None: pbh = '0'*64
+ s = int_to_hex(res.get('version'),4) \
+ + rev_hex(pbh) \
+ + rev_hex(res.get('merkle_root')) \
+ + int_to_hex(int(res.get('timestamp')),4) \
+ + int_to_hex(int(res.get('bits')),4) \
+ + int_to_hex(int(res.get('nonce')),4)
+ return s
+
+def header_from_string( s):
+ hex_to_int = lambda s: eval('0x' + s[::-1].encode('hex'))
+ h = {}
+ h['version'] = hex_to_int(s[0:4])
+ h['prev_block_hash'] = hash_encode(s[4:36])
+ h['merkle_root'] = hash_encode(s[36:68])
+ h['timestamp'] = hex_to_int(s[68:72])
+ h['bits'] = hex_to_int(s[72:76])
+ h['nonce'] = hex_to_int(s[76:80])
+ return h
+
+
+
from processor import Processor, print_log
self.history_cache = {}
self.chunk_cache = {}
self.cache_lock = threading.Lock()
+ self.headers_data = ''
+ self.mempool_addresses = {}
self.mempool_hist = {}
- self.known_mempool_hashes = []
+ self.mempool_hashes = []
+ self.mempool_lock = threading.Lock()
+
self.address_queue = Queue()
+ self.dbpath = config.get('leveldb', 'path')
self.dblock = threading.Lock()
try:
- self.db = leveldb.LevelDB(config.get('leveldb', 'path'))
+ self.db = leveldb.LevelDB(self.dbpath)
except:
traceback.print_exc(file=sys.stdout)
self.shared.stop()
self.sent_height = 0
self.sent_header = None
+
try:
hist = self.deserialize(self.db.Get('0'))
hh, self.height, _ = hist[0]
self.height = 0
self.block_hashes = [ '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' ]
- # catch_up first
+ # catch_up headers
+ self.init_headers(self.height)
+
threading.Timer(0, lambda: self.catch_up(sync=False)).start()
while not shared.stopped() and not self.up_to_date:
try:
shared.stop()
sys.exit(0)
+ print "blockchain is up to date."
+
+ threading.Timer(10, self.main_iteration).start()
+
def bitcoind(self, method, params=[]):
return self.block2header(b)
- def get_chunk(self):
+ def init_headers(self, db_height):
+ self.chunk_cache = {}
+ self.headers_filename = os.path.join( self.dbpath, 'blockchain_headers')
+
+ height = 0
+ if os.path.exists(self.headers_filename):
+ height = os.path.getsize(self.headers_filename)/80
+
+ if height:
+ prev_header = self.read_header(height -1)
+ prev_hash = self.hash_header(prev_header)
+ else:
+ open(self.headers_filename,'wb').close()
+ prev_hash = None
+
+ if height != db_height:
+ print_log( "catching up missing headers:", height, db_height)
+
+ s = ''
+ try:
+ for i in range(height, db_height):
+ header = self.get_header(i)
+ assert prev_hash == header.get('prev_block_hash')
+ self.write_header(header, sync=False)
+ prev_hash = self.hash_header(header)
+ if i%1000==0: print_log("headers file:",i)
+ except KeyboardInterrupt:
+ self.flush_headers()
+ sys.exit()
+
+ self.flush_headers()
+
+
+ def hash_header(self, header):
+ return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
+
+
+ def read_header(self, block_height):
+ if os.path.exists(self.headers_filename):
+ f = open(self.headers_filename,'rb')
+ f.seek(block_height*80)
+ h = f.read(80)
+ f.close()
+ if len(h) == 80:
+ h = header_from_string(h)
+ return h
+
+
+ def read_chunk(self, index):
+ f = open(self.headers_filename,'rb')
+ f.seek(index*2016*80)
+ chunk = f.read(2016*80)
+ f.close()
+ return chunk.encode('hex')
+
+
+ def write_header(self, header, sync=True):
+ if not self.headers_data:
+ self.headers_offset = header.get('block_height')
+ self.headers_data += header_to_string(header).decode('hex')
+ if sync or len(self.headers_data) > 40*100:
+ self.flush_headers()
+
+ def pop_header(self):
+ # we need to do this only if we have not flushed
+ if self.headers_data:
+ self.headers_data = self.headers_data[:-40]
+
+ def flush_headers(self):
+ if not self.headers_data: return
+ f = open(self.headers_filename,'rb+')
+ f.seek(self.headers_offset*80)
+ f.write(self.headers_data)
+ f.close()
+ self.headers_data = ''
+
+
+ def get_chunk(self, i):
# store them on disk; store the current chunk in memory
- pass
+ chunk = self.chunk_cache.get(i)
+ if not chunk:
+ chunk = self.read_chunk(i)
+ self.chunk_cache[i] = chunk
+ return chunk
def get_transaction(self, txid, block_height=-1, is_coinbase = False):
- t0 = time.time()
raw_tx = self.bitcoind('getrawtransaction', [txid, 0, block_height])
- t1 = time.time()
vds = deserialize.BCDataStream()
vds.write(raw_tx.decode('hex'))
out = deserialize.parse_Transaction(vds, is_coinbase)
- t2 = time.time()
- return out, t1 - t0, t2 - t1
+ return out
def get_history(self, addr, cache_only=False):
with self.dblock:
try:
hist = self.deserialize(self.db.Get(addr))
+ is_known = True
except:
hist = []
+ is_known = False
# should not be necessary
hist.sort( key=lambda tup: tup[1])
# check uniqueness too...
# add memory pool
- for txid in self.mempool_hist.get(addr,[]):
- hist.append((txid, 0))
+ with self.mempool_lock:
+ for txid in self.mempool_hist.get(addr,[]):
+ hist.append((txid, 0, 0))
hist = map(lambda x: {'tx_hash':x[0], 'height':x[2]}, hist)
+ # add something to distinguish between unused and empty addresses
+ if hist == [] and is_known: hist = ['*']
+
with self.cache_lock: self.history_cache[addr] = hist
return hist
if cache_only and tx_points == -1: return -1
if not tx_points: return None
+ if tx_points == ['*']: return '*'
status = ''
for tx in tx_points:
status += tx.get('tx_hash') + ':%d:' % tx.get('height')
return hashlib.sha256( status ).digest().encode('hex')
- def get_merkle(self, target_hash, height):
+ def get_merkle(self, tx_hash, height):
block_hash = self.bitcoind('getblockhash', [height])
b = self.bitcoind('getblock', [block_hash])
- merkle = b.get('tx')
-
+ tx_list = b.get('tx')
+ tx_pos = tx_list.index(tx_hash)
+
+ merkle = map(hash_decode, tx_list)
+ target_hash = hash_decode(tx_hash)
s = []
while len(merkle) != 1:
if len(merkle)%2: merkle.append( merkle[-1] )
while merkle:
new_hash = Hash( merkle[0] + merkle[1] )
if merkle[0] == target_hash:
- s.append( merkle[1])
+ s.append( hash_encode( merkle[1]))
target_hash = new_hash
elif merkle[1] == target_hash:
- s.append( merkle[0])
+ s.append( hash_encode( merkle[0]))
target_hash = new_hash
n.append( new_hash )
merkle = merkle[2:]
"write:%.2f "%(t3-t2),
"max:", max_len, max_addr)
- # invalidate cache
- for addr in self.batch_list.keys(): self.update_history_cache(addr)
+ for addr in self.batch_list.keys(): self.invalidate_cache(addr)
print_log( "error:", error)
elif method == 'blockchain.transaction.broadcast':
- txo = self.bitcoind('sendrawtransaction', params[0])
+ txo = self.bitcoind('sendrawtransaction', params)
print_log( "sent tx:", txo )
result = txo
if block.get('previousblockhash') == self.last_hash():
self.import_block(block, block_hash, self.height+1, sync)
+ self.height = self.height + 1
+ self.write_header(self.block2header(block), sync)
+
+ self.block_hashes.append(block_hash)
+ self.block_hashes = self.block_hashes[-10:]
if (self.height+1)%100 == 0 and not sync:
t2 = time.time()
- print_log( "catch_up: block %d (%.3fs)"%( self.height+1, t2 - t1 ) )
+ print_log( "catch_up: block %d (%.3fs)"%( self.height, t2 - t1 ) )
t1 = t2
- self.height = self.height + 1
- self.block_hashes.append(block_hash)
- self.block_hashes = self.block_hashes[-10:]
else:
# revert current block
block_hash = self.last_hash()
block = self.bitcoind('getblock', [block_hash, 1])
self.height = self.height -1
+ self.pop_header()
+
self.block_hashes.remove(block_hash)
self.import_block(block, self.last_hash(), self.height, revert=True)
mempool_hashes = self.bitcoind('getrawmempool')
for tx_hash in mempool_hashes:
- if tx_hash in self.known_mempool_hashes: continue
- self.known_mempool_hashes.append(tx_hash)
+ if tx_hash in self.mempool_hashes: continue
tx = self.get_transaction(tx_hash)
if not tx: continue
- for x in tx.get('inputs') + tx.get('outputs'):
+ for x in tx.get('inputs'):
+ txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
+ try:
+ addr = self.db.Get(txi)
+ except:
+ continue
+ l = self.mempool_addresses.get(tx_hash, [])
+ if addr not in l:
+ l.append( addr )
+ self.mempool_addresses[tx_hash] = l
+
+ for x in tx.get('outputs'):
addr = x.get('address')
- hist = self.mempool_hist.get(addr, [])
- if tx_hash not in hist:
- hist.append( tx_hash )
- self.mempool_hist[addr] = hist
- self.update_history_cache(addr)
+ l = self.mempool_addresses.get(tx_hash, [])
+ if addr not in l:
+ l.append( addr )
+ self.mempool_addresses[tx_hash] = l
+
+ self.mempool_hashes.append(tx_hash)
+
+ # remove older entries from mempool_hashes
+ self.mempool_hashes = mempool_hashes
+
+ # remove deprecated entries from mempool_addresses
+ for tx_hash, addresses in self.mempool_addresses.items():
+ if tx_hash not in self.mempool_hashes:
+ self.mempool_addresses.pop(tx_hash)
- self.known_mempool_hashes = mempool_hashes
+ # rebuild histories
+ with self.mempool_lock:
+ self.mempool_hist = {}
+ for tx_hash, addresses in self.mempool_addresses.items():
+ for addr in addresses:
+ h = self.mempool_hist.get(addr, [])
+ if tx_hash not in h:
+ h.append( tx_hash )
+ self.mempool_hist[addr] = h
+ self.invalidate_cache(addr)
- def update_history_cache(self, address):
+
+
+ def invalidate_cache(self, address):
with self.cache_lock:
if self.history_cache.has_key(address):
print_log( "cache: invalidating", address )
t1 = time.time()
self.catch_up()
t2 = time.time()
- print_log( "blockchain: %d (%.3fs)"%( self.height+1, t2 - t1 ) )
+
self.memorypool_update()
+ t3 = time.time()
+ # print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2)
+
if self.sent_height != self.height:
self.sent_height = self.height
self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.height] })
if self.sent_header != self.header:
+ print_log( "blockchain: %d (%.3fs)"%( self.height, t2 - t1 ) )
self.sent_header = self.header
self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.header] })