From 3051f46df11fc6c9ebe321c9ef6d726e65310129 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Tue, 5 Jun 2012 20:52:13 +0400 Subject: [PATCH] add timeout for http sessions --- backends/irc/__init__.py | 6 +++++- processor.py | 1 + server.py | 3 ++- transports/stratum_http.py | 9 ++++++++- transports/stratum_tcp.py | 2 ++ 5 files changed, 18 insertions(+), 3 deletions(-) diff --git a/backends/irc/__init__.py b/backends/irc/__init__.py index 036d68d..9a53402 100644 --- a/backends/irc/__init__.py +++ b/backends/irc/__init__.py @@ -118,7 +118,11 @@ class ServerProcessor(Processor): result = 'ok' elif method == 'server.info': - result = map(lambda s: { "address":s.address, "version":s.version, "subscriptions":len(s.subscriptions)}, self.dispatcher.request_dispatcher.sessions) + result = map(lambda s: { "time":s.time, + "address":s.address, + "version":s.version, + "subscriptions":len(s.subscriptions)}, + self.dispatcher.request_dispatcher.sessions) else: print "unknown method", request diff --git a/processor.py b/processor.py index 5ba83ad..20fdbfd 100644 --- a/processor.py +++ b/processor.py @@ -170,6 +170,7 @@ class Session: self.address = '' self.name = '' self.version = 'unknown' + self.time = time.time() threading.Timer(2, self.info).start() # Debugging method. Doesn't need to be threadsafe. diff --git a/server.py b/server.py index 914a6bb..5846fba 100755 --- a/server.py +++ b/server.py @@ -72,8 +72,9 @@ def run_rpc_command(command, stratum_tcp_port): r = json.loads(msg).get('result') if command == 'stop': print r elif command == 'info': + now = time.time() for item in r: - print '%15s %3s %7s'%( item.get('address'), item.get('subscriptions'), item.get('version') ) + print '%15s %3s %7s %.2f'%( item.get('address'), item.get('subscriptions'), item.get('version'), (now - item.get('time')) ) if __name__ == '__main__': config = create_config() diff --git a/transports/stratum_http.py b/transports/stratum_http.py index ef738e3..8118478 100644 --- a/transports/stratum_http.py +++ b/transports/stratum_http.py @@ -22,7 +22,7 @@ import SimpleXMLRPCServer import SocketServer import socket import logging -import os +import os, time import types import traceback import sys, threading @@ -100,6 +100,7 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): session = self.dispatcher.get_session_by_address(session_id) if not session: return 'Error: session not found' + session.time = time.time() responses = [] if type(request) is not types.ListType: @@ -290,6 +291,12 @@ class HttpSession(Session): raw_response = json.dumps(response) self.pending_responses.put(response) + def stopped(self): + with self.lock: + if time.time() - self.time > 60: + self._stopped = True + return self._stopped + class HttpServer(threading.Thread): def __init__(self, dispatcher, host, port): self.shared = dispatcher.shared diff --git a/transports/stratum_tcp.py b/transports/stratum_tcp.py index 8f0c936..b7176b9 100644 --- a/transports/stratum_tcp.py +++ b/transports/stratum_tcp.py @@ -55,6 +55,8 @@ class TcpClientRequestor(threading.Thread): if not self.update(): break + self.session.time = time.time() + while self.parse(): pass -- 1.7.1