2 from bitcoin import bind, _1, _2, _3
3 from processor import Processor
12 # Create 3 thread-pools each with 1 thread
13 self.network_service = bitcoin.async_service(1)
14 self.disk_service = bitcoin.async_service(1)
15 self.mempool_service = bitcoin.async_service(1)
17 self.hosts = bitcoin.hosts(self.network_service)
18 self.handshake = bitcoin.handshake(self.network_service)
19 self.network = bitcoin.network(self.network_service)
20 self.protocol = bitcoin.protocol(self.network_service, self.hosts,
21 self.handshake, self.network)
23 db_prefix = "/home/genjix/libbitcoin/database"
24 self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix,
25 self.blockchain_started)
26 self.poller = bitcoin.poller(self.blockchain)
27 self.transaction_pool = \
28 bitcoin.transaction_pool(self.mempool_service, self.blockchain)
30 self.protocol.subscribe_channel(self.monitor_tx)
32 bitcoin.session(self.network_service, self.hosts, self.handshake,
33 self.network, self.protocol, self.blockchain,
34 self.poller, self.transaction_pool)
35 self.session.start(self.handle_start)
37 self.pool_buffer = history.MemoryPoolBuffer(self.transaction_pool,
40 def handle_start(self, ec):
42 print "Error starting backend:", ec
44 def blockchain_started(self, ec, chain):
45 print "Blockchain initialisation:", ec
48 self.session.stop(self.handle_stop)
50 def handle_stop(self, ec):
52 print "Error stopping backend:", ec
53 print "Stopped backend"
55 def monitor_tx(self, node):
56 # We will be notified here when connected to new bitcoin nodes
57 # Here we subscribe to new transactions from them which we
58 # add to the transaction_pool. That way we can track which
59 # transactions we are interested in.
60 node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
61 # Re-subscribe to next new node
62 self.protocol.subscribe_channel(self.monitor_tx)
64 def recv_tx(self, ec, tx, node):
66 print "Error with new transaction:", ec
68 tx_hash = bitcoin.hash_transaction(tx)
69 self.pool_buffer.recv_tx(tx, bind(self.store_tx, _1, tx_hash))
70 # Re-subscribe to new transactions from node
71 node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
73 def store_tx(self, ec, tx_hash):
75 print "Error storing memory pool transaction", tx_hash, ec
77 print "Accepted transaction", tx_hash
82 self.event = threading.Event()
93 class NumblocksSubscribe:
95 def __init__(self, backend, processor):
96 self.backend = backend
97 self.processor = processor
98 self.lock = threading.Lock()
99 self.backend.blockchain.subscribe_reorganize(self.reorganize)
100 self.backend.blockchain.fetch_last_depth(self.set_last_depth)
101 self.latest = GhostValue()
103 def set_last_depth(self, ec, last_depth):
105 print "Error retrieving last depth", ec
107 self.latest.set(last_depth)
109 def reorganize(self, ec, fork_point, arrivals, replaced):
110 latest = fork_point + len(arrivals)
111 self.latest.set(latest)
112 response = {"id": None, "method": "blockchain.numblocks.subscribe",
114 self.processor.push_response(response)
115 self.backend.blockchain.subscribe_reorganize(self.reorganize)
117 def subscribe(self, request):
118 latest = self.latest.get()
119 response = {"id": request["id"],
120 "method": "blockchain.numblocks.subscribe",
123 self.processor.push_response(response)
125 class AddressGetHistory:
127 def __init__(self, backend, processor):
128 self.backend = backend
129 self.processor = processor
131 def get(self, request):
132 address = str(request["params"][0])
133 chain = self.backend.blockchain
134 txpool = self.backend.transaction_pool
135 membuf = self.backend.pool_buffer
136 history.payment_history(chain, txpool, membuf, address,
137 bind(self.respond, _1, request))
139 def respond(self, result, request):
141 response = {"id": request["id"], "result": None,
142 "error": {"message": "Error", "code": -4}}
144 response = {"id": request["id"], "result": result, "error": None}
145 self.processor.push_response(response)
147 class AddressSubscribe:
149 def __init__(self, backend, processor):
150 self.backend = backend
151 self.processor = processor
153 def subscribe(self, session, request):
154 address = str(request["params"][0])
155 chain = self.backend.blockchain
156 txpool = self.backend.transaction_pool
157 membuf = self.backend.pool_buffer
158 history.payment_history(chain, txpool, membuf, address,
159 bind(self.respond, _1, request))
161 def construct(self, result, request):
163 response = {"id": request["id"], "result": None,
164 "error": {"message": "Error", "code": -4}}
167 response = {"id": request["id"], "result": result, "error": None}
168 self.processor.push_response(response)
170 class BlockchainProcessor(Processor):
172 def __init__(self, config):
173 Processor.__init__(self)
174 self.backend = Backend()
175 self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
176 self.address_get_history = AddressGetHistory(self.backend, self)
177 self.address_subscribe = AddressSubscribe(self.backend, self)
182 def process(self, request):
183 print "New request (lib)", request
184 if request["method"] == "blockchain.numblocks.subscribe":
185 self.numblocks_subscribe.subscribe(request)
186 elif request["method"] == "blockchain.address.subscribe":
188 elif request["method"] == "blockchain.address.get_history":
189 self.address_get_history.get(request)
190 elif request["method"] == "blockchain.transaction.broadcast":
191 self.broadcast_transaction(request)
193 def broadcast_transaction(self, request):
194 raw_tx = bitcoin.data_chunk(str(request["params"]))
195 exporter = bitcoin.satoshi_exporter()
197 tx = exporter.load_transaction(raw_tx)
199 response = {"id": request["id"], "result": None,
201 "Exception while parsing the transaction data.",
204 self.backend.protocol.broadcast_transaction(tx)
205 tx_hash = str(bitcoin.hash_transaction(tx))
206 response = {"id": request["id"], "result": tx_hash, "error": None}
207 self.push_response(response)
210 print "Warning: pre-alpha prototype. Full of bugs."
211 while not self.shared.stopped():