add timeout for http sessions
[electrum-server.git] / transports / stratum_tcp.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 from processor import Session, Dispatcher, timestr
8
9 class TcpSession(Session):
10
11     def __init__(self, connection, address):
12         Session.__init__(self)
13         self._connection = connection
14         self.address = address[0]
15         self.name = "TCP session"
16
17     def connection(self):
18         if self.stopped():
19             raise Exception("Session was stopped")
20         else:
21             return self._connection
22
23     def stop(self):
24         self._connection.close()
25         #print "Terminating connection:", self.address
26         with self.lock:
27             self._stopped = True
28
29     def send_response(self, response):
30         data = json.dumps(response) + "\n"
31         # Possible race condition here by having session
32         # close connection?
33         # I assume Python connections are thread safe interfaces
34         try:
35             connection = self.connection()
36             while data:
37                 l = connection.send(data)
38                 data = data[l:]
39         except:
40             self.stop()
41
42
43
44 class TcpClientRequestor(threading.Thread):
45
46     def __init__(self, dispatcher, session):
47         self.shared = dispatcher.shared
48         self.dispatcher = dispatcher
49         self.message = ""
50         self.session = session
51         threading.Thread.__init__(self)
52
53     def run(self):
54         while not self.shared.stopped():
55             if not self.update():
56                 break
57
58             self.session.time = time.time()
59
60             while self.parse():
61                 pass
62
63     def update(self):
64         data = self.receive()
65         if not data:
66             # close_session
67             self.session.stop()
68             return False
69
70         self.message += data
71         return True
72
73     def receive(self):
74         try:
75             return self.session.connection().recv(2048)
76         except:
77             return ''
78
79     def parse(self):
80         raw_buffer = self.message.find('\n')
81         if raw_buffer == -1:
82             return False
83
84         raw_command = self.message[0:raw_buffer].strip()
85         self.message = self.message[raw_buffer + 1:]
86         if raw_command == 'quit': 
87             self.session.stop()
88             return False
89
90         try:
91             command = json.loads(raw_command)
92         except:
93             self.dispatcher.push_response({"error": "bad JSON", "request": raw_command})
94             return True
95
96         try:
97             # Try to load vital fields, and return an error if
98             # unsuccessful.
99             message_id = command['id']
100             method = command['method']
101         except KeyError:
102             # Return an error JSON in response.
103             self.dispatcher.push_response({"error": "syntax error", "request": raw_command})
104         else:
105             self.dispatcher.push_request(self.session,command)
106
107         return True
108
109 class TcpServer(threading.Thread):
110
111     def __init__(self, dispatcher, host, port):
112         self.shared = dispatcher.shared
113         self.dispatcher = dispatcher.request_dispatcher
114         threading.Thread.__init__(self)
115         self.daemon = True
116         self.host = host
117         self.port = port
118         self.lock = threading.Lock()
119
120     def run(self):
121         print "TCP server started."
122         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
123         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
124         sock.bind((self.host, self.port))
125         sock.listen(1)
126         while not self.shared.stopped():
127             session = TcpSession(*sock.accept())
128             self.dispatcher.add_session(session)
129             self.dispatcher.collect_garbage()
130             client_req = TcpClientRequestor(self.dispatcher, session)
131             client_req.start()
132