-import bitcoin
-from processor import Processor
import threading
import time
-import composed
+import bitcoin
+from bitcoin import bind, _1, _2, _3
+
+from processor import Processor
+import history1 as history
+import membuf
+
+
+class HistoryCache:
+
+ def __init__(self):
+ self.lock = threading.Lock()
+ self.cache = {}
+
+ def store(self, address, result):
+ with self.lock:
+ self.cache[address] = result
+
+ def fetch(self, address):
+ try:
+ with self.lock:
+ return self.cache[address]
+ except KeyError:
+ return None
+
+ def clear(self, addresses):
+ with self.lock:
+ for address in addresses:
+ if address in self.cache:
+ del self.cache[address]
+
+
+class MonitorAddress:
+
+ def __init__(self, processor, cache, backend):
+ self.processor = processor
+ self.cache = cache
+ self.backend = backend
+ self.lock = threading.Lock()
+ # key is hash:index, value is address
+ self.monitor_output = {}
+ # key is address
+ self.monitor_address = set()
+
+ backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed)
+
+ def monitor(self, address, result):
+ for info in result:
+ if "raw_output_script" not in info:
+ continue
+ assert info["is_input"] == 0
+ tx_hash = info["tx_hash"]
+ output_index = info["index"]
+ outpoint = "%s:%s" % (tx_hash, output_index)
+ with self.lock:
+ self.monitor_output[outpoint] = address
+ with self.lock:
+ self.monitor_address.add(address)
+
+ def unpack(self, tx):
+ tx_hash = bitcoin.hash_transaction(tx)
+ previous_outputs = []
+ for input in tx.inputs:
+ prevout = input.previous_output
+ prevout = "%s:%s" % (prevout.hash, prevout.index)
+ previous_outputs.append(prevout)
+ addrs = []
+ for output_index, output in enumerate(tx.outputs):
+ address = bitcoin.payment_address()
+ if address.extract(output.output_script):
+ addrs.append((output_index, str(address)))
+ return tx_hash, previous_outputs, addrs
+
+ def effect_notify(self, tx, delete_outs):
+ affected_addrs = set()
+ tx_hash, previous_outputs, addrs = self.unpack(tx)
+ for prevout in previous_outputs:
+ try:
+ with self.lock:
+ affected_addrs.add(self.monitor_output[prevout])
+ if delete_outs:
+ del self.monitor_output[prevout]
+ except KeyError:
+ pass
+ for idx, address in addrs:
+ with self.lock:
+ if address in self.monitor_address:
+ affected_addrs.add(address)
+ self.cache.clear(affected_addrs)
+ self.notify(affected_addrs)
+ # Used in confirmed txs
+ return tx_hash, addrs, affected_addrs
+
+ def tx_stored(self, tx):
+ self.effect_notify(tx, False)
+
+ def tx_confirmed(self, tx):
+ tx_hash, addrs, affected_addrs = self.effect_notify(tx, True)
+ # add new outputs to monitor
+ for idx, address in addrs:
+ outpoint = "%s:%s" % (tx_hash, idx)
+ if address in affected_addrs:
+ with self.lock:
+ self.monitor_output[outpoint] = address
+
+ def notify(self, affected_addrs):
+ service = self.backend.mempool_service
+ chain = self.backend.blockchain
+ txpool = self.backend.transaction_pool
+ memory_buff = self.backend.memory_buffer
+ for address in affected_addrs:
+ response = {"id": None,
+ "method": "blockchain.address.subscribe",
+ "params": [str(address)]}
+ history.payment_history(service, chain, txpool, memory_buff, address,
+ bind(self.send_notify, _1, _2, response))
+
+ def mempool_n(self, result):
+ assert result is not None
+ if len(result) == 0:
+ return None
+ # mempool:n status
+ # Order by time, grab last item (latest)
+ last_info = sorted(result, key=lambda k: k['timestamp'])[-1]
+ if last_info["block_hash"] == "mempool":
+ last_id = "mempool:%s" % len(result)
+ else:
+ last_id = last_info["block_hash"]
+ return last_id
+
+ def send_notify(self, ec, result, response):
+ if ec:
+ print "Error: Monitor.send_notify()", ec
+ return
+ assert len(response["params"]) == 1
+ response["params"].append(self.mempool_n(result))
+ self.processor.push_response(response)
+
class Backend:
self.handshake, self.network)
db_prefix = "/home/genjix/libbitcoin/database"
- self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix)
- self.poller = bitcoin.poller(self.blockchain)
+ self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix,
+ self.blockchain_started)
+ self.poller = bitcoin.poller(self.mempool_service, self.blockchain)
self.transaction_pool = \
bitcoin.transaction_pool(self.mempool_service, self.blockchain)
self.protocol.subscribe_channel(self.monitor_tx)
self.session = \
- bitcoin.session(self.hosts, self.handshake, self.network,
- self.protocol, self.blockchain, self.poller,
- self.transaction_pool)
+ bitcoin.session(self.network_service, self.hosts, self.handshake,
+ self.network, self.protocol, self.blockchain,
+ self.poller, self.transaction_pool)
self.session.start(self.handle_start)
+ self.memory_buffer = \
+ membuf.memory_buffer(self.mempool_service.internal_ptr,
+ self.blockchain.internal_ptr,
+ self.transaction_pool.internal_ptr)
+
def handle_start(self, ec):
if ec:
print "Error starting backend:", ec
+ def blockchain_started(self, ec, chain):
+ print "Blockchain initialisation:", ec
+
def stop(self):
self.session.stop(self.handle_stop)
# Here we subscribe to new transactions from them which we
# add to the transaction_pool. That way we can track which
# transactions we are interested in.
- node.subscribe_transaction(
- bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node))
+ node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
# Re-subscribe to next new node
self.protocol.subscribe_channel(self.monitor_tx)
print "Error with new transaction:", ec
return
tx_hash = bitcoin.hash_transaction(tx)
- # If we want to ignore this transaction, we can set
- # the 2 handlers to be null handlers that do nothing.
- self.transaction_pool.store(tx,
- bitcoin.bind(self.tx_confirmed, bitcoin._1, tx_hash),
- bitcoin.bind(self.handle_mempool_store, bitcoin._1, tx_hash))
+ self.memory_buffer.receive(tx, bind(self.store_tx, _1, tx_hash))
# Re-subscribe to new transactions from node
- node.subscribe_transaction(
- bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node))
+ node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
- def handle_mempool_store(self, ec, tx_hash):
+ def store_tx(self, ec, tx_hash):
if ec:
print "Error storing memory pool transaction", tx_hash, ec
else:
print "Accepted transaction", tx_hash
- def tx_confirmed(self, ec, tx_hash):
- if ec:
- print "Problem confirming transaction", tx_hash, ec
- else:
- print "Confirmed", tx_hash
class GhostValue:
self.value = value
self.event.set()
+
class NumblocksSubscribe:
def __init__(self, backend, processor):
def reorganize(self, ec, fork_point, arrivals, replaced):
latest = fork_point + len(arrivals)
self.latest.set(latest)
- response = {"method": "numblocks.subscribe", "result": latest}
+ response = {"id": None, "method": "blockchain.numblocks.subscribe",
+ "params": [latest]}
self.processor.push_response(response)
self.backend.blockchain.subscribe_reorganize(self.reorganize)
+ def subscribe(self, request):
+ latest = self.latest.get()
+ response = {"id": request["id"],
+ "result": latest,
+ "error": None}
+ self.processor.push_response(response)
+
class AddressGetHistory:
self.processor = processor
def get(self, request):
- address = str(request["params"])
- composed.payment_history(self.backend.blockchain, address,
- bitcoin.bind(self.respond, request, bitcoin._1))
+ address = str(request["params"][0])
+ service = self.backend.mempool_service
+ chain = self.backend.blockchain
+ txpool = self.backend.transaction_pool
+ memory_buff = self.backend.memory_buffer
+ history.payment_history(service, chain, txpool, memory_buff, address,
+ bind(self.respond, _1, _2, request))
+
+ def respond(self, ec, result, request):
+ if ec:
+ response = {"id": request["id"], "result": None,
+ "error": {"message": str(ec), "code": -4}}
+ else:
+ response = {"id": request["id"], "result": result, "error": None}
+ self.processor.push_response(response)
+
- def respond(self, request, result):
- response = {"id": request["id"], "method": request["method"],
- "params": request["params"], "result": result}
+class AddressSubscribe:
+
+ def __init__(self, backend, processor, cache, monitor):
+ self.backend = backend
+ self.processor = processor
+ self.cache = cache
+ self.monitor = monitor
+
+ def subscribe(self, request):
+ address = str(request["params"][0])
+ service = self.backend.mempool_service
+ chain = self.backend.blockchain
+ txpool = self.backend.transaction_pool
+ memory_buff = self.backend.memory_buffer
+ history.payment_history(service, chain, txpool, memory_buff, address,
+ bind(self.construct, _1, _2, request))
+
+ def construct(self, ec, result, request):
+ if ec:
+ response = {"id": request["id"], "result": None,
+ "error": {"message": str(ec), "code": -4}}
+ self.processor.push_response(response)
+ return
+ last_id = self.monitor.mempool_n(result)
+ response = {"id": request["id"], "result": last_id, "error": None}
+ self.processor.push_response(response)
+ address = request["params"][0]
+ self.monitor.monitor(address, result)
+ # Cache result for get_history
+ self.cache.store(address, result)
+
+ def fetch_cached(self, request):
+ address = request["params"][0]
+ cached = self.cache.fetch(address)
+ if cached is None:
+ return False
+ response = {"id": request["id"], "result": cached, "error": None}
self.processor.push_response(response)
+ return True
class BlockchainProcessor(Processor):
def __init__(self, config):
Processor.__init__(self)
+ cache = HistoryCache()
self.backend = Backend()
+ monitor = MonitorAddress(self, cache, self.backend)
self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
self.address_get_history = AddressGetHistory(self.backend, self)
+ self.address_subscribe = \
+ AddressSubscribe(self.backend, self, cache, monitor)
def stop(self):
self.backend.stop()
def process(self, request):
print "New request (lib)", request
- if request["method"] == "numblocks.subscribe":
- self.numblocks_subscribe.subscribe(session, request)
- elif request["method"] == "address.get_history":
- self.address_get_history.get(request)
- elif request["method"] == "server.banner":
- self.push_response({"id": request["id"],
- "method": request["method"], "params": request["params"],
- "result": "libbitcoin using python-bitcoin bindings"})
- elif request["method"] == "transaction.broadcast":
+ if request["method"] == "blockchain.numblocks.subscribe":
+ self.numblocks_subscribe.subscribe(request)
+ elif request["method"] == "blockchain.address.subscribe":
+ self.address_subscribe.subscribe(request)
+ elif request["method"] == "blockchain.address.get_history":
+ if not self.address_subscribe.fetch_cached(request):
+ self.address_get_history.get(request)
+ elif request["method"] == "blockchain.transaction.broadcast":
self.broadcast_transaction(request)
def broadcast_transaction(self, request):
- raw_tx = bitcoin.data_chunk(str(request["params"]))
+ raw_tx = bitcoin.data_chunk(str(request["params"][0]))
exporter = bitcoin.satoshi_exporter()
try:
tx = exporter.load_transaction(raw_tx)
except RuntimeError:
- response = {"id": request["id"], "method": request["method"],
- "params": request["params"], "result": None,
- "error": {"message":
- "Exception while parsing the transaction data.",
- "code": -4}}
+ response = {
+ "id": request["id"],
+ "result": None,
+ "error": {
+ "message": "Exception while parsing the transaction data.",
+ "code": -4,
+ }
+ }
else:
self.backend.protocol.broadcast_transaction(tx)
tx_hash = str(bitcoin.hash_transaction(tx))
- response = {"id": request["id"], "method": request["method"],
- "params": request["params"], "result": tx_hash}
+ response = {"id": request["id"], "result": tx_hash, "error": None}
self.push_response(response)
-
- def run(self):
- print "Warning: pre-alpha prototype. Full of bugs."
- while not self.shared.stopped():
- time.sleep(1)
-