generic processor; register backends
[electrum-server.git] / modules / python_bitcoin / __init__.py
1 import bitcoin
2 from processor import Processor
3 import threading
4 import time
5
6 import composed 
7
8 class Backend:
9
10     def __init__(self):
11         # Create 3 thread-pools each with 1 thread
12         self.network_service = bitcoin.async_service(1)
13         self.disk_service = bitcoin.async_service(1)
14         self.mempool_service = bitcoin.async_service(1)
15
16         self.hosts = bitcoin.hosts(self.network_service)
17         self.handshake = bitcoin.handshake(self.network_service)
18         self.network = bitcoin.network(self.network_service)
19         self.protocol = bitcoin.protocol(self.network_service, self.hosts,
20                                          self.handshake, self.network)
21
22         db_prefix = "/home/genjix/libbitcoin/database"
23         self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix)
24         self.poller = bitcoin.poller(self.blockchain)
25         self.transaction_pool = \
26             bitcoin.transaction_pool(self.mempool_service, self.blockchain)
27
28         self.protocol.subscribe_channel(self.monitor_tx)
29         self.session = \
30             bitcoin.session(self.hosts, self.handshake, self.network,
31                             self.protocol, self.blockchain, self.poller,
32                             self.transaction_pool)
33         self.session.start(self.handle_start)
34
35     def handle_start(self, ec):
36         if ec:
37             print "Error starting backend:", ec
38
39     def stop(self):
40         self.session.stop(self.handle_stop)
41
42     def handle_stop(self, ec):
43         if ec:
44             print "Error stopping backend:", ec
45         print "Stopped backend"
46
47     def monitor_tx(self, node):
48         # We will be notified here when connected to new bitcoin nodes
49         # Here we subscribe to new transactions from them which we
50         # add to the transaction_pool. That way we can track which
51         # transactions we are interested in.
52         node.subscribe_transaction(
53             bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node))
54         # Re-subscribe to next new node
55         self.protocol.subscribe_channel(self.monitor_tx)
56
57     def recv_tx(self, ec, tx, node):
58         if ec:
59             print "Error with new transaction:", ec
60             return
61         tx_hash = bitcoin.hash_transaction(tx)
62         # If we want to ignore this transaction, we can set
63         # the 2 handlers to be null handlers that do nothing.
64         self.transaction_pool.store(tx,
65             bitcoin.bind(self.tx_confirmed, bitcoin._1, tx_hash),
66             bitcoin.bind(self.handle_mempool_store, bitcoin._1, tx_hash))
67         # Re-subscribe to new transactions from node
68         node.subscribe_transaction(
69             bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node))
70
71     def handle_mempool_store(self, ec, tx_hash):
72         if ec:
73             print "Error storing memory pool transaction", tx_hash, ec
74         else:
75             print "Accepted transaction", tx_hash
76
77     def tx_confirmed(self, ec, tx_hash):
78         if ec:
79             print "Problem confirming transaction", tx_hash, ec
80         else:
81             print "Confirmed", tx_hash
82
83 class GhostValue:
84
85     def __init__(self):
86         self.event = threading.Event()
87         self.value = None
88
89     def get(self):
90         self.event.wait()
91         return self.value
92
93     def set(self, value):
94         self.value = value
95         self.event.set()
96
97 class NumblocksSubscribe:
98
99     def __init__(self, backend):
100         self.backend = backend
101         self.lock = threading.Lock()
102         self.backend.blockchain.subscribe_reorganize(self.reorganize)
103         self.backend.blockchain.fetch_last_depth(self.set_last_depth)
104         self.latest = GhostValue()
105
106     def set_last_depth(self, ec, last_depth):
107         if ec:
108             print "Error retrieving last depth", ec
109         else:
110             self.latest.set(last_depth)
111
112     def reorganize(self, ec, fork_point, arrivals, replaced):
113         latest = fork_point + len(arrivals)
114         self.latest.set(latest)
115         self.push_response({"method":"numblocks.subscribe", "result": latest})
116         self.backend.blockchain.subscribe_reorganize(self.reorganize)
117
118
119 class AddressGetHistory:
120
121     def __init__(self, backend):
122         self.backend = backend
123
124     def get(self, request):
125         address = str(request["params"])
126         composed.payment_history(self.backend.blockchain, address,
127             bitcoin.bind(self.respond, request, bitcoin._1))
128
129     def respond(self, request, result):
130         self.push_response({"id": request["id"], "method":request["method"], "params":request["params"], "result": result})
131
132 class LibbitcoinProcessor(Processor):
133
134     def __init__(self):
135         self.backend = Backend()
136         self.numblocks_subscribe = NumblocksSubscribe(self.backend)
137         self.address_get_history = AddressGetHistory(self.backend)
138         Processor.__init__(self)
139
140     def stop(self):
141         self.backend.stop()
142
143     def process(self, request):
144
145         print "New request (lib)", request
146         if request["method"] == "numblocks.subscribe":
147             self.numblocks_subscribe.subscribe(session, request)
148         elif request["method"] == "address.get_history":
149             self.address_get_history.get(request)
150         elif request["method"] == "server.banner":
151             self.push_response({"id": request["id"], "method": request["method"], "params":request["params"],
152                 "result": "libbitcoin using python-bitcoin bindings"})
153         elif request["method"] == "transaction.broadcast":
154             self.broadcast_transaction(request)
155         # Execute and when ready, you call
156         # self.push_response(response)
157
158     def broadcast_transaction(self, request):
159         raw_tx = bitcoin.data_chunk(str(request["params"]))
160         exporter = bitcoin.satoshi_exporter()
161         try:
162             tx = exporter.load_transaction(raw_tx)
163         except RuntimeError:
164             response = {"id": request["id"], "method": request["method"], "params":request["params"], "result": None,
165                 "error": {"message": 
166                     "Exception while parsing the transaction data.",
167                     "code": -4}}
168         else:
169             self.backend.protocol.broadcast_transaction(tx)
170             tx_hash = str(bitcoin.hash_transaction(tx))
171             response = {"id": request["id"], "method": request["method"], "params":request["params"], "result": tx_hash}
172         self.push_response(response)
173
174
175
176 def run(processor):
177     #processor = LibbitcoinProcessor()
178     print "Warning: pre-alpha prototype. Full of bugs."
179     while not processor.shared.stopped():
180         if raw_input() == "quit":
181             shared.stop()
182         time.sleep(1)
183