self.shared = shared
self.config = config
self.up_to_date = False
- self.watched_addresses = []
+
+ self.watch_lock = threading.Lock()
+ self.watch_blocks = []
+ self.watch_headers = []
+ self.watched_addresses = {}
+
self.history_cache = {}
self.chunk_cache = {}
self.cache_lock = threading.Lock()
self.mempool_addresses = {}
self.mempool_hist = {}
- self.mempool_hashes = []
+ self.mempool_hashes = set([])
self.mempool_lock = threading.Lock()
self.address_queue = Queue()
for addr in self.batch_list.keys():
self.invalidate_cache(addr)
- def add_request(self, request):
+ def add_request(self, session, request):
# see if we can get if from cache. if not, add to queue
- if self.process(request, cache_only=True) == -1:
- self.queue.put(request)
+ if self.process(session, request, cache_only=True) == -1:
+ self.queue.put((session, request))
+
+
- def process(self, request, cache_only=False):
- #print "abe process", request
+ def process(self, session, request, cache_only=False):
+
message_id = request['id']
method = request['method']
params = request.get('params', [])
error = None
if method == 'blockchain.numblocks.subscribe':
+ with self.watch_lock:
+ if session not in self.watch_blocks:
+ self.watch_blocks.append(session)
result = self.height
elif method == 'blockchain.headers.subscribe':
+ with self.watch_lock:
+ if session not in self.watch_headers:
+ self.watch_headers.append(session)
result = self.header
elif method == 'blockchain.address.subscribe':
try:
address = params[0]
result = self.get_status(address, cache_only)
- self.watch_address(address)
- except BaseException, e:
- error = str(e) + ': ' + address
- print_log("error:", error)
+ with self.watch_lock:
+ l = self.watched_addresses.get(address)
+ if l is None:
+ self.watched_addresses[address] = [session]
+ elif session not in l:
+ l.append(session)
- elif method == 'blockchain.address.unsubscribe':
- try:
- password = params[0]
- address = params[1]
- if password == self.config.get('server', 'password'):
- self.watched_addresses.remove(address)
- # print_log('unsubscribed', address)
- result = "ok"
- else:
- print_log('incorrect password')
- result = "authentication error"
except BaseException, e:
error = str(e) + ': ' + address
print_log("error:", error)
+
elif method == 'blockchain.address.get_history':
try:
address = params[0]
return -1
if error:
- self.push_response({'id': message_id, 'error': error})
+ self.push_response(session, {'id': message_id, 'error': error})
elif result != '':
- self.push_response({'id': message_id, 'result': result})
+ self.push_response(session, {'id': message_id, 'result': result})
- def watch_address(self, addr):
- if addr not in self.watched_addresses:
- self.watched_addresses.append(addr)
def getfullblock(self, block_hash):
block = self.bitcoind('getblock', [block_hash])
self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
+
def memorypool_update(self):
- mempool_hashes = self.bitcoind('getrawmempool')
+ mempool_hashes = set(self.bitcoind('getrawmempool'))
+ touched_addresses = set([])
- touched_addresses = []
for tx_hash in mempool_hashes:
if tx_hash in self.mempool_hashes:
continue
addr = x.get('address')
if addr and addr not in mpa:
mpa.append(addr)
- touched_addresses.append(addr)
+ touched_addresses.add(addr)
for x in tx.get('outputs'):
addr = x.get('address')
if addr and addr not in mpa:
mpa.append(addr)
- touched_addresses.append(addr)
+ touched_addresses.add(addr)
self.mempool_addresses[tx_hash] = mpa
- self.mempool_hashes.append(tx_hash)
+ self.mempool_hashes.add(tx_hash)
# remove older entries from mempool_hashes
self.mempool_hashes = mempool_hashes
if tx_hash not in self.mempool_hashes:
self.mempool_addresses.pop(tx_hash)
for addr in addresses:
- touched_addresses.append(addr)
+ touched_addresses.add(addr)
# rebuild mempool histories
new_mempool_hist = {}
print_log("cache: invalidating", address)
self.history_cache.pop(address)
- if address in self.watched_addresses:
+ with self.watch_lock:
+ sessions = self.watched_addresses.get(address)
+
+ if sessions:
# TODO: update cache here. if new value equals cached value, do not send notification
- self.address_queue.put(address)
+ self.address_queue.put((address,sessions))
def main_iteration(self):
if self.shared.stopped():
if self.sent_height != self.height:
self.sent_height = self.height
- self.push_response({
- 'id': None,
- 'method': 'blockchain.numblocks.subscribe',
- 'params': [self.height],
- })
+ for session in self.watch_blocks:
+ self.push_response(session, {
+ 'id': None,
+ 'method': 'blockchain.numblocks.subscribe',
+ 'params': [self.height],
+ })
if self.sent_header != self.header:
print_log("blockchain: %d (%.3fs)" % (self.height, t2 - t1))
self.sent_header = self.header
- self.push_response({
- 'id': None,
- 'method': 'blockchain.headers.subscribe',
- 'params': [self.header],
- })
+ for session in self.watch_headers:
+ self.push_response(session, {
+ 'id': None,
+ 'method': 'blockchain.headers.subscribe',
+ 'params': [self.header],
+ })
while True:
try:
- addr = self.address_queue.get(False)
+ addr, sessions = self.address_queue.get(False)
except:
break
- if addr in self.watched_addresses:
- status = self.get_status(addr)
- self.push_response({
- 'id': None,
- 'method': 'blockchain.address.subscribe',
- 'params': [addr, status],
- })
+
+ status = self.get_status(addr)
+ for session in sessions:
+ self.push_response(session, {
+ 'id': None,
+ 'method': 'blockchain.address.subscribe',
+ 'params': [addr, status],
+ })
if not self.shared.stopped():
threading.Timer(10, self.main_iteration).start()