various fixes
[electrum-server.git] / transports / stratum_tcp.py
1 import json
2 import Queue as queue
3 import socket
4 import threading
5 import time
6 import traceback, sys
7
8 from processor import Session, Dispatcher
9 from utils import print_log
10
11
12 class TcpSession(Session):
13
14     def __init__(self, connection, address, use_ssl, ssl_certfile, ssl_keyfile):
15         Session.__init__(self)
16         if use_ssl:
17             import ssl
18             self._connection = ssl.wrap_socket(
19                 connection,
20                 server_side=True,
21                 certfile=ssl_certfile,
22                 keyfile=ssl_keyfile,
23                 ssl_version=ssl.PROTOCOL_SSLv23)
24         else:
25             self._connection = connection
26
27         self.address = address[0]
28         self.name = "TCP " if not use_ssl else "SSL "
29
30     def connection(self):
31         if self.stopped():
32             raise Exception("Session was stopped")
33         else:
34             return self._connection
35
36     def stop(self):
37         if self.stopped():
38             return
39
40         try:
41             self._connection.shutdown(socket.SHUT_RDWR)
42         except:
43             # print_log("problem shutting down", self.address)
44             # traceback.print_exc(file=sys.stdout)
45             pass
46
47         self._connection.close()
48         with self.lock:
49             self._stopped = True
50
51     def send_response(self, response):
52         data = json.dumps(response) + "\n"
53         # Possible race condition here by having session
54         # close connection?
55         # I assume Python connections are thread safe interfaces
56         try:
57             connection = self.connection()
58             while data:
59                 l = connection.send(data)
60                 data = data[l:]
61         except:
62             self.stop()
63
64
65 class TcpClientRequestor(threading.Thread):
66
67     def __init__(self, dispatcher, session):
68         self.shared = dispatcher.shared
69         self.dispatcher = dispatcher
70         self.message = ""
71         self.session = session
72         threading.Thread.__init__(self)
73
74     def run(self):
75         while not self.shared.stopped():
76             if not self.update():
77                 break
78
79             self.session.time = time.time()
80
81             while self.parse():
82                 pass
83
84     def update(self):
85         data = self.receive()
86         if not data:
87             # close_session
88             self.session.stop()
89             return False
90
91         self.message += data
92         return True
93
94     def receive(self):
95         try:
96             return self.session.connection().recv(2048)
97         except:
98             return ''
99
100     def parse(self):
101         raw_buffer = self.message.find('\n')
102         if raw_buffer == -1:
103             return False
104
105         raw_command = self.message[0:raw_buffer].strip()
106         self.message = self.message[raw_buffer + 1:]
107         if raw_command == 'quit':
108             self.session.stop()
109             return False
110
111         try:
112             command = json.loads(raw_command)
113         except:
114             self.dispatcher.push_response({"error": "bad JSON", "request": raw_command})
115             return True
116
117         try:
118             # Try to load vital fields, and return an error if
119             # unsuccessful.
120             message_id = command['id']
121             method = command['method']
122         except KeyError:
123             # Return an error JSON in response.
124             self.dispatcher.push_response({"error": "syntax error", "request": raw_command})
125         else:
126             self.dispatcher.push_request(self.session, command)
127
128         return True
129
130
131 class TcpServer(threading.Thread):
132
133     def __init__(self, dispatcher, host, port, use_ssl, ssl_certfile, ssl_keyfile):
134         self.shared = dispatcher.shared
135         self.dispatcher = dispatcher.request_dispatcher
136         threading.Thread.__init__(self)
137         self.daemon = True
138         self.host = host
139         self.port = port
140         self.lock = threading.Lock()
141         self.use_ssl = use_ssl
142         self.ssl_keyfile = ssl_keyfile
143         self.ssl_certfile = ssl_certfile
144
145     def run(self):
146         if self.use_ssl:
147             print_log("TCP/SSL server started.")
148         else:
149             print_log("TCP server started.")
150         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
151         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
152         sock.bind((self.host, self.port))
153         sock.listen(5)
154
155         while not self.shared.stopped():
156
157             try:
158                 connection, address = sock.accept()
159             except:
160                 traceback.print_exc(file=sys.stdout)
161                 time.sleep(0.1)
162                 continue
163
164             try:
165                 session = TcpSession(connection, address, use_ssl=self.use_ssl, ssl_certfile=self.ssl_certfile, ssl_keyfile=self.ssl_keyfile)
166             except BaseException, e:
167                 error = str(e)
168                 print_log("cannot start TCP session", error, address)
169                 connection.close()
170                 time.sleep(0.1)
171                 continue
172
173             self.dispatcher.add_session(session)
174             self.dispatcher.collect_garbage()
175             client_req = TcpClientRequestor(self.dispatcher, session)
176             client_req.start()