import ast
import hashlib
from json import dumps, loads
-import leveldb
import os
from Queue import Queue
import random
self.shared = shared
self.config = config
self.up_to_date = False
- self.watched_addresses = []
+
+ self.watch_lock = threading.Lock()
+ self.watch_blocks = []
+ self.watch_headers = []
+ self.watched_addresses = {}
+
self.history_cache = {}
self.chunk_cache = {}
self.cache_lock = threading.Lock()
self.mempool_addresses = {}
self.mempool_hist = {}
- self.mempool_hashes = []
+ self.mempool_hashes = set([])
self.mempool_lock = threading.Lock()
self.address_queue = Queue()
- self.dbpath = config.get('leveldb', 'path')
+
+ try:
+ self.use_plyvel = config.getboolean('leveldb', 'use_plyvel')
+ except:
+ self.use_plyvel = False
+ print_log('use_plyvel:', self.use_plyvel)
+
+ # don't use the same database for plyvel, because python-leveldb uses snappy compression
+ self.dbpath = config.get('leveldb', 'path_plyvel' if self.use_plyvel else 'path')
+
self.pruning_limit = config.getint('leveldb', 'pruning_limit')
self.db_version = 1 # increase this when database needs to be updated
self.dblock = threading.Lock()
try:
- self.db = leveldb.LevelDB(self.dbpath, paranoid_checks=True)
+ if self.use_plyvel:
+ import plyvel
+ self.db = plyvel.DB(self.dbpath, create_if_missing=True, paranoid_checks=None, compression=None)
+ else:
+ import leveldb
+ self.db = leveldb.LevelDB(self.dbpath, paranoid_checks=False)
except:
traceback.print_exc(file=sys.stdout)
self.shared.stop()
self.sent_header = None
try:
- hist = self.deserialize(self.db.Get('height'))
+ hist = self.deserialize(self.db_get('height'))
self.last_hash, self.height, db_version = hist[0]
print_log("Database version", self.db_version)
print_log("Blockchain height", self.height)
threading.Timer(10, self.main_iteration).start()
+
+ def db_get(self, key):
+ if self.use_plyvel:
+ return self.db.get(key)
+ else:
+ try:
+ return self.db.Get(key)
+ except KeyError:
+ return None
+
+ def batch_put(self, batch, key, value):
+ if self.use_plyvel:
+ batch.put(key, value)
+ else:
+ batch.Put(key, value)
+
+ def batch_delete(self, batch, key):
+ if self.use_plyvel:
+ batch.delete(key)
+ else:
+ batch.Delete(key)
+
+ def batch_write(self, batch, sync):
+ if self.use_plyvel:
+ batch.write()#, sync=sync)
+ else:
+ self.db.Write(batch, sync=sync)
+
+
def bitcoind(self, method, params=[]):
postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
try:
def get_mempool_transaction(self, txid):
try:
- raw_tx = self.bitcoind('getrawtransaction', [txid, 0, -1])
+ raw_tx = self.bitcoind('getrawtransaction', [txid, 0])
except:
return None
with self.dblock:
try:
- hist = self.deserialize(self.db.Get(addr))
+ hist = self.deserialize(self.db_get(str((addr))))
is_known = True
except:
+ self.shared.stop()
+ raise
+ if hist:
+ is_known = True
+ else:
hist = []
is_known = False
return tx_hashes, txdict
def get_undo_info(self, height):
- s = self.db.Get("undo%d" % (height % 100))
+ s = self.db_get("undo%d" % (height % 100))
return eval(s)
def write_undo_info(self, batch, height, undo_info):
if self.is_test or height > self.bitcoind_height - 100:
- batch.Put("undo%d" % (height % 100), repr(undo_info))
+ self.batch_put(batch, "undo%d" % (height % 100), repr(undo_info))
def import_block(self, block, block_hash, block_height, sync, revert=False):
self.batch_list = {} # address -> history
self.batch_txio = {} # transaction i/o -> address
- block_inputs = []
- block_outputs = []
- addr_to_read = []
+ block_inputs = set([])
+ block_outputs = set([])
+ addr_to_read = set([])
# deserialize transactions
t0 = time.time()
for tx in txdict.values():
for x in tx.get('inputs'):
txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
- block_inputs.append(txi)
+ block_inputs.add(txi)
- block_inputs.sort()
- for txi in block_inputs:
+ #block_inputs.sort()
+ for txi in sorted(block_inputs):
try:
- addr = self.db.Get(txi)
- except KeyError:
- # the input could come from the same block
- continue
+ addr = self.db_get(txi)
+ if addr is None:
+ # the input could come from the same block
+ continue
except:
traceback.print_exc(file=sys.stdout)
self.shared.stop()
raise
self.batch_txio[txi] = addr
- addr_to_read.append(addr)
+ addr_to_read.add(addr)
else:
for txid, tx in txdict.items():
for x in tx.get('outputs'):
txo = (txid + int_to_hex(x.get('index'), 4)).decode('hex')
- block_outputs.append(txo)
- addr_to_read.append( x.get('address') )
+ block_outputs.add(txo)
+ addr_to_read.add( x.get('address') )
undo = undo_info.get(txid)
for i, x in enumerate(tx.get('inputs')):
addr = undo['prev_addr'][i]
- addr_to_read.append(addr)
-
-
+ addr_to_read.add(addr)
+ #time spent reading txio
+ t000 = time.time()
# read histories of addresses
for txid, tx in txdict.items():
for x in tx.get('outputs'):
- addr_to_read.append(x.get('address'))
+ addr_to_read.add(x.get('address'))
- addr_to_read.sort()
- for addr in addr_to_read:
+ #addr_to_read.sort()
+ for addr in sorted(addr_to_read):
try:
- self.batch_list[addr] = self.db.Get(addr)
- except KeyError:
- self.batch_list[addr] = ''
+ h = self.db_get(addr)
+ self.batch_list[addr] = '' if h is None else h
except:
+ print "db get error", addr
traceback.print_exc(file=sys.stdout)
self.shared.stop()
raise
max_addr = ''
t2 = time.time()
- batch = leveldb.WriteBatch()
+ if self.use_plyvel:
+ batch = self.db.write_batch()
+ else:
+ import leveldb
+ batch = leveldb.WriteBatch()
+
for addr, serialized_hist in self.batch_list.items():
- batch.Put(addr, serialized_hist)
+ self.batch_put(batch, addr, serialized_hist)
l = len(serialized_hist)/80
if l > max_len:
max_len = l
if not revert:
# add new created outputs
for txio, addr in self.batch_txio.items():
- batch.Put(txio, addr)
+ self.batch_put(batch, txio, addr)
# delete spent inputs
for txi in block_inputs:
- batch.Delete(txi)
+ self.batch_delete(batch, txi)
# add undo info
self.write_undo_info(batch, block_height, undo_info)
else:
# restore spent inputs
for txio, addr in self.batch_txio.items():
# print "restoring spent input", repr(txio)
- batch.Put(txio, addr)
+ self.batch_put(batch, txio, addr)
# delete spent outputs
for txo in block_outputs:
- batch.Delete(txo)
+ self.batch_delete(batch, txo)
# add the max
- batch.Put('height', self.serialize([(block_hash, block_height, self.db_version)]))
+ self.batch_put(batch, 'height', self.serialize([(block_hash, block_height, self.db_version)]))
# actual write
- self.db.Write(batch, sync=sync)
+ self.batch_write(batch, sync)
t3 = time.time()
if t3 - t0 > 10 and not sync:
- print_log("block", block_height,
- "parse:%0.2f " % (t00 - t0),
- "read:%0.2f " % (t1 - t00),
- "proc:%.2f " % (t2-t1),
+ print_log("block %d "%block_height,
+ "total:%0.2f " % (t3 - t0),
+ #"parse:%0.2f " % (t00 - t0),
+ "read_txio[%4d]:%0.2f " % (len(block_inputs), t000 - t00),
+ "read_addr[%4d]:%0.2f " % (len(addr_to_read), t1 - t000),
+ #"proc:%.2f " % (t2-t1),
"write:%.2f " % (t3-t2),
"max:", max_len, max_addr)
for addr in self.batch_list.keys():
self.invalidate_cache(addr)
- def add_request(self, request):
+ def add_request(self, session, request):
# see if we can get if from cache. if not, add to queue
- if self.process(request, cache_only=True) == -1:
- self.queue.put(request)
+ if self.process(session, request, cache_only=True) == -1:
+ self.queue.put((session, request))
- def process(self, request, cache_only=False):
- #print "abe process", request
+ def do_subscribe(self, method, params, session):
+ with self.watch_lock:
+ if method == 'blockchain.numblocks.subscribe':
+ if session not in self.watch_blocks:
+ self.watch_blocks.append(session)
+
+ elif method == 'blockchain.headers.subscribe':
+ if session not in self.watch_headers:
+ self.watch_headers.append(session)
+
+ elif method == 'blockchain.address.subscribe':
+ address = params[0]
+ l = self.watched_addresses.get(address)
+ if l is None:
+ self.watched_addresses[address] = [session]
+ elif session not in l:
+ l.append(session)
+
+
+ def do_unsubscribe(self, method, params, session):
+ with self.watch_lock:
+ if method == 'blockchain.numblocks.subscribe':
+ if session in self.watch_blocks:
+ self.watch_blocks.remove(session)
+ elif method == 'blockchain.headers.subscribe':
+ if session in self.watch_headers:
+ self.watch_headers.remove(session)
+ elif method == "blockchain.address.subscribe":
+ addr = params[0]
+ l = self.watched_addresses.get(addr)
+ if not l:
+ return
+ if session in l:
+ l.remove(session)
+ if session in l:
+ print "error rc!!"
+ self.shared.stop()
+ if l == []:
+ self.watched_addresses.pop(addr)
+
+
+ def process(self, session, request, cache_only=False):
+
message_id = request['id']
method = request['method']
params = request.get('params', [])
try:
address = params[0]
result = self.get_status(address, cache_only)
- self.watch_address(address)
- except BaseException, e:
- error = str(e) + ': ' + address
- print_log("error:", error)
-
- elif method == 'blockchain.address.unsubscribe':
- try:
- password = params[0]
- address = params[1]
- if password == self.config.get('server', 'password'):
- self.watched_addresses.remove(address)
- # print_log('unsubscribed', address)
- result = "ok"
- else:
- print_log('incorrect password')
- result = "authentication error"
except BaseException, e:
error = str(e) + ': ' + address
print_log("error:", error)
elif method == 'blockchain.transaction.get':
try:
tx_hash = params[0]
- height = params[1]
- result = self.bitcoind('getrawtransaction', [tx_hash, 0, height])
+ result = self.bitcoind('getrawtransaction', [tx_hash, 0])
except BaseException, e:
error = str(e) + ': ' + repr(params)
print_log("tx get error:", error)
return -1
if error:
- self.push_response({'id': message_id, 'error': error})
+ self.push_response(session, {'id': message_id, 'error': error})
elif result != '':
- self.push_response({'id': message_id, 'result': result})
+ self.push_response(session, {'id': message_id, 'result': result})
+
+
+ def getfullblock(self, block_hash):
+ block = self.bitcoind('getblock', [block_hash])
+
+ rawtxreq = []
+ i = 0
+ for txid in block['tx']:
+ rawtxreq.append({
+ "method": "getrawtransaction",
+ "params": [txid],
+ "id": i,
+ })
+ i += 1
- def watch_address(self, addr):
- if addr not in self.watched_addresses:
- self.watched_addresses.append(addr)
+ postdata = dumps(rawtxreq)
+ try:
+ respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
+ except:
+ traceback.print_exc(file=sys.stdout)
+ self.shared.stop()
+
+ r = loads(respdata)
+ rawtxdata = []
+ for ir in r:
+ if ir['error'] is not None:
+ self.shared.stop()
+ print_log("Error: make sure you run bitcoind with txindex=1; use -reindex if needed.")
+ raise BaseException(ir['error'])
+ rawtxdata.append(ir['result'])
+ block['tx'] = rawtxdata
+ return block
def catch_up(self, sync=True):
t1 = time.time()
# not done..
self.up_to_date = False
next_block_hash = self.bitcoind('getblockhash', [self.height + 1])
- next_block = self.bitcoind('getblock', [next_block_hash, 1])
+ next_block = self.getfullblock(next_block_hash)
# fixme: this is unsafe, if we revert when the undo info is not yet written
revert = (random.randint(1, 100) == 1) if self.is_test else False
else:
# revert current block
- block = self.bitcoind('getblock', [self.last_hash, 1])
+ block = self.getfullblock(self.last_hash)
print_log("blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash)
self.import_block(block, self.last_hash, self.height, sync, revert=True)
self.pop_header()
self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
+ if self.shared.stopped() and self.use_plyvel:
+ print_log( "closing database" )
+ self.db.close()
+
+
def memorypool_update(self):
- mempool_hashes = self.bitcoind('getrawmempool')
+ mempool_hashes = set(self.bitcoind('getrawmempool'))
+ touched_addresses = set([])
- touched_addresses = []
for tx_hash in mempool_hashes:
if tx_hash in self.mempool_hashes:
continue
addr = x.get('address')
if addr and addr not in mpa:
mpa.append(addr)
- touched_addresses.append(addr)
+ touched_addresses.add(addr)
for x in tx.get('outputs'):
addr = x.get('address')
if addr and addr not in mpa:
mpa.append(addr)
- touched_addresses.append(addr)
+ touched_addresses.add(addr)
self.mempool_addresses[tx_hash] = mpa
- self.mempool_hashes.append(tx_hash)
+ self.mempool_hashes.add(tx_hash)
# remove older entries from mempool_hashes
self.mempool_hashes = mempool_hashes
if tx_hash not in self.mempool_hashes:
self.mempool_addresses.pop(tx_hash)
for addr in addresses:
- touched_addresses.append(addr)
+ touched_addresses.add(addr)
# rebuild mempool histories
new_mempool_hist = {}
print_log("cache: invalidating", address)
self.history_cache.pop(address)
- if address in self.watched_addresses:
+ with self.watch_lock:
+ sessions = self.watched_addresses.get(address)
+
+ if sessions:
# TODO: update cache here. if new value equals cached value, do not send notification
- self.address_queue.put(address)
+ self.address_queue.put((address,sessions))
def main_iteration(self):
if self.shared.stopped():
print_log("blockchain processor terminating")
+ if self.use_plyvel:
+ self.db.close()
return
with self.dblock:
if self.sent_height != self.height:
self.sent_height = self.height
- self.push_response({
- 'id': None,
- 'method': 'blockchain.numblocks.subscribe',
- 'params': [self.height],
- })
+ for session in self.watch_blocks:
+ self.push_response(session, {
+ 'id': None,
+ 'method': 'blockchain.numblocks.subscribe',
+ 'params': [self.height],
+ })
if self.sent_header != self.header:
print_log("blockchain: %d (%.3fs)" % (self.height, t2 - t1))
self.sent_header = self.header
- self.push_response({
- 'id': None,
- 'method': 'blockchain.headers.subscribe',
- 'params': [self.header],
- })
+ for session in self.watch_headers:
+ self.push_response(session, {
+ 'id': None,
+ 'method': 'blockchain.headers.subscribe',
+ 'params': [self.header],
+ })
while True:
try:
- addr = self.address_queue.get(False)
+ addr, sessions = self.address_queue.get(False)
except:
break
- if addr in self.watched_addresses:
- status = self.get_status(addr)
- self.push_response({
- 'id': None,
- 'method': 'blockchain.address.subscribe',
- 'params': [addr, status],
- })
+
+ status = self.get_status(addr)
+ for session in sessions:
+ self.push_response(session, {
+ 'id': None,
+ 'method': 'blockchain.address.subscribe',
+ 'params': [addr, status],
+ })
if not self.shared.stopped():
threading.Timer(10, self.main_iteration).start()