From: ThomasV Date: Sun, 11 Nov 2012 16:45:50 +0000 (+0400) Subject: new bitcoind backend (experimental) X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=commitdiff_plain;h=a6c7750da047d13465a040edfa5fa4a9d3d28328 new bitcoind backend (experimental) --- diff --git a/backends/bitcoind/__init__.py b/backends/bitcoind/__init__.py new file mode 100644 index 0000000..7a67281 --- /dev/null +++ b/backends/bitcoind/__init__.py @@ -0,0 +1 @@ +from blockchain_processor import Blockchain2Processor diff --git a/backends/bitcoind/blockchain_processor.py b/backends/bitcoind/blockchain_processor.py new file mode 100644 index 0000000..24aa0d0 --- /dev/null +++ b/backends/bitcoind/blockchain_processor.py @@ -0,0 +1,512 @@ +from json import dumps, loads +import leveldb, urllib +import deserialize +import ast, time, threading, hashlib +from Queue import Queue + + +def rev_hex(s): + return s.decode('hex')[::-1].encode('hex') + + +def int_to_hex(i, length=1): + s = hex(i)[2:].rstrip('L') + s = "0"*(2*length - len(s)) + s + return rev_hex(s) + + +from processor import Processor, print_log + + +class Blockchain2Processor(Processor): + + def __init__(self, config): + Processor.__init__(self) + + self.watched_addresses = [] + self.history_cache = {} + self.chunk_cache = {} + self.cache_lock = threading.Lock() + + self.mempool_hist = {} + self.known_mempool_hashes = [] + self.address_queue = Queue() + + self.dblock = threading.Lock() + try: + self.db = leveldb.LevelDB(config.get('leveldb', 'path')) + except: + traceback.print_exc(file=sys.stdout) + self.shared.stop() + + self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( + config.get('bitcoind','user'), + config.get('bitcoind','password'), + config.get('bitcoind','host'), + config.get('bitcoind','port')) + + self.height = 0 + self.sent_height = 0 + self.sent_header = None + + # catch_up first + try: + hist = self.deserialize(self.db.Get('0')) + hh, self.height = hist[0] + self.block_hashes = [hh] + print_log( "hist", hist ) + except: + traceback.print_exc(file=sys.stdout) + self.height = 0 + self.block_hashes = [ '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' ] + + threading.Timer(10, self.main_iteration).start() + + + def bitcoind(self, method, params=[]): + postdata = dumps({"method": method, 'params': params, 'id':'jsonrpc'}) + respdata = urllib.urlopen(self.bitcoind_url, postdata).read() + r = loads(respdata) + if r['error'] != None: + raise BaseException(r['error']) + return r.get('result') + + + def serialize(self, h): + s = '' + for txid, height in h: + s += txid + int_to_hex(height, 4) + return s.decode('hex') + + def deserialize(self, s): + h = [] + while s: + txid = s[0:32].encode('hex') + height = s[32:36].encode('hex') + height = int( rev_hex( height ), 16 ) + h.append( ( txid, height ) ) + s = s[36:] + return h + + + def block2header(self, b): + return {"block_height":b.get('height'), "version":b.get('version'), "prev_block_hash":b.get('previousblockhash'), + "merkle_root":b.get('merkleroot'), "timestamp":b.get('time'), "bits":b.get('bits'), "nonce":b.get('nonce')} + + def get_header(self, height): + block_hash = self.bitcoind('getblockhash', [height]) + b = self.bitcoind('getblock', [block_hash]) + return self.block2header(b) + + + def get_chunk(self): + # store them on disk; store the current chunk in memory + pass + + + def get_transaction(self, txid, block_height=-1): + raw_tx = self.bitcoind('getrawtransaction', [txid, 0, block_height]) + vds = deserialize.BCDataStream() + vds.write(raw_tx.decode('hex')) + return deserialize.parse_Transaction(vds) + + + def get_history(self, addr, cache_only=False): + with self.cache_lock: hist = self.history_cache.get( addr ) + if hist is not None: return hist + if cache_only: return -1 + + with self.dblock: + try: + hist = self.deserialize(self.db.Get(addr)) + except: + hist = [] + + # should not be necessary + hist.sort( key=lambda tup: tup[1]) + # check uniqueness too... + + # add memory pool + for txid in self.mempool_hist.get(addr,[]): + hist.append((txid, 0)) + + hist = map(lambda x: {'tx_hash':x[0], 'height':x[1]}, hist) + with self.cache_lock: self.history_cache[addr] = hist + return hist + + + def get_status(self, addr, cache_only=False): + tx_points = self.get_history(addr, cache_only) + if cache_only and tx_points == -1: return -1 + + if not tx_points: return None + status = '' + for tx in tx_points: + status += tx.get('tx_hash') + ':%d:' % tx.get('height') + return hashlib.sha256( status ).digest().encode('hex') + + + def get_merkle(self, target_hash, height): + + block_hash = self.bitcoind('getblockhash', [height]) + b = self.bitcoind('getblock', [block_hash]) + merkle = b.get('tx') + + s = [] + while len(merkle) != 1: + if len(merkle)%2: merkle.append( merkle[-1] ) + n = [] + while merkle: + new_hash = Hash( merkle[0] + merkle[1] ) + if merkle[0] == target_hash: + s.append( merkle[1]) + target_hash = new_hash + elif merkle[1] == target_hash: + s.append( merkle[0]) + target_hash = new_hash + n.append( new_hash ) + merkle = merkle[2:] + merkle = n + + return {"block_height":height, "merkle":s, "pos":tx_pos} + + + + + def import_block(self, block, block_hash, block_height): + #print "importing block", block_hash, block_height + + txlist = block.get('tx') + batch_list = {} + + for txid in txlist: + tx = self.get_transaction(txid, block_height) + for x in tx.get('inputs') + tx.get('outputs'): + addr = x.get('address') + serialized_hist = batch_list.get(addr) + if serialized_hist is None: + try: + serialized_hist = self.db.Get(addr) + except: + serialized_hist = '' + + s = (txid + int_to_hex(block_height, 4)).decode('hex') + + found = False + for i in range(len(serialized_hist)/36): + item = serialized_hist[-36*(1+i):] + item = item[0:36] + + h = int( rev_hex( item[32:36].encode('hex') ), 16 ) + if h > block_height: + txhash = item[0:32].encode('hex') + print_log('warning: non-chronological order at', addr, (txhash, h), (txid, block_height)) + hist = self.deserialize(serialized_hist) + print_log(hist) + hist.sort( key=lambda tup: tup[1]) + while hist: + last = hist[-1] + if last[1] > block_height: + hist = hist[0:-1] + else: + break + found = (txhash, h) in hist + print_log('new sorted hist', hist, found) + serialized_hist = self.serialize(hist) + break + elif h < block_height: + break + elif item == s: + found = True + break + + if not found: + serialized_hist += s + + batch_list[addr] = serialized_hist + + # batch write + batch = leveldb.WriteBatch() + for addr, hist in batch_list.items(): + batch.Put(addr, serialized_hist) + batch.Put('0', self.serialize( [(block_hash, block_height)] ) ) + self.db.Write(batch, sync = True) + + # invalidate cache + for addr in batch_list.keys(): self.update_history_cache(addr) + + return len(txlist) + + + + def revert_block(self, block, block_hash, block_height): + + txlist = block.get('tx') + batch_list = {} + + for txid in txlist: + tx = self.get_transaction(txid, block_height) + for x in tx.get('inputs') + tx.get('outputs'): + + addr = x.get('address') + + hist = batch_list.get(addr) + if hist is None: + try: + hist = self.deserialize(self.db.Get(addr)) + except: + hist = [] + + if (txid, block_height) in hist: + hist.remove( (txid, block_height) ) + else: + print "error: txid not found during block revert", txid, block_height + + batch_list[addr] = hist + + # batch write + batch = leveldb.WriteBatch() + for addr, hist in batch_list.items(): + batch.Put(addr, self.serialize(hist)) + batch.Put('0', self.serialize( [(block_hash, block_height)] ) ) + self.db.Write(batch, sync = True) + + # invalidate cache + for addr in batch_list.keys(): self.update_history_cache(addr) + + return len(txlist) + + + + def add_request(self, request): + # see if we can get if from cache. if not, add to queue + if self.process( request, cache_only = True) == -1: + self.queue.put(request) + + + + def process(self, request, cache_only = False): + #print "abe process", request + + message_id = request['id'] + method = request['method'] + params = request.get('params',[]) + result = None + error = None + + if method == 'blockchain2.numblocks.subscribe': + result = self.height + + elif method == 'blockchain2.headers.subscribe': + result = self.header + + elif method == 'blockchain2.address.subscribe': + try: + address = params[0] + result = self.get_status(address, cache_only) + self.watch_address(address) + except BaseException, e: + error = str(e) + ': ' + address + print_log( "error:", error ) + + elif method == 'blockchain2.address.subscribe2': + try: + address = params[0] + result = self.get_status2(address, cache_only) + self.watch_address(address) + except BaseException, e: + error = str(e) + ': ' + address + print_log( "error:", error ) + + elif method == 'blockchain2.address.get_history': + try: + address = params[0] + result = self.get_history( address, cache_only ) + except BaseException, e: + error = str(e) + ': ' + address + print_log( "error:", error ) + + elif method == 'blockchain2.block.get_header': + if cache_only: + result = -1 + else: + try: + height = params[0] + result = self.get_header( height ) + except BaseException, e: + error = str(e) + ': %d'% height + print_log( "error:", error ) + + elif method == 'blockchain2.block.get_chunk': + if cache_only: + result = -1 + else: + try: + index = params[0] + result = self.get_chunk( index ) + except BaseException, e: + error = str(e) + ': %d'% index + print_log( "error:", error) + + elif method == 'blockchain2.transaction.broadcast': + txo = self.bitcoind('sendrawtransaction', params[0]) + print_log( "sent tx:", txo ) + result = txo + + elif method == 'blockchain2.transaction.get_merkle': + if cache_only: + result = -1 + else: + try: + tx_hash = params[0] + tx_height = params[1] + result = self.get_merkle(tx_hash, tx_height) + except BaseException, e: + error = str(e) + ': ' + tx_hash + print_log( "error:", error ) + + elif method == 'blockchain2.transaction.get': + try: + tx_hash = params[0] + height = params[1] + result = self.bitcoind('getrawtransaction', [tx_hash, 0, height] ) + except BaseException, e: + error = str(e) + ': ' + tx_hash + print_log( "error:", error ) + + else: + error = "unknown method:%s"%method + + if cache_only and result == -1: return -1 + + if error: + response = { 'id':message_id, 'error':error } + self.push_response(response) + elif result != '': + response = { 'id':message_id, 'result':result } + self.push_response(response) + + + def watch_address(self, addr): + if addr not in self.watched_addresses: + self.watched_addresses.append(addr) + + + + def last_hash(self): + return self.block_hashes[-1] + + + def catch_up(self): + + t1 = time.time() + + while not self.shared.stopped(): + + # are we done yet? + info = self.bitcoind('getinfo') + bitcoind_height = info.get('blocks') + bitcoind_block_hash = self.bitcoind('getblockhash', [bitcoind_height]) + if self.last_hash() == bitcoind_block_hash: break + + # not done.. + block_hash = self.bitcoind('getblockhash', [self.height+1]) + block = self.bitcoind('getblock', [block_hash]) + + if block.get('previousblockhash') == self.last_hash(): + + self.import_block(block, block_hash, self.height+1) + + if (self.height+1)%100 == 0: + t2 = time.time() + print_log( "bc2: block %d (%.3fs)"%( self.height+1, t2 - t1 ) ) + t1 = t2 + + self.height = self.height + 1 + self.block_hashes.append(block_hash) + self.block_hashes = self.block_hashes[-10:] + + else: + # revert current block + print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash() ) + block_hash = self.last_hash() + block = self.bitcoind('getblock', [block_hash]) + self.height = self.height -1 + self.block_hashes.remove(block_hash) + self.revert_block(block, self.last_hash(), self.height) + + + self.header = self.block2header(self.bitcoind('getblock', [self.last_hash()])) + + + + + def memorypool_update(self): + + mempool_hashes = self.bitcoind('getrawmempool') + + for tx_hash in mempool_hashes: + if tx_hash in self.known_mempool_hashes: continue + self.known_mempool_hashes.append(tx_hash) + + tx = self.get_transaction(tx_hash) + if not tx: continue + + for x in tx.get('inputs') + tx.get('outputs'): + addr = x.get('address') + hist = self.mempool_hist.get(addr, []) + if tx_hash not in hist: + hist.append( tx_hash ) + self.mempool_hist[addr] = hist + self.update_history_cache(addr) + + self.known_mempool_hashes = mempool_hashes + + + def update_history_cache(self, address): + with self.cache_lock: + if self.history_cache.has_key(address): + print_log( "cache: invalidating", address ) + self.history_cache.pop(address) + + + + def main_iteration(self): + + if self.shared.stopped(): + print_log( "bc2 terminating") + return + + with self.dblock: + t1 = time.time() + self.catch_up() + t2 = time.time() + print_log( "blockchain: %d (%.3fs)"%( self.height+1, t2 - t1 ) ) + self.memorypool_update() + + if self.sent_height != self.height: + self.sent_height = self.height + self.push_response({ 'id': None, 'method':'blockchain2.numblocks.subscribe', 'params':[self.height] }) + + if self.sent_header != self.header: + self.sent_header = self.header + self.push_response({ 'id': None, 'method':'blockchain2.headers.subscribe', 'params':[self.header] }) + + while True: + try: + addr = self.address_queue.get(False) + except: + break + if addr in self.watched_addresses: + status = self.get_status( addr ) + self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] }) + + + if not self.shared.stopped(): + threading.Timer(10, self.main_iteration).start() + else: + print_log( "bc2 terminating" ) + + + + diff --git a/backends/bitcoind/deserialize.py b/backends/bitcoind/deserialize.py new file mode 100644 index 0000000..d92a2b5 --- /dev/null +++ b/backends/bitcoind/deserialize.py @@ -0,0 +1,390 @@ +# this code comes from ABE. it can probably be simplified +# +# + +#from bitcoin import public_key_to_bc_address, hash_160_to_bc_address, hash_encode +#import socket +import time, hashlib +import struct +addrtype = 0 + + +Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest() +hash_encode = lambda x: x[::-1].encode('hex') +hash_decode = lambda x: x.decode('hex')[::-1] + +def hash_160(public_key): + md = hashlib.new('ripemd160') + md.update(hashlib.sha256(public_key).digest()) + return md.digest() + +def public_key_to_bc_address(public_key): + h160 = hash_160(public_key) + return hash_160_to_bc_address(h160) + +def hash_160_to_bc_address(h160): + vh160 = chr(addrtype) + h160 + h = Hash(vh160) + addr = vh160 + h[0:4] + return b58encode(addr) + +__b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' +__b58base = len(__b58chars) + +def b58encode(v): + """ encode v, which is a string of bytes, to base58.""" + + long_value = 0L + for (i, c) in enumerate(v[::-1]): + long_value += (256**i) * ord(c) + + result = '' + while long_value >= __b58base: + div, mod = divmod(long_value, __b58base) + result = __b58chars[mod] + result + long_value = div + result = __b58chars[long_value] + result + + # Bitcoin does a little leading-zero-compression: + # leading 0-bytes in the input become leading-1s + nPad = 0 + for c in v: + if c == '\0': nPad += 1 + else: break + + return (__b58chars[0]*nPad) + result + +def b58decode(v, length): + """ decode v into a string of len bytes.""" + long_value = 0L + for (i, c) in enumerate(v[::-1]): + long_value += __b58chars.find(c) * (__b58base**i) + + result = '' + while long_value >= 256: + div, mod = divmod(long_value, 256) + result = chr(mod) + result + long_value = div + result = chr(long_value) + result + + nPad = 0 + for c in v: + if c == __b58chars[0]: nPad += 1 + else: break + + result = chr(0)*nPad + result + if length is not None and len(result) != length: + return None + + return result + + +# +# Workalike python implementation of Bitcoin's CDataStream class. +# +import struct +import StringIO +import mmap + +class SerializationError(Exception): + """ Thrown when there's a problem deserializing or serializing """ + +class BCDataStream(object): + def __init__(self): + self.input = None + self.read_cursor = 0 + + def clear(self): + self.input = None + self.read_cursor = 0 + + def write(self, bytes): # Initialize with string of bytes + if self.input is None: + self.input = bytes + else: + self.input += bytes + + def map_file(self, file, start): # Initialize with bytes from file + self.input = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) + self.read_cursor = start + def seek_file(self, position): + self.read_cursor = position + def close_file(self): + self.input.close() + + def read_string(self): + # Strings are encoded depending on length: + # 0 to 252 : 1-byte-length followed by bytes (if any) + # 253 to 65,535 : byte'253' 2-byte-length followed by bytes + # 65,536 to 4,294,967,295 : byte '254' 4-byte-length followed by bytes + # ... and the Bitcoin client is coded to understand: + # greater than 4,294,967,295 : byte '255' 8-byte-length followed by bytes of string + # ... but I don't think it actually handles any strings that big. + if self.input is None: + raise SerializationError("call write(bytes) before trying to deserialize") + + try: + length = self.read_compact_size() + except IndexError: + raise SerializationError("attempt to read past end of buffer") + + return self.read_bytes(length) + + def write_string(self, string): + # Length-encoded as with read-string + self.write_compact_size(len(string)) + self.write(string) + + def read_bytes(self, length): + try: + result = self.input[self.read_cursor:self.read_cursor+length] + self.read_cursor += length + return result + except IndexError: + raise SerializationError("attempt to read past end of buffer") + + return '' + + def read_boolean(self): return self.read_bytes(1)[0] != chr(0) + def read_int16(self): return self._read_num('= opcodes.OP_SINGLEBYTE_END: + opcode <<= 8 + opcode |= ord(bytes[i]) + i += 1 + + if opcode <= opcodes.OP_PUSHDATA4: + nSize = opcode + if opcode == opcodes.OP_PUSHDATA1: + nSize = ord(bytes[i]) + i += 1 + elif opcode == opcodes.OP_PUSHDATA2: + (nSize,) = struct.unpack_from(' 0: result += " " + if opcode <= opcodes.OP_PUSHDATA4: + result += "%d:"%(opcode,) + result += short_hex(vch) + else: + result += script_GetOpName(opcode) + return result + +def match_decoded(decoded, to_match): + if len(decoded) != len(to_match): + return False; + for i in range(len(decoded)): + if to_match[i] == opcodes.OP_PUSHDATA4 and decoded[i][0] <= opcodes.OP_PUSHDATA4: + continue # Opcodes below OP_PUSHDATA4 all just push data onto stack, and are equivalent. + if to_match[i] != decoded[i][0]: + return False + return True + +def extract_public_key(bytes): + decoded = [ x for x in script_GetOp(bytes) ] + + # non-generated TxIn transactions push a signature + # (seventy-something bytes) and then their public key + # (65 bytes) onto the stack: + match = [ opcodes.OP_PUSHDATA4, opcodes.OP_PUSHDATA4 ] + if match_decoded(decoded, match): + return public_key_to_bc_address(decoded[1][1]) + + # The Genesis Block, self-payments, and pay-by-IP-address payments look like: + # 65 BYTES:... CHECKSIG + match = [ opcodes.OP_PUSHDATA4, opcodes.OP_CHECKSIG ] + if match_decoded(decoded, match): + return public_key_to_bc_address(decoded[0][1]) + + # Pay-by-Bitcoin-address TxOuts look like: + # DUP HASH160 20 BYTES:... EQUALVERIFY CHECKSIG + match = [ opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG ] + if match_decoded(decoded, match): + return hash_160_to_bc_address(decoded[2][1]) + + return "(None)" diff --git a/processor.py b/processor.py index dfe6128..56cea3d 100644 --- a/processor.py +++ b/processor.py @@ -13,6 +13,15 @@ def timestr(): return time.strftime("[%d/%m/%Y-%H:%M:%S]") +print_lock = threading.Lock() +def print_log(*args): + args = [str(item) for item in args] + with print_lock: + sys.stderr.write(" ".join(args) + "\n") + sys.stderr.flush() + + + class Shared: def __init__(self): @@ -20,7 +29,7 @@ class Shared: self._stopped = False def stop(self): - print "Stopping Stratum" + print_log( "Stopping Stratum" ) with self.lock: self._stopped = True @@ -55,7 +64,7 @@ class Processor(threading.Thread): except: traceback.print_exc(file=sys.stdout) - print "processor terminating" + print_log( "processor terminating") @@ -162,7 +171,7 @@ class RequestDispatcher(threading.Thread): try: p = self.processors[prefix] except: - print "error: no processor for", prefix + print_log( "error: no processor for", prefix) return p.add_request(request) @@ -221,8 +230,7 @@ class Session: addr = None if self.subscriptions: - print timestr(), self.name, self.address, addr,\ - len(self.subscriptions), self.version + print_log( timestr(), self.name, self.address, addr, len(self.subscriptions), self.version ) def stopped(self): with self.lock: @@ -280,7 +288,7 @@ class ResponseDispatcher(threading.Thread): elif internal_id is not None: # and method is None and params is None: self.send_response(internal_id, response) else: - print "no method", response + print_log( "no method", response) def notification(self, method, params, response): subdesc = Session.build_subdesc(method, params) @@ -298,5 +306,5 @@ class ResponseDispatcher(threading.Thread): response['id'] = message_id session.send_response(response) else: - print "send_response: no session", message_id, internal_id, response + print_log( "send_response: no session", message_id, internal_id, response ) diff --git a/server.py b/server.py index c24dcfc..517299e 100755 --- a/server.py +++ b/server.py @@ -122,19 +122,19 @@ if __name__ == '__main__': print "Starting Electrum server on", host - from backends.bitcoind import Blockchain2Processor - # Create hub dispatcher = Dispatcher() shared = dispatcher.shared # Create and register processors + + # from backends.bitcoind import Blockchain2Processor + # chain2_proc = Blockchain2Processor(config) + # dispatcher.register('blockchain2', chain2_proc) + chain_proc = backend.BlockchainProcessor(config) dispatcher.register('blockchain', chain_proc) - chain2_proc = Blockchain2Processor(config) - dispatcher.register('blockchain2', chain2_proc) - server_proc = ServerProcessor(config) dispatcher.register('server', server_proc) diff --git a/transports/stratum_http.py b/transports/stratum_http.py index 51d5219..f60b320 100644 --- a/transports/stratum_http.py +++ b/transports/stratum_http.py @@ -49,7 +49,7 @@ from the processor point of view: """ -from processor import random_string +from processor import random_string, print_log def get_version(request): @@ -390,11 +390,11 @@ class HttpServer(threading.Thread): if self.use_ssl: class StratumThreadedServer(ThreadingMixIn, StratumHTTPSSLServer): pass self.server = StratumThreadedServer(( self.host, self.port), self.certfile, self.keyfile) - print "HTTPS server started." + print_log( "HTTPS server started.") else: class StratumThreadedServer(ThreadingMixIn, StratumHTTPServer): pass self.server = StratumThreadedServer(( self.host, self.port)) - print "HTTP server started." + print_log( "HTTP server started.") self.server.dispatcher = self.dispatcher self.server.register_function(None, 'server.stop') diff --git a/transports/stratum_tcp.py b/transports/stratum_tcp.py index e1ebcd1..c843f77 100644 --- a/transports/stratum_tcp.py +++ b/transports/stratum_tcp.py @@ -4,7 +4,7 @@ import threading import time import Queue as queue -from processor import Session, Dispatcher, timestr +from processor import Session, Dispatcher, print_log class TcpSession(Session): @@ -131,7 +131,10 @@ class TcpServer(threading.Thread): self.ssl_certfile = ssl_certfile def run(self): - print "TCP server started.", self.use_ssl + if self.use_ssl: + print_log( "TCP/SSL server started.") + else: + print_log( "TCP server started.") sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((self.host, self.port))