self.monitor_output = {}
# key is address
self.monitor_address = set()
- # affected
- self.affected = {}
backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed)
addrs.append((output_index, str(address)))
return tx_hash, previous_outputs, addrs
- def tx_stored(self, tx):
+ def effect_notify(self, tx, delete_outs):
affected_addrs = set()
tx_hash, previous_outputs, addrs = self.unpack(tx)
for prevout in previous_outputs:
- with self.lock:
- if self.monitor_output.has_key(prevout):
+ try:
+ with self.lock:
affected_addrs.add(self.monitor_output[prevout])
+ if delete_outs:
+ del self.monitor_output[prevout]
+ except KeyError:
+ pass
for idx, address in addrs:
with self.lock:
if address in self.monitor_address:
affected_addrs.add(address)
- with self.lock:
- self.affected[tx_hash] = affected_addrs
self.cache.clear(affected_addrs)
self.notify(affected_addrs)
+ # Used in confirmed txs
+ return tx_hash, addrs, affected_addrs
+
+ def tx_stored(self, tx):
+ self.effect_notify(tx, False)
def tx_confirmed(self, tx):
- tx_hash, previous_outputs, addrs = self.unpack(tx)
- with self.lock:
- affected_addrs = self.affected[tx_hash]
- del self.affected[tx_hash]
- self.cache.clear(affected_addrs)
- self.notify(affected_addrs)
+ tx_hash, addrs, affected_addrs = self.effect_notify(tx, True)
# add new outputs to monitor
for idx, address in addrs:
outpoint = "%s:%s" % (tx_hash, idx)
- with self.lock:
- if address in affected_addrs:
+ if address in affected_addrs:
+ with self.lock:
self.monitor_output[outpoint] = address
- # delete spent outpoints
- for prevout in previous_outputs:
- with self.lock:
- if self.monitor_output.has_key(prevout):
- del self.monitor_output[prevout]
def notify(self, affected_addrs):
service = self.backend.mempool_service
def subscribe(self, request):
latest = self.latest.get()
response = {"id": request["id"],
- "method": "blockchain.numblocks.subscribe",
"result": latest,
"error": None}
self.processor.push_response(response)
response = {"id": request["id"], "result": tx_hash, "error": None}
self.push_response(response)
- def run(self):
- print "Warning: pre-alpha prototype. Full of bugs."
- while not self.shared.stopped():
- time.sleep(1)
-