Memory cache for addresses. Not activated for the moment, but it will log errors.
authorThomasV <thomasv@gitorious>
Sat, 3 Dec 2011 16:33:15 +0000 (19:33 +0300)
committerThomasV <thomasv@gitorious>
Sat, 3 Dec 2011 16:33:15 +0000 (19:33 +0300)
server.py

index 500a17b..5292eb8 100755 (executable)
--- a/server.py
+++ b/server.py
@@ -60,19 +60,41 @@ dblock = thread.allocate_lock()
 
 peer_list = {}
 
+
+
 class MyStore(Datastore_class):
 
-    def safe_sql(self,sql, params=()):
+    def import_tx(self, tx, is_coinbase):
+        tx_id = super(MyStore, self).import_tx(tx, is_coinbase)
+        self.update_tx_cache(tx_id)
+
+    def update_tx_cache(self, txid):
+        inrows = self.get_tx_inputs(txid, False)
+        for row in inrows:
+            _hash = store.binout(row[6])
+            address = hash_to_address(chr(0), _hash)
+            if self.tx_cache.has_key(address):
+                print "cache: popping", address
+                self.tx_cache.pop(address)
+        outrows = self.get_tx_outputs(txid, False)
+        for row in outrows:
+            _hash = store.binout(row[6])
+            address = hash_to_address(chr(0), _hash)
+            if self.tx_cache.has_key(address):
+                print "cache: popping", address
+                self.tx_cache.pop(address)
+
+    def safe_sql(self,sql, params=(), lock=True):
         try:
-            dblock.acquire()
+            if lock: dblock.acquire()
             ret = self.selectall(sql,params)
-            dblock.release()
+            if lock: dblock.release()
             return ret
         except:
             print "sql error", sql
             return []
 
-    def get_tx_outputs(self, tx_id):
+    def get_tx_outputs(self, tx_id, lock=True):
         return self.safe_sql("""SELECT
                 txout.txout_pos,
                 txout.txout_scriptPubKey,
@@ -87,9 +109,9 @@ class MyStore(Datastore_class):
               LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
              WHERE txout.tx_id = %d 
              ORDER BY txout.txout_pos
-        """%(tx_id))
+        """%(tx_id), (), lock)
 
-    def get_tx_inputs(self, tx_id):
+    def get_tx_inputs(self, tx_id, lock=True):
         return self.safe_sql(""" SELECT
                 txin.txin_pos,
                 txin.txin_scriptSig,
@@ -105,7 +127,7 @@ class MyStore(Datastore_class):
               LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
              WHERE txin.tx_id = %d
              ORDER BY txin.txin_pos
-             """%(tx_id,))
+             """%(tx_id,), (), lock)
 
     def get_address_out_rows(self, dbhash):
         return self.safe_sql(""" SELECT
@@ -174,6 +196,9 @@ class MyStore(Datastore_class):
              WHERE pubkey.pubkey_hash = ? """, (dbhash,))
 
     def get_txpoints(self, addr):
+
+        cached_version = self.tx_cache.get( addr ) 
+
         version, binaddr = decode_check_address(addr)
         if binaddr is None:
             return "err"
@@ -267,11 +292,18 @@ class MyStore(Datastore_class):
                     if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
 
 
-        return txpoints
+        if cached_version is None:
+            #print "cache: adding", addr
+            self.tx_cache[addr] = txpoints
+            return txpoints
+        else:
+            if cached_version != txpoints: 
+                print "cache error: ", addr
+            return txpoints
 
 
     def get_status(self, addr):
-        # last block for an address
+        # last block for an address.
         tx_points = self.get_txpoints(addr)
         if not tx_points:
             return None
@@ -360,9 +392,8 @@ def client_thread(ipaddr,conn):
 
         elif cmd == 'h': 
             # history
-            addr = data
-            h = store.get_txpoints( addr )
-            out = repr(h)
+            address = data
+            out = repr( store.get_txpoints( address ) )
 
         elif cmd == 'load': 
             if config.get('server','password') == data:
@@ -401,26 +432,29 @@ def client_thread(ipaddr,conn):
 ds = BCDataStream.BCDataStream()
 
 
-def memorypool_update(store):
 
+
+def memorypool_update(store):
     conn = bitcoinrpc.connect_to_local()
     try:
         v = conn.getmemorypool()
     except:
-        print "cannot contact bitcoin daemmon"
+        print "cannot contact bitcoin daemon"
         return
     v = v['transactions']
     for hextx in v:
         ds.clear()
         ds.write(hextx.decode('hex'))
         tx = deserialize.parse_Transaction(ds)
+        #print "new tx",tx
+
         tx['hash'] = util.double_sha256(tx['tx'])
             
         if store.tx_find_id_and_value(tx):
             pass
         else:
             store.import_tx(tx, False)
-            #print tx['hash'][::-1].encode('hex')
+
     store.commit()
 
 
@@ -517,6 +551,7 @@ if __name__ == '__main__':
     elif args.dbtype == 'psycopg2':
        args.connect_args = { 'database' : config.get('database','database') }
     store = MyStore(args)
+    store.tx_cache = {}
 
     thread.start_new_thread(listen_thread, (store,))
     thread.start_new_thread(clean_session_thread, ())