From c9c337a89a3f04d7a985a521ca552d421fd94fda Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 9 Jan 2013 19:20:02 -0800 Subject: [PATCH] pep8 and utility cleanup --- backends/abe/__init__.py | 479 +++++++++++------------ backends/bitcoind/blockchain_processor.py | 393 +++++++++---------- backends/bitcoind/deserialize.py | 601 ++++++++++++++--------------- backends/bitcoind/util.py | 183 --------- backends/irc/__init__.py | 120 +++--- backends/libbitcoin/__init__.py | 44 ++- backends/libbitcoin/composed.py | 48 ++- backends/libbitcoin/h1.py | 7 +- backends/libbitcoin/history.py | 57 ++-- backends/libbitcoin/history1/__init__.py | 8 +- backends/libbitcoin/multimap.py | 7 +- backends/libbitcoin/trace_test.py | 24 +- processor.py | 61 ++-- server.py | 51 ++- transports/stratum_http.py | 134 +++---- transports/stratum_tcp.py | 17 +- utils/__init__.py | 230 +++++++++++ 17 files changed, 1227 insertions(+), 1237 deletions(-) delete mode 100644 backends/bitcoind/util.py create mode 100644 utils/__init__.py diff --git a/backends/abe/__init__.py b/backends/abe/__init__.py index 7a2cb02..48dd27e 100644 --- a/backends/abe/__init__.py +++ b/backends/abe/__init__.py @@ -1,71 +1,54 @@ -from Abe.util import hash_to_address, decode_check_address -from Abe.DataStore import DataStore as Datastore_class -from Abe import DataStore, readconf, BCDataStream, deserialize, util, base58 - import binascii - -import thread, traceback, sys, urllib, operator from json import dumps, loads +import operator from Queue import Queue -import time, threading - - -import hashlib -encode = lambda x: x[::-1].encode('hex') -decode = lambda x: x.decode('hex')[::-1] -Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest() - -def rev_hex(s): - return s.decode('hex')[::-1].encode('hex') - -def int_to_hex(i, length=1): - s = hex(i)[2:].rstrip('L') - s = "0"*(2*length - len(s)) + s - return rev_hex(s) +import sys +import thread +import threading +import time +import traceback +import urllib + +from Abe import DataStore, readconf, BCDataStream, deserialize +from Abe.util import hash_to_address, decode_check_address -def header_to_string(res): - s = int_to_hex(res.get('version'),4) \ - + rev_hex(res.get('prev_block_hash')) \ - + rev_hex(res.get('merkle_root')) \ - + int_to_hex(int(res.get('timestamp')),4) \ - + int_to_hex(int(res.get('bits')),4) \ - + int_to_hex(int(res.get('nonce')),4) - return s +from processor import Processor, print_log +from utils import * -class AbeStore(Datastore_class): +class AbeStore(Datastore.Datastore): def __init__(self, config): conf = DataStore.CONFIG_DEFAULTS - args, argv = readconf.parse_argv( [], conf) - args.dbtype = config.get('database','type') + args, argv = readconf.parse_argv([], conf) + args.dbtype = config.get('database', 'type') if args.dbtype == 'sqlite3': - args.connect_args = { 'database' : config.get('database','database') } + args.connect_args = {'database': config.get('database', 'database')} elif args.dbtype == 'MySQLdb': - args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') } + args.connect_args = {'db': config.get('database', 'database'), 'user': config.get('database', 'username'), 'passwd': config.get('database', 'password')} elif args.dbtype == 'psycopg2': - args.connect_args = { 'database' : config.get('database','database') } + args.connect_args = {'database': config.get('database', 'database')} coin = config.get('server', 'coin') self.addrtype = 0 if coin == 'litecoin': - print_log ('Litecoin settings:') - datadir = config.get('server','datadir') - print_log (' datadir = ' + datadir) - args.datadir = [{"dirname":datadir,"chain":"Litecoin","code3":"LTC","address_version":"\u0030"}] - print_log (' addrtype = 48') + print_log('Litecoin settings:') + datadir = config.get('server', 'datadir') + print_log(' datadir = ' + datadir) + args.datadir = [{"dirname": datadir, "chain": "Litecoin", "code3": "LTC", "address_version": "\u0030"}] + print_log(' addrtype = 48') self.addrtype = 48 - Datastore_class.__init__(self,args) + Datastore.Datastore.__init__(self, args) # Use 1 (Bitcoin) if chain_id is not sent self.chain_id = self.datadirs[0]["chain_id"] or 1 - print_log ('Coin chain_id = %d' % self.chain_id) + print_log('Coin chain_id = %d' % self.chain_id) - self.sql_limit = int( config.get('database','limit') ) + self.sql_limit = int(config.get('database', 'limit')) self.tx_cache = {} - self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port')) + self.bitcoind_url = 'http://%s:%s@%s:%s/' % (config.get('bitcoind', 'user'), config.get('bitcoind', 'password'), config.get('bitcoind', 'host'), config.get('bitcoind', 'port')) self.chunk_cache = {} @@ -76,43 +59,37 @@ class AbeStore(Datastore_class): self.last_tx_id = 0 self.known_mempool_hashes = [] - - def import_tx(self, tx, is_coinbase): tx_id = super(AbeStore, self).import_tx(tx, is_coinbase) self.last_tx_id = tx_id return tx_id - - - def import_block(self, b, chain_ids=frozenset()): - #print_log ("import block") + #print_log("import block") block_id = super(AbeStore, self).import_block(b, chain_ids) for pos in xrange(len(b['transactions'])): tx = b['transactions'][pos] if 'hash' not in tx: - tx['hash'] = util.double_sha256(tx['tx']) + tx['hash'] = Hash(tx['tx']) tx_id = self.tx_find_id_and_value(tx) if tx_id: self.update_tx_cache(tx_id) else: - print_log ("error: import_block: no tx_id") + print_log("error: import_block: no tx_id") return block_id - def update_tx_cache(self, txid): inrows = self.get_tx_inputs(txid, False) for row in inrows: _hash = self.binout(row[6]) if not _hash: - #print_log ("WARNING: missing tx_in for tx", txid) + #print_log("WARNING: missing tx_in for tx", txid) continue address = hash_to_address(chr(self.addrtype), _hash) with self.cache_lock: - if self.tx_cache.has_key(address): - print_log ("cache: invalidating", address) + if address in self.tx_cache: + print_log("cache: invalidating", address) self.tx_cache.pop(address) self.address_queue.put(address) @@ -121,34 +98,34 @@ class AbeStore(Datastore_class): for row in outrows: _hash = self.binout(row[6]) if not _hash: - #print_log ("WARNING: missing tx_out for tx", txid) + #print_log("WARNING: missing tx_out for tx", txid) continue address = hash_to_address(chr(self.addrtype), _hash) with self.cache_lock: - if self.tx_cache.has_key(address): - print_log ("cache: invalidating", address) + if address in self.tx_cache: + print_log("cache: invalidating", address) self.tx_cache.pop(address) self.address_queue.put(address) - def safe_sql(self,sql, params=(), lock=True): - + def safe_sql(self, sql, params=(), lock=True): error = False try: - if lock: self.lock.acquire() - ret = self.selectall(sql,params) + if lock: + self.lock.acquire() + ret = self.selectall(sql, params) except: error = True traceback.print_exc(file=sys.stdout) finally: - if lock: self.lock.release() + if lock: + self.lock.release() - if error: - raise BaseException('sql error') + if error: + raise Exception('sql error') return ret - def get_tx_outputs(self, tx_id, lock=True): return self.safe_sql("""SELECT @@ -163,9 +140,9 @@ class AbeStore(Datastore_class): LEFT JOIN txin ON (txin.txout_id = txout.txout_id) LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id) LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id) - WHERE txout.tx_id = %d + WHERE txout.tx_id = %d ORDER BY txout.txout_pos - """%(tx_id), (), lock) + """ % (tx_id), (), lock) def get_tx_inputs(self, tx_id, lock=True): return self.safe_sql(""" SELECT @@ -183,8 +160,7 @@ class AbeStore(Datastore_class): LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id) WHERE txin.tx_id = %d ORDER BY txin.txin_pos - """%(tx_id,), (), lock) - + """ % (tx_id,), (), lock) def get_address_out_rows(self, dbhash): out = self.safe_sql(""" SELECT @@ -209,8 +185,8 @@ class AbeStore(Datastore_class): AND cc.in_longest = 1 LIMIT ? """, (dbhash, self.chain_id, self.sql_limit)) - if len(out)==self.sql_limit: - raise BaseException('limit reached') + if len(out) == self.sql_limit: + raise Exception('limit reached') return out def get_address_out_rows_memorypool(self, dbhash): @@ -220,15 +196,15 @@ class AbeStore(Datastore_class): tx.tx_id, txin.txin_pos, -prevout.txout_value - FROM tx + FROM tx JOIN txin ON (txin.tx_id = tx.tx_id) JOIN txout prevout ON (txin.txout_id = prevout.txout_id) JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id) WHERE pubkey.pubkey_hash = ? - LIMIT ? """, (dbhash,self.sql_limit)) + LIMIT ? """, (dbhash, self.sql_limit)) - if len(out)==self.sql_limit: - raise BaseException('limit reached') + if len(out) == self.sql_limit: + raise Exception('limit reached') return out def get_address_in_rows(self, dbhash): @@ -253,12 +229,12 @@ class AbeStore(Datastore_class): AND cc.in_longest = 1 LIMIT ? """, (dbhash, self.chain_id, self.sql_limit)) - if len(out)==self.sql_limit: - raise BaseException('limit reached') + if len(out) == self.sql_limit: + raise Exception('limit reached') return out def get_address_in_rows_memorypool(self, dbhash): - out = self.safe_sql( """ SELECT + out = self.safe_sql(""" SELECT 0, tx.tx_hash, tx.tx_id, @@ -268,21 +244,21 @@ class AbeStore(Datastore_class): JOIN txout ON (txout.tx_id = tx.tx_id) JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id) WHERE pubkey.pubkey_hash = ? - LIMIT ? """, (dbhash,self.sql_limit)) + LIMIT ? """, (dbhash, self.sql_limit)) - if len(out)==self.sql_limit: - raise BaseException('limit reached') + if len(out) == self.sql_limit: + raise Exception('limit reached') return out - - def get_history(self, addr, cache_only=False): + # todo: make this more efficient. it iterates over txpoints multiple times with self.cache_lock: - cached_version = self.tx_cache.get( addr ) + cached_version = self.tx_cache.get(addr) if cached_version is not None: return cached_version - if cache_only: return -1 + if cache_only: + return -1 version, binaddr = decode_check_address(addr) if binaddr is None: @@ -290,8 +266,8 @@ class AbeStore(Datastore_class): dbhash = self.binin(binaddr) rows = [] - rows += self.get_address_out_rows( dbhash ) - rows += self.get_address_in_rows( dbhash ) + rows += self.get_address_out_rows(dbhash) + rows += self.get_address_in_rows(dbhash) txpoints = [] known_tx = [] @@ -300,31 +276,29 @@ class AbeStore(Datastore_class): try: nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row except: - print_log ("cannot unpack row", row) + print_log("cannot unpack row", row) break tx_hash = self.hashout_hex(tx_hash) - txpoint = { - "timestamp": int(nTime), - "height": int(height), - "is_input": int(is_in), - "block_hash": self.hashout_hex(blk_hash), - "tx_hash": tx_hash, - "tx_id": int(tx_id), - "index": int(pos), - "value": int(value), - } - - txpoints.append(txpoint) - known_tx.append(self.hashout_hex(tx_hash)) + txpoints.append({ + "timestamp": int(nTime), + "height": int(height), + "is_input": int(is_in), + "block_hash": self.hashout_hex(blk_hash), + "tx_hash": tx_hash, + "tx_id": int(tx_id), + "index": int(pos), + "value": int(value), + }) + known_tx.append(self.hashout_hex(tx_hash)) # todo: sort them really... txpoints = sorted(txpoints, key=operator.itemgetter("timestamp")) # read memory pool rows = [] - rows += self.get_address_in_rows_memorypool( dbhash ) - rows += self.get_address_out_rows_memorypool( dbhash ) + rows += self.get_address_in_rows_memorypool(dbhash) + rows += self.get_address_out_rows_memorypool(dbhash) address_has_mempool = False for row in rows: @@ -335,35 +309,33 @@ class AbeStore(Datastore_class): # discard transactions that are too old if self.last_tx_id - tx_id > 50000: - print_log ("discarding tx id", tx_id) + print_log("discarding tx id", tx_id) continue # this means that pending transactions were added to the db, even if they are not returned by getmemorypool address_has_mempool = True - #print_log ("mempool", tx_hash) - txpoint = { - "timestamp": 0, - "height": 0, - "is_input": int(is_in), - "block_hash": 'mempool', - "tx_hash": tx_hash, - "tx_id": int(tx_id), - "index": int(pos), - "value": int(value), - } - txpoints.append(txpoint) - + #print_log("mempool", tx_hash) + txpoints.append({ + "timestamp": 0, + "height": 0, + "is_input": int(is_in), + "block_hash": 'mempool', + "tx_hash": tx_hash, + "tx_id": int(tx_id), + "index": int(pos), + "value": int(value), + }) for txpoint in txpoints: tx_id = txpoint['tx_id'] - + txinputs = [] inrows = self.get_tx_inputs(tx_id) for row in inrows: _hash = self.binout(row[6]) if not _hash: - #print_log ("WARNING: missing tx_in for tx", tx_id, addr) + #print_log("WARNING: missing tx_in for tx", tx_id, addr) continue address = hash_to_address(chr(self.addrtype), _hash) txinputs.append(address) @@ -373,7 +345,7 @@ class AbeStore(Datastore_class): for row in outrows: _hash = self.binout(row[6]) if not _hash: - #print_log ("WARNING: missing tx_out for tx", tx_id, addr) + #print_log("WARNING: missing tx_out for tx", tx_id, addr) continue address = hash_to_address(chr(self.addrtype), _hash) txoutputs.append(address) @@ -383,43 +355,45 @@ class AbeStore(Datastore_class): if not txpoint['is_input']: # detect if already redeemed... for row in outrows: - if row[6] == dbhash: break + if row[6] == dbhash: + break else: raise #row = self.get_tx_output(tx_id,dbhash) # pos, script, value, o_hash, o_id, o_pos, binaddr = row # if not redeemed, we add the script if row: - if not row[4]: txpoint['raw_output_script'] = row[1] + if not row[4]: + txpoint['raw_output_script'] = row[1] txpoint.pop('tx_id') - - txpoints = map(lambda x: {'tx_hash':x['tx_hash'], 'height':x['height']}, txpoints) + txpoints = map(lambda x: {'tx_hash': x['tx_hash'], 'height': x['height']}, txpoints) out = [] for item in txpoints: - if item not in out: out.append(item) + if item not in out: + out.append(item) # cache result ## do not cache mempool results because statuses are ambiguous #if not address_has_mempool: with self.cache_lock: self.tx_cache[addr] = out - - return out + return out def get_status(self, addr, cache_only=False): # for 0.5 clients tx_points = self.get_history(addr, cache_only) - if cache_only and tx_points == -1: return -1 + if cache_only and tx_points == -1: + return -1 - if not tx_points: return None + if not tx_points: + return None status = '' for tx in tx_points: status += tx.get('tx_hash') + ':%d:' % tx.get('height') - return hashlib.sha256( status ).digest().encode('hex') - + return hashlib.sha256(status).digest().encode('hex') def get_block_header(self, block_height): out = self.safe_sql(""" @@ -434,22 +408,29 @@ class AbeStore(Datastore_class): prev_block_hash, block_id FROM chain_summary - WHERE block_height = %d AND in_longest = 1"""%block_height) + WHERE block_height = %d AND in_longest = 1""" % block_height) - if not out: raise BaseException("block not found") + if not out: + raise Exception("block not found") row = out[0] - (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height,prev_block_hash, block_id) \ - = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) ) - - out = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash, - "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce} - return out - + (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height, prev_block_hash, block_id) \ + = (self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8])) + + return { + "block_height": block_height, + "version": block_version, + "prev_block_hash": prev_block_hash, + "merkle_root": hashMerkleRoot, + "timestamp": nTime, + "bits": nBits, + "nonce": nNonce, + } def get_chunk(self, index): with self.cache_lock: msg = self.chunk_cache.get(index) - if msg: return msg + if msg: + return msg sql = """ SELECT @@ -463,55 +444,60 @@ class AbeStore(Datastore_class): prev_block_hash, block_height FROM chain_summary - WHERE block_height >= %d AND block_height< %d AND in_longest = 1 ORDER BY block_height"""%(index*2016, (index+1)*2016) + WHERE block_height >= %d AND block_height< %d AND in_longest = 1 ORDER BY block_height""" % (index * 2016, (index+1) * 2016) out = self.safe_sql(sql) msg = '' for row in out: (block_hash, block_version, hashMerkleRoot, nTime, nBits, nNonce, height, prev_block_hash, block_height) \ - = ( self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8]) ) - h = {"block_height":block_height, "version":block_version, "prev_block_hash":prev_block_hash, - "merkle_root":hashMerkleRoot, "timestamp":nTime, "bits":nBits, "nonce":nNonce} - - if h.get('block_height')==0: h['prev_block_hash'] = "0"*64 + = (self.hashout_hex(row[0]), int(row[1]), self.hashout_hex(row[2]), int(row[3]), int(row[4]), int(row[5]), int(row[6]), self.hashout_hex(row[7]), int(row[8])) + h = { + "block_height": block_height, + "version": block_version, + "prev_block_hash": prev_block_hash, + "merkle_root": hashMerkleRoot, + "timestamp": nTime, + "bits": nBits, + "nonce": nNonce, + } + + if h.get('block_height') == 0: + h['prev_block_hash'] = "0" * 64 msg += header_to_string(h) - #print_log ("hash", encode(Hash(msg.decode('hex')))) + #print_log("hash", encode(Hash(msg.decode('hex')))) #if h.get('block_height')==1:break with self.cache_lock: self.chunk_cache[index] = msg - print_log ("get_chunk", index, len(msg)) + print_log("get_chunk", index, len(msg)) return msg - - def get_raw_tx(self, tx_hash, height): - postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash, 0, height], 'id':'jsonrpc'}) + postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash, 0, height], 'id': 'jsonrpc'}) respdata = urllib.urlopen(self.bitcoind_url, postdata).read() r = loads(respdata) - if r['error'] != None: - raise BaseException(r['error']) - - hextx = r.get('result') - return hextx + if r['error'] is not None: + raise Exception(r['error']) + return r.get('result') def get_tx_merkle(self, tx_hash): - out = self.safe_sql(""" - SELECT block_tx.block_id FROM tx - JOIN block_tx on tx.tx_id = block_tx.tx_id + SELECT block_tx.block_id FROM tx + JOIN block_tx on tx.tx_id = block_tx.tx_id JOIN chain_summary on chain_summary.block_id = block_tx.block_id - WHERE tx_hash='%s' AND in_longest = 1"""%tx_hash) + WHERE tx_hash='%s' AND in_longest = 1""" % tx_hash) - if not out: raise BaseException("not in a block") + if not out: + raise Exception("not in a block") block_id = int(out[0][0]) # get block height - out = self.safe_sql("SELECT block_height FROM chain_summary WHERE block_id = %d AND in_longest = 1"%block_id) + out = self.safe_sql("SELECT block_height FROM chain_summary WHERE block_id = %d AND in_longest = 1" % block_id) - if not out: raise BaseException("block not found") + if not out: + raise Exception("block not found") block_height = int(out[0][0]) merkle = [] @@ -525,7 +511,8 @@ class AbeStore(Datastore_class): ORDER BY tx_pos""", (block_id,)): _id, _pos, _hash = row merkle.append(_hash) - if _hash == tx_hash: tx_pos = int(_pos) + if _hash == tx_hash: + tx_pos = int(_pos) # find subset. # TODO: do not compute this on client request, better store the hash tree of each block in a database... @@ -535,81 +522,77 @@ class AbeStore(Datastore_class): s = [] while len(merkle) != 1: - if len(merkle)%2: merkle.append( merkle[-1] ) + if len(merkle) % 2: + merkle.append(merkle[-1]) n = [] while merkle: - new_hash = Hash( merkle[0] + merkle[1] ) + new_hash = Hash(merkle[0] + merkle[1]) if merkle[0] == target_hash: - s.append( encode(merkle[1])) + s.append(encode(merkle[1])) target_hash = new_hash elif merkle[1] == target_hash: - s.append( encode(merkle[0])) + s.append(encode(merkle[0])) target_hash = new_hash - n.append( new_hash ) + n.append(new_hash) merkle = merkle[2:] merkle = n # send result - return {"block_height":block_height, "merkle":s, "pos":tx_pos} - - - + return {"block_height": block_height, "merkle": s, "pos": tx_pos} def memorypool_update(store): - ds = BCDataStream.BCDataStream() - postdata = dumps({"method": 'getrawmempool', 'params': [], 'id':'jsonrpc'}) + postdata = dumps({"method": 'getrawmempool', 'params': [], 'id': 'jsonrpc'}) respdata = urllib.urlopen(store.bitcoind_url, postdata).read() r = loads(respdata) - if r['error'] != None: - print_log (r['error']) + if r['error'] is not None: + print_log(r['error']) return mempool_hashes = r.get('result') - num_new_tx = 0 + num_new_tx = 0 for tx_hash in mempool_hashes: - if tx_hash in store.known_mempool_hashes: continue + if tx_hash in store.known_mempool_hashes: + continue store.known_mempool_hashes.append(tx_hash) num_new_tx += 1 - postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash], 'id':'jsonrpc'}) + postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash], 'id': 'jsonrpc'}) respdata = urllib.urlopen(store.bitcoind_url, postdata).read() r = loads(respdata) - if r['error'] != None: + if r['error'] is not None: continue hextx = r.get('result') ds.clear() ds.write(hextx.decode('hex')) tx = deserialize.parse_Transaction(ds) - tx['hash'] = util.double_sha256(tx['tx']) - + tx['hash'] = Hash(tx['tx']) + if store.tx_find_id_and_value(tx): pass else: tx_id = store.import_tx(tx, False) store.update_tx_cache(tx_id) - #print_log (tx_hash) + #print_log(tx_hash) store.commit() store.known_mempool_hashes = mempool_hashes return num_new_tx - - def send_tx(self,tx): - postdata = dumps({"method": 'sendrawtransaction', 'params': [tx], 'id':'jsonrpc'}) + def send_tx(self, tx): + postdata = dumps({"method": 'sendrawtransaction', 'params': [tx], 'id': 'jsonrpc'}) respdata = urllib.urlopen(self.bitcoind_url, postdata).read() r = loads(respdata) - if r['error'] != None: + if r['error'] is not None: msg = r['error'].get('message') out = "error: transaction rejected by memorypool: " + msg + "\n" + tx else: out = r['result'] return out - def main_iteration(self): with self.lock: t1 = time.time() @@ -618,20 +601,17 @@ class AbeStore(Datastore_class): time_catch_up = t2 - t1 n = self.memorypool_update() time_mempool = time.time() - t2 - height = self.get_block_number( self.chain_id ) + height = self.get_block_number(self.chain_id) with self.cache_lock: - try: - self.chunk_cache.pop(height/2016) - except: + try: + self.chunk_cache.pop(height/2016) + except: pass - block_header = self.get_block_header( height ) + block_header = self.get_block_header(height) return block_header, time_catch_up, time_mempool, n - - - def catch_up(store): # if there is an exception, do rollback and then re-raise the exception for dircfg in store.datadirs: @@ -643,10 +623,6 @@ class AbeStore(Datastore_class): raise e - - -from processor import Processor, print_log - class BlockchainProcessor(Processor): def __init__(self, config, shared): @@ -658,23 +634,21 @@ class BlockchainProcessor(Processor): # catch_up first self.block_header, time_catch_up, time_mempool, n = self.store.main_iteration() self.block_number = self.block_header.get('block_height') - print_log ("blockchain: %d blocks"%self.block_number) + print_log("blockchain: %d blocks" % self.block_number) threading.Timer(10, self.run_store_iteration).start() - def add_request(self, request): # see if we can get if from cache. if not, add to queue - if self.process( request, cache_only = True) == -1: + if self.process(request, cache_only=True) == -1: self.queue.put(request) - - def process(self, request, cache_only = False): - #print_log ("abe process", request) + def process(self, request, cache_only=False): + #print_log("abe process", request) message_id = request['id'] method = request['method'] - params = request.get('params',[]) + params = request.get('params', []) result = None error = None @@ -689,44 +663,44 @@ class BlockchainProcessor(Processor): address = params[0] result = self.store.get_status(address, cache_only) self.watch_address(address) - except BaseException, e: + except Exception, e: error = str(e) + ': ' + address - print_log ("error:", error) + print_log("error:", error) elif method == 'blockchain.address.get_history': try: address = params[0] - result = self.store.get_history( address, cache_only ) - except BaseException, e: + result = self.store.get_history(address, cache_only) + except Exception, e: error = str(e) + ': ' + address - print_log ("error:", error) + print_log("error:", error) elif method == 'blockchain.block.get_header': - if cache_only: + if cache_only: result = -1 else: try: height = params[0] - result = self.store.get_block_header( height ) - except BaseException, e: - error = str(e) + ': %d'% height - print_log ("error:", error) - + result = self.store.get_block_header(height) + except Exception, e: + error = str(e) + ': %d' % height + print_log("error:", error) + elif method == 'blockchain.block.get_chunk': if cache_only: result = -1 else: try: index = params[0] - result = self.store.get_chunk( index ) - except BaseException, e: - error = str(e) + ': %d'% index - print_log ("error:", error) - + result = self.store.get_chunk(index) + except Exception, e: + error = str(e) + ': %d' % index + print_log("error:", error) + elif method == 'blockchain.transaction.broadcast': txo = self.store.send_tx(params[0]) - print_log ("sent tx:", txo) - result = txo + print_log("sent tx:", txo) + result = txo elif method == 'blockchain.transaction.get_merkle': if cache_only: @@ -734,62 +708,59 @@ class BlockchainProcessor(Processor): else: try: tx_hash = params[0] - result = self.store.get_tx_merkle(tx_hash ) - except BaseException, e: + result = self.store.get_tx_merkle(tx_hash) + except Exception, e: error = str(e) + ': ' + tx_hash - print_log ("error:", error) - + print_log("error:", error) + elif method == 'blockchain.transaction.get': try: tx_hash = params[0] height = params[1] - result = self.store.get_raw_tx(tx_hash, height ) - except BaseException, e: + result = self.store.get_raw_tx(tx_hash, height) + except Exception, e: error = str(e) + ': ' + tx_hash - print_log ("error:", error) + print_log("error:", error) else: - error = "unknown method:%s"%method + error = "unknown method:%s" % method - if cache_only and result == -1: return -1 + if cache_only and result == -1: + return -1 if error: - response = { 'id':message_id, 'error':error } + response = {'id': message_id, 'error': error} self.push_response(response) elif result != '': - response = { 'id':message_id, 'result':result } + response = {'id': message_id, 'result': result} self.push_response(response) - def watch_address(self, addr): if addr not in self.watched_addresses: self.watched_addresses.append(addr) - def run_store_iteration(self): - try: block_header, time_catch_up, time_mempool, n = self.store.main_iteration() except: traceback.print_exc(file=sys.stdout) - print_log ("terminating") + print_log("terminating") self.shared.stop() - if self.shared.stopped(): - print_log ("exit timer") + if self.shared.stopped(): + print_log("exit timer") return - #print_log ("block number: %d (%.3fs) mempool:%d (%.3fs)"%(self.block_number, time_catch_up, n, time_mempool)) + #print_log("block number: %d (%.3fs) mempool:%d (%.3fs)"%(self.block_number, time_catch_up, n, time_mempool)) if self.block_number != block_header.get('block_height'): self.block_number = block_header.get('block_height') - print_log ("block number: %d (%.3fs)"%(self.block_number, time_catch_up)) - self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] }) + print_log("block number: %d (%.3fs)" % (self.block_number, time_catch_up)) + self.push_response({'id': None, 'method': 'blockchain.numblocks.subscribe', 'params': [self.block_number]}) if self.block_header != block_header: self.block_header = block_header - self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.block_header] }) - + self.push_response({'id': None, 'method': 'blockchain.headers.subscribe', 'params': [self.block_header]}) while True: try: @@ -798,11 +769,9 @@ class BlockchainProcessor(Processor): break if addr in self.watched_addresses: try: - status = self.store.get_status( addr ) - self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] }) + status = self.store.get_status(addr) + self.push_response({'id': None, 'method': 'blockchain.address.subscribe', 'params': [addr, status]}) except: break threading.Timer(10, self.run_store_iteration).start() - - diff --git a/backends/bitcoind/blockchain_processor.py b/backends/bitcoind/blockchain_processor.py index 5dfecb8..cb64178 100644 --- a/backends/bitcoind/blockchain_processor.py +++ b/backends/bitcoind/blockchain_processor.py @@ -1,14 +1,20 @@ +import ast +import hashlib from json import dumps, loads -import leveldb, urllib -import deserialize -import ast, time, threading, hashlib +import leveldb +import os from Queue import Queue -import traceback, sys, os, random - - -from util import Hash, hash_encode, hash_decode, rev_hex, int_to_hex -from util import bc_address_to_hash_160, hash_160_to_bc_address, header_to_string, header_from_string +import random +import sys +import time +import threading +import traceback +import urllib + +from backends.bitcoind import deserialize from processor import Processor, print_log +from utils import * + class BlockchainProcessor(Processor): @@ -40,21 +46,20 @@ class BlockchainProcessor(Processor): self.shared.stop() self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( - config.get('bitcoind','user'), - config.get('bitcoind','password'), - config.get('bitcoind','host'), - config.get('bitcoind','port')) + config.get('bitcoind', 'user'), + config.get('bitcoind', 'password'), + config.get('bitcoind', 'host'), + config.get('bitcoind', 'port')) self.height = 0 self.is_test = False self.sent_height = 0 self.sent_header = None - try: hist = self.deserialize(self.db.Get('height')) - self.last_hash, self.height, _ = hist[0] - print_log( "hist", hist ) + self.last_hash, self.height, _ = hist[0] + print_log("hist", hist) except: #traceback.print_exc(file=sys.stdout) print_log('initializing database') @@ -73,14 +78,12 @@ class BlockchainProcessor(Processor): shared.stop() sys.exit(0) - print_log( "blockchain is up to date." ) + print_log("blockchain is up to date.") threading.Timer(10, self.main_iteration).start() - - def bitcoind(self, method, params=[]): - postdata = dumps({"method": method, 'params': params, 'id':'jsonrpc'}) + postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'}) try: respdata = urllib.urlopen(self.bitcoind_url, postdata).read() except: @@ -88,10 +91,9 @@ class BlockchainProcessor(Processor): self.shared.stop() r = loads(respdata) - if r['error'] != None: + if r['error'] is not None: raise BaseException(r['error']) return r.get('result') - def serialize(self, h): s = '' @@ -99,32 +101,35 @@ class BlockchainProcessor(Processor): s += txid + int_to_hex(txpos, 4) + int_to_hex(height, 4) return s.decode('hex') - def deserialize(self, s): h = [] while s: txid = s[0:32].encode('hex') - txpos = int( rev_hex( s[32:36].encode('hex') ), 16 ) - height = int( rev_hex( s[36:40].encode('hex') ), 16 ) - h.append( ( txid, txpos, height ) ) + txpos = int(rev_hex(s[32:36].encode('hex')), 16) + height = int(rev_hex(s[36:40].encode('hex')), 16) + h.append((txid, txpos, height)) s = s[40:] return h - def block2header(self, b): - return {"block_height":b.get('height'), "version":b.get('version'), "prev_block_hash":b.get('previousblockhash'), - "merkle_root":b.get('merkleroot'), "timestamp":b.get('time'), "bits":int(b.get('bits'),16), "nonce":b.get('nonce')} - + return { + "block_height": b.get('height'), + "version": b.get('version'), + "prev_block_hash": b.get('previousblockhash'), + "merkle_root": b.get('merkleroot'), + "timestamp": b.get('time'), + "bits": int(b.get('bits'), 16), + "nonce": b.get('nonce'), + } def get_header(self, height): block_hash = self.bitcoind('getblockhash', [height]) b = self.bitcoind('getblock', [block_hash]) return self.block2header(b) - def init_headers(self, db_height): self.chunk_cache = {} - self.headers_filename = os.path.join( self.dbpath, 'blockchain_headers') + self.headers_filename = os.path.join(self.dbpath, 'blockchain_headers') if os.path.exists(self.headers_filename): height = os.path.getsize(self.headers_filename)/80 - 1 # the current height @@ -133,52 +138,47 @@ class BlockchainProcessor(Processor): else: prev_hash = None else: - open(self.headers_filename,'wb').close() + open(self.headers_filename, 'wb').close() prev_hash = None height = -1 if height < db_height: - print_log( "catching up missing headers:", height, db_height) + print_log("catching up missing headers:", height, db_height) try: while height < db_height: height = height + 1 header = self.get_header(height) - if height>1: + if height > 1: assert prev_hash == header.get('prev_block_hash') self.write_header(header, sync=False) prev_hash = self.hash_header(header) - if height%1000==0: print_log("headers file:",height) + if (height % 1000) == 0: + print_log("headers file:", height) except KeyboardInterrupt: self.flush_headers() sys.exit() self.flush_headers() - def hash_header(self, header): return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex')) - def read_header(self, block_height): if os.path.exists(self.headers_filename): - f = open(self.headers_filename,'rb') - f.seek(block_height*80) - h = f.read(80) - f.close() + with open(self.headers_filename, 'rb') as f: + f.seek(block_height * 80) + h = f.read(80) if len(h) == 80: h = header_from_string(h) return h - def read_chunk(self, index): - f = open(self.headers_filename,'rb') - f.seek(index*2016*80) - chunk = f.read(2016*80) - f.close() + with open(self.headers_filename, 'rb') as f: + f.seek(index*2016*80) + chunk = f.read(2016*80) return chunk.encode('hex') - def write_header(self, header, sync=True): if not self.headers_data: self.headers_offset = header.get('block_height') @@ -198,14 +198,13 @@ class BlockchainProcessor(Processor): self.headers_data = self.headers_data[:-40] def flush_headers(self): - if not self.headers_data: return - f = open(self.headers_filename,'rb+') - f.seek(self.headers_offset*80) - f.write(self.headers_data) - f.close() + if not self.headers_data: + return + with open(self.headers_filename, 'rb+') as f: + f.seek(self.headers_offset*80) + f.write(self.headers_data) self.headers_data = '' - def get_chunk(self, i): # store them on disk; store the current chunk in memory with self.cache_lock: @@ -216,7 +215,6 @@ class BlockchainProcessor(Processor): return chunk - def get_mempool_transaction(self, txid): try: raw_tx = self.bitcoind('getrawtransaction', [txid, 0, -1]) @@ -225,52 +223,57 @@ class BlockchainProcessor(Processor): vds = deserialize.BCDataStream() vds.write(raw_tx.decode('hex')) - out = deserialize.parse_Transaction(vds, is_coinbase = False) - return out + return deserialize.parse_Transaction(vds, is_coinbase=False) def get_history(self, addr, cache_only=False): - with self.cache_lock: hist = self.history_cache.get( addr ) - if hist is not None: return hist - if cache_only: return -1 + with self.cache_lock: + hist = self.history_cache.get(addr) + if hist is not None: + return hist + if cache_only: + return -1 with self.dblock: try: hash_160 = bc_address_to_hash_160(addr) hist = self.deserialize(self.db.Get(hash_160)) is_known = True - except: + except: hist = [] is_known = False # should not be necessary - hist.sort( key=lambda tup: tup[1]) + hist.sort(key=lambda tup: tup[1]) # check uniqueness too... # add memory pool with self.mempool_lock: - for txid in self.mempool_hist.get(addr,[]): + for txid in self.mempool_hist.get(addr, []): hist.append((txid, 0, 0)) - hist = map(lambda x: {'tx_hash':x[0], 'height':x[2]}, hist) + hist = map(lambda x: {'tx_hash': x[0], 'height': x[2]}, hist) # add something to distinguish between unused and empty addresses - if hist == [] and is_known: hist = ['*'] + if hist == [] and is_known: + hist = ['*'] - with self.cache_lock: self.history_cache[addr] = hist + with self.cache_lock: + self.history_cache[addr] = hist return hist - def get_status(self, addr, cache_only=False): tx_points = self.get_history(addr, cache_only) - if cache_only and tx_points == -1: return -1 + if cache_only and tx_points == -1: + return -1 - if not tx_points: return None - if tx_points == ['*']: return '*' + if not tx_points: + return None + if tx_points == ['*']: + return '*' status = '' for tx in tx_points: status += tx.get('tx_hash') + ':%d:' % tx.get('height') - return hashlib.sha256( status ).digest().encode('hex') - + return hashlib.sha256(status).digest().encode('hex') def get_merkle(self, tx_hash, height): @@ -278,41 +281,38 @@ class BlockchainProcessor(Processor): b = self.bitcoind('getblock', [block_hash]) tx_list = b.get('tx') tx_pos = tx_list.index(tx_hash) - + merkle = map(hash_decode, tx_list) target_hash = hash_decode(tx_hash) s = [] while len(merkle) != 1: - if len(merkle)%2: merkle.append( merkle[-1] ) + if len(merkle) % 2: + merkle.append(merkle[-1]) n = [] while merkle: - new_hash = Hash( merkle[0] + merkle[1] ) + new_hash = Hash(merkle[0] + merkle[1]) if merkle[0] == target_hash: - s.append( hash_encode( merkle[1])) + s.append(hash_encode(merkle[1])) target_hash = new_hash elif merkle[1] == target_hash: - s.append( hash_encode( merkle[0])) + s.append(hash_encode(merkle[0])) target_hash = new_hash - n.append( new_hash ) + n.append(new_hash) merkle = merkle[2:] merkle = n - return {"block_height":height, "merkle":s, "pos":tx_pos} - - - + return {"block_height": height, "merkle": s, "pos": tx_pos} def add_to_history(self, addr, tx_hash, tx_pos, tx_height): - # keep it sorted s = (tx_hash + int_to_hex(tx_pos, 4) + int_to_hex(tx_height, 4)).decode('hex') - serialized_hist = self.batch_list[addr] + serialized_hist = self.batch_list[addr] l = len(serialized_hist)/40 for i in range(l-1, -1, -1): item = serialized_hist[40*i:40*(i+1)] - item_height = int( rev_hex( item[36:40].encode('hex') ), 16 ) + item_height = int(rev_hex(item[36:40].encode('hex')), 16) if item_height < tx_height: serialized_hist = serialized_hist[0:40*(i+1)] + s + serialized_hist[40*(i+1):] break @@ -325,9 +325,7 @@ class BlockchainProcessor(Processor): txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex') self.batch_txio[txo] = addr - def remove_from_history(self, addr, tx_hash, tx_pos): - txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex') if addr is None: @@ -335,14 +333,14 @@ class BlockchainProcessor(Processor): addr = self.batch_txio[txi] except: raise BaseException(tx_hash, tx_pos) - + serialized_hist = self.batch_list[addr] l = len(serialized_hist)/40 for i in range(l): item = serialized_hist[40*i:40*(i+1)] if item[0:36] == txi: - height = int( rev_hex( item[36:40].encode('hex') ), 16 ) + height = int(rev_hex(item[36:40].encode('hex')), 16) serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):] break else: @@ -352,7 +350,6 @@ class BlockchainProcessor(Processor): self.batch_list[addr] = serialized_hist return height, addr - def deserialize_block(self, block): txlist = block.get('tx') tx_hashes = [] # ordered txids @@ -369,13 +366,12 @@ class BlockchainProcessor(Processor): return tx_hashes, txdict def get_undo_info(self, height): - s = self.db.Get("undo%d"%(height%100)) + s = self.db.Get("undo%d" % (height % 100)) return eval(s) def write_undo_info(self, batch, height, undo_info): if self.is_test or height > self.bitcoind_height - 100: - batch.Put("undo%d"%(height%100), repr(undo_info)) - + batch.Put("undo%d" % (height % 100), repr(undo_info)) def import_block(self, block, block_hash, block_height, sync, revert=False): @@ -392,7 +388,6 @@ class BlockchainProcessor(Processor): t00 = time.time() - if not revert: # read addresses of tx inputs for tx in txdict.values(): @@ -415,7 +410,7 @@ class BlockchainProcessor(Processor): for x in tx.get('outputs'): txo = (txid + int_to_hex(x.get('index'), 4)).decode('hex') block_outputs.append(txo) - + # read histories of addresses for txid, tx in txdict.items(): for x in tx.get('outputs'): @@ -426,37 +421,38 @@ class BlockchainProcessor(Processor): for addr in addr_to_read: try: self.batch_list[addr] = self.db.Get(addr) - except: + except: self.batch_list[addr] = '' - - if revert: + if revert: undo_info = self.get_undo_info(block_height) # print "undo", block_height, undo_info - else: undo_info = {} + else: + undo_info = {} # process t1 = time.time() - if revert: tx_hashes = tx_hashes[::-1] - for txid in tx_hashes: # must be ordered + if revert: + tx_hashes = tx_hashes[::-1] + for txid in tx_hashes: # must be ordered tx = txdict[txid] if not revert: undo = [] for x in tx.get('inputs'): - prevout_height, prevout_addr = self.remove_from_history( None, x.get('prevout_hash'), x.get('prevout_n')) - undo.append( (prevout_height, prevout_addr) ) + prevout_height, prevout_addr = self.remove_from_history(None, x.get('prevout_hash'), x.get('prevout_n')) + undo.append((prevout_height, prevout_addr)) undo_info[txid] = undo for x in tx.get('outputs'): hash_160 = bc_address_to_hash_160(x.get('address')) - self.add_to_history( hash_160, txid, x.get('index'), block_height) - + self.add_to_history(hash_160, txid, x.get('index'), block_height) + else: for x in tx.get('outputs'): hash_160 = bc_address_to_hash_160(x.get('address')) - self.remove_from_history( hash_160, txid, x.get('index')) + self.remove_from_history(hash_160, txid, x.get('index')) i = 0 for x in tx.get('inputs'): @@ -468,8 +464,8 @@ class BlockchainProcessor(Processor): self.batch_list[prevout_addr] = self.db.Get(prevout_addr) # re-add them to the history - self.add_to_history( prevout_addr, x.get('prevout_hash'), x.get('prevout_n'), prevout_height) - # print_log( "new hist for", hash_160_to_bc_address(prevout_addr), self.deserialize(self.batch_list[prevout_addr]) ) + self.add_to_history(prevout_addr, x.get('prevout_hash'), x.get('prevout_n'), prevout_height) + # print_log("new hist for", hash_160_to_bc_address(prevout_addr), self.deserialize(self.batch_list[prevout_addr]) ) # write max_len = 0 @@ -491,7 +487,7 @@ class BlockchainProcessor(Processor): # delete spent inputs for txi in block_inputs: batch.Delete(txi) - # add undo info + # add undo info self.write_undo_info(batch, block_height, undo_info) else: # restore spent inputs @@ -501,41 +497,36 @@ class BlockchainProcessor(Processor): for txo in block_outputs: batch.Delete(txo) - # add the max - batch.Put('height', self.serialize( [(block_hash, block_height, 0)] ) ) + batch.Put('height', self.serialize([(block_hash, block_height, 0)])) # actual write - self.db.Write(batch, sync = sync) + self.db.Write(batch, sync=sync) t3 = time.time() - if t3 - t0 > 10 and not sync: - print_log("block", block_height, - "parse:%0.2f "%(t00 - t0), - "read:%0.2f "%(t1 - t00), - "proc:%.2f "%(t2-t1), - "write:%.2f "%(t3-t2), + if t3 - t0 > 10 and not sync: + print_log("block", block_height, + "parse:%0.2f " % (t00 - t0), + "read:%0.2f " % (t1 - t00), + "proc:%.2f " % (t2-t1), + "write:%.2f " % (t3-t2), "max:", max_len, hash_160_to_bc_address(max_addr)) - for h160 in self.batch_list.keys(): + for h160 in self.batch_list.keys(): addr = hash_160_to_bc_address(h160) self.invalidate_cache(addr) - - def add_request(self, request): # see if we can get if from cache. if not, add to queue - if self.process( request, cache_only = True) == -1: + if self.process(request, cache_only=True) == -1: self.queue.put(request) - - - def process(self, request, cache_only = False): + def process(self, request, cache_only=False): #print "abe process", request message_id = request['id'] method = request['method'] - params = request.get('params',[]) + params = request.get('params', []) result = None error = None @@ -552,13 +543,13 @@ class BlockchainProcessor(Processor): self.watch_address(address) except BaseException, e: error = str(e) + ': ' + address - print_log( "error:", error ) + print_log("error:", error) elif method == 'blockchain.address.unsubscribe': try: password = params[0] address = params[1] - if password == self.config.get('server','password'): + if password == self.config.get('server', 'password'): self.watched_addresses.remove(address) print_log('unsubscribed', address) result = "ok" @@ -567,46 +558,46 @@ class BlockchainProcessor(Processor): result = "authentication error" except BaseException, e: error = str(e) + ': ' + address - print_log( "error:", error ) + print_log("error:", error) elif method == 'blockchain.address.get_history': try: address = params[0] - result = self.get_history( address, cache_only ) + result = self.get_history(address, cache_only) except BaseException, e: error = str(e) + ': ' + address - print_log( "error:", error ) + print_log("error:", error) elif method == 'blockchain.block.get_header': - if cache_only: + if cache_only: result = -1 else: try: height = params[0] - result = self.get_header( height ) + result = self.get_header(height) except BaseException, e: - error = str(e) + ': %d'% height - print_log( "error:", error ) - + error = str(e) + ': %d' % height + print_log("error:", error) + elif method == 'blockchain.block.get_chunk': if cache_only: result = -1 else: try: index = params[0] - result = self.get_chunk( index ) + result = self.get_chunk(index) except BaseException, e: - error = str(e) + ': %d'% index - print_log( "error:", error) + error = str(e) + ': %d' % index + print_log("error:", error) elif method == 'blockchain.transaction.broadcast': try: txo = self.bitcoind('sendrawtransaction', params) - print_log( "sent tx:", txo ) - result = txo + print_log("sent tx:", txo) + result = txo except BaseException, e: - result = str(e) # do not send an error - print_log( "error:", str(e), params ) + result = str(e) # do not send an error + print_log("error:", result, params) elif method == 'blockchain.transaction.get_merkle': if cache_only: @@ -615,60 +606,55 @@ class BlockchainProcessor(Processor): try: tx_hash = params[0] tx_height = params[1] - result = self.get_merkle(tx_hash, tx_height) + result = self.get_merkle(tx_hash, tx_height) except BaseException, e: error = str(e) + ': ' + tx_hash - print_log( "error:", error ) - + print_log("error:", error) + elif method == 'blockchain.transaction.get': try: tx_hash = params[0] height = params[1] - result = self.bitcoind('getrawtransaction', [tx_hash, 0, height] ) + result = self.bitcoind('getrawtransaction', [tx_hash, 0, height]) except BaseException, e: error = str(e) + ': ' + tx_hash - print_log( "error:", error ) + print_log("error:", error) else: - error = "unknown method:%s"%method + error = "unknown method:%s" % method - if cache_only and result == -1: return -1 + if cache_only and result == -1: + return -1 if error: - response = { 'id':message_id, 'error':error } - self.push_response(response) + response = {'id': message_id, 'error': error} elif result != '': - response = { 'id':message_id, 'result':result } - self.push_response(response) - + response = {'id': message_id, 'result': result} + self.push_response(response) def watch_address(self, addr): if addr not in self.watched_addresses: self.watched_addresses.append(addr) - - - def catch_up(self, sync = True): - + def catch_up(self, sync=True): t1 = time.time() while not self.shared.stopped(): - # are we done yet? info = self.bitcoind('getinfo') self.bitcoind_height = info.get('blocks') bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height]) - if self.last_hash == bitcoind_block_hash: + if self.last_hash == bitcoind_block_hash: self.up_to_date = True break # not done.. self.up_to_date = False - next_block_hash = self.bitcoind('getblockhash', [self.height+1]) + next_block_hash = self.bitcoind('getblockhash', [self.height + 1]) next_block = self.bitcoind('getblock', [next_block_hash, 1]) - # fixme: this is unsafe, if we revert when the undo info is not yet written - revert = (random.randint(1, 100)==1) if self.is_test else False + # fixme: this is unsafe, if we revert when the undo info is not yet written + revert = (random.randint(1, 100) == 1) if self.is_test else False if (next_block.get('previousblockhash') == self.last_hash) and not revert: @@ -677,40 +663,37 @@ class BlockchainProcessor(Processor): self.write_header(self.block2header(next_block), sync) self.last_hash = next_block_hash - if (self.height)%100 == 0 and not sync: + if self.height % 100 == 0 and not sync: t2 = time.time() - print_log( "catch_up: block %d (%.3fs)"%( self.height, t2 - t1 ) ) + print_log("catch_up: block %d (%.3fs)" % (self.height, t2 - t1)) t1 = t2 - + else: # revert current block block = self.bitcoind('getblock', [self.last_hash, 1]) - print_log( "blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash ) + print_log("blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash) self.import_block(block, self.last_hash, self.height, sync, revert=True) self.pop_header() self.flush_headers() - self.height = self.height -1 + self.height -= 1 # read previous header from disk self.header = self.read_header(self.height) self.last_hash = self.hash_header(self.header) - self.header = self.block2header(self.bitcoind('getblock', [self.last_hash])) - - - def memorypool_update(self): - mempool_hashes = self.bitcoind('getrawmempool') for tx_hash in mempool_hashes: - if tx_hash in self.mempool_hashes: continue + if tx_hash in self.mempool_hashes: + continue tx = self.get_mempool_transaction(tx_hash) - if not tx: continue + if not tx: + continue for x in tx.get('inputs'): txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex') @@ -720,15 +703,15 @@ class BlockchainProcessor(Processor): except: continue l = self.mempool_addresses.get(tx_hash, []) - if addr not in l: - l.append( addr ) + if addr not in l: + l.append(addr) self.mempool_addresses[tx_hash] = l for x in tx.get('outputs'): addr = x.get('address') l = self.mempool_addresses.get(tx_hash, []) - if addr not in l: - l.append( addr ) + if addr not in l: + l.append(addr) self.mempool_addresses[tx_hash] = l self.mempool_hashes.append(tx_hash) @@ -746,13 +729,13 @@ class BlockchainProcessor(Processor): for tx_hash, addresses in self.mempool_addresses.items(): for addr in addresses: h = new_mempool_hist.get(addr, []) - if tx_hash not in h: - h.append( tx_hash ) + if tx_hash not in h: + h.append(tx_hash) new_mempool_hist[addr] = h for addr in new_mempool_hist.keys(): if addr in self.mempool_hist.keys(): - if self.mempool_hist[addr] != new_mempool_hist[addr]: + if self.mempool_hist[addr] != new_mempool_hist[addr]: self.invalidate_cache(addr) else: self.invalidate_cache(addr) @@ -760,23 +743,18 @@ class BlockchainProcessor(Processor): with self.mempool_lock: self.mempool_hist = new_mempool_hist - - def invalidate_cache(self, address): with self.cache_lock: - if self.history_cache.has_key(address): - print_log( "cache: invalidating", address ) + if 'address' in self.history_cache: + print_log("cache: invalidating", address) self.history_cache.pop(address) if address in self.watched_addresses: self.address_queue.put(address) - - def main_iteration(self): - - if self.shared.stopped(): - print_log( "blockchain processor terminating" ) + if self.shared.stopped(): + print_log("blockchain processor terminating") return with self.dblock: @@ -788,15 +766,22 @@ class BlockchainProcessor(Processor): t3 = time.time() # print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2) - if self.sent_height != self.height: self.sent_height = self.height - self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.height] }) + self.push_response({ + 'id': None, + 'method': 'blockchain.numblocks.subscribe', + 'params': [self.height], + }) if self.sent_header != self.header: - print_log( "blockchain: %d (%.3fs)"%( self.height, t2 - t1 ) ) + print_log("blockchain: %d (%.3fs)" % (self.height, t2 - t1)) self.sent_header = self.header - self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.header] }) + self.push_response({ + 'id': None, + 'method': 'blockchain.headers.subscribe', + 'params': [self.header], + }) while True: try: @@ -804,14 +789,14 @@ class BlockchainProcessor(Processor): except: break if addr in self.watched_addresses: - status = self.get_status( addr ) - self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] }) - - if not self.shared.stopped(): + status = self.get_status(addr) + self.push_response({ + 'id': None, + 'method': 'blockchain.address.subscribe', + 'params': [addr, status], + }) + + if not self.shared.stopped(): threading.Timer(10, self.main_iteration).start() else: - print_log( "blockchain processor terminating" ) - - - - + print_log("blockchain processor terminating") diff --git a/backends/bitcoind/deserialize.py b/backends/bitcoind/deserialize.py index 60af334..a1e873c 100644 --- a/backends/bitcoind/deserialize.py +++ b/backends/bitcoind/deserialize.py @@ -2,228 +2,183 @@ # # -#from bitcoin import public_key_to_bc_address, hash_160_to_bc_address, hash_encode -#import socket -import time, hashlib +import mmap +import string import struct -addrtype = 0 - - -Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest() -hash_encode = lambda x: x[::-1].encode('hex') -hash_decode = lambda x: x.decode('hex')[::-1] - -def hash_160(public_key): - md = hashlib.new('ripemd160') - md.update(hashlib.sha256(public_key).digest()) - return md.digest() - -def public_key_to_bc_address(public_key): - h160 = hash_160(public_key) - return hash_160_to_bc_address(h160) - -def hash_160_to_bc_address(h160): - vh160 = chr(addrtype) + h160 - h = Hash(vh160) - addr = vh160 + h[0:4] - return b58encode(addr) +import types -__b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' -__b58base = len(__b58chars) +from utils import * -def b58encode(v): - """ encode v, which is a string of bytes, to base58.""" - - long_value = 0L - for (i, c) in enumerate(v[::-1]): - long_value += (256**i) * ord(c) - - result = '' - while long_value >= __b58base: - div, mod = divmod(long_value, __b58base) - result = __b58chars[mod] + result - long_value = div - result = __b58chars[long_value] + result - - # Bitcoin does a little leading-zero-compression: - # leading 0-bytes in the input become leading-1s - nPad = 0 - for c in v: - if c == '\0': nPad += 1 - else: break - - return (__b58chars[0]*nPad) + result - -def b58decode(v, length): - """ decode v into a string of len bytes.""" - long_value = 0L - for (i, c) in enumerate(v[::-1]): - long_value += __b58chars.find(c) * (__b58base**i) - - result = '' - while long_value >= 256: - div, mod = divmod(long_value, 256) - result = chr(mod) + result - long_value = div - result = chr(long_value) + result - nPad = 0 - for c in v: - if c == __b58chars[0]: nPad += 1 - else: break +class SerializationError(Exception): + """Thrown when there's a problem deserializing or serializing.""" - result = chr(0)*nPad + result - if length is not None and len(result) != length: - return None - return result +class BCDataStream(object): + """Workalike python implementation of Bitcoin's CDataStream class.""" + def __init__(self): + self.input = None + self.read_cursor = 0 + def clear(self): + self.input = None + self.read_cursor = 0 + + def write(self, bytes): # Initialize with string of bytes + if self.input is None: + self.input = bytes + else: + self.input += bytes + + def map_file(self, file, start): # Initialize with bytes from file + self.input = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) + self.read_cursor = start + + def seek_file(self, position): + self.read_cursor = position + + def close_file(self): + self.input.close() + + def read_string(self): + # Strings are encoded depending on length: + # 0 to 252 : 1-byte-length followed by bytes (if any) + # 253 to 65,535 : byte'253' 2-byte-length followed by bytes + # 65,536 to 4,294,967,295 : byte '254' 4-byte-length followed by bytes + # ... and the Bitcoin client is coded to understand: + # greater than 4,294,967,295 : byte '255' 8-byte-length followed by bytes of string + # ... but I don't think it actually handles any strings that big. + if self.input is None: + raise SerializationError("call write(bytes) before trying to deserialize") + + try: + length = self.read_compact_size() + except IndexError: + raise SerializationError("attempt to read past end of buffer") + + return self.read_bytes(length) + + def write_string(self, string): + # Length-encoded as with read-string + self.write_compact_size(len(string)) + self.write(string) + + def read_bytes(self, length): + try: + result = self.input[self.read_cursor:self.read_cursor+length] + self.read_cursor += length + return result + except IndexError: + raise SerializationError("attempt to read past end of buffer") + + return '' + + def read_boolean(self): + return self.read_bytes(1)[0] != chr(0) + + def read_int16(self): + return self._read_num('= opcodes.OP_SINGLEBYTE_END: - opcode <<= 8 - opcode |= ord(bytes[i]) - i += 1 - - if opcode <= opcodes.OP_PUSHDATA4: - nSize = opcode - if opcode == opcodes.OP_PUSHDATA1: - nSize = ord(bytes[i]) + i = 0 + while i < len(bytes): + vch = None + opcode = ord(bytes[i]) i += 1 - elif opcode == opcodes.OP_PUSHDATA2: - (nSize,) = struct.unpack_from('= opcodes.OP_SINGLEBYTE_END: + opcode <<= 8 + opcode |= ord(bytes[i]) + i += 1 + + if opcode <= opcodes.OP_PUSHDATA4: + nSize = opcode + if opcode == opcodes.OP_PUSHDATA1: + nSize = ord(bytes[i]) + i += 1 + elif opcode == opcodes.OP_PUSHDATA2: + (nSize,) = struct.unpack_from(' 0: result += " " - if opcode <= opcodes.OP_PUSHDATA4: - result += "%d:"%(opcode,) - result += short_hex(vch) - else: - result += script_GetOpName(opcode) - return result + result = '' + for (opcode, vch, i) in script_GetOp(bytes): + if len(result) > 0: + result += " " + if opcode <= opcodes.OP_PUSHDATA4: + result += "%d:" % (opcode,) + result += short_hex(vch) + else: + result += script_GetOpName(opcode) + return result + def match_decoded(decoded, to_match): - if len(decoded) != len(to_match): - return False; - for i in range(len(decoded)): - if to_match[i] == opcodes.OP_PUSHDATA4 and decoded[i][0] <= opcodes.OP_PUSHDATA4: - continue # Opcodes below OP_PUSHDATA4 all just push data onto stack, and are equivalent. - if to_match[i] != decoded[i][0]: - return False - return True + if len(decoded) != len(to_match): + return False + for i in range(len(decoded)): + if to_match[i] == opcodes.OP_PUSHDATA4 and decoded[i][0] <= opcodes.OP_PUSHDATA4: + continue # Opcodes below OP_PUSHDATA4 all just push data onto stack, and are equivalent. + if to_match[i] != decoded[i][0]: + return False + return True + def extract_public_key(bytes): - decoded = [ x for x in script_GetOp(bytes) ] - - # non-generated TxIn transactions push a signature - # (seventy-something bytes) and then their public key - # (65 bytes) onto the stack: - match = [ opcodes.OP_PUSHDATA4, opcodes.OP_PUSHDATA4 ] - if match_decoded(decoded, match): - return public_key_to_bc_address(decoded[1][1]) - - # The Genesis Block, self-payments, and pay-by-IP-address payments look like: - # 65 BYTES:... CHECKSIG - match = [ opcodes.OP_PUSHDATA4, opcodes.OP_CHECKSIG ] - if match_decoded(decoded, match): - return public_key_to_bc_address(decoded[0][1]) - - # coins sent to black hole - # DUP HASH160 20 BYTES:... EQUALVERIFY CHECKSIG - match = [ opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_0, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG ] - if match_decoded(decoded, match): + decoded = list(script_GetOp(bytes)) + + # non-generated TxIn transactions push a signature + # (seventy-something bytes) and then their public key + # (65 bytes) onto the stack: + match = [opcodes.OP_PUSHDATA4, opcodes.OP_PUSHDATA4] + if match_decoded(decoded, match): + return public_key_to_bc_address(decoded[1][1]) + + # The Genesis Block, self-payments, and pay-by-IP-address payments look like: + # 65 BYTES:... CHECKSIG + match = [opcodes.OP_PUSHDATA4, opcodes.OP_CHECKSIG] + if match_decoded(decoded, match): + return public_key_to_bc_address(decoded[0][1]) + + # coins sent to black hole + # DUP HASH160 20 BYTES:... EQUALVERIFY CHECKSIG + match = [opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_0, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG] + if match_decoded(decoded, match): + return "None" + + # Pay-by-Bitcoin-address TxOuts look like: + # DUP HASH160 20 BYTES:... EQUALVERIFY CHECKSIG + match = [opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG] + if match_decoded(decoded, match): + return hash_160_to_bc_address(decoded[2][1]) + + # strange tx + match = [opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG, opcodes.OP_NOP] + if match_decoded(decoded, match): + return hash_160_to_bc_address(decoded[2][1]) + + #raise BaseException("address not found in script") see ce35795fb64c268a52324b884793b3165233b1e6d678ccaadf760628ec34d76b return "None" - - # Pay-by-Bitcoin-address TxOuts look like: - # DUP HASH160 20 BYTES:... EQUALVERIFY CHECKSIG - match = [ opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG ] - if match_decoded(decoded, match): - return hash_160_to_bc_address(decoded[2][1]) - - # strange tx - match = [ opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG, opcodes.OP_NOP ] - if match_decoded(decoded, match): - return hash_160_to_bc_address(decoded[2][1]) - - #raise BaseException("address not found in script") see ce35795fb64c268a52324b884793b3165233b1e6d678ccaadf760628ec34d76b - return "None" diff --git a/backends/bitcoind/util.py b/backends/bitcoind/util.py deleted file mode 100644 index f9b9ddc..0000000 --- a/backends/bitcoind/util.py +++ /dev/null @@ -1,183 +0,0 @@ -#!/usr/bin/env python -# -# Electrum - lightweight Bitcoin client -# Copyright (C) 2011 thomasv@gitorious -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - - -import hashlib, base64, re - - -def rev_hex(s): - return s.decode('hex')[::-1].encode('hex') - -def int_to_hex(i, length=1): - s = hex(i)[2:].rstrip('L') - s = "0"*(2*length - len(s)) + s - return rev_hex(s) - -def var_int(i): - if i<0xfd: - return int_to_hex(i) - elif i<=0xffff: - return "fd"+int_to_hex(i,2) - elif i<=0xffffffff: - return "fe"+int_to_hex(i,4) - else: - return "ff"+int_to_hex(i,8) - - -Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest() -hash_encode = lambda x: x[::-1].encode('hex') -hash_decode = lambda x: x.decode('hex')[::-1] - - -def header_to_string(res): - pbh = res.get('prev_block_hash') - if pbh is None: pbh = '0'*64 - s = int_to_hex(res.get('version'),4) \ - + rev_hex(pbh) \ - + rev_hex(res.get('merkle_root')) \ - + int_to_hex(int(res.get('timestamp')),4) \ - + int_to_hex(int(res.get('bits')),4) \ - + int_to_hex(int(res.get('nonce')),4) - return s - -def header_from_string( s): - hex_to_int = lambda s: eval('0x' + s[::-1].encode('hex')) - h = {} - h['version'] = hex_to_int(s[0:4]) - h['prev_block_hash'] = hash_encode(s[4:36]) - h['merkle_root'] = hash_encode(s[36:68]) - h['timestamp'] = hex_to_int(s[68:72]) - h['bits'] = hex_to_int(s[72:76]) - h['nonce'] = hex_to_int(s[76:80]) - return h - - -############ functions from pywallet ##################### - -addrtype = 0 - -def hash_160(public_key): - try: - md = hashlib.new('ripemd160') - md.update(hashlib.sha256(public_key).digest()) - return md.digest() - except: - import ripemd - md = ripemd.new(hashlib.sha256(public_key).digest()) - return md.digest() - - -def public_key_to_bc_address(public_key): - h160 = hash_160(public_key) - return hash_160_to_bc_address(h160) - -def hash_160_to_bc_address(h160): - if h160 == 'None': return 'None' - vh160 = chr(addrtype) + h160 - h = Hash(vh160) - addr = vh160 + h[0:4] - return b58encode(addr) - -def bc_address_to_hash_160(addr): - if addr == 'None': return 'None' - bytes = b58decode(addr, 25) - return bytes[1:21] - - -__b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' -__b58base = len(__b58chars) - -def b58encode(v): - """ encode v, which is a string of bytes, to base58.""" - - long_value = 0L - for (i, c) in enumerate(v[::-1]): - long_value += (256**i) * ord(c) - - result = '' - while long_value >= __b58base: - div, mod = divmod(long_value, __b58base) - result = __b58chars[mod] + result - long_value = div - result = __b58chars[long_value] + result - - # Bitcoin does a little leading-zero-compression: - # leading 0-bytes in the input become leading-1s - nPad = 0 - for c in v: - if c == '\0': nPad += 1 - else: break - - return (__b58chars[0]*nPad) + result - -def b58decode(v, length): - """ decode v into a string of len bytes.""" - long_value = 0L - for (i, c) in enumerate(v[::-1]): - long_value += __b58chars.find(c) * (__b58base**i) - - result = '' - while long_value >= 256: - div, mod = divmod(long_value, 256) - result = chr(mod) + result - long_value = div - result = chr(long_value) + result - - nPad = 0 - for c in v: - if c == __b58chars[0]: nPad += 1 - else: break - - result = chr(0)*nPad + result - if length is not None and len(result) != length: - return None - - return result - - -def EncodeBase58Check(vchIn): - hash = Hash(vchIn) - return b58encode(vchIn + hash[0:4]) - -def DecodeBase58Check(psz): - vchRet = b58decode(psz, None) - key = vchRet[0:-4] - csum = vchRet[-4:] - hash = Hash(key) - cs32 = hash[0:4] - if cs32 != csum: - return None - else: - return key - -def PrivKeyToSecret(privkey): - return privkey[9:9+32] - -def SecretToASecret(secret): - vchIn = chr(addrtype+128) + secret - return EncodeBase58Check(vchIn) - -def ASecretToSecret(key): - vch = DecodeBase58Check(key) - if vch and vch[0] == chr(addrtype+128): - return vch[1:] - else: - return False - -########### end pywallet functions ####################### - diff --git a/backends/irc/__init__.py b/backends/irc/__init__.py index e13c6c6..0e7b757 100644 --- a/backends/irc/__init__.py +++ b/backends/irc/__init__.py @@ -1,36 +1,45 @@ -import threading, socket, traceback, time, sys - -def random_string(N): - import random, string - return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) +import socket +import sys +import threading +import time +import traceback from processor import Processor +from utils import Hash, print_log from version import VERSION + class IrcThread(threading.Thread): def __init__(self, processor, config): threading.Thread.__init__(self) + self.processor = processor self.daemon = True - self.stratum_tcp_port = config.get('server','stratum_tcp_port') - self.stratum_http_port = config.get('server','stratum_http_port') - self.stratum_tcp_ssl_port = config.get('server','stratum_tcp_ssl_port') - self.stratum_http_ssl_port = config.get('server','stratum_http_ssl_port') - self.report_stratum_tcp_port = config.get('server','report_stratum_tcp_port') - self.report_stratum_http_port = config.get('server','report_stratum_http_port') - self.report_stratum_tcp_ssl_port = config.get('server','report_stratum_tcp_ssl_port') - self.report_stratum_http_ssl_port = config.get('server','report_stratum_http_ssl_port') + self.stratum_tcp_port = config.get('server', 'stratum_tcp_port') + self.stratum_http_port = config.get('server', 'stratum_http_port') + self.stratum_tcp_ssl_port = config.get('server', 'stratum_tcp_ssl_port') + self.stratum_http_ssl_port = config.get('server', 'stratum_http_ssl_port') + self.report_stratum_tcp_port = config.get('server', 'report_stratum_tcp_port') + self.report_stratum_http_port = config.get('server', 'report_stratum_http_port') + self.report_stratum_tcp_ssl_port = config.get('server', 'report_stratum_tcp_ssl_port') + self.report_stratum_http_ssl_port = config.get('server', 'report_stratum_http_ssl_port') self.peers = {} - self.host = config.get('server','host') - self.report_host = config.get('server','report_host') + self.host = config.get('server', 'host') + self.report_host = config.get('server', 'report_host') self.nick = config.get('server', 'irc_nick') - if self.report_stratum_tcp_port: self.stratum_tcp_port = self.report_stratum_tcp_port - if self.report_stratum_http_port: self.stratum_http_port = self.report_stratum_http_port - if self.report_stratum_tcp_ssl_port: self.stratum_tcp_ssl_port = self.report_stratum_tcp_ssl_port - if self.report_stratum_http_ssl_port: self.stratum_http_ssl_port = self.report_stratum_http_ssl_port - if self.report_host: self.host = self.report_host - if not self.nick: self.nick = random_string(10) + if self.report_stratum_tcp_port: + self.stratum_tcp_port = self.report_stratum_tcp_port + if self.report_stratum_http_port: + self.stratum_http_port = self.report_stratum_http_port + if self.report_stratum_tcp_ssl_port: + self.stratum_tcp_ssl_port = self.report_stratum_tcp_ssl_port + if self.report_stratum_http_ssl_port: + self.stratum_http_ssl_port = self.report_stratum_http_ssl_port + if self.report_host: + self.host = self.report_host + if not self.nick: + self.nick = Hash(self.report_host)[:10] self.prepend = 'E_' if config.get('server', 'coin') == 'litecoin': self.prepend = 'EL_' @@ -40,21 +49,20 @@ class IrcThread(threading.Thread): def get_peers(self): return self.peers.values() - def getname(self): s = 'v' + VERSION + ' ' - if self.pruning: s += 'p ' + if self.pruning: + s += 'p ' if self.stratum_tcp_port: - s += 't' + self.stratum_tcp_port + ' ' + s += 't' + self.stratum_tcp_port + ' ' if self.stratum_http_port: s += 'h' + self.stratum_http_port + ' ' if self.stratum_tcp_port: - s += 's' + self.stratum_tcp_ssl_port + ' ' + s += 's' + self.stratum_tcp_ssl_port + ' ' if self.stratum_http_port: s += 'g' + self.stratum_http_ssl_port + ' ' return s - def run(self): ircname = self.getname() @@ -75,28 +83,26 @@ class IrcThread(threading.Thread): sf = s.makefile('r', 0) t = 0 while not self.processor.shared.stopped(): - line = sf.readline() - line = line.rstrip('\r\n') - line = line.split() - if not line: continue - if line[0]=='PING': - s.send('PONG '+line[1]+'\n') - elif '353' in line: # answer to /names + line = sf.readline().rstrip('\r\n').split() + if not line: + continue + if line[0] == 'PING': + s.send('PONG ' + line[1] + '\n') + elif '353' in line: # answer to /names k = line.index('353') for item in line[k+1:]: if item.startswith(self.prepend): - s.send('WHO %s\n'%item) - elif '352' in line: # answer to /who + s.send('WHO %s\n' % item) + elif '352' in line: # answer to /who # warning: this is a horrible hack which apparently works k = line.index('352') - ip = line[k+4] - ip = socket.gethostbyname(ip) + ip = socket.gethostbyname(line[k+4]) name = line[k+6] host = line[k+9] - ports = line[k+10:] + ports = line[k+10:] self.peers[name] = (ip, host, ports) if time.time() - t > 5*60: - self.processor.push_response({'method':'server.peers', 'params':[self.get_peers()]}) + self.processor.push_response({'method': 'server.peers', 'params': [self.get_peers()]}) s.send('NAMES #electrum\n') t = time.time() self.peers = {} @@ -106,8 +112,7 @@ class IrcThread(threading.Thread): sf.close() s.close() - print "quitting IRC" - + print_log("quitting IRC") class ServerProcessor(Processor): @@ -115,22 +120,20 @@ class ServerProcessor(Processor): def __init__(self, config): Processor.__init__(self) self.daemon = True - self.banner = config.get('server','banner') - self.password = config.get('server','password') + self.banner = config.get('server', 'banner') + self.password = config.get('server', 'password') if config.get('server', 'irc') == 'yes': self.irc = IrcThread(self, config) - else: + else: self.irc = None - def get_peers(self): if self.irc: return self.irc.get_peers() else: return [] - def run(self): if self.irc: self.irc.start() @@ -148,12 +151,13 @@ class ServerProcessor(Processor): password = None if password != self.password: - response = { 'id':request['id'], 'result':None, 'error':'incorrect password'} - self.push_response(response) + self.push_response({'id': request['id'], + 'result': None, + 'error': 'incorrect password'}) return if method == 'server.banner': - result = self.banner.replace('\\n','\n') + result = self.banner.replace('\\n', '\n') elif method == 'server.peers.subscribe': result = self.get_peers() @@ -166,11 +170,11 @@ class ServerProcessor(Processor): result = 'stopping, please wait until all threads terminate.' elif method == 'server.info': - result = map(lambda s: { "time":s.time, - "name":s.name, - "address":s.address, - "version":s.version, - "subscriptions":len(s.subscriptions)}, + result = map(lambda s: {"time": s.time, + "name": s.name, + "address": s.address, + "version": s.version, + "subscriptions": len(s.subscriptions)}, self.dispatcher.request_dispatcher.get_sessions()) elif method == 'server.cache': @@ -182,9 +186,7 @@ class ServerProcessor(Processor): result = p.queue.qsize() else: - print "unknown method", request - - if result!='': - response = { 'id':request['id'], 'result':result } - self.push_response(response) + print_log("unknown method", request) + if result != '': + self.push_response({'id': request['id'], 'result': result}) diff --git a/backends/libbitcoin/__init__.py b/backends/libbitcoin/__init__.py index e6092f7..ef2eb1d 100644 --- a/backends/libbitcoin/__init__.py +++ b/backends/libbitcoin/__init__.py @@ -1,12 +1,14 @@ -import bitcoin -from bitcoin import bind, _1, _2, _3 -from processor import Processor import threading import time +import bitcoin +from bitcoin import bind, _1, _2, _3 + +from processor import Processor import history1 as history import membuf + class HistoryCache: def __init__(self): @@ -27,9 +29,10 @@ class HistoryCache: def clear(self, addresses): with self.lock: for address in addresses: - if self.cache.has_key(address): + if address in self.cache: del self.cache[address] + class MonitorAddress: def __init__(self, processor, cache, backend): @@ -46,7 +49,7 @@ class MonitorAddress: def monitor(self, address, result): for info in result: - if not info.has_key("raw_output_script"): + if "raw_output_script" not in info: continue assert info["is_input"] == 0 tx_hash = info["tx_hash"] @@ -112,8 +115,8 @@ class MonitorAddress: response = {"id": None, "method": "blockchain.address.subscribe", "params": [str(address)]} - history.payment_history(service, chain, txpool, memory_buff, - address, bind(self.send_notify, _1, _2, response)) + history.payment_history(service, chain, txpool, memory_buff, address, + bind(self.send_notify, _1, _2, response)) def mempool_n(self, result): assert result is not None @@ -136,6 +139,7 @@ class MonitorAddress: response["params"].append(self.mempool_n(result)) self.processor.push_response(response) + class Backend: def __init__(self): @@ -208,6 +212,7 @@ class Backend: else: print "Accepted transaction", tx_hash + class GhostValue: def __init__(self): @@ -222,6 +227,7 @@ class GhostValue: self.value = value self.event.set() + class NumblocksSubscribe: def __init__(self, backend, processor): @@ -253,6 +259,7 @@ class NumblocksSubscribe: "error": None} self.processor.push_response(response) + class AddressGetHistory: def __init__(self, backend, processor): @@ -265,8 +272,8 @@ class AddressGetHistory: chain = self.backend.blockchain txpool = self.backend.transaction_pool memory_buff = self.backend.memory_buffer - history.payment_history(service, chain, txpool, memory_buff, - address, bind(self.respond, _1, _2, request)) + history.payment_history(service, chain, txpool, memory_buff, address, + bind(self.respond, _1, _2, request)) def respond(self, ec, result, request): if ec: @@ -276,6 +283,7 @@ class AddressGetHistory: response = {"id": request["id"], "result": result, "error": None} self.processor.push_response(response) + class AddressSubscribe: def __init__(self, backend, processor, cache, monitor): @@ -290,8 +298,8 @@ class AddressSubscribe: chain = self.backend.blockchain txpool = self.backend.transaction_pool memory_buff = self.backend.memory_buffer - history.payment_history(service, chain, txpool, memory_buff, - address, bind(self.construct, _1, _2, request)) + history.payment_history(service, chain, txpool, memory_buff, address, + bind(self.construct, _1, _2, request)) def construct(self, ec, result, request): if ec: @@ -316,6 +324,7 @@ class AddressSubscribe: self.processor.push_response(response) return True + class BlockchainProcessor(Processor): def __init__(self, config): @@ -349,13 +358,16 @@ class BlockchainProcessor(Processor): try: tx = exporter.load_transaction(raw_tx) except RuntimeError: - response = {"id": request["id"], "result": None, - "error": {"message": - "Exception while parsing the transaction data.", - "code": -4}} + response = { + "id": request["id"], + "result": None, + "error": { + "message": "Exception while parsing the transaction data.", + "code": -4, + } + } else: self.backend.protocol.broadcast_transaction(tx) tx_hash = str(bitcoin.hash_transaction(tx)) response = {"id": request["id"], "result": tx_hash, "error": None} self.push_response(response) - diff --git a/backends/libbitcoin/composed.py b/backends/libbitcoin/composed.py index 9df57a0..b16cee0 100644 --- a/backends/libbitcoin/composed.py +++ b/backends/libbitcoin/composed.py @@ -1,7 +1,9 @@ -import bitcoin import threading import time +import bitcoin + + class ExpiryQueue(threading.Thread): def __init__(self): @@ -23,6 +25,7 @@ class ExpiryQueue(threading.Thread): expiry_queue = ExpiryQueue() + class StatementLine: def __init__(self, output_point): @@ -42,6 +45,7 @@ class StatementLine: return False return True + class PaymentHistory: def __init__(self, chain): @@ -62,9 +66,10 @@ class PaymentHistory: for outpoint in output_points: statement_line = StatementLine(outpoint) self.statement.append(statement_line) - self.chain.fetch_spend(outpoint, - bitcoin.bind(self.load_spend, - bitcoin._1, bitcoin._2, statement_line)) + self.chain.fetch_spend( + outpoint, + bitcoin.bind(self.load_spend, bitcoin._1, bitcoin._2, statement_line) + ) self.load_tx_info(outpoint, statement_line, False) def load_spend(self, ec, inpoint, statement_line): @@ -87,8 +92,7 @@ class PaymentHistory: line.input_loaded["value"] = -line.output_loaded["value"] result.append(line.input_loaded) else: - line.output_loaded["raw_output_script"] = \ - line.raw_output_script + line.output_loaded["raw_output_script"] = line.raw_output_script result.append(line.output_loaded) self.handle_finish(result) self.stop() @@ -106,23 +110,26 @@ class PaymentHistory: info["tx_hash"] = str(point.hash) info["index"] = point.index info["is_input"] = 1 if is_input else 0 - self.chain.fetch_transaction_index(point.hash, - bitcoin.bind(self.tx_index, bitcoin._1, bitcoin._2, bitcoin._3, - statement_line, info)) + self.chain.fetch_transaction_index( + point.hash, + bitcoin.bind(self.tx_index, bitcoin._1, bitcoin._2, bitcoin._3, statement_line, info) + ) def tx_index(self, ec, block_depth, offset, statement_line, info): info["height"] = block_depth - self.chain.fetch_block_header_by_depth(block_depth, - bitcoin.bind(self.block_header, bitcoin._1, bitcoin._2, - statement_line, info)) + self.chain.fetch_block_header_by_depth( + block_depth, + bitcoin.bind(self.block_header, bitcoin._1, bitcoin._2, statement_line, info) + ) def block_header(self, ec, blk_head, statement_line, info): info["timestamp"] = blk_head.timestamp info["block_hash"] = str(bitcoin.hash_block_header(blk_head)) tx_hash = bitcoin.hash_digest(info["tx_hash"]) - self.chain.fetch_transaction(tx_hash, - bitcoin.bind(self.load_tx, bitcoin._1, bitcoin._2, - statement_line, info)) + self.chain.fetch_transaction( + tx_hash, + bitcoin.bind(self.load_tx, bitcoin._1, bitcoin._2, statement_line, info) + ) def load_tx(self, ec, tx, statement_line, info): outputs = [] @@ -152,9 +159,10 @@ class PaymentHistory: for tx_idx, tx_in in enumerate(tx.inputs): if info["is_input"] == 1 and info["index"] == tx_idx: continue - self.chain.fetch_transaction(tx_in.previous_output.hash, - bitcoin.bind(self.load_input, bitcoin._1, bitcoin._2, - tx_in.previous_output.index, statement_line, info, tx_idx)) + self.chain.fetch_transaction( + tx_in.previous_output.hash, + bitcoin.bind(self.load_input, bitcoin._1, bitcoin._2, tx_in.previous_output.index, statement_line, info, tx_idx) + ) def load_input(self, ec, tx, index, statement_line, info, inputs_index): script = tx.outputs[index].output_script @@ -172,14 +180,17 @@ class PaymentHistory: statement_line.output_loaded = info self.finish_if_done() + def payment_history(chain, address, handle_finish): ph = PaymentHistory(chain) expiry_queue.add(ph) ph.run(address, handle_finish) + if __name__ == "__main__": def finish(result): print result + def last(ec, depth): print "D:", depth @@ -191,4 +202,3 @@ if __name__ == "__main__": print "Looking up", address payment_history(chain, address, finish) raw_input() - diff --git a/backends/libbitcoin/h1.py b/backends/libbitcoin/h1.py index 71026ba..8f66963 100644 --- a/backends/libbitcoin/h1.py +++ b/backends/libbitcoin/h1.py @@ -2,9 +2,11 @@ import bitcoin import history1 as history import membuf + def blockchain_started(ec, chain): print "Blockchain initialisation:", ec + def finish(ec, result): print "Finish:", ec for line in result: @@ -13,6 +15,7 @@ def finish(ec, result): print begin, " " * (12 - len(begin)), v print + a = bitcoin.async_service(1) chain = bitcoin.bdb_blockchain(a, "/home/genjix/libbitcoin/database", blockchain_started) @@ -20,8 +23,11 @@ txpool = bitcoin.transaction_pool(a, chain) txdat = bitcoin.data_chunk("0100000001d6cad920a04acd6c0609cd91fe4dafa1f3b933ac90e032c78fdc19d98785f2bb010000008b483045022043f8ce02784bd7231cb362a602920f2566c18e1877320bf17d4eabdac1019b2f022100f1fd06c57330683dff50e1b4571fb0cdab9592f36e3d7e98d8ce3f94ce3f255b01410453aa8d5ddef56731177915b7b902336109326f883be759ec9da9c8f1212c6fa3387629d06e5bf5e6bcc62ec5a70d650c3b1266bb0bcc65ca900cff5311cb958bffffffff0280969800000000001976a9146025cabdbf823949f85595f3d1c54c54cd67058b88ac602d2d1d000000001976a914c55c43631ab14f7c4fd9c5f153f6b9123ec32c8888ac00000000") ex = bitcoin.satoshi_exporter() tx = ex.load_transaction(txdat) + + def stored(ec): print "mbuff", ec + mbuff = membuf.memory_buffer(a.internal_ptr, chain.internal_ptr, txpool.internal_ptr) mbuff.receive(tx, stored) @@ -29,4 +35,3 @@ address = "1AA6mgxqSrvJTxRrYrikSnLaAGupVzvx4f" raw_input() history.payment_history(a, chain, txpool, mbuff, address, finish) raw_input() - diff --git a/backends/libbitcoin/history.py b/backends/libbitcoin/history.py index c9e807d..cc2a7ad 100644 --- a/backends/libbitcoin/history.py +++ b/backends/libbitcoin/history.py @@ -1,8 +1,10 @@ +import threading +import time + import bitcoin from bitcoin import bind, _1, _2, _3 -import threading import multimap -import time + class ExpiryQueue(threading.Thread): @@ -23,8 +25,10 @@ class ExpiryQueue(threading.Thread): with self.lock: self.items.append(item) + expiry_queue = ExpiryQueue() + class MemoryPoolBuffer: def __init__(self, txpool, chain, monitor): @@ -48,9 +52,11 @@ class MemoryPoolBuffer: address = bitcoin.payment_address() if address.extract(output.output_script): desc[2].append((idx, str(address))) - self.txpool.store(tx, + self.txpool.store( + tx, bind(self.confirmed, _1, desc), - bind(self.mempool_stored, _1, desc, handle_store)) + bind(self.mempool_stored, _1, desc, handle_store) + ) def mempool_stored(self, ec, desc, handle_store): tx_hash, prevouts, addrs = desc @@ -95,7 +101,7 @@ class MemoryPoolBuffer: pass result = [] for outpoint in output_points: - if self.lookup_input.has_key(str(outpoint)): + if str(outpoint) in self.lookup_input: point = self.lookup_input[str(outpoint)] info = ExtendableDict() info["tx_hash"] = point[0] @@ -103,7 +109,7 @@ class MemoryPoolBuffer: info["is_input"] = 1 info["timestamp"] = self.timestamps[info["tx_hash"]] result.append(info) - if self.lookup_address.has_key(str(address)): + if str(address) in self.lookup_address: addr_points = self.lookup_address[str(address)] for point in addr_points: info = ExtendableDict() @@ -114,6 +120,7 @@ class MemoryPoolBuffer: result.append(info) handle(result) + class PaymentEntry: def __init__(self, output_point): @@ -135,6 +142,7 @@ class PaymentEntry: def has_input(self): return self.input_point is not False + class History: def __init__(self, chain, txpool, membuf): @@ -153,12 +161,11 @@ class History: address = bitcoin.payment_address(address) # To begin we fetch all the outputs (payments in) # associated with this address - self.chain.fetch_outputs(address, - bind(self.check_membuf, _1, _2)) + self.chain.fetch_outputs(address, bind(self.check_membuf, _1, _2)) def stop(self): with self.lock: - assert self._stopped == False + assert self._stopped is False self._stopped = True def stopped(self): @@ -174,8 +181,7 @@ class History: def check_membuf(self, ec, output_points): if self.stop_on_error(ec): return - self.membuf.check(output_points, self.address, - bind(self.start_loading, _1, output_points)) + self.membuf.check(output_points, self.address, bind(self.start_loading, _1, output_points)) def start_loading(self, membuf_result, output_points): if len(membuf_result) == 0 and len(output_points) == 0: @@ -188,15 +194,13 @@ class History: with self.lock: self.statement.append(entry) # Attempt to fetch the spend of this output - self.chain.fetch_spend(outpoint, - bind(self.load_spend, _1, _2, entry)) + self.chain.fetch_spend(outpoint, bind(self.load_spend, _1, _2, entry)) self.load_tx_info(outpoint, entry, False) # Load memory pool transactions with self.lock: self.membuf_result = membuf_result for info in self.membuf_result: - self.txpool.fetch(bitcoin.hash_digest(info["tx_hash"]), - bind(self.load_pool_tx, _1, _2, info)) + self.txpool.fetch(bitcoin.hash_digest(info["tx_hash"]), bind(self.load_pool_tx, _1, _2, info)) def load_spend(self, ec, inpoint, entry): # Need a custom self.stop_on_error(...) as a missing spend @@ -227,7 +231,7 @@ class History: if any(not entry.is_loaded() for entry in self.statement): return # Memory buffer transactions finished loading? - if any(not info.has_key("height") for info in self.membuf_result): + if any("height" not in info for info in self.membuf_result): return # Whole operation completed successfully! Finish up. result = [] @@ -269,16 +273,14 @@ class History: # Before loading the transaction, Stratum requires the hash # of the parent block, so we load the block depth and then # fetch the block header and hash it. - self.chain.fetch_transaction_index(point.hash, - bind(self.tx_index, _1, _2, _3, entry, info)) + self.chain.fetch_transaction_index(point.hash, bind(self.tx_index, _1, _2, _3, entry, info)) def tx_index(self, ec, block_depth, offset, entry, info): if self.stop_on_error(ec): return info["height"] = block_depth # And now for the block hash - self.chain.fetch_block_header_by_depth(block_depth, - bind(self.block_header, _1, _2, entry, info)) + self.chain.fetch_block_header_by_depth(block_depth, bind(self.block_header, _1, _2, entry, info)) def block_header(self, ec, blk_head, entry, info): if self.stop_on_error(ec): @@ -287,8 +289,7 @@ class History: info["block_hash"] = str(bitcoin.hash_block_header(blk_head)) tx_hash = bitcoin.hash_digest(info["tx_hash"]) # Now load the actual main transaction for this input or output - self.chain.fetch_transaction(tx_hash, - bind(self.load_chain_tx, _1, _2, entry, info)) + self.chain.fetch_transaction(tx_hash, bind(self.load_chain_tx, _1, _2, entry, info)) def load_pool_tx(self, ec, tx, info): if self.stop_on_error(ec): @@ -319,8 +320,7 @@ class History: info["block_hash"] = "mempool" self.finish_if_done() create_handler = lambda prevout_index, input_index: \ - bind(self.load_input_pool_tx, _1, _2, - prevout_index, info, input_index) + bind(self.load_input_pool_tx, _1, _2, prevout_index, info, input_index) self.fetch_input_txs(tx, info, create_handler) def load_tx(self, tx, info): @@ -374,8 +374,7 @@ class History: entry.input_loaded = info self.finish_if_done() create_handler = lambda prevout_index, input_index: \ - bind(self.load_input_chain_tx, _1, _2, - prevout_index, entry, info, input_index) + bind(self.load_input_chain_tx, _1, _2, prevout_index, entry, info, input_index) self.fetch_input_txs(tx, info, create_handler) def inputs_all_loaded(self, info_inputs): @@ -418,11 +417,13 @@ class History: info["block_hash"] = "mempool" self.finish_if_done() + def payment_history(chain, txpool, membuf, address, handle_finish): h = History(chain, txpool, membuf) expiry_queue.add(h) h.start(address, handle_finish) + if __name__ == "__main__": ex = bitcoin.satoshi_exporter() tx_a = bitcoin.data_chunk("0100000003d0406a31f628e18f5d894b2eaf4af719906dc61be4fb433a484ed870f6112d15000000008b48304502210089c11db8c1524d8839243803ac71e536f3d876e8265bbb3bc4a722a5d0bd40aa022058c3e59a7842ef1504b1c2ce048f9af2d69bbf303401dced1f68b38d672098a10141046060f6c8e355b94375eec2cc1d231f8044e811552d54a7c4b36fe8ee564861d07545c6c9d5b9f60d16e67d683b93486c01d3bd3b64d142f48af70bb7867d0ffbffffffff6152ed1552b1f2635317cea7be06615a077fc0f4aa62795872836c4182ca0f25000000008b48304502205f75a468ddb08070d235f76cb94c3f3e2a75e537bc55d087cc3e2a1559b7ac9b022100b17e4c958aaaf9b93359f5476aa5ed438422167e294e7207d5cfc105e897ed91014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff01c52914dcb0f3d8822e5a9e3374e5893a7b6033c9cfce5a8e5e6a1b3222a5cb010000008c4930460221009561f7206cc98f40f3eab5f3308b12846d76523bd07b5f058463f387694452b2022100b2684ec201760fa80b02954e588f071e46d0ff16562c1ab393888416bf8fcc44014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff02407e0f00000000001976a914c3b98829108923c41b3c1ba6740ecb678752fd5e88ac40420f00000000001976a914424648ea6548cc1c4ea707c7ca58e6131791785188ac00000000") @@ -434,8 +435,10 @@ if __name__ == "__main__": def blockchain_started(ec, chain): print "Blockchain initialisation:", ec + def store_tx(ec): print "Tx", ec + def finish(result): print "Finish" if result is None: @@ -449,6 +452,7 @@ if __name__ == "__main__": class FakeMonitor: def tx_stored(self, tx): pass + def tx_confirmed(self, tx): pass @@ -475,4 +479,3 @@ if __name__ == "__main__": #payment_history(chain, txpool, membuf, address[1], finish) raw_input() print "Stopping..." - diff --git a/backends/libbitcoin/history1/__init__.py b/backends/libbitcoin/history1/__init__.py index 3bce9ab..be555b1 100644 --- a/backends/libbitcoin/history1/__init__.py +++ b/backends/libbitcoin/history1/__init__.py @@ -1,7 +1,9 @@ -import _history -from bitcoin import bind, _1, _2 import json +from bitcoin import bind, _1, _2 +import _history + + def wrap_finish(handle_finish, ec, result_json): try: result = json.loads(result_json) @@ -11,8 +13,8 @@ def wrap_finish(handle_finish, ec, result_json): else: handle_finish(ec, result) + def payment_history(service, chain, txpool, membuf, address, finish): _history.payment_history(service.internal_ptr, chain.internal_ptr, txpool.internal_ptr, membuf.internal_ptr, str(address), bind(wrap_finish, finish, _1, _2)) - diff --git a/backends/libbitcoin/multimap.py b/backends/libbitcoin/multimap.py index c0d4dca..9fb1892 100644 --- a/backends/libbitcoin/multimap.py +++ b/backends/libbitcoin/multimap.py @@ -7,7 +7,7 @@ class MultiMap: return self.multi[key] def __setitem__(self, key, value): - if not self.multi.has_key(key): + if key not in self.multi: self.multi[key] = [] self.multi[key].append(value) @@ -22,11 +22,13 @@ class MultiMap: def __repr__(self): return repr(self.multi) + def __str__(self): return str(self.multi) def has_key(self, key): - return self.multi.has_key(key) + return key in self.multi + if __name__ == "__main__": m = MultiMap() @@ -37,4 +39,3 @@ if __name__ == "__main__": m.delete("foo", 1) m.delete("bar", 2) print m.multi - diff --git a/backends/libbitcoin/trace_test.py b/backends/libbitcoin/trace_test.py index 6940962..7dfaa3d 100644 --- a/backends/libbitcoin/trace_test.py +++ b/backends/libbitcoin/trace_test.py @@ -1,22 +1,30 @@ -import bitcoin, trace_tx +import bitcoin + +import trace_tx + def blockchain_started(ec, chain): print "Blockchain initialisation:", ec + + def handle_tx(ec, tx): if ec: print ec trace_tx.trace_tx(service.internal_ptr, chain.internal_ptr, tx, finish) + def finish(ec, result): print ec print result -service = bitcoin.async_service(1) -chain = bitcoin.bdb_blockchain(service, "/home/genjix/libbitcoin/database", - blockchain_started) -chain.fetch_transaction( - bitcoin.hash_digest("16e3e3bfbaa072e33e6a9be1df7a13ecde5ad46a8d4d4893dbecaf0c0aeeb842"), - handle_tx) -raw_input() +if __name__ == '__main__': + service = bitcoin.async_service(1) + chain = bitcoin.bdb_blockchain(service, "/home/genjix/libbitcoin/database", + blockchain_started) + chain.fetch_transaction( + bitcoin.hash_digest("16e3e3bfbaa072e33e6a9be1df7a13ecde5ad46a8d4d4893dbecaf0c0aeeb842"), + handle_tx + ) + raw_input() diff --git a/processor.py b/processor.py index 314f4c5..baec5d0 100644 --- a/processor.py +++ b/processor.py @@ -1,25 +1,12 @@ import json +import Queue as queue import socket import threading import time -import traceback, sys -import Queue as queue - -def random_string(N): - import random, string - return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) - -def timestr(): - return time.strftime("[%d/%m/%Y-%H:%M:%S]") - - -print_lock = threading.Lock() -def print_log(*args): - args = [str(item) for item in args] - with print_lock: - sys.stderr.write(timestr() + " " + " ".join(args) + "\n") - sys.stderr.flush() +import traceback +import sys +from utils import random_string, timestr, print_log class Shared: @@ -30,7 +17,7 @@ class Shared: self.config = config def stop(self): - print_log( "Stopping Stratum" ) + print_log("Stopping Stratum") with self.lock: self._stopped = True @@ -65,8 +52,7 @@ class Processor(threading.Thread): except: traceback.print_exc(file=sys.stdout) - print_log( "processor terminating") - + print_log("processor terminating") class Dispatcher: @@ -86,7 +72,6 @@ class Dispatcher: self.request_dispatcher.processors[prefix] = processor - class RequestDispatcher(threading.Thread): def __init__(self, shared): @@ -108,7 +93,7 @@ class RequestDispatcher(threading.Thread): return self.response_queue.get() def push_request(self, session, item): - self.request_queue.put((session,item)) + self.request_queue.put((session, item)) def pop_request(self): return self.request_queue.get() @@ -138,7 +123,6 @@ class RequestDispatcher(threading.Thread): self.do_dispatch(session, request) except: traceback.print_exc(file=sys.stdout) - self.stop() @@ -149,7 +133,7 @@ class RequestDispatcher(threading.Thread): """ dispatch request to the relevant processor """ method = request['method'] - params = request.get('params',[]) + params = request.get('params', []) suffix = method.split('.')[-1] if session is not None: @@ -164,7 +148,7 @@ class RequestDispatcher(threading.Thread): try: p = self.processors[prefix] except: - print_log( "error: no processor for", prefix) + print_log("error: no processor for", prefix) return p.add_request(request) @@ -227,7 +211,11 @@ class Session: addr = None if self.subscriptions: - print_log( "%4s"%self.name, "%15s"%self.address, "%35s"%addr, "%3d"%len(self.subscriptions), self.version ) + print_log("%4s" % self.name, + "%15s" % self.address, + "%35s" % addr, + "%3d" % len(self.subscriptions), + self.version) def stopped(self): with self.lock: @@ -257,7 +245,7 @@ class Session: def contains_subscription(self, subdesc): with self.lock: return subdesc in self.subscriptions - + class ResponseDispatcher(threading.Thread): @@ -279,17 +267,21 @@ class ResponseDispatcher(threading.Thread): params = response.get('params') # A notification - if internal_id is None: # and method is not None and params is not None: + if internal_id is None: # and method is not None and params is not None: found = self.notification(method, params, response) if not found and method == 'blockchain.address.subscribe': - params2 = [self.shared.config.get('server','password')] + params - self.request_dispatcher.push_request(None,{'method':method.replace('.subscribe', '.unsubscribe'), 'params':params2, 'id':None}) + request = { + 'id': None, + 'method': method.replace('.subscribe', '.unsubscribe'), + 'params': [self.shared.config.get('server', 'password')] + params, + } + self.request_dispatcher.push_request(None, request) # A response - elif internal_id is not None: + elif internal_id is not None: self.send_response(internal_id, response) else: - print_log( "no method", response) + print_log("no method", response) def notification(self, method, params, response): subdesc = Session.build_subdesc(method, params) @@ -300,7 +292,7 @@ class ResponseDispatcher(threading.Thread): if session.contains_subscription(subdesc): session.send_response(response) found = True - # if not found: print_log( "no subscriber for", subdesc) + # if not found: print_log("no subscriber for", subdesc) return found def send_response(self, internal_id, response): @@ -309,5 +301,4 @@ class ResponseDispatcher(threading.Thread): response['id'] = message_id session.send_response(response) #else: - # print_log( "send_response: no session", message_id, internal_id, response ) - + # print_log("send_response: no session", message_id, internal_id, response ) diff --git a/server.py b/server.py index c3cb6fb..927e62d 100755 --- a/server.py +++ b/server.py @@ -15,10 +15,16 @@ # License along with this program. If not, see # . -import time, sys, traceback, threading import ConfigParser - import logging +import socket +import sys +import time +import threading +import traceback + +import json + logging.basicConfig() if sys.maxsize <= 2**32: @@ -32,6 +38,7 @@ def attempt_read_config(config, filename): except IOError: pass + def create_config(): config = ConfigParser.ConfigParser() # set some defaults, which will be overwritten by the config file @@ -64,44 +71,47 @@ def create_config(): try: with open('/etc/electrum.banner', 'r') as f: - config.set('server','banner', f.read()) + config.set('server', 'banner', f.read()) except IOError: pass return config + def run_rpc_command(command, stratum_tcp_port): - import socket, json try: - s = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) - s.connect(( host, int(stratum_tcp_port ))) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect((host, int(stratum_tcp_port))) except: print "cannot connect to server." return method = 'server.' + command - request = json.dumps( { 'id':0, 'method':method, 'params':[password] } ) + request = json.dumps({'id': 0, 'method': method, 'params': [password]}) s.send(request + '\n') msg = '' while True: o = s.recv(1024) msg += o - if msg.find('\n') != -1: break + if msg.find('\n') != -1: + break s.close() r = json.loads(msg).get('result') - if command == 'info': + if command == 'info': now = time.time() - print 'type address sub version time' + print 'type address sub version time' for item in r: - print '%4s %15s %3s %7s %.2f'%( item.get('name'), - item.get('address'), - item.get('subscriptions'), - item.get('version'), - (now - item.get('time')) ) + print '%4s %15s %3s %7s %.2f' % (item.get('name'), + item.get('address'), + item.get('subscriptions'), + item.get('version'), + (now - item.get('time')), + ) else: print r + if __name__ == '__main__': config = create_config() password = config.get('server', 'password') @@ -113,7 +123,8 @@ if __name__ == '__main__': ssl_certfile = config.get('server', 'ssl_certfile') ssl_keyfile = config.get('server', 'ssl_keyfile') - if stratum_tcp_ssl_port or stratum_http_ssl_port: assert ssl_certfile and ssl_keyfile + if stratum_tcp_ssl_port or stratum_http_ssl_port: + assert ssl_certfile and ssl_keyfile if len(sys.argv) > 1: run_rpc_command(sys.argv[1], stratum_tcp_port) @@ -133,8 +144,9 @@ if __name__ == '__main__': print "Unknown backend '%s' specified\n" % backend_name sys.exit(1) - for i in range(5): print "" - print_log( "Starting Electrum server on", host) + for i in xrange(5): + print "" + print_log("Starting Electrum server on", host) # Create hub dispatcher = Dispatcher(config) @@ -178,5 +190,4 @@ if __name__ == '__main__': except: shared.stop() - print_log( "Electrum Server stopped") - + print_log("Electrum Server stopped") diff --git a/transports/stratum_http.py b/transports/stratum_http.py index b3e9a72..add17ea 100644 --- a/transports/stratum_http.py +++ b/transports/stratum_http.py @@ -14,19 +14,31 @@ # You should have received a copy of the GNU Affero General Public # License along with this program. If not, see # . +""" +sessions are identified with cookies + - each session has a buffer of responses to requests -import jsonrpclib -from jsonrpclib import Fault -from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS + +from the processor point of view: + - the user only defines process() ; the rest is session management. thus sessions should not belong to processor + +""" +import json +import logging +import os +import Queue import SimpleXMLRPCServer -import SocketServer import socket -import logging -import os, time -import types +import SocketServer +import sys +import time +import threading import traceback -import sys, threading +import types +import jsonrpclib +from jsonrpclib import Fault +from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS from OpenSSL import SSL try: @@ -35,21 +47,9 @@ except ImportError: # For Windows fcntl = None -import json - - -""" -sessions are identified with cookies - - each session has a buffer of responses to requests - -from the processor point of view: - - the user only defines process() ; the rest is session management. thus sessions should not belong to processor - -""" - - -from processor import random_string, print_log +from processor import Session +from utils import random_string, print_log def get_version(request): @@ -59,38 +59,31 @@ def get_version(request): if 'id' in request.keys(): return 1.0 return None - + + def validate_request(request): - if type(request) is not types.DictType: - fault = Fault( - -32600, 'Request must be {}, not %s.' % type(request) - ) - return fault + if not isinstance(request, types.DictType): + return Fault(-32600, 'Request must be {}, not %s.' % type(request)) rpcid = request.get('id', None) version = get_version(request) if not version: - fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid) - return fault + return Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid) request.setdefault('params', []) method = request.get('method', None) params = request.get('params') param_types = (types.ListType, types.DictType, types.TupleType) - if not method or type(method) not in types.StringTypes or \ - type(params) not in param_types: - fault = Fault( - -32600, 'Invalid request parameters or method.', rpcid=rpcid - ) - return fault + if not method or type(method) not in types.StringTypes or type(params) not in param_types: + return Fault(-32600, 'Invalid request parameters or method.', rpcid=rpcid) return True + class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): def __init__(self, encoding=None): - SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, - allow_none=True, - encoding=encoding) + # todo: use super + SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, allow_none=True, encoding=encoding) - def _marshaled_dispatch(self, session_id, data, dispatch_method = None): + def _marshaled_dispatch(self, session_id, data, dispatch_method=None): response = None try: request = jsonrpclib.loads(data) @@ -105,8 +98,8 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): session.time = time.time() responses = [] - if type(request) is not types.ListType: - request = [ request ] + if not isinstance(request, types.ListType): + request = [request] for req_entry in request: result = validate_request(req_entry) @@ -115,14 +108,14 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): continue self.dispatcher.do_dispatch(session, req_entry) - + if req_entry['method'] == 'server.stop': - return json.dumps({'result':'ok'}) + return json.dumps({'result': 'ok'}) r = self.poll_session(session) for item in r: responses.append(json.dumps(item)) - + if len(responses) > 1: response = '[%s]' % ','.join(responses) elif len(responses) == 1: @@ -132,7 +125,6 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): return response - def create_session(self): session_id = random_string(10) session = HttpSession(session_id) @@ -149,11 +141,8 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): return responses +class StratumJSONRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): - -class StratumJSONRPCRequestHandler( - SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): - def do_OPTIONS(self): self.send_response(200) self.send_header('Allow', 'GET, POST, OPTIONS') @@ -161,7 +150,7 @@ class StratumJSONRPCRequestHandler( self.send_header('Access-Control-Allow-Headers', '*') self.send_header('Content-Length', '0') self.end_headers() - + def do_GET(self): if not self.is_rpc_path_valid(): self.report_404() @@ -170,7 +159,7 @@ class StratumJSONRPCRequestHandler( session_id = None c = self.headers.get('cookie') if c: - if c[0:8]=='SESSION=': + if c[0:8] == 'SESSION=': #print "found cookie", c[8:] session_id = c[8:] @@ -188,11 +177,11 @@ class StratumJSONRPCRequestHandler( fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string) response = fault.response() print "500", trace_string - if response == None: + if response is None: response = '' if session_id: - self.send_header("Set-Cookie", "SESSION=%s"%session_id) + self.send_header("Set-Cookie", "SESSION=%s" % session_id) self.send_header("Content-type", "application/json-rpc") self.send_header("Access-Control-Allow-Origin", "*") @@ -202,7 +191,6 @@ class StratumJSONRPCRequestHandler( self.wfile.flush() self.shutdown_connection() - def do_POST(self): if not self.is_rpc_path_valid(): self.report_404() @@ -220,7 +208,7 @@ class StratumJSONRPCRequestHandler( session_id = None c = self.headers.get('cookie') if c: - if c[0:8]=='SESSION=': + if c[0:8] == 'SESSION=': #print "found cookie", c[8:] session_id = c[8:] @@ -237,11 +225,11 @@ class StratumJSONRPCRequestHandler( fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string) response = fault.response() print "500", trace_string - if response == None: + if response is None: response = '' if session_id: - self.send_header("Set-Cookie", "SESSION=%s"%session_id) + self.send_header("Set-Cookie", "SESSION=%s" % session_id) self.send_header("Content-type", "application/json-rpc") self.send_header("Access-Control-Allow-Origin", "*") @@ -276,7 +264,7 @@ class SSLTCPServer(SocketServer.TCPServer): self.server_bind() self.server_activate() - def shutdown_request(self,request): + def shutdown_request(self, request): #request.shutdown() pass @@ -298,7 +286,7 @@ class StratumHTTPServer(SocketServer.TCPServer, StratumJSONRPCDispatcher): # Unix sockets can't be bound if they already exist in the # filesystem. The convention of e.g. X11 is to unlink # before binding again. - if os.path.exists(addr): + if os.path.exists(addr): try: os.unlink(addr) except OSError: @@ -331,7 +319,7 @@ class StratumHTTPSSLServer(SSLTCPServer, StratumJSONRPCDispatcher): # Unix sockets can't be bound if they already exist in the # filesystem. The convention of e.g. X11 is to unlink # before binding again. - if os.path.exists(addr): + if os.path.exists(addr): try: os.unlink(addr) except OSError: @@ -345,13 +333,6 @@ class StratumHTTPSSLServer(SSLTCPServer, StratumJSONRPCDispatcher): fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags) - - - - -from processor import Session -import Queue - class HttpSession(Session): def __init__(self, session_id): @@ -370,6 +351,7 @@ class HttpSession(Session): self._stopped = True return self._stopped + class HttpServer(threading.Thread): def __init__(self, dispatcher, host, port, use_ssl, certfile, keyfile): self.shared = dispatcher.shared @@ -383,22 +365,22 @@ class HttpServer(threading.Thread): self.keyfile = keyfile self.lock = threading.Lock() - def run(self): # see http://code.google.com/p/jsonrpclib/ from SocketServer import ThreadingMixIn if self.use_ssl: - class StratumThreadedServer(ThreadingMixIn, StratumHTTPSSLServer): pass - self.server = StratumThreadedServer(( self.host, self.port), self.certfile, self.keyfile) - print_log( "HTTPS server started.") + class StratumThreadedServer(ThreadingMixIn, StratumHTTPSSLServer): + pass + self.server = StratumThreadedServer((self.host, self.port), self.certfile, self.keyfile) + print_log("HTTPS server started.") else: - class StratumThreadedServer(ThreadingMixIn, StratumHTTPServer): pass - self.server = StratumThreadedServer(( self.host, self.port)) - print_log( "HTTP server started.") + class StratumThreadedServer(ThreadingMixIn, StratumHTTPServer): + pass + self.server = StratumThreadedServer((self.host, self.port)) + print_log("HTTP server started.") self.server.dispatcher = self.dispatcher self.server.register_function(None, 'server.stop') self.server.register_function(None, 'server.info') self.server.serve_forever() - diff --git a/transports/stratum_tcp.py b/transports/stratum_tcp.py index ffdd871..bc36a9e 100644 --- a/transports/stratum_tcp.py +++ b/transports/stratum_tcp.py @@ -1,10 +1,12 @@ import json +import Queue as queue import socket import threading import time -import Queue as queue -from processor import Session, Dispatcher, print_log +from processor import Session, Dispatcher +from utils import print_log + class TcpSession(Session): @@ -50,7 +52,6 @@ class TcpSession(Session): self.stop() - class TcpClientRequestor(threading.Thread): def __init__(self, dispatcher, session): @@ -93,7 +94,7 @@ class TcpClientRequestor(threading.Thread): raw_command = self.message[0:raw_buffer].strip() self.message = self.message[raw_buffer + 1:] - if raw_command == 'quit': + if raw_command == 'quit': self.session.stop() return False @@ -112,10 +113,11 @@ class TcpClientRequestor(threading.Thread): # Return an error JSON in response. self.dispatcher.push_response({"error": "syntax error", "request": raw_command}) else: - self.dispatcher.push_request(self.session,command) + self.dispatcher.push_request(self.session, command) return True + class TcpServer(threading.Thread): def __init__(self, dispatcher, host, port, use_ssl, ssl_certfile, ssl_keyfile): @@ -132,9 +134,9 @@ class TcpServer(threading.Thread): def run(self): if self.use_ssl: - print_log( "TCP/SSL server started.") + print_log("TCP/SSL server started.") else: - print_log( "TCP server started.") + print_log("TCP server started.") sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((self.host, self.port)) @@ -150,4 +152,3 @@ class TcpServer(threading.Thread): self.dispatcher.collect_garbage() client_req = TcpClientRequestor(self.dispatcher, session) client_req.start() - diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..95c6c17 --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python +# +# Electrum - lightweight Bitcoin client +# Copyright (C) 2011 thomasv@gitorious +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import base64 +from functools import partial +from itertools import imap +import random +import string +import threading +import time +import hashlib +import re +import sys + +__b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' +__b58base = len(__b58chars) + + +def rev_hex(s): + return s.decode('hex')[::-1].encode('hex') + + +def int_to_hex(i, length=1): + s = hex(i)[2:].rstrip('L') + s = "0"*(2*length - len(s)) + s + return rev_hex(s) + + +def var_int(i): + if i < 0xfd: + return int_to_hex(i) + elif i <= 0xffff: + return "fd" + int_to_hex(i, 2) + elif i <= 0xffffffff: + return "fe" + int_to_hex(i, 4) + else: + return "ff" + int_to_hex(i, 8) + + +Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest() + + +hash_encode = lambda x: x[::-1].encode('hex') + + +hash_decode = lambda x: x.decode('hex')[::-1] + + +def header_to_string(res): + pbh = res.get('prev_block_hash') + if pbh is None: + pbh = '0'*64 + + return int_to_hex(res.get('version'), 4) \ + + rev_hex(pbh) \ + + rev_hex(res.get('merkle_root')) \ + + int_to_hex(int(res.get('timestamp')), 4) \ + + int_to_hex(int(res.get('bits')), 4) \ + + int_to_hex(int(res.get('nonce')), 4) + + +def hex_to_int(s): + return eval('0x' + s[::-1].encode('hex')) + + +def header_from_string(s): + return { + 'version': hex_to_int(s[0:4]), + 'prev_block_hash': hash_encode(s[4:36]), + 'merkle_root': hash_encode(s[36:68]), + 'timestamp': hex_to_int(s[68:72]), + 'bits': hex_to_int(s[72:76]), + 'nonce': hex_to_int(s[76:80]), + } + + +############ functions from pywallet ##################### + +addrtype = 0 + + +def hash_160(public_key): + try: + md = hashlib.new('ripemd160') + md.update(hashlib.sha256(public_key).digest()) + return md.digest() + except: + import ripemd + md = ripemd.new(hashlib.sha256(public_key).digest()) + return md.digest() + + +def public_key_to_bc_address(public_key): + return hash_160_to_bc_address(hash_160(public_key)) + + +def hash_160_to_bc_address(h160): + if h160 == 'None': + return 'None' + vh160 = chr(addrtype) + h160 + h = Hash(vh160) + addr = vh160 + h[0:4] + return b58encode(addr) + + +def bc_address_to_hash_160(addr): + if addr == 'None': + return 'None' + bytes = b58decode(addr, 25) + return bytes[1:21] + + +def b58encode(v): + """encode v, which is a string of bytes, to base58.""" + + long_value = 0L + for (i, c) in enumerate(v[::-1]): + long_value += (256**i) * ord(c) + + result = '' + while long_value >= __b58base: + div, mod = divmod(long_value, __b58base) + result = __b58chars[mod] + result + long_value = div + result = __b58chars[long_value] + result + + # Bitcoin does a little leading-zero-compression: + # leading 0-bytes in the input become leading-1s + nPad = 0 + for c in v: + if c == '\0': + nPad += 1 + else: + break + + return (__b58chars[0]*nPad) + result + + +def b58decode(v, length): + """ decode v into a string of len bytes.""" + long_value = 0L + for (i, c) in enumerate(v[::-1]): + long_value += __b58chars.find(c) * (__b58base**i) + + result = '' + while long_value >= 256: + div, mod = divmod(long_value, 256) + result = chr(mod) + result + long_value = div + result = chr(long_value) + result + + nPad = 0 + for c in v: + if c == __b58chars[0]: + nPad += 1 + else: + break + + result = chr(0)*nPad + result + if length is not None and len(result) != length: + return None + + return result + + +def EncodeBase58Check(vchIn): + hash = Hash(vchIn) + return b58encode(vchIn + hash[0:4]) + + +def DecodeBase58Check(psz): + vchRet = b58decode(psz, None) + key = vchRet[0:-4] + csum = vchRet[-4:] + hash = Hash(key) + cs32 = hash[0:4] + if cs32 != csum: + return None + else: + return key + + +def PrivKeyToSecret(privkey): + return privkey[9:9+32] + + +def SecretToASecret(secret): + vchIn = chr(addrtype+128) + secret + return EncodeBase58Check(vchIn) + + +def ASecretToSecret(key): + vch = DecodeBase58Check(key) + if vch and vch[0] == chr(addrtype+128): + return vch[1:] + else: + return False + + +########### end pywallet functions ####################### + +def random_string(length): + return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in xrange(length)) + + +def timestr(): + return time.strftime("[%d/%m/%Y-%H:%M:%S]") + + +print_lock = threading.Lock() + + +def print_log(*args): + with print_lock: + sys.stderr.write(timestr() + " " + " ".join(imap(str, args)) + "\n") + sys.stderr.flush() -- 1.7.1