aa13238234461c1e2128c17f65b59a6245276cf0
[electrum-server.git] / backends / libbitcoin / __init__.py
1 import bitcoin
2 from bitcoin import bind, _1, _2, _3
3 from processor import Processor
4 import threading
5 import time
6
7 import history 
8
9 class Backend:
10
11     def __init__(self):
12         # Create 3 thread-pools each with 1 thread
13         self.network_service = bitcoin.async_service(1)
14         self.disk_service = bitcoin.async_service(1)
15         self.mempool_service = bitcoin.async_service(1)
16
17         self.hosts = bitcoin.hosts(self.network_service)
18         self.handshake = bitcoin.handshake(self.network_service)
19         self.network = bitcoin.network(self.network_service)
20         self.protocol = bitcoin.protocol(self.network_service, self.hosts,
21                                          self.handshake, self.network)
22
23         db_prefix = "/home/genjix/libbitcoin/database"
24         self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix,
25                                                  self.blockchain_started)
26         self.poller = bitcoin.poller(self.blockchain)
27         self.transaction_pool = \
28             bitcoin.transaction_pool(self.mempool_service, self.blockchain)
29
30         self.protocol.subscribe_channel(self.monitor_tx)
31         self.session = \
32             bitcoin.session(self.network_service, self.hosts, self.handshake,
33                             self.network, self.protocol, self.blockchain,
34                             self.poller, self.transaction_pool)
35         self.session.start(self.handle_start)
36
37         self.pool_buffer = history.MemoryPoolBuffer(self.transaction_pool,
38                                                     self.blockchain)
39
40     def handle_start(self, ec):
41         if ec:
42             print "Error starting backend:", ec
43
44     def blockchain_started(self, ec, chain):
45         print "Blockchain initialisation:", ec
46
47     def stop(self):
48         self.session.stop(self.handle_stop)
49
50     def handle_stop(self, ec):
51         if ec:
52             print "Error stopping backend:", ec
53         print "Stopped backend"
54
55     def monitor_tx(self, node):
56         # We will be notified here when connected to new bitcoin nodes
57         # Here we subscribe to new transactions from them which we
58         # add to the transaction_pool. That way we can track which
59         # transactions we are interested in.
60         node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
61         # Re-subscribe to next new node
62         self.protocol.subscribe_channel(self.monitor_tx)
63
64     def recv_tx(self, ec, tx, node):
65         if ec:
66             print "Error with new transaction:", ec
67             return
68         tx_hash = bitcoin.hash_transaction(tx)
69         self.pool_buffer.recv_tx(tx, bind(self.store_tx, _1, tx_hash))
70         # Re-subscribe to new transactions from node
71         node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
72
73     def store_tx(self, ec, tx_hash):
74         if ec:
75             print "Error storing memory pool transaction", tx_hash, ec
76         else:
77             print "Accepted transaction", tx_hash
78
79 class GhostValue:
80
81     def __init__(self):
82         self.event = threading.Event()
83         self.value = None
84
85     def get(self):
86         self.event.wait()
87         return self.value
88
89     def set(self, value):
90         self.value = value
91         self.event.set()
92
93 class NumblocksSubscribe:
94
95     def __init__(self, backend, processor):
96         self.backend = backend
97         self.processor = processor
98         self.lock = threading.Lock()
99         self.backend.blockchain.subscribe_reorganize(self.reorganize)
100         self.backend.blockchain.fetch_last_depth(self.set_last_depth)
101         self.latest = GhostValue()
102
103     def set_last_depth(self, ec, last_depth):
104         if ec:
105             print "Error retrieving last depth", ec
106         else:
107             self.latest.set(last_depth)
108
109     def reorganize(self, ec, fork_point, arrivals, replaced):
110         latest = fork_point + len(arrivals)
111         self.latest.set(latest)
112         response = {"id": None, "method": "blockchain.numblocks.subscribe",
113                     "result": latest}
114         self.processor.push_response(response)
115         self.backend.blockchain.subscribe_reorganize(self.reorganize)
116
117     def subscribe(self, request):
118         latest = self.latest.get()
119         response = {"id": request["id"],
120                     "method": "blockchain.numblocks.subscribe",
121                     "result": latest,
122                     "error": None}
123         self.processor.push_response(response)
124
125 class AddressGetHistory:
126
127     def __init__(self, backend, processor):
128         self.backend = backend
129         self.processor = processor
130
131     def get(self, request):
132         address = str(request["params"][0])
133         chain = self.backend.blockchain
134         txpool = self.backend.transaction_pool
135         membuf = self.backend.pool_buffer
136         history.payment_history(chain, txpool, membuf, address,
137             bind(self.respond, _1, request))
138
139     def respond(self, result, request):
140         if result is None:
141             response = {"id": request["id"], "result": None,
142                         "error": {"message": "Error", "code": -4}}
143         else:
144             response = {"id": request["id"], "result": result, "error": None}
145         self.processor.push_response(response)
146
147 class AddressSubscribe:
148
149     def __init__(self, backend, processor):
150         self.backend = backend
151         self.processor = processor
152
153     def subscribe(self, session, request):
154         address = str(request["params"][0])
155         chain = self.backend.blockchain
156         txpool = self.backend.transaction_pool
157         membuf = self.backend.pool_buffer
158         history.payment_history(chain, txpool, membuf, address,
159             bind(self.respond, _1, request))
160
161     def construct(self, result, request):
162         if result is None:
163             response = {"id": request["id"], "result": None,
164                         "error": {"message": "Error", "code": -4}}
165             return
166         else:
167             response = {"id": request["id"], "result": result, "error": None}
168         self.processor.push_response(response)
169
170 class BlockchainProcessor(Processor):
171
172     def __init__(self, config):
173         Processor.__init__(self)
174         self.backend = Backend()
175         self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
176         self.address_get_history = AddressGetHistory(self.backend, self)
177         self.address_subscribe = AddressSubscribe(self.backend, self)
178
179     def stop(self):
180         self.backend.stop()
181
182     def process(self, request):
183         print "New request (lib)", request
184         if request["method"] == "blockchain.numblocks.subscribe":
185             self.numblocks_subscribe.subscribe(request)
186         elif request["method"] == "blockchain.address.subscribe":
187             pass
188         elif request["method"] == "blockchain.address.get_history":
189             self.address_get_history.get(request)
190         elif request["method"] == "blockchain.transaction.broadcast":
191             self.broadcast_transaction(request)
192
193     def broadcast_transaction(self, request):
194         raw_tx = bitcoin.data_chunk(str(request["params"]))
195         exporter = bitcoin.satoshi_exporter()
196         try:
197             tx = exporter.load_transaction(raw_tx)
198         except RuntimeError:
199             response = {"id": request["id"], "result": None,
200                         "error": {"message": 
201                             "Exception while parsing the transaction data.",
202                             "code": -4}}
203         else:
204             self.backend.protocol.broadcast_transaction(tx)
205             tx_hash = str(bitcoin.hash_transaction(tx))
206             response = {"id": request["id"], "result": tx_hash, "error": None}
207         self.push_response(response)
208
209     def run(self):
210         print "Warning: pre-alpha prototype. Full of bugs."
211         while not self.shared.stopped():
212             time.sleep(1)
213