1 from json import dumps, loads
4 import ast, time, threading, hashlib
5 from Queue import Queue
9 return s.decode('hex')[::-1].encode('hex')
12 def int_to_hex(i, length=1):
13 s = hex(i)[2:].rstrip('L')
14 s = "0"*(2*length - len(s)) + s
18 from processor import Processor, print_log
21 class Blockchain2Processor(Processor):
23 def __init__(self, config):
24 Processor.__init__(self)
26 self.watched_addresses = []
27 self.history_cache = {}
29 self.cache_lock = threading.Lock()
31 self.mempool_hist = {}
32 self.known_mempool_hashes = []
33 self.address_queue = Queue()
35 self.dblock = threading.Lock()
37 self.db = leveldb.LevelDB(config.get('leveldb', 'path'))
39 traceback.print_exc(file=sys.stdout)
42 self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
43 config.get('bitcoind','user'),
44 config.get('bitcoind','password'),
45 config.get('bitcoind','host'),
46 config.get('bitcoind','port'))
50 self.sent_header = None
54 hist = self.deserialize(self.db.Get('0'))
55 hh, self.height = hist[0]
56 self.block_hashes = [hh]
57 print_log( "hist", hist )
59 traceback.print_exc(file=sys.stdout)
61 self.block_hashes = [ '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' ]
63 threading.Timer(10, self.main_iteration).start()
66 def bitcoind(self, method, params=[]):
67 postdata = dumps({"method": method, 'params': params, 'id':'jsonrpc'})
68 respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
70 if r['error'] != None:
71 raise BaseException(r['error'])
72 return r.get('result')
75 def serialize(self, h):
77 for txid, height in h:
78 s += txid + int_to_hex(height, 4)
79 return s.decode('hex')
81 def deserialize(self, s):
84 txid = s[0:32].encode('hex')
85 height = s[32:36].encode('hex')
86 height = int( rev_hex( height ), 16 )
87 h.append( ( txid, height ) )
92 def block2header(self, b):
93 return {"block_height":b.get('height'), "version":b.get('version'), "prev_block_hash":b.get('previousblockhash'),
94 "merkle_root":b.get('merkleroot'), "timestamp":b.get('time'), "bits":b.get('bits'), "nonce":b.get('nonce')}
96 def get_header(self, height):
97 block_hash = self.bitcoind('getblockhash', [height])
98 b = self.bitcoind('getblock', [block_hash])
99 return self.block2header(b)
103 # store them on disk; store the current chunk in memory
107 def get_transaction(self, txid, block_height=-1):
108 raw_tx = self.bitcoind('getrawtransaction', [txid, 0, block_height])
109 vds = deserialize.BCDataStream()
110 vds.write(raw_tx.decode('hex'))
111 return deserialize.parse_Transaction(vds)
114 def get_history(self, addr, cache_only=False):
115 with self.cache_lock: hist = self.history_cache.get( addr )
116 if hist is not None: return hist
117 if cache_only: return -1
121 hist = self.deserialize(self.db.Get(addr))
125 # should not be necessary
126 hist.sort( key=lambda tup: tup[1])
127 # check uniqueness too...
130 for txid in self.mempool_hist.get(addr,[]):
131 hist.append((txid, 0))
133 hist = map(lambda x: {'tx_hash':x[0], 'height':x[1]}, hist)
134 with self.cache_lock: self.history_cache[addr] = hist
138 def get_status(self, addr, cache_only=False):
139 tx_points = self.get_history(addr, cache_only)
140 if cache_only and tx_points == -1: return -1
142 if not tx_points: return None
145 status += tx.get('tx_hash') + ':%d:' % tx.get('height')
146 return hashlib.sha256( status ).digest().encode('hex')
149 def get_merkle(self, target_hash, height):
151 block_hash = self.bitcoind('getblockhash', [height])
152 b = self.bitcoind('getblock', [block_hash])
156 while len(merkle) != 1:
157 if len(merkle)%2: merkle.append( merkle[-1] )
160 new_hash = Hash( merkle[0] + merkle[1] )
161 if merkle[0] == target_hash:
163 target_hash = new_hash
164 elif merkle[1] == target_hash:
166 target_hash = new_hash
171 return {"block_height":height, "merkle":s, "pos":tx_pos}
176 def import_block(self, block, block_hash, block_height):
177 #print "importing block", block_hash, block_height
179 txlist = block.get('tx')
183 tx = self.get_transaction(txid, block_height)
184 for x in tx.get('inputs') + tx.get('outputs'):
185 addr = x.get('address')
186 serialized_hist = batch_list.get(addr)
187 if serialized_hist is None:
189 serialized_hist = self.db.Get(addr)
193 s = (txid + int_to_hex(block_height, 4)).decode('hex')
196 for i in range(len(serialized_hist)/36):
197 item = serialized_hist[-36*(1+i):]
200 h = int( rev_hex( item[32:36].encode('hex') ), 16 )
202 txhash = item[0:32].encode('hex')
203 print_log('warning: non-chronological order at', addr, (txhash, h), (txid, block_height))
204 hist = self.deserialize(serialized_hist)
206 hist.sort( key=lambda tup: tup[1])
209 if last[1] > block_height:
213 found = (txhash, h) in hist
214 print_log('new sorted hist', hist, found)
215 serialized_hist = self.serialize(hist)
217 elif h < block_height:
226 batch_list[addr] = serialized_hist
229 batch = leveldb.WriteBatch()
230 for addr, hist in batch_list.items():
231 batch.Put(addr, serialized_hist)
232 batch.Put('0', self.serialize( [(block_hash, block_height)] ) )
233 self.db.Write(batch, sync = True)
236 for addr in batch_list.keys(): self.update_history_cache(addr)
242 def revert_block(self, block, block_hash, block_height):
244 txlist = block.get('tx')
248 tx = self.get_transaction(txid, block_height)
249 for x in tx.get('inputs') + tx.get('outputs'):
251 addr = x.get('address')
253 hist = batch_list.get(addr)
256 hist = self.deserialize(self.db.Get(addr))
260 if (txid, block_height) in hist:
261 hist.remove( (txid, block_height) )
263 print "error: txid not found during block revert", txid, block_height
265 batch_list[addr] = hist
268 batch = leveldb.WriteBatch()
269 for addr, hist in batch_list.items():
270 batch.Put(addr, self.serialize(hist))
271 batch.Put('0', self.serialize( [(block_hash, block_height)] ) )
272 self.db.Write(batch, sync = True)
275 for addr in batch_list.keys(): self.update_history_cache(addr)
281 def add_request(self, request):
282 # see if we can get if from cache. if not, add to queue
283 if self.process( request, cache_only = True) == -1:
284 self.queue.put(request)
288 def process(self, request, cache_only = False):
289 #print "abe process", request
291 message_id = request['id']
292 method = request['method']
293 params = request.get('params',[])
297 if method == 'blockchain2.numblocks.subscribe':
300 elif method == 'blockchain2.headers.subscribe':
303 elif method == 'blockchain2.address.subscribe':
306 result = self.get_status(address, cache_only)
307 self.watch_address(address)
308 except BaseException, e:
309 error = str(e) + ': ' + address
310 print_log( "error:", error )
312 elif method == 'blockchain2.address.subscribe2':
315 result = self.get_status2(address, cache_only)
316 self.watch_address(address)
317 except BaseException, e:
318 error = str(e) + ': ' + address
319 print_log( "error:", error )
321 elif method == 'blockchain2.address.get_history':
324 result = self.get_history( address, cache_only )
325 except BaseException, e:
326 error = str(e) + ': ' + address
327 print_log( "error:", error )
329 elif method == 'blockchain2.block.get_header':
335 result = self.get_header( height )
336 except BaseException, e:
337 error = str(e) + ': %d'% height
338 print_log( "error:", error )
340 elif method == 'blockchain2.block.get_chunk':
346 result = self.get_chunk( index )
347 except BaseException, e:
348 error = str(e) + ': %d'% index
349 print_log( "error:", error)
351 elif method == 'blockchain2.transaction.broadcast':
352 txo = self.bitcoind('sendrawtransaction', params[0])
353 print_log( "sent tx:", txo )
356 elif method == 'blockchain2.transaction.get_merkle':
362 tx_height = params[1]
363 result = self.get_merkle(tx_hash, tx_height)
364 except BaseException, e:
365 error = str(e) + ': ' + tx_hash
366 print_log( "error:", error )
368 elif method == 'blockchain2.transaction.get':
372 result = self.bitcoind('getrawtransaction', [tx_hash, 0, height] )
373 except BaseException, e:
374 error = str(e) + ': ' + tx_hash
375 print_log( "error:", error )
378 error = "unknown method:%s"%method
380 if cache_only and result == -1: return -1
383 response = { 'id':message_id, 'error':error }
384 self.push_response(response)
386 response = { 'id':message_id, 'result':result }
387 self.push_response(response)
390 def watch_address(self, addr):
391 if addr not in self.watched_addresses:
392 self.watched_addresses.append(addr)
397 return self.block_hashes[-1]
404 while not self.shared.stopped():
407 info = self.bitcoind('getinfo')
408 bitcoind_height = info.get('blocks')
409 bitcoind_block_hash = self.bitcoind('getblockhash', [bitcoind_height])
410 if self.last_hash() == bitcoind_block_hash: break
413 block_hash = self.bitcoind('getblockhash', [self.height+1])
414 block = self.bitcoind('getblock', [block_hash])
416 if block.get('previousblockhash') == self.last_hash():
418 self.import_block(block, block_hash, self.height+1)
420 if (self.height+1)%100 == 0:
422 print_log( "bc2: block %d (%.3fs)"%( self.height+1, t2 - t1 ) )
425 self.height = self.height + 1
426 self.block_hashes.append(block_hash)
427 self.block_hashes = self.block_hashes[-10:]
430 # revert current block
431 print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash() )
432 block_hash = self.last_hash()
433 block = self.bitcoind('getblock', [block_hash])
434 self.height = self.height -1
435 self.block_hashes.remove(block_hash)
436 self.revert_block(block, self.last_hash(), self.height)
439 self.header = self.block2header(self.bitcoind('getblock', [self.last_hash()]))
444 def memorypool_update(self):
446 mempool_hashes = self.bitcoind('getrawmempool')
448 for tx_hash in mempool_hashes:
449 if tx_hash in self.known_mempool_hashes: continue
450 self.known_mempool_hashes.append(tx_hash)
452 tx = self.get_transaction(tx_hash)
455 for x in tx.get('inputs') + tx.get('outputs'):
456 addr = x.get('address')
457 hist = self.mempool_hist.get(addr, [])
458 if tx_hash not in hist:
459 hist.append( tx_hash )
460 self.mempool_hist[addr] = hist
461 self.update_history_cache(addr)
463 self.known_mempool_hashes = mempool_hashes
466 def update_history_cache(self, address):
467 with self.cache_lock:
468 if self.history_cache.has_key(address):
469 print_log( "cache: invalidating", address )
470 self.history_cache.pop(address)
474 def main_iteration(self):
476 if self.shared.stopped():
477 print_log( "bc2 terminating")
484 print_log( "blockchain: %d (%.3fs)"%( self.height+1, t2 - t1 ) )
485 self.memorypool_update()
487 if self.sent_height != self.height:
488 self.sent_height = self.height
489 self.push_response({ 'id': None, 'method':'blockchain2.numblocks.subscribe', 'params':[self.height] })
491 if self.sent_header != self.header:
492 self.sent_header = self.header
493 self.push_response({ 'id': None, 'method':'blockchain2.headers.subscribe', 'params':[self.header] })
497 addr = self.address_queue.get(False)
500 if addr in self.watched_addresses:
501 status = self.get_status( addr )
502 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
505 if not self.shared.stopped():
506 threading.Timer(10, self.main_iteration).start()
508 print_log( "bc2 terminating" )