Notify on new blocks.
[electrum-server.git] / server-genjix.py
1 import bitcoin
2 import stratum
3 import threading
4 import time
5
6 class Backend:
7
8     def __init__(self):
9         # Create 3 thread-pools each with 1 thread
10         self.network_service = bitcoin.async_service(1)
11         self.disk_service = bitcoin.async_service(1)
12         self.mempool_service = bitcoin.async_service(1)
13
14         self.hosts = bitcoin.hosts(self.network_service)
15         self.handshake = bitcoin.handshake(self.network_service)
16         self.network = bitcoin.network(self.network_service)
17         self.protocol = bitcoin.protocol(self.network_service, self.hosts,
18                                          self.handshake, self.network)
19
20         db_prefix = "/home/genjix/libbitcoin/database"
21         self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix)
22         self.poller = bitcoin.poller(self.blockchain)
23         self.transaction_pool = \
24             bitcoin.transaction_pool(self.mempool_service, self.blockchain)
25
26         self.protocol.subscribe_channel(self.monitor_tx)
27         self.session = \
28             bitcoin.session(self.hosts, self.handshake, self.network,
29                             self.protocol, self.blockchain, self.poller,
30                             self.transaction_pool)
31         self.session.start(self.handle_start)
32
33     def handle_start(self, ec):
34         if ec:
35             print "Error starting backend:", ec
36
37     def stop(self):
38         self.session.stop(self.handle_stop)
39
40     def handle_stop(self, ec):
41         if ec:
42             print "Error stopping backend:", ec
43         print "Stopped backend"
44
45     def monitor_tx(self, node):
46         # We will be notified here when connected to new bitcoin nodes
47         # Here we subscribe to new transactions from them which we
48         # add to the transaction_pool. That way we can track which
49         # transactions we are interested in.
50         node.subscribe_transaction(
51             bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node))
52         # Re-subscribe to next new node
53         self.protocol.subscribe_channel(self.monitor_tx)
54
55     def recv_tx(self, ec, tx, node):
56         if ec:
57             print "Error with new transaction:", ec
58             return
59         tx_hash = bitcoin.hash_transaction(tx)
60         # If we want to ignore this transaction, we can set
61         # the 2 handlers to be null handlers that do nothing.
62         self.transaction_pool.store(tx,
63             bitcoin.bind(self.tx_confirmed, bitcoin._1, tx_hash),
64             bitcoin.bind(self.handle_mempool_store, bitcoin._1, tx_hash))
65         # Re-subscribe to new transactions from node
66         node.subscribe_transaction(
67             bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node))
68
69     def handle_mempool_store(self, ec, tx_hash):
70         if ec:
71             print "Error storing memory pool transaction", tx_hash, ec
72         else:
73             print "Accepted transaction", tx_hash
74
75     def tx_confirmed(self, ec, tx_hash):
76         if ec:
77             print "Problem confirming transaction", tx_hash, ec
78         else:
79             print "Confirmed", tx_hash
80
81 class GhostValue:
82
83     def __init__(self):
84         self.event = threading.Event()
85         self.value = None
86
87     def get(self):
88         self.event.wait()
89         return self.value
90
91     def set(self, value):
92         self.value = value
93         self.event.set()
94
95 class NumblocksSubscribe:
96
97     def __init__(self, backend):
98         self.backend = backend
99         self.lock = threading.Lock()
100         self.backend.blockchain.subscribe_reorganize(self.reorganize)
101         self.backend.blockchain.fetch_last_depth(self.set_last_depth)
102         self.latest = GhostValue()
103         self.subscribed = []
104
105     def subscribe(self, session, request):
106         last = self.latest.get()
107         session.push_response({"id": request["id"], "result": last})
108         with self.lock:
109             self.subscribed.append((session, request))
110
111     def set_last_depth(self, ec, last_depth):
112         if ec:
113             print "Error retrieving last depth", ec
114         else:
115             self.latest.set(last_depth)
116
117     def reorganize(self, ec, fork_point, arrivals, replaced):
118         latest = fork_point + len(arrivals)
119         self.latest.set(latest)
120         subscribed = self.spring_clean()
121         for session, request in subscribed:
122             session.push_response({"id": request["id"], "result": latest})
123
124     def spring_clean(self):
125         with self.lock:
126             self.subscribed = [sub for sub in self.subscribed
127                                if not sub[0].stopped()]
128             return self.subscribed[:]
129
130 class LibbitcoinProcessor(stratum.Processor):
131
132     def __init__(self):
133         self.backend = Backend()
134         self.numblocks_subscribe = NumblocksSubscribe(self.backend)
135         stratum.Processor.__init__(self)
136
137     def stop(self):
138         self.backend.stop()
139
140     def process(self, session):
141         request = session.pop_request()
142         if request["method"] == "numblocks.subscribe":
143             self.numblocks_subscribe.subscribe(session, request)
144         print "New request (lib)", request
145         # Execute and when ready, you call
146         # session.push_response(response)
147
148 if __name__ == "__main__":
149     processor = LibbitcoinProcessor()
150     app = stratum.Stratum()
151     app.start(processor)
152