Renamed: modules -> backends
[electrum-server.git] / backends / libbitcoin / __init__.py
1 import bitcoin
2 from processor import Processor
3 import threading
4 import time
5
6 import composed 
7
8 class Backend:
9
10     def __init__(self):
11         # Create 3 thread-pools each with 1 thread
12         self.network_service = bitcoin.async_service(1)
13         self.disk_service = bitcoin.async_service(1)
14         self.mempool_service = bitcoin.async_service(1)
15
16         self.hosts = bitcoin.hosts(self.network_service)
17         self.handshake = bitcoin.handshake(self.network_service)
18         self.network = bitcoin.network(self.network_service)
19         self.protocol = bitcoin.protocol(self.network_service, self.hosts,
20                                          self.handshake, self.network)
21
22         db_prefix = "/home/genjix/libbitcoin/database"
23         self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix)
24         self.poller = bitcoin.poller(self.blockchain)
25         self.transaction_pool = \
26             bitcoin.transaction_pool(self.mempool_service, self.blockchain)
27
28         self.protocol.subscribe_channel(self.monitor_tx)
29         self.session = \
30             bitcoin.session(self.hosts, self.handshake, self.network,
31                             self.protocol, self.blockchain, self.poller,
32                             self.transaction_pool)
33         self.session.start(self.handle_start)
34
35     def handle_start(self, ec):
36         if ec:
37             print "Error starting backend:", ec
38
39     def stop(self):
40         self.session.stop(self.handle_stop)
41
42     def handle_stop(self, ec):
43         if ec:
44             print "Error stopping backend:", ec
45         print "Stopped backend"
46
47     def monitor_tx(self, node):
48         # We will be notified here when connected to new bitcoin nodes
49         # Here we subscribe to new transactions from them which we
50         # add to the transaction_pool. That way we can track which
51         # transactions we are interested in.
52         node.subscribe_transaction(
53             bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node))
54         # Re-subscribe to next new node
55         self.protocol.subscribe_channel(self.monitor_tx)
56
57     def recv_tx(self, ec, tx, node):
58         if ec:
59             print "Error with new transaction:", ec
60             return
61         tx_hash = bitcoin.hash_transaction(tx)
62         # If we want to ignore this transaction, we can set
63         # the 2 handlers to be null handlers that do nothing.
64         self.transaction_pool.store(tx,
65             bitcoin.bind(self.tx_confirmed, bitcoin._1, tx_hash),
66             bitcoin.bind(self.handle_mempool_store, bitcoin._1, tx_hash))
67         # Re-subscribe to new transactions from node
68         node.subscribe_transaction(
69             bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node))
70
71     def handle_mempool_store(self, ec, tx_hash):
72         if ec:
73             print "Error storing memory pool transaction", tx_hash, ec
74         else:
75             print "Accepted transaction", tx_hash
76
77     def tx_confirmed(self, ec, tx_hash):
78         if ec:
79             print "Problem confirming transaction", tx_hash, ec
80         else:
81             print "Confirmed", tx_hash
82
83 class GhostValue:
84
85     def __init__(self):
86         self.event = threading.Event()
87         self.value = None
88
89     def get(self):
90         self.event.wait()
91         return self.value
92
93     def set(self, value):
94         self.value = value
95         self.event.set()
96
97 class NumblocksSubscribe:
98
99     def __init__(self, backend, processor):
100         self.backend = backend
101         self.processor = processor
102         self.lock = threading.Lock()
103         self.backend.blockchain.subscribe_reorganize(self.reorganize)
104         self.backend.blockchain.fetch_last_depth(self.set_last_depth)
105         self.latest = GhostValue()
106
107     def set_last_depth(self, ec, last_depth):
108         if ec:
109             print "Error retrieving last depth", ec
110         else:
111             self.latest.set(last_depth)
112
113     def reorganize(self, ec, fork_point, arrivals, replaced):
114         latest = fork_point + len(arrivals)
115         self.latest.set(latest)
116         response = {"method": "numblocks.subscribe", "result": latest}
117         self.processor.push_response(response)
118         self.backend.blockchain.subscribe_reorganize(self.reorganize)
119
120
121 class AddressGetHistory:
122
123     def __init__(self, backend, processor):
124         self.backend = backend
125         self.processor = processor
126
127     def get(self, request):
128         address = str(request["params"])
129         composed.payment_history(self.backend.blockchain, address,
130             bitcoin.bind(self.respond, request, bitcoin._1))
131
132     def respond(self, request, result):
133         response = {"id": request["id"], "method": request["method"],
134                     "params": request["params"], "result": result}
135         self.processor.push_response(response)
136
137
138 class BlockchainProcessor(Processor):
139
140     def __init__(self, config):
141         Processor.__init__(self)
142         self.backend = Backend()
143         self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
144         self.address_get_history = AddressGetHistory(self.backend, self)
145
146     def stop(self):
147         self.backend.stop()
148
149     def process(self, request):
150         print "New request (lib)", request
151         if request["method"] == "numblocks.subscribe":
152             self.numblocks_subscribe.subscribe(session, request)
153         elif request["method"] == "address.get_history":
154             self.address_get_history.get(request)
155         elif request["method"] == "server.banner":
156             self.push_response({"id": request["id"],
157                 "method": request["method"], "params": request["params"],
158                 "result": "libbitcoin using python-bitcoin bindings"})
159         elif request["method"] == "transaction.broadcast":
160             self.broadcast_transaction(request)
161
162     def broadcast_transaction(self, request):
163         raw_tx = bitcoin.data_chunk(str(request["params"]))
164         exporter = bitcoin.satoshi_exporter()
165         try:
166             tx = exporter.load_transaction(raw_tx)
167         except RuntimeError:
168             response = {"id": request["id"], "method": request["method"],
169                         "params": request["params"], "result": None,
170                         "error": {"message": 
171                             "Exception while parsing the transaction data.",
172                             "code": -4}}
173         else:
174             self.backend.protocol.broadcast_transaction(tx)
175             tx_hash = str(bitcoin.hash_transaction(tx))
176             response = {"id": request["id"], "method": request["method"],
177                         "params": request["params"], "result": tx_hash}
178         self.push_response(response)
179
180     def run(self):
181         print "Warning: pre-alpha prototype. Full of bugs."
182         while not self.shared.stopped():
183             time.sleep(1)
184