cd9a4dbfa26a9f06935d860ac1a5f77b20cc446a
[electrum-server.git] / modules / python_bitcoin / __init__.py
1 import bitcoin
2 import stratum
3 import threading
4 import time
5
6 import composed 
7
8 class Backend:
9
10     def __init__(self):
11         # Create 3 thread-pools each with 1 thread
12         self.network_service = bitcoin.async_service(1)
13         self.disk_service = bitcoin.async_service(1)
14         self.mempool_service = bitcoin.async_service(1)
15
16         self.hosts = bitcoin.hosts(self.network_service)
17         self.handshake = bitcoin.handshake(self.network_service)
18         self.network = bitcoin.network(self.network_service)
19         self.protocol = bitcoin.protocol(self.network_service, self.hosts,
20                                          self.handshake, self.network)
21
22         db_prefix = "/home/genjix/libbitcoin/database"
23         self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix)
24         self.poller = bitcoin.poller(self.blockchain)
25         self.transaction_pool = \
26             bitcoin.transaction_pool(self.mempool_service, self.blockchain)
27
28         self.protocol.subscribe_channel(self.monitor_tx)
29         self.session = \
30             bitcoin.session(self.hosts, self.handshake, self.network,
31                             self.protocol, self.blockchain, self.poller,
32                             self.transaction_pool)
33         self.session.start(self.handle_start)
34
35     def handle_start(self, ec):
36         if ec:
37             print "Error starting backend:", ec
38
39     def stop(self):
40         self.session.stop(self.handle_stop)
41
42     def handle_stop(self, ec):
43         if ec:
44             print "Error stopping backend:", ec
45         print "Stopped backend"
46
47     def monitor_tx(self, node):
48         # We will be notified here when connected to new bitcoin nodes
49         # Here we subscribe to new transactions from them which we
50         # add to the transaction_pool. That way we can track which
51         # transactions we are interested in.
52         node.subscribe_transaction(
53             bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node))
54         # Re-subscribe to next new node
55         self.protocol.subscribe_channel(self.monitor_tx)
56
57     def recv_tx(self, ec, tx, node):
58         if ec:
59             print "Error with new transaction:", ec
60             return
61         tx_hash = bitcoin.hash_transaction(tx)
62         # If we want to ignore this transaction, we can set
63         # the 2 handlers to be null handlers that do nothing.
64         self.transaction_pool.store(tx,
65             bitcoin.bind(self.tx_confirmed, bitcoin._1, tx_hash),
66             bitcoin.bind(self.handle_mempool_store, bitcoin._1, tx_hash))
67         # Re-subscribe to new transactions from node
68         node.subscribe_transaction(
69             bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node))
70
71     def handle_mempool_store(self, ec, tx_hash):
72         if ec:
73             print "Error storing memory pool transaction", tx_hash, ec
74         else:
75             print "Accepted transaction", tx_hash
76
77     def tx_confirmed(self, ec, tx_hash):
78         if ec:
79             print "Problem confirming transaction", tx_hash, ec
80         else:
81             print "Confirmed", tx_hash
82
83 class GhostValue:
84
85     def __init__(self):
86         self.event = threading.Event()
87         self.value = None
88
89     def get(self):
90         self.event.wait()
91         return self.value
92
93     def set(self, value):
94         self.value = value
95         self.event.set()
96
97 class NumblocksSubscribe:
98
99     def __init__(self, backend):
100         self.backend = backend
101         self.lock = threading.Lock()
102         self.backend.blockchain.subscribe_reorganize(self.reorganize)
103         self.backend.blockchain.fetch_last_depth(self.set_last_depth)
104         self.latest = GhostValue()
105         self.subscribed = []
106
107     def subscribe(self, session, request):
108         last = self.latest.get()
109         self.push_response(session,{"id": request["id"], "result": last})
110         with self.lock:
111             self.subscribed.append((session, request))
112
113     def set_last_depth(self, ec, last_depth):
114         if ec:
115             print "Error retrieving last depth", ec
116         else:
117             self.latest.set(last_depth)
118
119     def reorganize(self, ec, fork_point, arrivals, replaced):
120         latest = fork_point + len(arrivals)
121         self.latest.set(latest)
122         subscribed = self.spring_clean()
123         for session, request in subscribed:
124             self.push_response(session,{"id": request["id"], "result": latest})
125         self.backend.blockchain.subscribe_reorganize(self.reorganize)
126
127     def spring_clean(self):
128         with self.lock:
129             self.subscribed = [sub for sub in self.subscribed
130                                if not sub[0].stopped()]
131             return self.subscribed[:]
132
133 class AddressGetHistory:
134
135     def __init__(self, backend):
136         self.backend = backend
137
138     def get(self, session, request):
139         address = str(request["params"])
140         composed.payment_history(self.backend.blockchain, address,
141             bitcoin.bind(self.respond, session, request, bitcoin._1))
142
143     def respond(self, session, request, result):
144         self.push_response(session,{"id": request["id"], "result": result})
145
146 class LibbitcoinProcessor(stratum.Processor):
147
148     def __init__(self):
149         self.backend = Backend()
150         self.numblocks_subscribe = NumblocksSubscribe(self.backend)
151         self.address_get_history = AddressGetHistory(self.backend)
152         stratum.Processor.__init__(self)
153
154     def stop(self):
155         self.backend.stop()
156
157     def process(self, session, request):
158
159         print "New request (lib)", request
160         if request["method"] == "numblocks.subscribe":
161             self.numblocks_subscribe.subscribe(session, request)
162         elif request["method"] == "address.get_history":
163             self.address_get_history.get(session, request)
164         elif request["method"] == "server.banner":
165             self.push_response(session, {"id": request["id"],
166                 "result": "libbitcoin using python-bitcoin bindings"})
167         elif request["method"] == "transaction.broadcast":
168             self.broadcast_transaction(session, request)
169         # Execute and when ready, you call
170         # self.push_response(session,response)
171
172     def broadcast_transaction(self, session, request):
173         raw_tx = bitcoin.data_chunk(str(request["params"]))
174         exporter = bitcoin.satoshi_exporter()
175         try:
176             tx = exporter.load_transaction(raw_tx)
177         except RuntimeError:
178             response = {"id": request["id"], "result": None,
179                 "error": {"message": 
180                     "Exception while parsing the transaction data.",
181                     "code": -4}}
182         else:
183             self.backend.protocol.broadcast_transaction(tx)
184             tx_hash = str(bitcoin.hash_transaction(tx))
185             response = {"id": request["id"], "result": tx_hash}
186         self.push_response(session,response)
187
188 def run(stratum):
189     print "Warning: pre-alpha prototype. Full of bugs."
190     processor = LibbitcoinProcessor()
191     stratum.start(processor)
192