From 2e3252aa7976d1ddd11b333a1c5c5fa4fb3bb338 Mon Sep 17 00:00:00 2001 From: genjix Date: Fri, 6 Apr 2012 19:43:15 +0100 Subject: [PATCH] Renamed: modules -> backends --- backends/abe/__init__.py | 436 +++++++++++++++++++++++++++++++++++++++ backends/irc/__init__.py | 120 +++++++++++ backends/libbitcoin/__init__.py | 184 ++++++++++++++++ backends/libbitcoin/composed.py | 194 +++++++++++++++++ modules/abe/__init__.py | 436 --------------------------------------- modules/irc/__init__.py | 120 ----------- modules/libbitcoin/__init__.py | 182 ---------------- modules/libbitcoin/composed.py | 194 ----------------- server.py | 6 +- 9 files changed, 937 insertions(+), 935 deletions(-) create mode 100644 backends/__init__.py create mode 100644 backends/abe/__init__.py create mode 100644 backends/irc/__init__.py create mode 100644 backends/libbitcoin/__init__.py create mode 100644 backends/libbitcoin/composed.py delete mode 100644 modules/__init__.py delete mode 100644 modules/abe/__init__.py delete mode 100644 modules/irc/__init__.py delete mode 100644 modules/libbitcoin/__init__.py delete mode 100644 modules/libbitcoin/composed.py diff --git a/backends/__init__.py b/backends/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backends/abe/__init__.py b/backends/abe/__init__.py new file mode 100644 index 0000000..d63847d --- /dev/null +++ b/backends/abe/__init__.py @@ -0,0 +1,436 @@ +from Abe.abe import hash_to_address, decode_check_address +from Abe.DataStore import DataStore as Datastore_class +from Abe import DataStore, readconf, BCDataStream, deserialize, util, base58 + +import psycopg2, binascii + +import thread, traceback, sys, urllib, operator +from json import dumps, loads +from Queue import Queue +import time + +class AbeStore(Datastore_class): + + def __init__(self, config): + conf = DataStore.CONFIG_DEFAULTS + args, argv = readconf.parse_argv( [], conf) + args.dbtype = config.get('database','type') + if args.dbtype == 'sqlite3': + args.connect_args = { 'database' : config.get('database','database') } + elif args.dbtype == 'MySQLdb': + args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') } + elif args.dbtype == 'psycopg2': + args.connect_args = { 'database' : config.get('database','database') } + + Datastore_class.__init__(self,args) + + self.tx_cache = {} + self.mempool_keys = {} + self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port')) + + self.address_queue = Queue() + + self.dblock = thread.allocate_lock() + + + + def import_block(self, b, chain_ids=frozenset()): + #print "import block" + block_id = super(AbeStore, self).import_block(b, chain_ids) + for pos in xrange(len(b['transactions'])): + tx = b['transactions'][pos] + if 'hash' not in tx: + tx['hash'] = util.double_sha256(tx['tx']) + tx_id = self.tx_find_id_and_value(tx) + if tx_id: + self.update_tx_cache(tx_id) + else: + print "error: import_block: no tx_id" + return block_id + + + def update_tx_cache(self, txid): + inrows = self.get_tx_inputs(txid, False) + for row in inrows: + _hash = self.binout(row[6]) + address = hash_to_address(chr(0), _hash) + if self.tx_cache.has_key(address): + print "cache: invalidating", address + self.tx_cache.pop(address) + self.address_queue.put(address) + + outrows = self.get_tx_outputs(txid, False) + for row in outrows: + _hash = self.binout(row[6]) + address = hash_to_address(chr(0), _hash) + if self.tx_cache.has_key(address): + print "cache: invalidating", address + self.tx_cache.pop(address) + self.address_queue.put(address) + + def safe_sql(self,sql, params=(), lock=True): + try: + if lock: self.dblock.acquire() + ret = self.selectall(sql,params) + if lock: self.dblock.release() + return ret + except: + print "sql error", sql + return [] + + def get_tx_outputs(self, tx_id, lock=True): + return self.safe_sql("""SELECT + txout.txout_pos, + txout.txout_scriptPubKey, + txout.txout_value, + nexttx.tx_hash, + nexttx.tx_id, + txin.txin_pos, + pubkey.pubkey_hash + FROM txout + LEFT JOIN txin ON (txin.txout_id = txout.txout_id) + LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id) + LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id) + WHERE txout.tx_id = %d + ORDER BY txout.txout_pos + """%(tx_id), (), lock) + + def get_tx_inputs(self, tx_id, lock=True): + return self.safe_sql(""" SELECT + txin.txin_pos, + txin.txin_scriptSig, + txout.txout_value, + COALESCE(prevtx.tx_hash, u.txout_tx_hash), + prevtx.tx_id, + COALESCE(txout.txout_pos, u.txout_pos), + pubkey.pubkey_hash + FROM txin + LEFT JOIN txout ON (txout.txout_id = txin.txout_id) + LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id) + LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id) + LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id) + WHERE txin.tx_id = %d + ORDER BY txin.txin_pos + """%(tx_id,), (), lock) + + def get_address_out_rows(self, dbhash): + return self.safe_sql(""" SELECT + b.block_nTime, + cc.chain_id, + b.block_height, + 1, + b.block_hash, + tx.tx_hash, + tx.tx_id, + txin.txin_pos, + -prevout.txout_value + FROM chain_candidate cc + JOIN block b ON (b.block_id = cc.block_id) + JOIN block_tx ON (block_tx.block_id = b.block_id) + JOIN tx ON (tx.tx_id = block_tx.tx_id) + JOIN txin ON (txin.tx_id = tx.tx_id) + JOIN txout prevout ON (txin.txout_id = prevout.txout_id) + JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id) + WHERE pubkey.pubkey_hash = ? + AND cc.in_longest = 1""", (dbhash,)) + + def get_address_out_rows_memorypool(self, dbhash): + return self.safe_sql(""" SELECT + 1, + tx.tx_hash, + tx.tx_id, + txin.txin_pos, + -prevout.txout_value + FROM tx + JOIN txin ON (txin.tx_id = tx.tx_id) + JOIN txout prevout ON (txin.txout_id = prevout.txout_id) + JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id) + WHERE pubkey.pubkey_hash = ? """, (dbhash,)) + + def get_address_in_rows(self, dbhash): + return self.safe_sql(""" SELECT + b.block_nTime, + cc.chain_id, + b.block_height, + 0, + b.block_hash, + tx.tx_hash, + tx.tx_id, + txout.txout_pos, + txout.txout_value + FROM chain_candidate cc + JOIN block b ON (b.block_id = cc.block_id) + JOIN block_tx ON (block_tx.block_id = b.block_id) + JOIN tx ON (tx.tx_id = block_tx.tx_id) + JOIN txout ON (txout.tx_id = tx.tx_id) + JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id) + WHERE pubkey.pubkey_hash = ? + AND cc.in_longest = 1""", (dbhash,)) + + def get_address_in_rows_memorypool(self, dbhash): + return self.safe_sql( """ SELECT + 0, + tx.tx_hash, + tx.tx_id, + txout.txout_pos, + txout.txout_value + FROM tx + JOIN txout ON (txout.tx_id = tx.tx_id) + JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id) + WHERE pubkey.pubkey_hash = ? """, (dbhash,)) + + def get_history(self, addr): + + cached_version = self.tx_cache.get( addr ) + if cached_version is not None: + return cached_version + + version, binaddr = decode_check_address(addr) + if binaddr is None: + return None + + dbhash = self.binin(binaddr) + rows = [] + rows += self.get_address_out_rows( dbhash ) + rows += self.get_address_in_rows( dbhash ) + + txpoints = [] + known_tx = [] + + for row in rows: + try: + nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row + except: + print "cannot unpack row", row + break + tx_hash = self.hashout_hex(tx_hash) + txpoint = { + "timestamp": int(nTime), + "height": int(height), + "is_input": int(is_in), + "block_hash": self.hashout_hex(blk_hash), + "tx_hash": tx_hash, + "tx_id": int(tx_id), + "index": int(pos), + "value": int(value), + } + + txpoints.append(txpoint) + known_tx.append(self.hashout_hex(tx_hash)) + + + # todo: sort them really... + txpoints = sorted(txpoints, key=operator.itemgetter("timestamp")) + + # read memory pool + rows = [] + rows += self.get_address_in_rows_memorypool( dbhash ) + rows += self.get_address_out_rows_memorypool( dbhash ) + address_has_mempool = False + + for row in rows: + is_in, tx_hash, tx_id, pos, value = row + tx_hash = self.hashout_hex(tx_hash) + if tx_hash in known_tx: + continue + + # this means that pending transactions were added to the db, even if they are not returned by getmemorypool + address_has_mempool = True + + # this means pending transactions are returned by getmemorypool + if tx_hash not in self.mempool_keys: + continue + + #print "mempool", tx_hash + txpoint = { + "timestamp": 0, + "height": 0, + "is_input": int(is_in), + "block_hash": 'mempool', + "tx_hash": tx_hash, + "tx_id": int(tx_id), + "index": int(pos), + "value": int(value), + } + txpoints.append(txpoint) + + + for txpoint in txpoints: + tx_id = txpoint['tx_id'] + + txinputs = [] + inrows = self.get_tx_inputs(tx_id) + for row in inrows: + _hash = self.binout(row[6]) + address = hash_to_address(chr(0), _hash) + txinputs.append(address) + txpoint['inputs'] = txinputs + txoutputs = [] + outrows = self.get_tx_outputs(tx_id) + for row in outrows: + _hash = self.binout(row[6]) + address = hash_to_address(chr(0), _hash) + txoutputs.append(address) + txpoint['outputs'] = txoutputs + + # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address) + if not txpoint['is_input']: + # detect if already redeemed... + for row in outrows: + if row[6] == dbhash: break + else: + raise + #row = self.get_tx_output(tx_id,dbhash) + # pos, script, value, o_hash, o_id, o_pos, binaddr = row + # if not redeemed, we add the script + if row: + if not row[4]: txpoint['raw_output_script'] = row[1] + + # cache result + if not address_has_mempool: + self.tx_cache[addr] = txpoints + + return txpoints + + + def get_status(self,addr): + # get address status, i.e. the last block for that address. + tx_points = self.get_history(addr) + if not tx_points: + status = None + else: + lastpoint = tx_points[-1] + status = lastpoint['block_hash'] + # this is a temporary hack; move it up once old clients have disappeared + if status == 'mempool': # and session['version'] != "old": + status = status + ':%d'% len(tx_points) + return status + + + + def memorypool_update(store): + + ds = BCDataStream.BCDataStream() + previous_transactions = store.mempool_keys + store.mempool_keys = [] + + postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'}) + + respdata = urllib.urlopen(store.bitcoind_url, postdata).read() + r = loads(respdata) + if r['error'] != None: + return + + v = r['result'].get('transactions') + for hextx in v: + ds.clear() + ds.write(hextx.decode('hex')) + tx = deserialize.parse_Transaction(ds) + tx['hash'] = util.double_sha256(tx['tx']) + tx_hash = store.hashin(tx['hash']) + + store.mempool_keys.append(tx_hash) + if store.tx_find_id_and_value(tx): + pass + else: + tx_id = store.import_tx(tx, False) + store.update_tx_cache(tx_id) + + store.commit() + + + def send_tx(self,tx): + postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'}) + respdata = urllib.urlopen(self.bitcoind_url, postdata).read() + r = loads(respdata) + if r['error'] != None: + out = "error: transaction rejected by memorypool\n"+tx + else: + out = r['result'] + return out + + + def main_iteration(store): + try: + store.dblock.acquire() + store.catch_up() + store.memorypool_update() + block_number = store.get_block_number(1) + + except IOError: + print "IOError: cannot reach bitcoind" + block_number = 0 + except: + traceback.print_exc(file=sys.stdout) + block_number = 0 + finally: + store.dblock.release() + + return block_number + + +from processor import Processor + +class BlockchainProcessor(Processor): + + def __init__(self, config): + Processor.__init__(self) + self.store = AbeStore(config) + self.block_number = -1 + self.watched_addresses = [] + + def process(self, request): + message_id = request['id'] + method = request['method'] + params = request.get('params',[]) + result = '' + if method == 'blockchain.numblocks.subscribe': + result = self.block_number + elif method == 'blockchain.address.subscribe': + address = params[0] + self.watch_address(address) + status = self.store.get_status(address) + result = status + elif method == 'blockchain.address.get_history': + address = params[0] + result = self.store.get_history( address ) + elif method == 'blockchain.transaction.broadcast': + txo = self.store.send_tx(params[0]) + print "sent tx:", txo + result = txo + else: + print "unknown method", request + + if result != '': + response = { 'id':message_id, 'method':method, 'params':params, 'result':result } + self.push_response(response) + + + def watch_address(self, addr): + if addr not in self.watched_addresses: + self.watched_addresses.append(addr) + + + def run(self): + + old_block_number = None + while not self.shared.stopped(): + self.block_number = self.store.main_iteration() + + if self.block_number != old_block_number: + old_block_number = self.block_number + self.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number }) + + while True: + try: + addr = self.store.address_queue.get(False) + except: + break + if addr in self.watched_addresses: + status = self.store.get_status( addr ) + self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status }) + + time.sleep(10) + + + diff --git a/backends/irc/__init__.py b/backends/irc/__init__.py new file mode 100644 index 0000000..4d3c448 --- /dev/null +++ b/backends/irc/__init__.py @@ -0,0 +1,120 @@ +import threading, socket, traceback, time, sys + +def random_string(N): + import random, string + return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) + +from processor import Processor + +class ServerProcessor(Processor): + + def __init__(self, config): + Processor.__init__(self) + self.daemon = True + self.peers = {} + self.banner = config.get('server','banner') + self.host = config.get('server','host') + self.password = config.get('server','password') + + self.native_port = config.get('server','native_port') + self.stratum_tcp_port = config.get('server','stratum_tcp_port') + self.stratum_http_port = config.get('server','stratum_http_port') + + self.irc = config.get('server', 'irc') == 'yes' + self.nick = config.get('server', 'irc_nick') + if not self.nick: self.nick = random_string(10) + + + def get_peers(self): + return self.peers.values() + + + def getname(self): + s = '' + if self.native_port: + s+= 'n' + self.native_port + ' ' + if self.stratum_tcp_port: + s += 't' + self.stratum_tcp_port + ' ' + if self.stratum_http_port: + s += 'h' + self.stratum_http_port + ' ' + return s + + + def run(self): + if not self.irc: + return + + ircname = self.getname() + + while not self.shared.stopped(): + try: + s = socket.socket() + s.connect(('irc.freenode.net', 6667)) + s.send('USER electrum 0 * :' + self.host + ' ' + ircname + '\n') + s.send('NICK E_' + self.nick + '\n') + s.send('JOIN #electrum\n') + sf = s.makefile('r', 0) + t = 0 + while not self.shared.stopped(): + line = sf.readline() + line = line.rstrip('\r\n') + line = line.split() + if line[0]=='PING': + s.send('PONG '+line[1]+'\n') + elif '353' in line: # answer to /names + k = line.index('353') + for item in line[k+1:]: + if item[0:2] == 'E_': + s.send('WHO %s\n'%item) + elif '352' in line: # answer to /who + # warning: this is a horrible hack which apparently works + k = line.index('352') + ip = line[k+4] + ip = socket.gethostbyname(ip) + name = line[k+6] + host = line[k+9] + ports = line[k+10:] + self.peers[name] = (ip, host, ports) + if time.time() - t > 5*60: + self.push_response({'method':'server.peers', 'result':[self.get_peers()]}) + s.send('NAMES #electrum\n') + t = time.time() + self.peers = {} + except: + traceback.print_exc(file=sys.stdout) + finally: + sf.close() + s.close() + + + + def process(self, request): + method = request['method'] + params = request['params'] + result = None + + if method == 'server.banner': + result = self.banner.replace('\\n','\n') + + elif method == 'server.peers.subscribe': + result = self.get_peers() + + elif method == 'server.version': + pass + + elif method == 'server.stop': + print "stopping..." + try: + password = request['params'][0] + except: + password = None + if password == self.password: + self.shared.stop() + result = 'ok' + else: + print "unknown method", request + + if result!='': + response = { 'id':request['id'], 'method':method, 'params':params, 'result':result } + self.push_response(response) + diff --git a/backends/libbitcoin/__init__.py b/backends/libbitcoin/__init__.py new file mode 100644 index 0000000..e28132a --- /dev/null +++ b/backends/libbitcoin/__init__.py @@ -0,0 +1,184 @@ +import bitcoin +from processor import Processor +import threading +import time + +import composed + +class Backend: + + def __init__(self): + # Create 3 thread-pools each with 1 thread + self.network_service = bitcoin.async_service(1) + self.disk_service = bitcoin.async_service(1) + self.mempool_service = bitcoin.async_service(1) + + self.hosts = bitcoin.hosts(self.network_service) + self.handshake = bitcoin.handshake(self.network_service) + self.network = bitcoin.network(self.network_service) + self.protocol = bitcoin.protocol(self.network_service, self.hosts, + self.handshake, self.network) + + db_prefix = "/home/genjix/libbitcoin/database" + self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix) + self.poller = bitcoin.poller(self.blockchain) + self.transaction_pool = \ + bitcoin.transaction_pool(self.mempool_service, self.blockchain) + + self.protocol.subscribe_channel(self.monitor_tx) + self.session = \ + bitcoin.session(self.hosts, self.handshake, self.network, + self.protocol, self.blockchain, self.poller, + self.transaction_pool) + self.session.start(self.handle_start) + + def handle_start(self, ec): + if ec: + print "Error starting backend:", ec + + def stop(self): + self.session.stop(self.handle_stop) + + def handle_stop(self, ec): + if ec: + print "Error stopping backend:", ec + print "Stopped backend" + + def monitor_tx(self, node): + # We will be notified here when connected to new bitcoin nodes + # Here we subscribe to new transactions from them which we + # add to the transaction_pool. That way we can track which + # transactions we are interested in. + node.subscribe_transaction( + bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node)) + # Re-subscribe to next new node + self.protocol.subscribe_channel(self.monitor_tx) + + def recv_tx(self, ec, tx, node): + if ec: + print "Error with new transaction:", ec + return + tx_hash = bitcoin.hash_transaction(tx) + # If we want to ignore this transaction, we can set + # the 2 handlers to be null handlers that do nothing. + self.transaction_pool.store(tx, + bitcoin.bind(self.tx_confirmed, bitcoin._1, tx_hash), + bitcoin.bind(self.handle_mempool_store, bitcoin._1, tx_hash)) + # Re-subscribe to new transactions from node + node.subscribe_transaction( + bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node)) + + def handle_mempool_store(self, ec, tx_hash): + if ec: + print "Error storing memory pool transaction", tx_hash, ec + else: + print "Accepted transaction", tx_hash + + def tx_confirmed(self, ec, tx_hash): + if ec: + print "Problem confirming transaction", tx_hash, ec + else: + print "Confirmed", tx_hash + +class GhostValue: + + def __init__(self): + self.event = threading.Event() + self.value = None + + def get(self): + self.event.wait() + return self.value + + def set(self, value): + self.value = value + self.event.set() + +class NumblocksSubscribe: + + def __init__(self, backend, processor): + self.backend = backend + self.processor = processor + self.lock = threading.Lock() + self.backend.blockchain.subscribe_reorganize(self.reorganize) + self.backend.blockchain.fetch_last_depth(self.set_last_depth) + self.latest = GhostValue() + + def set_last_depth(self, ec, last_depth): + if ec: + print "Error retrieving last depth", ec + else: + self.latest.set(last_depth) + + def reorganize(self, ec, fork_point, arrivals, replaced): + latest = fork_point + len(arrivals) + self.latest.set(latest) + response = {"method": "numblocks.subscribe", "result": latest} + self.processor.push_response(response) + self.backend.blockchain.subscribe_reorganize(self.reorganize) + + +class AddressGetHistory: + + def __init__(self, backend, processor): + self.backend = backend + self.processor = processor + + def get(self, request): + address = str(request["params"]) + composed.payment_history(self.backend.blockchain, address, + bitcoin.bind(self.respond, request, bitcoin._1)) + + def respond(self, request, result): + response = {"id": request["id"], "method": request["method"], + "params": request["params"], "result": result} + self.processor.push_response(response) + + +class BlockchainProcessor(Processor): + + def __init__(self, config): + Processor.__init__(self) + self.backend = Backend() + self.numblocks_subscribe = NumblocksSubscribe(self.backend, self) + self.address_get_history = AddressGetHistory(self.backend, self) + + def stop(self): + self.backend.stop() + + def process(self, request): + print "New request (lib)", request + if request["method"] == "numblocks.subscribe": + self.numblocks_subscribe.subscribe(session, request) + elif request["method"] == "address.get_history": + self.address_get_history.get(request) + elif request["method"] == "server.banner": + self.push_response({"id": request["id"], + "method": request["method"], "params": request["params"], + "result": "libbitcoin using python-bitcoin bindings"}) + elif request["method"] == "transaction.broadcast": + self.broadcast_transaction(request) + + def broadcast_transaction(self, request): + raw_tx = bitcoin.data_chunk(str(request["params"])) + exporter = bitcoin.satoshi_exporter() + try: + tx = exporter.load_transaction(raw_tx) + except RuntimeError: + response = {"id": request["id"], "method": request["method"], + "params": request["params"], "result": None, + "error": {"message": + "Exception while parsing the transaction data.", + "code": -4}} + else: + self.backend.protocol.broadcast_transaction(tx) + tx_hash = str(bitcoin.hash_transaction(tx)) + response = {"id": request["id"], "method": request["method"], + "params": request["params"], "result": tx_hash} + self.push_response(response) + + def run(self): + print "Warning: pre-alpha prototype. Full of bugs." + while not self.shared.stopped(): + time.sleep(1) + diff --git a/backends/libbitcoin/composed.py b/backends/libbitcoin/composed.py new file mode 100644 index 0000000..9df57a0 --- /dev/null +++ b/backends/libbitcoin/composed.py @@ -0,0 +1,194 @@ +import bitcoin +import threading +import time + +class ExpiryQueue(threading.Thread): + + def __init__(self): + self.lock = threading.Lock() + self.items = [] + threading.Thread.__init__(self) + self.daemon = True + + def run(self): + # Garbage collection + while True: + with self.lock: + self.items = [i for i in self.items if not i.stopped()] + time.sleep(0.1) + + def add(self, item): + with self.lock: + self.items.append(item) + +expiry_queue = ExpiryQueue() + +class StatementLine: + + def __init__(self, output_point): + self.lock = threading.Lock() + self.output_point = output_point + self.output_loaded = None + self.input_point = None + self.input_loaded = None + self.raw_output_script = None + + def is_loaded(self): + with self.lock: + if self.output_loaded is None: + return False + elif (self.input_point is not False and + self.input_loaded is None): + return False + return True + +class PaymentHistory: + + def __init__(self, chain): + self.chain = chain + self.lock = threading.Lock() + self.statement = [] + self._stopped = False + + def run(self, address, handle_finish): + self.address = address + self.handle_finish = handle_finish + + pubkey_hash = bitcoin.address_to_short_hash(address) + self.chain.fetch_outputs(pubkey_hash, self.start_loading) + + def start_loading(self, ec, output_points): + with self.lock: + for outpoint in output_points: + statement_line = StatementLine(outpoint) + self.statement.append(statement_line) + self.chain.fetch_spend(outpoint, + bitcoin.bind(self.load_spend, + bitcoin._1, bitcoin._2, statement_line)) + self.load_tx_info(outpoint, statement_line, False) + + def load_spend(self, ec, inpoint, statement_line): + with statement_line.lock: + if ec: + statement_line.input_point = False + else: + statement_line.input_point = inpoint + self.finish_if_done() + if not ec: + self.load_tx_info(inpoint, statement_line, True) + + def finish_if_done(self): + with self.lock: + if any(not line.is_loaded() for line in self.statement): + return + result = [] + for line in self.statement: + if line.input_point: + line.input_loaded["value"] = -line.output_loaded["value"] + result.append(line.input_loaded) + else: + line.output_loaded["raw_output_script"] = \ + line.raw_output_script + result.append(line.output_loaded) + self.handle_finish(result) + self.stop() + + def stop(self): + with self.lock: + self._stopped = True + + def stopped(self): + with self.lock: + return self._stopped + + def load_tx_info(self, point, statement_line, is_input): + info = {} + info["tx_hash"] = str(point.hash) + info["index"] = point.index + info["is_input"] = 1 if is_input else 0 + self.chain.fetch_transaction_index(point.hash, + bitcoin.bind(self.tx_index, bitcoin._1, bitcoin._2, bitcoin._3, + statement_line, info)) + + def tx_index(self, ec, block_depth, offset, statement_line, info): + info["height"] = block_depth + self.chain.fetch_block_header_by_depth(block_depth, + bitcoin.bind(self.block_header, bitcoin._1, bitcoin._2, + statement_line, info)) + + def block_header(self, ec, blk_head, statement_line, info): + info["timestamp"] = blk_head.timestamp + info["block_hash"] = str(bitcoin.hash_block_header(blk_head)) + tx_hash = bitcoin.hash_digest(info["tx_hash"]) + self.chain.fetch_transaction(tx_hash, + bitcoin.bind(self.load_tx, bitcoin._1, bitcoin._2, + statement_line, info)) + + def load_tx(self, ec, tx, statement_line, info): + outputs = [] + for tx_out in tx.outputs: + script = tx_out.output_script + if script.type() == bitcoin.payment_type.pubkey_hash: + pkh = bitcoin.short_hash(str(script.operations()[2].data)) + outputs.append(bitcoin.public_key_hash_to_address(pkh)) + else: + outputs.append("Unknown") + info["outputs"] = outputs + info["inputs"] = [None for i in range(len(tx.inputs))] + if info["is_input"] == 1: + info["inputs"][info["index"]] = self.address + else: + our_output = tx.outputs[info["index"]] + info["value"] = our_output.value + with statement_line.lock: + statement_line.raw_output_script = \ + str(bitcoin.save_script(our_output.output_script)) + if not [empty_in for empty_in in info["inputs"] if empty_in is None]: + # We have the sole input + assert(info["is_input"] == 1) + with statement_line.lock: + statement_line.input_loaded = info + self.finish_if_done() + for tx_idx, tx_in in enumerate(tx.inputs): + if info["is_input"] == 1 and info["index"] == tx_idx: + continue + self.chain.fetch_transaction(tx_in.previous_output.hash, + bitcoin.bind(self.load_input, bitcoin._1, bitcoin._2, + tx_in.previous_output.index, statement_line, info, tx_idx)) + + def load_input(self, ec, tx, index, statement_line, info, inputs_index): + script = tx.outputs[index].output_script + if script.type() == bitcoin.payment_type.pubkey_hash: + pkh = bitcoin.short_hash(str(script.operations()[2].data)) + info["inputs"][inputs_index] = \ + bitcoin.public_key_hash_to_address(pkh) + else: + info["inputs"][inputs_index] = "Unknown" + if not [empty_in for empty_in in info["inputs"] if empty_in is None]: + with statement_line.lock: + if info["is_input"] == 1: + statement_line.input_loaded = info + else: + statement_line.output_loaded = info + self.finish_if_done() + +def payment_history(chain, address, handle_finish): + ph = PaymentHistory(chain) + expiry_queue.add(ph) + ph.run(address, handle_finish) + +if __name__ == "__main__": + def finish(result): + print result + def last(ec, depth): + print "D:", depth + + service = bitcoin.async_service(1) + prefix = "/home/genjix/libbitcoin/database" + chain = bitcoin.bdb_blockchain(service, prefix) + chain.fetch_last_depth(last) + address = "1Pbn3DLXfjqF1fFV9YPdvpvyzejZwkHhZE" + print "Looking up", address + payment_history(chain, address, finish) + raw_input() + diff --git a/modules/__init__.py b/modules/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/modules/abe/__init__.py b/modules/abe/__init__.py deleted file mode 100644 index d63847d..0000000 --- a/modules/abe/__init__.py +++ /dev/null @@ -1,436 +0,0 @@ -from Abe.abe import hash_to_address, decode_check_address -from Abe.DataStore import DataStore as Datastore_class -from Abe import DataStore, readconf, BCDataStream, deserialize, util, base58 - -import psycopg2, binascii - -import thread, traceback, sys, urllib, operator -from json import dumps, loads -from Queue import Queue -import time - -class AbeStore(Datastore_class): - - def __init__(self, config): - conf = DataStore.CONFIG_DEFAULTS - args, argv = readconf.parse_argv( [], conf) - args.dbtype = config.get('database','type') - if args.dbtype == 'sqlite3': - args.connect_args = { 'database' : config.get('database','database') } - elif args.dbtype == 'MySQLdb': - args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') } - elif args.dbtype == 'psycopg2': - args.connect_args = { 'database' : config.get('database','database') } - - Datastore_class.__init__(self,args) - - self.tx_cache = {} - self.mempool_keys = {} - self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port')) - - self.address_queue = Queue() - - self.dblock = thread.allocate_lock() - - - - def import_block(self, b, chain_ids=frozenset()): - #print "import block" - block_id = super(AbeStore, self).import_block(b, chain_ids) - for pos in xrange(len(b['transactions'])): - tx = b['transactions'][pos] - if 'hash' not in tx: - tx['hash'] = util.double_sha256(tx['tx']) - tx_id = self.tx_find_id_and_value(tx) - if tx_id: - self.update_tx_cache(tx_id) - else: - print "error: import_block: no tx_id" - return block_id - - - def update_tx_cache(self, txid): - inrows = self.get_tx_inputs(txid, False) - for row in inrows: - _hash = self.binout(row[6]) - address = hash_to_address(chr(0), _hash) - if self.tx_cache.has_key(address): - print "cache: invalidating", address - self.tx_cache.pop(address) - self.address_queue.put(address) - - outrows = self.get_tx_outputs(txid, False) - for row in outrows: - _hash = self.binout(row[6]) - address = hash_to_address(chr(0), _hash) - if self.tx_cache.has_key(address): - print "cache: invalidating", address - self.tx_cache.pop(address) - self.address_queue.put(address) - - def safe_sql(self,sql, params=(), lock=True): - try: - if lock: self.dblock.acquire() - ret = self.selectall(sql,params) - if lock: self.dblock.release() - return ret - except: - print "sql error", sql - return [] - - def get_tx_outputs(self, tx_id, lock=True): - return self.safe_sql("""SELECT - txout.txout_pos, - txout.txout_scriptPubKey, - txout.txout_value, - nexttx.tx_hash, - nexttx.tx_id, - txin.txin_pos, - pubkey.pubkey_hash - FROM txout - LEFT JOIN txin ON (txin.txout_id = txout.txout_id) - LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id) - LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id) - WHERE txout.tx_id = %d - ORDER BY txout.txout_pos - """%(tx_id), (), lock) - - def get_tx_inputs(self, tx_id, lock=True): - return self.safe_sql(""" SELECT - txin.txin_pos, - txin.txin_scriptSig, - txout.txout_value, - COALESCE(prevtx.tx_hash, u.txout_tx_hash), - prevtx.tx_id, - COALESCE(txout.txout_pos, u.txout_pos), - pubkey.pubkey_hash - FROM txin - LEFT JOIN txout ON (txout.txout_id = txin.txout_id) - LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id) - LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id) - LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id) - WHERE txin.tx_id = %d - ORDER BY txin.txin_pos - """%(tx_id,), (), lock) - - def get_address_out_rows(self, dbhash): - return self.safe_sql(""" SELECT - b.block_nTime, - cc.chain_id, - b.block_height, - 1, - b.block_hash, - tx.tx_hash, - tx.tx_id, - txin.txin_pos, - -prevout.txout_value - FROM chain_candidate cc - JOIN block b ON (b.block_id = cc.block_id) - JOIN block_tx ON (block_tx.block_id = b.block_id) - JOIN tx ON (tx.tx_id = block_tx.tx_id) - JOIN txin ON (txin.tx_id = tx.tx_id) - JOIN txout prevout ON (txin.txout_id = prevout.txout_id) - JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id) - WHERE pubkey.pubkey_hash = ? - AND cc.in_longest = 1""", (dbhash,)) - - def get_address_out_rows_memorypool(self, dbhash): - return self.safe_sql(""" SELECT - 1, - tx.tx_hash, - tx.tx_id, - txin.txin_pos, - -prevout.txout_value - FROM tx - JOIN txin ON (txin.tx_id = tx.tx_id) - JOIN txout prevout ON (txin.txout_id = prevout.txout_id) - JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id) - WHERE pubkey.pubkey_hash = ? """, (dbhash,)) - - def get_address_in_rows(self, dbhash): - return self.safe_sql(""" SELECT - b.block_nTime, - cc.chain_id, - b.block_height, - 0, - b.block_hash, - tx.tx_hash, - tx.tx_id, - txout.txout_pos, - txout.txout_value - FROM chain_candidate cc - JOIN block b ON (b.block_id = cc.block_id) - JOIN block_tx ON (block_tx.block_id = b.block_id) - JOIN tx ON (tx.tx_id = block_tx.tx_id) - JOIN txout ON (txout.tx_id = tx.tx_id) - JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id) - WHERE pubkey.pubkey_hash = ? - AND cc.in_longest = 1""", (dbhash,)) - - def get_address_in_rows_memorypool(self, dbhash): - return self.safe_sql( """ SELECT - 0, - tx.tx_hash, - tx.tx_id, - txout.txout_pos, - txout.txout_value - FROM tx - JOIN txout ON (txout.tx_id = tx.tx_id) - JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id) - WHERE pubkey.pubkey_hash = ? """, (dbhash,)) - - def get_history(self, addr): - - cached_version = self.tx_cache.get( addr ) - if cached_version is not None: - return cached_version - - version, binaddr = decode_check_address(addr) - if binaddr is None: - return None - - dbhash = self.binin(binaddr) - rows = [] - rows += self.get_address_out_rows( dbhash ) - rows += self.get_address_in_rows( dbhash ) - - txpoints = [] - known_tx = [] - - for row in rows: - try: - nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row - except: - print "cannot unpack row", row - break - tx_hash = self.hashout_hex(tx_hash) - txpoint = { - "timestamp": int(nTime), - "height": int(height), - "is_input": int(is_in), - "block_hash": self.hashout_hex(blk_hash), - "tx_hash": tx_hash, - "tx_id": int(tx_id), - "index": int(pos), - "value": int(value), - } - - txpoints.append(txpoint) - known_tx.append(self.hashout_hex(tx_hash)) - - - # todo: sort them really... - txpoints = sorted(txpoints, key=operator.itemgetter("timestamp")) - - # read memory pool - rows = [] - rows += self.get_address_in_rows_memorypool( dbhash ) - rows += self.get_address_out_rows_memorypool( dbhash ) - address_has_mempool = False - - for row in rows: - is_in, tx_hash, tx_id, pos, value = row - tx_hash = self.hashout_hex(tx_hash) - if tx_hash in known_tx: - continue - - # this means that pending transactions were added to the db, even if they are not returned by getmemorypool - address_has_mempool = True - - # this means pending transactions are returned by getmemorypool - if tx_hash not in self.mempool_keys: - continue - - #print "mempool", tx_hash - txpoint = { - "timestamp": 0, - "height": 0, - "is_input": int(is_in), - "block_hash": 'mempool', - "tx_hash": tx_hash, - "tx_id": int(tx_id), - "index": int(pos), - "value": int(value), - } - txpoints.append(txpoint) - - - for txpoint in txpoints: - tx_id = txpoint['tx_id'] - - txinputs = [] - inrows = self.get_tx_inputs(tx_id) - for row in inrows: - _hash = self.binout(row[6]) - address = hash_to_address(chr(0), _hash) - txinputs.append(address) - txpoint['inputs'] = txinputs - txoutputs = [] - outrows = self.get_tx_outputs(tx_id) - for row in outrows: - _hash = self.binout(row[6]) - address = hash_to_address(chr(0), _hash) - txoutputs.append(address) - txpoint['outputs'] = txoutputs - - # for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address) - if not txpoint['is_input']: - # detect if already redeemed... - for row in outrows: - if row[6] == dbhash: break - else: - raise - #row = self.get_tx_output(tx_id,dbhash) - # pos, script, value, o_hash, o_id, o_pos, binaddr = row - # if not redeemed, we add the script - if row: - if not row[4]: txpoint['raw_output_script'] = row[1] - - # cache result - if not address_has_mempool: - self.tx_cache[addr] = txpoints - - return txpoints - - - def get_status(self,addr): - # get address status, i.e. the last block for that address. - tx_points = self.get_history(addr) - if not tx_points: - status = None - else: - lastpoint = tx_points[-1] - status = lastpoint['block_hash'] - # this is a temporary hack; move it up once old clients have disappeared - if status == 'mempool': # and session['version'] != "old": - status = status + ':%d'% len(tx_points) - return status - - - - def memorypool_update(store): - - ds = BCDataStream.BCDataStream() - previous_transactions = store.mempool_keys - store.mempool_keys = [] - - postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'}) - - respdata = urllib.urlopen(store.bitcoind_url, postdata).read() - r = loads(respdata) - if r['error'] != None: - return - - v = r['result'].get('transactions') - for hextx in v: - ds.clear() - ds.write(hextx.decode('hex')) - tx = deserialize.parse_Transaction(ds) - tx['hash'] = util.double_sha256(tx['tx']) - tx_hash = store.hashin(tx['hash']) - - store.mempool_keys.append(tx_hash) - if store.tx_find_id_and_value(tx): - pass - else: - tx_id = store.import_tx(tx, False) - store.update_tx_cache(tx_id) - - store.commit() - - - def send_tx(self,tx): - postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'}) - respdata = urllib.urlopen(self.bitcoind_url, postdata).read() - r = loads(respdata) - if r['error'] != None: - out = "error: transaction rejected by memorypool\n"+tx - else: - out = r['result'] - return out - - - def main_iteration(store): - try: - store.dblock.acquire() - store.catch_up() - store.memorypool_update() - block_number = store.get_block_number(1) - - except IOError: - print "IOError: cannot reach bitcoind" - block_number = 0 - except: - traceback.print_exc(file=sys.stdout) - block_number = 0 - finally: - store.dblock.release() - - return block_number - - -from processor import Processor - -class BlockchainProcessor(Processor): - - def __init__(self, config): - Processor.__init__(self) - self.store = AbeStore(config) - self.block_number = -1 - self.watched_addresses = [] - - def process(self, request): - message_id = request['id'] - method = request['method'] - params = request.get('params',[]) - result = '' - if method == 'blockchain.numblocks.subscribe': - result = self.block_number - elif method == 'blockchain.address.subscribe': - address = params[0] - self.watch_address(address) - status = self.store.get_status(address) - result = status - elif method == 'blockchain.address.get_history': - address = params[0] - result = self.store.get_history( address ) - elif method == 'blockchain.transaction.broadcast': - txo = self.store.send_tx(params[0]) - print "sent tx:", txo - result = txo - else: - print "unknown method", request - - if result != '': - response = { 'id':message_id, 'method':method, 'params':params, 'result':result } - self.push_response(response) - - - def watch_address(self, addr): - if addr not in self.watched_addresses: - self.watched_addresses.append(addr) - - - def run(self): - - old_block_number = None - while not self.shared.stopped(): - self.block_number = self.store.main_iteration() - - if self.block_number != old_block_number: - old_block_number = self.block_number - self.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number }) - - while True: - try: - addr = self.store.address_queue.get(False) - except: - break - if addr in self.watched_addresses: - status = self.store.get_status( addr ) - self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status }) - - time.sleep(10) - - - diff --git a/modules/irc/__init__.py b/modules/irc/__init__.py deleted file mode 100644 index 4d3c448..0000000 --- a/modules/irc/__init__.py +++ /dev/null @@ -1,120 +0,0 @@ -import threading, socket, traceback, time, sys - -def random_string(N): - import random, string - return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) - -from processor import Processor - -class ServerProcessor(Processor): - - def __init__(self, config): - Processor.__init__(self) - self.daemon = True - self.peers = {} - self.banner = config.get('server','banner') - self.host = config.get('server','host') - self.password = config.get('server','password') - - self.native_port = config.get('server','native_port') - self.stratum_tcp_port = config.get('server','stratum_tcp_port') - self.stratum_http_port = config.get('server','stratum_http_port') - - self.irc = config.get('server', 'irc') == 'yes' - self.nick = config.get('server', 'irc_nick') - if not self.nick: self.nick = random_string(10) - - - def get_peers(self): - return self.peers.values() - - - def getname(self): - s = '' - if self.native_port: - s+= 'n' + self.native_port + ' ' - if self.stratum_tcp_port: - s += 't' + self.stratum_tcp_port + ' ' - if self.stratum_http_port: - s += 'h' + self.stratum_http_port + ' ' - return s - - - def run(self): - if not self.irc: - return - - ircname = self.getname() - - while not self.shared.stopped(): - try: - s = socket.socket() - s.connect(('irc.freenode.net', 6667)) - s.send('USER electrum 0 * :' + self.host + ' ' + ircname + '\n') - s.send('NICK E_' + self.nick + '\n') - s.send('JOIN #electrum\n') - sf = s.makefile('r', 0) - t = 0 - while not self.shared.stopped(): - line = sf.readline() - line = line.rstrip('\r\n') - line = line.split() - if line[0]=='PING': - s.send('PONG '+line[1]+'\n') - elif '353' in line: # answer to /names - k = line.index('353') - for item in line[k+1:]: - if item[0:2] == 'E_': - s.send('WHO %s\n'%item) - elif '352' in line: # answer to /who - # warning: this is a horrible hack which apparently works - k = line.index('352') - ip = line[k+4] - ip = socket.gethostbyname(ip) - name = line[k+6] - host = line[k+9] - ports = line[k+10:] - self.peers[name] = (ip, host, ports) - if time.time() - t > 5*60: - self.push_response({'method':'server.peers', 'result':[self.get_peers()]}) - s.send('NAMES #electrum\n') - t = time.time() - self.peers = {} - except: - traceback.print_exc(file=sys.stdout) - finally: - sf.close() - s.close() - - - - def process(self, request): - method = request['method'] - params = request['params'] - result = None - - if method == 'server.banner': - result = self.banner.replace('\\n','\n') - - elif method == 'server.peers.subscribe': - result = self.get_peers() - - elif method == 'server.version': - pass - - elif method == 'server.stop': - print "stopping..." - try: - password = request['params'][0] - except: - password = None - if password == self.password: - self.shared.stop() - result = 'ok' - else: - print "unknown method", request - - if result!='': - response = { 'id':request['id'], 'method':method, 'params':params, 'result':result } - self.push_response(response) - diff --git a/modules/libbitcoin/__init__.py b/modules/libbitcoin/__init__.py deleted file mode 100644 index 97440ce..0000000 --- a/modules/libbitcoin/__init__.py +++ /dev/null @@ -1,182 +0,0 @@ -import bitcoin -from processor import Processor -import threading -import time - -import composed - -class Backend: - - def __init__(self): - # Create 3 thread-pools each with 1 thread - self.network_service = bitcoin.async_service(1) - self.disk_service = bitcoin.async_service(1) - self.mempool_service = bitcoin.async_service(1) - - self.hosts = bitcoin.hosts(self.network_service) - self.handshake = bitcoin.handshake(self.network_service) - self.network = bitcoin.network(self.network_service) - self.protocol = bitcoin.protocol(self.network_service, self.hosts, - self.handshake, self.network) - - db_prefix = "/home/genjix/libbitcoin/database" - self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix) - self.poller = bitcoin.poller(self.blockchain) - self.transaction_pool = \ - bitcoin.transaction_pool(self.mempool_service, self.blockchain) - - self.protocol.subscribe_channel(self.monitor_tx) - self.session = \ - bitcoin.session(self.hosts, self.handshake, self.network, - self.protocol, self.blockchain, self.poller, - self.transaction_pool) - self.session.start(self.handle_start) - - def handle_start(self, ec): - if ec: - print "Error starting backend:", ec - - def stop(self): - self.session.stop(self.handle_stop) - - def handle_stop(self, ec): - if ec: - print "Error stopping backend:", ec - print "Stopped backend" - - def monitor_tx(self, node): - # We will be notified here when connected to new bitcoin nodes - # Here we subscribe to new transactions from them which we - # add to the transaction_pool. That way we can track which - # transactions we are interested in. - node.subscribe_transaction( - bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node)) - # Re-subscribe to next new node - self.protocol.subscribe_channel(self.monitor_tx) - - def recv_tx(self, ec, tx, node): - if ec: - print "Error with new transaction:", ec - return - tx_hash = bitcoin.hash_transaction(tx) - # If we want to ignore this transaction, we can set - # the 2 handlers to be null handlers that do nothing. - self.transaction_pool.store(tx, - bitcoin.bind(self.tx_confirmed, bitcoin._1, tx_hash), - bitcoin.bind(self.handle_mempool_store, bitcoin._1, tx_hash)) - # Re-subscribe to new transactions from node - node.subscribe_transaction( - bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node)) - - def handle_mempool_store(self, ec, tx_hash): - if ec: - print "Error storing memory pool transaction", tx_hash, ec - else: - print "Accepted transaction", tx_hash - - def tx_confirmed(self, ec, tx_hash): - if ec: - print "Problem confirming transaction", tx_hash, ec - else: - print "Confirmed", tx_hash - -class GhostValue: - - def __init__(self): - self.event = threading.Event() - self.value = None - - def get(self): - self.event.wait() - return self.value - - def set(self, value): - self.value = value - self.event.set() - -class NumblocksSubscribe: - - def __init__(self, backend, processor): - self.backend = backend - self.processor = processor - self.lock = threading.Lock() - self.backend.blockchain.subscribe_reorganize(self.reorganize) - self.backend.blockchain.fetch_last_depth(self.set_last_depth) - self.latest = GhostValue() - - def set_last_depth(self, ec, last_depth): - if ec: - print "Error retrieving last depth", ec - else: - self.latest.set(last_depth) - - def reorganize(self, ec, fork_point, arrivals, replaced): - latest = fork_point + len(arrivals) - self.latest.set(latest) - self.processor.push_response({"method":"numblocks.subscribe", "result": latest}) - self.backend.blockchain.subscribe_reorganize(self.reorganize) - - -class AddressGetHistory: - - def __init__(self, backend, processor): - self.backend = backend - self.processor = processor - - def get(self, request): - address = str(request["params"]) - composed.payment_history(self.backend.blockchain, address, - bitcoin.bind(self.respond, request, bitcoin._1)) - - def respond(self, request, result): - self.processor.push_response({"id": request["id"], "method":request["method"], "params":request["params"], "result": result}) - - -class BlockchainProcessor(Processor): - - def __init__(self, config): - Processor.__init__(self) - self.backend = Backend() - self.numblocks_subscribe = NumblocksSubscribe(self.backend, self) - self.address_get_history = AddressGetHistory(self.backend, self) - - def stop(self): - self.backend.stop() - - def process(self, request): - - print "New request (lib)", request - if request["method"] == "numblocks.subscribe": - self.numblocks_subscribe.subscribe(session, request) - elif request["method"] == "address.get_history": - self.address_get_history.get(request) - elif request["method"] == "server.banner": - self.push_response({"id": request["id"], "method": request["method"], "params":request["params"], - "result": "libbitcoin using python-bitcoin bindings"}) - elif request["method"] == "transaction.broadcast": - self.broadcast_transaction(request) - # Execute and when ready, you call - # self.push_response(response) - - def broadcast_transaction(self, request): - raw_tx = bitcoin.data_chunk(str(request["params"])) - exporter = bitcoin.satoshi_exporter() - try: - tx = exporter.load_transaction(raw_tx) - except RuntimeError: - response = {"id": request["id"], "method": request["method"], "params":request["params"], "result": None, - "error": {"message": - "Exception while parsing the transaction data.", - "code": -4}} - else: - self.backend.protocol.broadcast_transaction(tx) - tx_hash = str(bitcoin.hash_transaction(tx)) - response = {"id": request["id"], "method": request["method"], "params":request["params"], "result": tx_hash} - self.push_response(response) - - def run(self): - # this class is a thread. it does nothing in this example. - print "Warning: pre-alpha prototype. Full of bugs." - while not self.shared.stopped(): - time.sleep(1) - diff --git a/modules/libbitcoin/composed.py b/modules/libbitcoin/composed.py deleted file mode 100644 index 9df57a0..0000000 --- a/modules/libbitcoin/composed.py +++ /dev/null @@ -1,194 +0,0 @@ -import bitcoin -import threading -import time - -class ExpiryQueue(threading.Thread): - - def __init__(self): - self.lock = threading.Lock() - self.items = [] - threading.Thread.__init__(self) - self.daemon = True - - def run(self): - # Garbage collection - while True: - with self.lock: - self.items = [i for i in self.items if not i.stopped()] - time.sleep(0.1) - - def add(self, item): - with self.lock: - self.items.append(item) - -expiry_queue = ExpiryQueue() - -class StatementLine: - - def __init__(self, output_point): - self.lock = threading.Lock() - self.output_point = output_point - self.output_loaded = None - self.input_point = None - self.input_loaded = None - self.raw_output_script = None - - def is_loaded(self): - with self.lock: - if self.output_loaded is None: - return False - elif (self.input_point is not False and - self.input_loaded is None): - return False - return True - -class PaymentHistory: - - def __init__(self, chain): - self.chain = chain - self.lock = threading.Lock() - self.statement = [] - self._stopped = False - - def run(self, address, handle_finish): - self.address = address - self.handle_finish = handle_finish - - pubkey_hash = bitcoin.address_to_short_hash(address) - self.chain.fetch_outputs(pubkey_hash, self.start_loading) - - def start_loading(self, ec, output_points): - with self.lock: - for outpoint in output_points: - statement_line = StatementLine(outpoint) - self.statement.append(statement_line) - self.chain.fetch_spend(outpoint, - bitcoin.bind(self.load_spend, - bitcoin._1, bitcoin._2, statement_line)) - self.load_tx_info(outpoint, statement_line, False) - - def load_spend(self, ec, inpoint, statement_line): - with statement_line.lock: - if ec: - statement_line.input_point = False - else: - statement_line.input_point = inpoint - self.finish_if_done() - if not ec: - self.load_tx_info(inpoint, statement_line, True) - - def finish_if_done(self): - with self.lock: - if any(not line.is_loaded() for line in self.statement): - return - result = [] - for line in self.statement: - if line.input_point: - line.input_loaded["value"] = -line.output_loaded["value"] - result.append(line.input_loaded) - else: - line.output_loaded["raw_output_script"] = \ - line.raw_output_script - result.append(line.output_loaded) - self.handle_finish(result) - self.stop() - - def stop(self): - with self.lock: - self._stopped = True - - def stopped(self): - with self.lock: - return self._stopped - - def load_tx_info(self, point, statement_line, is_input): - info = {} - info["tx_hash"] = str(point.hash) - info["index"] = point.index - info["is_input"] = 1 if is_input else 0 - self.chain.fetch_transaction_index(point.hash, - bitcoin.bind(self.tx_index, bitcoin._1, bitcoin._2, bitcoin._3, - statement_line, info)) - - def tx_index(self, ec, block_depth, offset, statement_line, info): - info["height"] = block_depth - self.chain.fetch_block_header_by_depth(block_depth, - bitcoin.bind(self.block_header, bitcoin._1, bitcoin._2, - statement_line, info)) - - def block_header(self, ec, blk_head, statement_line, info): - info["timestamp"] = blk_head.timestamp - info["block_hash"] = str(bitcoin.hash_block_header(blk_head)) - tx_hash = bitcoin.hash_digest(info["tx_hash"]) - self.chain.fetch_transaction(tx_hash, - bitcoin.bind(self.load_tx, bitcoin._1, bitcoin._2, - statement_line, info)) - - def load_tx(self, ec, tx, statement_line, info): - outputs = [] - for tx_out in tx.outputs: - script = tx_out.output_script - if script.type() == bitcoin.payment_type.pubkey_hash: - pkh = bitcoin.short_hash(str(script.operations()[2].data)) - outputs.append(bitcoin.public_key_hash_to_address(pkh)) - else: - outputs.append("Unknown") - info["outputs"] = outputs - info["inputs"] = [None for i in range(len(tx.inputs))] - if info["is_input"] == 1: - info["inputs"][info["index"]] = self.address - else: - our_output = tx.outputs[info["index"]] - info["value"] = our_output.value - with statement_line.lock: - statement_line.raw_output_script = \ - str(bitcoin.save_script(our_output.output_script)) - if not [empty_in for empty_in in info["inputs"] if empty_in is None]: - # We have the sole input - assert(info["is_input"] == 1) - with statement_line.lock: - statement_line.input_loaded = info - self.finish_if_done() - for tx_idx, tx_in in enumerate(tx.inputs): - if info["is_input"] == 1 and info["index"] == tx_idx: - continue - self.chain.fetch_transaction(tx_in.previous_output.hash, - bitcoin.bind(self.load_input, bitcoin._1, bitcoin._2, - tx_in.previous_output.index, statement_line, info, tx_idx)) - - def load_input(self, ec, tx, index, statement_line, info, inputs_index): - script = tx.outputs[index].output_script - if script.type() == bitcoin.payment_type.pubkey_hash: - pkh = bitcoin.short_hash(str(script.operations()[2].data)) - info["inputs"][inputs_index] = \ - bitcoin.public_key_hash_to_address(pkh) - else: - info["inputs"][inputs_index] = "Unknown" - if not [empty_in for empty_in in info["inputs"] if empty_in is None]: - with statement_line.lock: - if info["is_input"] == 1: - statement_line.input_loaded = info - else: - statement_line.output_loaded = info - self.finish_if_done() - -def payment_history(chain, address, handle_finish): - ph = PaymentHistory(chain) - expiry_queue.add(ph) - ph.run(address, handle_finish) - -if __name__ == "__main__": - def finish(result): - print result - def last(ec, depth): - print "D:", depth - - service = bitcoin.async_service(1) - prefix = "/home/genjix/libbitcoin/database" - chain = bitcoin.bdb_blockchain(service, prefix) - chain.fetch_last_depth(last) - address = "1Pbn3DLXfjqF1fFV9YPdvpvyzejZwkHhZE" - print "Looking up", address - payment_history(chain, address, finish) - raw_input() - diff --git a/server.py b/server.py index f0b2b41..1697150 100755 --- a/server.py +++ b/server.py @@ -29,7 +29,7 @@ def create_config(): config = ConfigParser.ConfigParser() # set some defaults, which will be overwritten by the config file config.add_section('server') - config.set('server','banner', 'Welcome to Electrum!') + config.set('server', 'banner', 'Welcome to Electrum!') config.set('server', 'host', 'localhost') config.set('server', 'native_port', '50000') config.set('server', 'stratum_tcp_port', '50001') @@ -80,14 +80,14 @@ if __name__ == '__main__': from transports.stratum_tcp import TcpServer from transports.native import NativeServer - from modules.irc import ServerProcessor + from backends.irc import ServerProcessor backend_name = config.get('server', 'backend') if backend_name == "libbitcoin": # NativeServer cannot be used with libbitcoin native_port = None config.set('server', 'native_port', '') try: - backend = __import__("modules." + backend_name, + backend = __import__("backends." + backend_name, fromlist=["BlockchainProcessor"]) except ImportError: sys.stderr.write('Unknown backend specified\n') -- 1.7.1