import history
-class Backend:
+class HistoryCache:
def __init__(self):
+ self.lock = threading.Lock()
+ self.cache = {}
+
+ def store(self, address, result):
+ with self.lock:
+ self.cache[address] = result
+
+ def fetch(self, address):
+ try:
+ with self.lock:
+ return self.cache[address]
+ except KeyError:
+ return None
+
+ def clear(self, addresses):
+ with self.lock:
+ for address in addresses:
+ if self.cache.has_key(address):
+ del self.cache[address]
+
+class MonitorAddress:
+
+ def __init__(self, processor, cache):
+ self.processor = processor
+ self.cache = cache
+ self.lock = threading.Lock()
+ # key is hash:index, value is address
+ self.monitor_output = {}
+ # key is address
+ self.monitor_address = set()
+ # affected
+ self.affected = {}
+
+ def monitor(self, address, result):
+ for info in result:
+ if not info.has_key("raw_output_script"):
+ continue
+ assert info["is_input"] == 0
+ tx_hash = info["tx_hash"]
+ output_index = info["index"]
+ outpoint = "%s:%s" % (tx_hash, output_index)
+ with self.lock:
+ self.monitor_output[outpoint] = address
+ with self.lock:
+ self.monitor_address.add(address)
+
+ def tx_stored(self, tx_desc):
+ tx_hash, prevouts, addrs = tx_desc
+ affected_addrs = set()
+ for prevout_hash, prevout_index in prevouts:
+ prevout = "%s:%s" % (prevout_hash, prevout_index)
+ with self.lock:
+ if self.monitor_output.has_key(prevout):
+ affected_addrs.add(self.monitor_output[prevout])
+ for idx, address in addrs:
+ with self.lock:
+ if address in self.monitor_address:
+ affected_addrs.add(address)
+ with self.lock:
+ self.affected[tx_hash] = affected_addrs
+ self.cache.clear(affected_addrs)
+ self.notify(affected_addrs)
+
+ def tx_confirmed(self, tx_desc):
+ tx_hash, prevouts, addrs = tx_desc
+ with self.lock:
+ affected_addrs = self.affected[tx_hash]
+ del self.affected[tx_hash]
+ self.cache.clear(affected_addrs)
+ self.notify(affected_addrs)
+ # add new outputs to monitor
+ for idx, address in addrs:
+ outpoint = "%s:%s" % (tx_hash, idx)
+ with self.lock:
+ if address in affected_addrs:
+ self.monitor_output[outpoint] = address
+ # delete spent outpoints
+ for prevout_hash, prevout_index in prevouts:
+ prevout = "%s:%s" % (prevout_hash, prevout_index)
+ with self.lock:
+ if self.monitor_output.has_key(prevout):
+ del self.monitor_output[prevout]
+
+ def notify(self, affected_addrs):
+ templ_response = {"id": None,
+ "method": "blockchain.address.subscribe",
+ "params": []}
+ chain = self.backend.blockchain
+ txpool = self.backend.transaction_pool
+ membuf = self.backend.pool_buffer
+ for address in affected_addrs:
+ response = templ_response.copy()
+ response["params"].append(address)
+ history.payment_history(chain, txpool, membuf, address,
+ bind(self.send_notify, _1, response))
+
+ def mempool_n(self, result):
+ assert result is not None
+ if len(result) == 0:
+ return None
+ # mempool:n status
+ # Order by time, grab last item (latest)
+ last_info = sorted(result, key=lambda k: k['timestamp'])[-1]
+ if last_info["block_hash"] == "mempool":
+ last_id = "mempool:%s" % len(result)
+ else:
+ last_id = last_info["block_hash"]
+ return last_id
+
+ def send_notify(self, result, response):
+ response["params"].append(self.mempool_n(result))
+ self.processor.push_response(response)
+
+class Backend:
+
+ def __init__(self, monitor):
# Create 3 thread-pools each with 1 thread
self.network_service = bitcoin.async_service(1)
self.disk_service = bitcoin.async_service(1)
self.poller, self.transaction_pool)
self.session.start(self.handle_start)
- self.pool_buffer = history.MemoryPoolBuffer(self.transaction_pool,
- self.blockchain)
+ self.pool_buffer = \
+ history.MemoryPoolBuffer(self.transaction_pool,
+ self.blockchain, monitor)
def handle_start(self, ec):
if ec:
class AddressSubscribe:
- def __init__(self, backend, processor):
+ def __init__(self, backend, processor, cache, monitor):
self.backend = backend
self.processor = processor
+ self.cache = cache
+ self.monitor = monitor
- def subscribe(self, session, request):
+ self.backend.pool_buffer.cheat = self
+
+ def subscribe(self, request):
address = str(request["params"][0])
chain = self.backend.blockchain
txpool = self.backend.transaction_pool
membuf = self.backend.pool_buffer
history.payment_history(chain, txpool, membuf, address,
- bind(self.respond, _1, request))
+ bind(self.construct, _1, request))
def construct(self, result, request):
if result is None:
response = {"id": request["id"], "result": None,
"error": {"message": "Error", "code": -4}}
return
- else:
- response = {"id": request["id"], "result": result, "error": None}
+ last_id = self.monitor.mempool_n(result)
+ response = {"id": request["id"], "result": last_id, "error": None}
+ self.processor.push_response(response)
+ address = request["params"][0]
+ self.monitor.monitor(address, result)
+ # Cache result for get_history
+ self.cache.store(address, result)
+
+ def fetch_cached(self, request):
+ address = request["params"][0]
+ cached = self.cache.fetch(address)
+ if cached is None:
+ return False
+ response = {"id": request["id"], "result": cached, "error": None}
self.processor.push_response(response)
+ return True
class BlockchainProcessor(Processor):
def __init__(self, config):
Processor.__init__(self)
- self.backend = Backend()
+ cache = HistoryCache()
+ monitor = MonitorAddress(self, cache)
+ self.backend = Backend(monitor)
+ monitor.backend = self.backend
self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
self.address_get_history = AddressGetHistory(self.backend, self)
- self.address_subscribe = AddressSubscribe(self.backend, self)
+ self.address_subscribe = \
+ AddressSubscribe(self.backend, self, cache, monitor)
def stop(self):
self.backend.stop()
if request["method"] == "blockchain.numblocks.subscribe":
self.numblocks_subscribe.subscribe(request)
elif request["method"] == "blockchain.address.subscribe":
- pass
+ self.address_subscribe.subscribe(request)
elif request["method"] == "blockchain.address.get_history":
- self.address_get_history.get(request)
+ if not self.address_subscribe.fetch_cached(request):
+ self.address_get_history.get(request)
elif request["method"] == "blockchain.transaction.broadcast":
self.broadcast_transaction(request)
class MemoryPoolBuffer:
- def __init__(self, txpool, chain):
+ def __init__(self, txpool, chain, monitor):
self.txpool = txpool
self.chain = chain
+ self.monitor = monitor
# prevout: inpoint
self.lookup_input = {}
# payment_address: outpoint
self.timestamps = {}
def recv_tx(self, tx, handle_store):
- tx_hash = bitcoin.hash_transaction(tx)
+ tx_hash = str(bitcoin.hash_transaction(tx))
desc = (tx_hash, [], [])
for input in tx.inputs:
- desc[1].append(input.previous_output)
+ prevout = input.previous_output
+ desc[1].append((str(prevout.hash), prevout.index))
for idx, output in enumerate(tx.outputs):
address = bitcoin.payment_address()
if address.extract(output.output_script):
- desc[2].append((idx, address))
+ desc[2].append((idx, str(address)))
self.txpool.store(tx,
bind(self.confirmed, _1, desc),
bind(self.mempool_stored, _1, desc, handle_store))
def mempool_stored(self, ec, desc, handle_store):
tx_hash, prevouts, addrs = desc
+ tx_hash = bitcoin.hash_digest(tx_hash)
if ec:
handle_store(ec)
return
self.lookup_address[str(address)] = outpoint
self.timestamps[str(tx_hash)] = int(time.time())
handle_store(ec)
+ self.monitor.tx_stored(desc)
def confirmed(self, ec, desc):
tx_hash, prevouts, addrs = desc
+ tx_hash = bitcoin.hash_digest(tx_hash)
if ec:
print "Problem confirming transaction", tx_hash, ec
return
outpoint.hash, outpoint.index = tx_hash, idx
self.lookup_address.delete(str(address), outpoint)
del self.timestamps[str(tx_hash)]
+ self.monitor.tx_confirmed(desc)
def check(self, output_points, address, handle):
class ExtendableDict(dict):
bind(self.start_loading, _1, output_points))
def start_loading(self, membuf_result, output_points):
+ if len(membuf_result) == 0 and len(output_points) == 0:
+ self.handle_finish([])
+ self.stopped()
# Create a bunch of entry lines which are outputs and
# then their corresponding input (if it exists)
for outpoint in output_points:
print begin, " " * (12 - len(begin)), v
print
+ class FakeMonitor:
+ def tx_stored(self, tx):
+ pass
+ def tx_confirmed(self, tx):
+ pass
+
service = bitcoin.async_service(1)
prefix = "/home/genjix/libbitcoin/database.old"
chain = bitcoin.bdb_blockchain(service, prefix, blockchain_started)
txpool = bitcoin.transaction_pool(service, chain)
- membuf = MemoryPoolBuffer(txpool, chain)
+ membuf = MemoryPoolBuffer(txpool, chain, FakeMonitor())
membuf.recv_tx(tx_a, store_tx)
membuf.recv_tx(tx_b, store_tx)
raw_input()
- address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D", "1EMnecJFwihf2pf4nE2m8fUNFKVRMWKqhR"
+ address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D", "18auo3rqfsjth3w2H9zyEz467DDFNNpMJP"
#address = "1Pbn3DLXfjqF1fFV9YPdvpvyzejZwkHhZE"
print "Looking up", address
payment_history(chain, txpool, membuf, address[0], finish)