2 from bitcoin import bind, _1, _2, _3
7 class ExpiryQueue(threading.Thread):
10 self.lock = threading.Lock()
12 threading.Thread.__init__(self)
19 self.items = [i for i in self.items if not i.stopped()]
24 self.items.append(item)
26 expiry_queue = ExpiryQueue()
28 class MemoryPoolBuffer:
30 def __init__(self, txpool, chain):
34 self.lookup_input = {}
35 # payment_address: outpoint
36 self.lookup_address = multimap.MultiMap()
37 # transaction timestamps
40 def recv_tx(self, tx):
41 tx_hash = bitcoin.hash_transaction(tx)
42 desc = (tx_hash, [], [])
43 for input in tx.inputs:
44 desc[1].append(input.previous_output)
45 for idx, output in enumerate(tx.outputs):
46 address = bitcoin.payment_address()
47 if address.extract(output.output_script):
48 desc[2].append((idx, address))
50 bind(self.confirmed, _1, desc),
51 bind(self.mempool_stored, _1, desc))
53 def mempool_stored(self, ec, desc):
54 tx_hash, prevouts, addrs = desc
56 print "Error storing memory pool transaction", tx_hash, ec
58 print "Accepted transaction", tx_hash
59 for idx, prevout in enumerate(prevouts):
60 inpoint = bitcoin.input_point()
61 inpoint.hash, inpoint.index = tx_hash, idx
62 self.lookup_input[str(prevout)] = inpoint
63 for idx, address in addrs:
64 outpoint = bitcoin.output_point()
65 outpoint.hash, outpoint.index = tx_hash, idx
66 self.lookup_address[str(address)] = outpoint
67 self.timestamps[str(tx_hash)] = int(time.time())
69 def confirmed(self, ec, desc):
70 tx_hash, prevouts, addrs = desc
72 print "Problem confirming transaction", tx_hash, ec
74 print "Confirmed", tx_hash
75 for idx, prevout in enumerate(prevouts):
76 inpoint = bitcoin.input_point()
77 inpoint.hash, inpoint.index = tx_hash, idx
78 assert self.lookup_input[str(prevout)] == inpoint
79 del self.lookup_input[str(prevout)]
80 for idx, address in addrs:
81 outpoint = bitcoin.output_point()
82 outpoint.hash, outpoint.index = tx_hash, idx
83 self.lookup_address.delete(str(address), outpoint)
84 del self.timestamps[str(tx_hash)]
86 def check(self, output_points, address, handle):
87 class ExtendableDict(dict):
90 for outpoint in output_points:
91 if self.lookup_input.has_key(str(outpoint)):
92 point = self.lookup_input[str(outpoint)]
93 info = ExtendableDict()
94 info["tx_hash"] = str(point.hash)
95 info["index"] = point.index
97 info["timestamp"] = self.timestamps[info["tx_hash"]]
99 if self.lookup_address.has_key(str(address)):
100 addr_points = self.lookup_address[str(address)]
101 for point in addr_points:
102 info = ExtendableDict()
103 info["tx_hash"] = str(point.hash)
104 info["index"] = point.index
106 info["timestamp"] = self.timestamps[info["tx_hash"]]
112 def __init__(self, output_point):
113 self.lock = threading.Lock()
114 self.output_point = output_point
115 self.output_loaded = None
116 self.input_point = None
117 self.input_loaded = None
118 self.raw_output_script = None
122 if self.output_loaded is None:
124 elif self.has_input() and self.input_loaded is None:
129 return self.input_point is not False
133 def __init__(self, chain, txpool, membuf):
137 self.lock = threading.Lock()
138 self._stopped = False
140 def start(self, address, handle_finish):
142 self.membuf_result = None
143 self.address = address
144 self.handle_finish = handle_finish
146 address = bitcoin.payment_address(address)
147 # To begin we fetch all the outputs (payments in)
148 # associated with this address
149 self.chain.fetch_outputs(address,
150 bind(self.check_membuf, _1, _2))
154 assert self._stopped == False
161 def stop_on_error(self, ec):
163 self.handle_finish(None)
165 return self.stopped()
167 def check_membuf(self, ec, output_points):
168 if self.stop_on_error(ec):
170 self.membuf.check(output_points, self.address,
171 bind(self.start_loading, _1, output_points))
173 def start_loading(self, membuf_result, output_points):
174 # Create a bunch of entry lines which are outputs and
175 # then their corresponding input (if it exists)
176 for outpoint in output_points:
177 entry = PaymentEntry(outpoint)
179 self.statement.append(entry)
180 # Attempt to fetch the spend of this output
181 self.chain.fetch_spend(outpoint,
182 bind(self.load_spend, _1, _2, entry))
183 self.load_tx_info(outpoint, entry, False)
184 # Load memory pool transactions
186 self.membuf_result = membuf_result
187 for info in self.membuf_result:
188 self.txpool.fetch(bitcoin.hash_digest(info["tx_hash"]),
189 bind(self.load_pool_tx, _1, _2, info))
191 def load_spend(self, ec, inpoint, entry):
192 # Need a custom self.stop_on_error(...) as a missing spend
193 # is not an error in this case.
194 if not self.stopped() and ec and ec != bitcoin.error.unspent_output:
195 self.handle_finish(None)
200 if ec == bitcoin.error.unspent_output:
201 # This particular entry.output_point
202 # has not been spent yet
203 entry.input_point = False
205 entry.input_point = inpoint
206 if ec == bitcoin.error.unspent_output:
207 # Attempt to stop if all the info for the inputs and outputs
209 self.finish_if_done()
211 # We still have to load at least one more payment outwards
212 self.load_tx_info(inpoint, entry, True)
214 def finish_if_done(self):
216 # Still have more entries to finish loading, so return
217 if any(not entry.is_loaded() for entry in self.statement):
219 # Memory buffer transactions finished loading?
220 if any(not info.has_key("height") for info in self.membuf_result):
222 # Whole operation completed successfully! Finish up.
224 for entry in self.statement:
225 if entry.input_point:
226 # value of the input is simply the inverse of
227 # the corresponding output
228 entry.input_loaded["value"] = -entry.output_loaded["value"]
229 # output should come before the input as it's chronological
230 result.append(entry.output_loaded)
231 result.append(entry.input_loaded)
233 # Unspent outputs have a raw_output_script field
234 assert entry.raw_output_script is not None
235 entry.output_loaded["raw_output_script"] = \
236 entry.raw_output_script
237 result.append(entry.output_loaded)
239 for info in self.membuf_result:
240 # Lookup prevout in result
242 if info["is_input"] == 1:
244 for prevout_info in result:
245 if prevout_info["tx_hash"] == info.previous_output.hash:
246 prevout_tx = prevout_info
247 assert prevout_tx is not None
248 info["value"] = -prevout_info["value"]
249 mempool_result.append(info)
250 result.extend(mempool_result)
251 self.handle_finish(result)
254 def load_tx_info(self, point, entry, is_input):
256 info["tx_hash"] = str(point.hash)
257 info["index"] = point.index
258 info["is_input"] = 1 if is_input else 0
259 # Before loading the transaction, Stratum requires the hash
260 # of the parent block, so we load the block depth and then
261 # fetch the block header and hash it.
262 self.chain.fetch_transaction_index(point.hash,
263 bind(self.tx_index, _1, _2, _3, entry, info))
265 def tx_index(self, ec, block_depth, offset, entry, info):
266 if self.stop_on_error(ec):
268 info["height"] = block_depth
269 # And now for the block hash
270 self.chain.fetch_block_header_by_depth(block_depth,
271 bind(self.block_header, _1, _2, entry, info))
273 def block_header(self, ec, blk_head, entry, info):
274 if self.stop_on_error(ec):
276 info["timestamp"] = blk_head.timestamp
277 info["block_hash"] = str(bitcoin.hash_block_header(blk_head))
278 tx_hash = bitcoin.hash_digest(info["tx_hash"])
279 # Now load the actual main transaction for this input or output
280 self.chain.fetch_transaction(tx_hash,
281 bind(self.load_chain_tx, _1, _2, entry, info))
283 def load_pool_tx(self, ec, tx, info):
284 if self.stop_on_error(ec):
286 # block_hash = mempool:5
287 # inputs (load from prevtx)
288 # outputs (load from tx)
289 # raw_output_script (load from tx)
290 # height is always None
291 # value (get from finish_if_done)
292 self.load_tx(tx, info)
293 if info["is_input"] == 0:
294 our_output = tx.outputs[info["index"]]
295 info["value"] = our_output.value
296 # Save serialised output script in case this output is unspent
297 info["raw_output_script"] = \
298 str(bitcoin.save_script(our_output.output_script))
300 assert(info["is_input"] == 1)
301 info.previous_output = tx.inputs[info["index"]].previous_output
302 # If all the inputs are loaded
303 if self.inputs_all_loaded(info["inputs"]):
304 # We are the sole input
305 assert(info["is_input"] == 1)
306 self.finish_if_done()
307 create_handler = lambda prevout_index, input_index: \
308 bind(self.load_input_pool_tx, _1, _2,
309 prevout_index, info, input_index)
310 self.fetch_input_txs(tx, info, create_handler)
312 def load_tx(self, tx, info):
313 # List of output addresses
315 for tx_out in tx.outputs:
316 address = bitcoin.payment_address()
317 # Attempt to extract address from output script
318 if address.extract(tx_out.output_script):
319 outputs.append(address.encoded())
321 # ... otherwise append "Unknown"
322 outputs.append("Unknown")
323 info["outputs"] = outputs
324 # For the inputs, we need the originator address which has to
325 # be looked up in the blockchain.
326 # Create list of Nones and then populate it.
327 # Loading has finished when list is no longer all None.
328 info["inputs"] = [None for i in tx.inputs]
329 # If this transaction was loaded for an input, then we already
330 # have a source address for at least one input.
331 if info["is_input"] == 1:
332 info["inputs"][info["index"]] = self.address
334 def fetch_input_txs(self, tx, info, create_handler):
335 # Load the previous_output for every input so we can get
337 for input_index, tx_input in enumerate(tx.inputs):
338 if info["is_input"] == 1 and info["index"] == input_index:
340 prevout = tx_input.previous_output
341 handler = create_handler(prevout.index, input_index)
342 self.chain.fetch_transaction(prevout.hash, handler)
344 def load_chain_tx(self, ec, tx, entry, info):
345 if self.stop_on_error(ec):
347 self.load_tx(tx, info)
348 if info["is_input"] == 0:
349 our_output = tx.outputs[info["index"]]
350 info["value"] = our_output.value
351 # Save serialised output script in case this output is unspent
353 entry.raw_output_script = \
354 str(bitcoin.save_script(our_output.output_script))
355 # If all the inputs are loaded
356 if self.inputs_all_loaded(info["inputs"]):
357 # We are the sole input
358 assert(info["is_input"] == 1)
360 entry.input_loaded = info
361 self.finish_if_done()
362 create_handler = lambda prevout_index, input_index: \
363 bind(self.load_input_chain_tx, _1, _2,
364 prevout_index, entry, info, input_index)
365 self.fetch_input_txs(tx, info, create_handler)
367 def inputs_all_loaded(self, info_inputs):
368 return not [empty_in for empty_in in info_inputs if empty_in is None]
370 def load_input_tx(self, tx, output_index, info, input_index):
371 # For our input, we load the previous tx so we can get the
372 # corresponding output.
373 # We need the output to extract the address.
374 script = tx.outputs[output_index].output_script
375 address = bitcoin.payment_address()
376 if address.extract(script):
377 info["inputs"][input_index] = address.encoded()
379 info["inputs"][input_index] = "Unknown"
381 def load_input_chain_tx(self, ec, tx, output_index,
382 entry, info, input_index):
383 if self.stop_on_error(ec):
385 self.load_input_tx(tx, output_index, info, input_index)
386 # If all the inputs are loaded, then we have finished loading
387 # the info for this input-output entry pair
388 if self.inputs_all_loaded(info["inputs"]):
390 if info["is_input"] == 1:
391 entry.input_loaded = info
393 entry.output_loaded = info
394 self.finish_if_done()
396 def load_input_pool_tx(self, ec, tx, output_index, info, input_index):
397 if self.stop_on_error(ec):
399 self.load_input_tx(tx, output_index, info, input_index)
400 if not [inp for inp in info["inputs"] if inp is None]:
401 # No more inputs left to load
402 # This info has finished loading
403 info["height"] = None
404 info["block_hash"] = "mempool"
405 self.finish_if_done()
407 def payment_history(chain, txpool, membuf, address, handle_finish):
408 h = History(chain, txpool, membuf)
410 h.start(address, handle_finish)
412 if __name__ == "__main__":
413 ex = bitcoin.satoshi_exporter()
414 tx_a = bitcoin.data_chunk("0100000003d0406a31f628e18f5d894b2eaf4af719906dc61be4fb433a484ed870f6112d15000000008b48304502210089c11db8c1524d8839243803ac71e536f3d876e8265bbb3bc4a722a5d0bd40aa022058c3e59a7842ef1504b1c2ce048f9af2d69bbf303401dced1f68b38d672098a10141046060f6c8e355b94375eec2cc1d231f8044e811552d54a7c4b36fe8ee564861d07545c6c9d5b9f60d16e67d683b93486c01d3bd3b64d142f48af70bb7867d0ffbffffffff6152ed1552b1f2635317cea7be06615a077fc0f4aa62795872836c4182ca0f25000000008b48304502205f75a468ddb08070d235f76cb94c3f3e2a75e537bc55d087cc3e2a1559b7ac9b022100b17e4c958aaaf9b93359f5476aa5ed438422167e294e7207d5cfc105e897ed91014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff01c52914dcb0f3d8822e5a9e3374e5893a7b6033c9cfce5a8e5e6a1b3222a5cb010000008c4930460221009561f7206cc98f40f3eab5f3308b12846d76523bd07b5f058463f387694452b2022100b2684ec201760fa80b02954e588f071e46d0ff16562c1ab393888416bf8fcc44014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff02407e0f00000000001976a914c3b98829108923c41b3c1ba6740ecb678752fd5e88ac40420f00000000001976a914424648ea6548cc1c4ea707c7ca58e6131791785188ac00000000")
415 tx_a = ex.load_transaction(tx_a)
416 assert bitcoin.hash_transaction(tx_a) == "e72e4f025695446cfd5c5349d1720beb38801f329a00281f350cb7e847153397"
417 tx_b = bitcoin.data_chunk("0100000001e269f0d74b8e6849233953715bc0be3ba6727afe0bc5000d015758f9e67dde34000000008c4930460221008e305e3fdf4420203a8cced5be20b73738a3b51186dfda7c6294ee6bebe331b7022100c812ded044196132f5e796dbf4b566b6ee3246cc4915eca3cf07047bcdf24a9301410493b6ce24182a58fc3bd0cbee0ddf5c282e00c0c10b1293c7a3567e95bfaaf6c9a431114c493ba50398ad0a82df06254605d963d6c226db615646fadd083ddfd9ffffffff020f9c1208000000001976a91492fffb2cb978d539b6bcd12c968b263896c6aacf88ac8e3f7600000000001976a914654dc745e9237f86b5fcdfd7e01165af2d72909588ac00000000")
418 tx_b = ex.load_transaction(tx_b)
419 assert bitcoin.hash_transaction(tx_b) == "acfda6dbf4ae1b102326bfb7c9541702d5ebb0339bc57bd74d36746855be8eac"
421 def blockchain_started(ec, chain):
422 print "Blockchain initialisation:", ec
428 for k, v in line.iteritems():
430 print begin, " " * (12 - len(begin)), v
433 service = bitcoin.async_service(1)
434 prefix = "/home/genjix/libbitcoin/database.old"
435 chain = bitcoin.bdb_blockchain(service, prefix, blockchain_started)
436 txpool = bitcoin.transaction_pool(service, chain)
437 membuf = MemoryPoolBuffer(txpool, chain)
441 address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D", "1EMnecJFwihf2pf4nE2m8fUNFKVRMWKqhR"
442 #address = "1Pbn3DLXfjqF1fFV9YPdvpvyzejZwkHhZE"
443 print "Looking up", address
444 payment_history(chain, txpool, membuf, address[0], finish)
445 payment_history(chain, txpool, membuf, address[1], finish)