From 880d9b9f27b9ff81ed8d65cf7cd5ee64d6997a1b Mon Sep 17 00:00:00 2001 From: ThomasV Date: Sat, 3 Dec 2011 19:33:15 +0300 Subject: [PATCH] Memory cache for addresses. Not activated for the moment, but it will log errors. --- server.py | 65 ++++++++++++++++++++++++++++++++++++++++++++++-------------- 1 files changed, 50 insertions(+), 15 deletions(-) diff --git a/server.py b/server.py index 500a17b..5292eb8 100755 --- a/server.py +++ b/server.py @@ -60,19 +60,41 @@ dblock = thread.allocate_lock() peer_list = {} + + class MyStore(Datastore_class): - def safe_sql(self,sql, params=()): + def import_tx(self, tx, is_coinbase): + tx_id = super(MyStore, self).import_tx(tx, is_coinbase) + self.update_tx_cache(tx_id) + + def update_tx_cache(self, txid): + inrows = self.get_tx_inputs(txid, False) + for row in inrows: + _hash = store.binout(row[6]) + address = hash_to_address(chr(0), _hash) + if self.tx_cache.has_key(address): + print "cache: popping", address + self.tx_cache.pop(address) + outrows = self.get_tx_outputs(txid, False) + for row in outrows: + _hash = store.binout(row[6]) + address = hash_to_address(chr(0), _hash) + if self.tx_cache.has_key(address): + print "cache: popping", address + self.tx_cache.pop(address) + + def safe_sql(self,sql, params=(), lock=True): try: - dblock.acquire() + if lock: dblock.acquire() ret = self.selectall(sql,params) - dblock.release() + if lock: dblock.release() return ret except: print "sql error", sql return [] - def get_tx_outputs(self, tx_id): + def get_tx_outputs(self, tx_id, lock=True): return self.safe_sql("""SELECT txout.txout_pos, txout.txout_scriptPubKey, @@ -87,9 +109,9 @@ class MyStore(Datastore_class): LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id) WHERE txout.tx_id = %d ORDER BY txout.txout_pos - """%(tx_id)) + """%(tx_id), (), lock) - def get_tx_inputs(self, tx_id): + def get_tx_inputs(self, tx_id, lock=True): return self.safe_sql(""" SELECT txin.txin_pos, txin.txin_scriptSig, @@ -105,7 +127,7 @@ class MyStore(Datastore_class): LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id) WHERE txin.tx_id = %d ORDER BY txin.txin_pos - """%(tx_id,)) + """%(tx_id,), (), lock) def get_address_out_rows(self, dbhash): return self.safe_sql(""" SELECT @@ -174,6 +196,9 @@ class MyStore(Datastore_class): WHERE pubkey.pubkey_hash = ? """, (dbhash,)) def get_txpoints(self, addr): + + cached_version = self.tx_cache.get( addr ) + version, binaddr = decode_check_address(addr) if binaddr is None: return "err" @@ -267,11 +292,18 @@ class MyStore(Datastore_class): if not row[4]: txpoint['raw_scriptPubKey'] = row[1] - return txpoints + if cached_version is None: + #print "cache: adding", addr + self.tx_cache[addr] = txpoints + return txpoints + else: + if cached_version != txpoints: + print "cache error: ", addr + return txpoints def get_status(self, addr): - # last block for an address + # last block for an address. tx_points = self.get_txpoints(addr) if not tx_points: return None @@ -360,9 +392,8 @@ def client_thread(ipaddr,conn): elif cmd == 'h': # history - addr = data - h = store.get_txpoints( addr ) - out = repr(h) + address = data + out = repr( store.get_txpoints( address ) ) elif cmd == 'load': if config.get('server','password') == data: @@ -401,26 +432,29 @@ def client_thread(ipaddr,conn): ds = BCDataStream.BCDataStream() -def memorypool_update(store): + +def memorypool_update(store): conn = bitcoinrpc.connect_to_local() try: v = conn.getmemorypool() except: - print "cannot contact bitcoin daemmon" + print "cannot contact bitcoin daemon" return v = v['transactions'] for hextx in v: ds.clear() ds.write(hextx.decode('hex')) tx = deserialize.parse_Transaction(ds) + #print "new tx",tx + tx['hash'] = util.double_sha256(tx['tx']) if store.tx_find_id_and_value(tx): pass else: store.import_tx(tx, False) - #print tx['hash'][::-1].encode('hex') + store.commit() @@ -517,6 +551,7 @@ if __name__ == '__main__': elif args.dbtype == 'psycopg2': args.connect_args = { 'database' : config.get('database','database') } store = MyStore(args) + store.tx_cache = {} thread.start_new_thread(listen_thread, (store,)) thread.start_new_thread(clean_session_thread, ()) -- 1.7.1