various fixes
authorThomasV <thomasv@gitorious>
Sat, 20 Apr 2013 19:07:02 +0000 (23:07 +0400)
committerThomasV <thomasv@gitorious>
Sat, 20 Apr 2013 19:07:02 +0000 (23:07 +0400)
backends/bitcoind/blockchain_processor.py
processor.py
server.py
transports/stratum_tcp.py

index 2af9534..219ffdc 100644 (file)
@@ -894,6 +894,7 @@ class BlockchainProcessor(Processor):
                 self.history_cache.pop(address)
 
         if address in self.watched_addresses:
+            # TODO: update cache here. if new value equals cached value, do not send notification
             self.address_queue.put(address)
 
     def main_iteration(self):
index 71b9d62..9fa9378 100644 (file)
@@ -83,6 +83,7 @@ class RequestDispatcher(threading.Thread):
         self.internal_ids = {}
         self.internal_id = 1
         self.lock = threading.Lock()
+        self.idlock = threading.Lock()
         self.sessions = []
         self.processors = {}
 
@@ -104,11 +105,11 @@ class RequestDispatcher(threading.Thread):
                 return x
 
     def get_session_id(self, internal_id):
-        with self.lock:
+        with self.idlock:
             return self.internal_ids.pop(internal_id)
 
     def store_session_id(self, session, msgid):
-        with self.lock:
+        with self.idlock:
             self.internal_ids[self.internal_id] = session, msgid
             r = self.internal_id
             self.internal_id += 1
@@ -137,7 +138,6 @@ class RequestDispatcher(threading.Thread):
         suffix = method.split('.')[-1]
 
         if session is not None:
-            is_new = session.protocol_version >= 0.5
             if suffix == 'subscribe':
                 session.subscribe_to_service(method, params)
 
@@ -160,9 +160,6 @@ class RequestDispatcher(threading.Thread):
             except:
                 pass
 
-            #if session.protocol_version < 0.6:
-            #    print_log("stopping session from old client", session.protocol_version)
-            #    session.stop()
 
     def get_sessions(self):
         with self.lock:
index c17f05e..4d347e7 100755 (executable)
--- a/server.py
+++ b/server.py
@@ -95,6 +95,7 @@ def run_rpc_command(command, stratum_tcp_port):
     msg = ''
     while True:
         o = s.recv(1024)
+        if not o: break
         msg += o
         if msg.find('\n') != -1:
             break
index b21ca96..88ec0cf 100644 (file)
@@ -3,6 +3,7 @@ import Queue as queue
 import socket
 import threading
 import time
+import traceback, sys
 
 from processor import Session, Dispatcher
 from utils import print_log
@@ -33,8 +34,17 @@ class TcpSession(Session):
             return self._connection
 
     def stop(self):
+        if self.stopped():
+            return
+
+        try:
+            self._connection.shutdown(socket.SHUT_RDWR)
+        except:
+            # print_log("problem shutting down", self.address)
+            # traceback.print_exc(file=sys.stdout)
+            pass
+
         self._connection.close()
-        #print "Terminating connection:", self.address
         with self.lock:
             self._stopped = True
 
@@ -140,16 +150,26 @@ class TcpServer(threading.Thread):
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
         sock.bind((self.host, self.port))
-        sock.listen(1)
+        sock.listen(5)
+
         while not self.shared.stopped():
+
             try:
                 connection, address = sock.accept()
+            except:
+                traceback.print_exc(file=sys.stdout)
+                time.sleep(0.1)
+                continue
+
+            try:
                 session = TcpSession(connection, address, use_ssl=self.use_ssl, ssl_certfile=self.ssl_certfile, ssl_keyfile=self.ssl_keyfile)
             except BaseException, e:
                 error = str(e)
                 print_log("cannot start TCP session", error, address)
+                connection.close()
                 time.sleep(0.1)
                 continue
+
             self.dispatcher.add_session(session)
             self.dispatcher.collect_garbage()
             client_req = TcpClientRequestor(self.dispatcher, session)