import threading
import time
+import composed
+
class Backend:
def __init__(self):
+ # Create 3 thread-pools each with 1 thread
self.network_service = bitcoin.async_service(1)
self.disk_service = bitcoin.async_service(1)
self.mempool_service = bitcoin.async_service(1)
print "Error with new transaction:", ec
return
tx_hash = bitcoin.hash_transaction(tx)
+ # If we want to ignore this transaction, we can set
+ # the 2 handlers to be null handlers that do nothing.
self.transaction_pool.store(tx,
bitcoin.bind(self.tx_confirmed, bitcoin._1, tx_hash),
bitcoin.bind(self.handle_mempool_store, bitcoin._1, tx_hash))
else:
print "Confirmed", tx_hash
+class GhostValue:
+
+ def __init__(self):
+ self.event = threading.Event()
+ self.value = None
+
+ def get(self):
+ self.event.wait()
+ return self.value
+
+ def set(self, value):
+ self.value = value
+ self.event.set()
+
class NumblocksSubscribe:
def __init__(self, backend):
self.lock = threading.Lock()
self.backend.blockchain.subscribe_reorganize(self.reorganize)
self.backend.blockchain.fetch_last_depth(self.set_last_depth)
- self.latest = None
+ self.latest = GhostValue()
+ self.subscribed = []
def subscribe(self, session, request):
- last = self.get_last_depth()
+ last = self.latest.get()
session.push_response({"id": request["id"], "result": last})
-
- def get_last_depth(self):
- # Stall until last depth has been set...
- # Should use condition variable here instead
- while True:
- with self.lock:
- last = self.latest
- if last is not None:
- return last
- time.sleep(0.1)
+ with self.lock:
+ self.subscribed.append((session, request))
def set_last_depth(self, ec, last_depth):
if ec:
print "Error retrieving last depth", ec
else:
- with self.lock:
- self.latest = last_depth
+ self.latest.set(last_depth)
def reorganize(self, ec, fork_point, arrivals, replaced):
- pass
+ latest = fork_point + len(arrivals)
+ self.latest.set(latest)
+ subscribed = self.spring_clean()
+ for session, request in subscribed:
+ session.push_response({"id": request["id"], "result": latest})
+
+ def spring_clean(self):
+ with self.lock:
+ self.subscribed = [sub for sub in self.subscribed
+ if not sub[0].stopped()]
+ return self.subscribed[:]
+
+class AddressGetHistory:
+
+ def __init__(self, backend):
+ self.backend = backend
+
+ def get(self, session, request):
+ address = str(request["params"])
+ composed.payment_history(self.backend.blockchain, address,
+ bitcoin.bind(self.respond, session, request, bitcoin._1))
+
+ def respond(self, session, request, result):
+ session.push_response({"id": request["id"], "result": result})
class LibbitcoinProcessor(stratum.Processor):
def __init__(self):
self.backend = Backend()
self.numblocks_subscribe = NumblocksSubscribe(self.backend)
+ self.address_get_history = AddressGetHistory(self.backend)
stratum.Processor.__init__(self)
def stop(self):
def process(self, session):
request = session.pop_request()
+ print "New request (lib)", request
if request["method"] == "numblocks.subscribe":
self.numblocks_subscribe.subscribe(session, request)
- print "New request (lib)", request
+ elif request["method"] == "address.get_history":
+ self.address_get_history.get(session, request)
# Execute and when ready, you call
# session.push_response(response)