add a sleep() to sessions, and remove some dead code
[electrum-server.git] / transports / stratum_tcp.py
1 import json
2 import Queue as queue
3 import socket
4 import threading
5 import time
6 import traceback, sys
7
8 from processor import Session, Dispatcher
9 from utils import print_log
10
11
12 class TcpSession(Session):
13
14     def __init__(self, connection, address, use_ssl, ssl_certfile, ssl_keyfile):
15         Session.__init__(self)
16         self.use_ssl = use_ssl
17         if use_ssl:
18             import ssl
19             self._connection = ssl.wrap_socket(
20                 connection,
21                 server_side=True,
22                 certfile=ssl_certfile,
23                 keyfile=ssl_keyfile,
24                 ssl_version=ssl.PROTOCOL_SSLv23,
25                 do_handshake_on_connect=False)
26         else:
27             self._connection = connection
28
29         self.address = address[0]
30         self.name = "TCP " if not use_ssl else "SSL "
31         self.response_queue = queue.Queue()
32
33     def do_handshake(self):
34         if self.use_ssl:
35             self._connection.do_handshake()
36
37     def connection(self):
38         if self.stopped():
39             raise Exception("Session was stopped")
40         else:
41             return self._connection
42
43     def stop(self):
44         if self.stopped():
45             return
46
47         try:
48             self._connection.shutdown(socket.SHUT_RDWR)
49         except:
50             # print_log("problem shutting down", self.address)
51             # traceback.print_exc(file=sys.stdout)
52             pass
53
54         self._connection.close()
55         with self.lock:
56             self._stopped = True
57
58     def send_response(self, response):
59         self.response_queue.put(response)
60
61
62 class TcpClientResponder(threading.Thread):
63
64     def __init__(self, session):
65         self.session = session
66         threading.Thread.__init__(self)
67
68     def run(self):
69         while not self.session.stopped():
70             try:
71                 response = self.session.response_queue.get(timeout=10)
72             except queue.Empty:
73                 continue
74             data = json.dumps(response) + "\n"
75             try:
76                 while data:
77                     l = self.session.connection().send(data)
78                     data = data[l:]
79             except:
80                 self.session.stop()
81
82
83
84 class TcpClientRequestor(threading.Thread):
85
86     def __init__(self, dispatcher, session):
87         self.shared = dispatcher.shared
88         self.dispatcher = dispatcher
89         self.message = ""
90         self.session = session
91         threading.Thread.__init__(self)
92
93     def run(self):
94         try:
95             self.session.do_handshake()
96         except:
97             self.session.stop()
98             return
99
100         while not self.shared.stopped():
101
102             data = self.receive()
103             if not data:
104                 self.session.stop()
105                 break
106
107             self.message += data
108             self.session.time = time.time()
109
110             while self.parse():
111                 pass
112
113
114     def receive(self):
115         try:
116             return self.session.connection().recv(2048)
117         except:
118             return ''
119
120     def parse(self):
121         raw_buffer = self.message.find('\n')
122         if raw_buffer == -1:
123             return False
124
125         raw_command = self.message[0:raw_buffer].strip()
126         self.message = self.message[raw_buffer + 1:]
127         if raw_command == 'quit':
128             self.session.stop()
129             return False
130
131         try:
132             command = json.loads(raw_command)
133         except:
134             self.dispatcher.push_response({"error": "bad JSON", "request": raw_command})
135             return True
136
137         try:
138             # Try to load vital fields, and return an error if
139             # unsuccessful.
140             message_id = command['id']
141             method = command['method']
142         except KeyError:
143             # Return an error JSON in response.
144             self.dispatcher.push_response({"error": "syntax error", "request": raw_command})
145         else:
146             self.dispatcher.push_request(self.session, command)
147             # sleep a bit to prevent a single session from DOSing the queue
148             time.sleep(0.01)
149
150         return True
151
152
153 class TcpServer(threading.Thread):
154
155     def __init__(self, dispatcher, host, port, use_ssl, ssl_certfile, ssl_keyfile):
156         self.shared = dispatcher.shared
157         self.dispatcher = dispatcher.request_dispatcher
158         threading.Thread.__init__(self)
159         self.daemon = True
160         self.host = host
161         self.port = port
162         self.lock = threading.Lock()
163         self.use_ssl = use_ssl
164         self.ssl_keyfile = ssl_keyfile
165         self.ssl_certfile = ssl_certfile
166
167     def run(self):
168         print_log( ("SSL" if self.use_ssl else "TCP") + " server started on port %d"%self.port)
169         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
170         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
171         sock.bind((self.host, self.port))
172         sock.listen(5)
173
174         while not self.shared.stopped():
175
176             #if self.use_ssl: print_log("SSL: socket listening")
177             try:
178                 connection, address = sock.accept()
179             except:
180                 traceback.print_exc(file=sys.stdout)
181                 time.sleep(0.1)
182                 continue
183
184             #if self.use_ssl: print_log("SSL: new session", address)
185             try:
186                 session = TcpSession(connection, address, use_ssl=self.use_ssl, ssl_certfile=self.ssl_certfile, ssl_keyfile=self.ssl_keyfile)
187             except BaseException, e:
188                 error = str(e)
189                 print_log("cannot start TCP session", error, address)
190                 connection.close()
191                 time.sleep(0.1)
192                 continue
193
194             self.dispatcher.add_session(session)
195             self.dispatcher.collect_garbage()
196             client_req = TcpClientRequestor(self.dispatcher, session)
197             client_req.start()
198             responder = TcpClientResponder(session)
199             responder.start()