From: ThomasV Date: Sat, 20 Apr 2013 19:07:02 +0000 (+0400) Subject: various fixes X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=commitdiff_plain;h=247a7b2add3cdffe0f60553a2b2a07db9ad4b576 various fixes --- diff --git a/backends/bitcoind/blockchain_processor.py b/backends/bitcoind/blockchain_processor.py index 2af9534..219ffdc 100644 --- a/backends/bitcoind/blockchain_processor.py +++ b/backends/bitcoind/blockchain_processor.py @@ -894,6 +894,7 @@ class BlockchainProcessor(Processor): self.history_cache.pop(address) if address in self.watched_addresses: + # TODO: update cache here. if new value equals cached value, do not send notification self.address_queue.put(address) def main_iteration(self): diff --git a/processor.py b/processor.py index 71b9d62..9fa9378 100644 --- a/processor.py +++ b/processor.py @@ -83,6 +83,7 @@ class RequestDispatcher(threading.Thread): self.internal_ids = {} self.internal_id = 1 self.lock = threading.Lock() + self.idlock = threading.Lock() self.sessions = [] self.processors = {} @@ -104,11 +105,11 @@ class RequestDispatcher(threading.Thread): return x def get_session_id(self, internal_id): - with self.lock: + with self.idlock: return self.internal_ids.pop(internal_id) def store_session_id(self, session, msgid): - with self.lock: + with self.idlock: self.internal_ids[self.internal_id] = session, msgid r = self.internal_id self.internal_id += 1 @@ -137,7 +138,6 @@ class RequestDispatcher(threading.Thread): suffix = method.split('.')[-1] if session is not None: - is_new = session.protocol_version >= 0.5 if suffix == 'subscribe': session.subscribe_to_service(method, params) @@ -160,9 +160,6 @@ class RequestDispatcher(threading.Thread): except: pass - #if session.protocol_version < 0.6: - # print_log("stopping session from old client", session.protocol_version) - # session.stop() def get_sessions(self): with self.lock: diff --git a/server.py b/server.py index c17f05e..4d347e7 100755 --- a/server.py +++ b/server.py @@ -95,6 +95,7 @@ def run_rpc_command(command, stratum_tcp_port): msg = '' while True: o = s.recv(1024) + if not o: break msg += o if msg.find('\n') != -1: break diff --git a/transports/stratum_tcp.py b/transports/stratum_tcp.py index b21ca96..88ec0cf 100644 --- a/transports/stratum_tcp.py +++ b/transports/stratum_tcp.py @@ -3,6 +3,7 @@ import Queue as queue import socket import threading import time +import traceback, sys from processor import Session, Dispatcher from utils import print_log @@ -33,8 +34,17 @@ class TcpSession(Session): return self._connection def stop(self): + if self.stopped(): + return + + try: + self._connection.shutdown(socket.SHUT_RDWR) + except: + # print_log("problem shutting down", self.address) + # traceback.print_exc(file=sys.stdout) + pass + self._connection.close() - #print "Terminating connection:", self.address with self.lock: self._stopped = True @@ -140,16 +150,26 @@ class TcpServer(threading.Thread): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((self.host, self.port)) - sock.listen(1) + sock.listen(5) + while not self.shared.stopped(): + try: connection, address = sock.accept() + except: + traceback.print_exc(file=sys.stdout) + time.sleep(0.1) + continue + + try: session = TcpSession(connection, address, use_ssl=self.use_ssl, ssl_certfile=self.ssl_certfile, ssl_keyfile=self.ssl_keyfile) except BaseException, e: error = str(e) print_log("cannot start TCP session", error, address) + connection.close() time.sleep(0.1) continue + self.dispatcher.add_session(session) self.dispatcher.collect_garbage() client_req = TcpClientRequestor(self.dispatcher, session)