add delay if tcp session cannot be started
[electrum-server.git] / transports / stratum_tcp.py
1 import json
2 import Queue as queue
3 import socket
4 import threading
5 import time
6
7 from processor import Session, Dispatcher
8 from utils import print_log
9
10
11 class TcpSession(Session):
12
13     def __init__(self, connection, address, use_ssl, ssl_certfile, ssl_keyfile):
14         Session.__init__(self)
15         if use_ssl:
16             import ssl
17             self._connection = ssl.wrap_socket(
18                 connection,
19                 server_side=True,
20                 certfile=ssl_certfile,
21                 keyfile=ssl_keyfile,
22                 ssl_version=ssl.PROTOCOL_SSLv23)
23         else:
24             self._connection = connection
25
26         self.address = address[0]
27         self.name = "TCP " if not use_ssl else "SSL "
28
29     def connection(self):
30         if self.stopped():
31             raise Exception("Session was stopped")
32         else:
33             return self._connection
34
35     def stop(self):
36         self._connection.close()
37         #print "Terminating connection:", self.address
38         with self.lock:
39             self._stopped = True
40
41     def send_response(self, response):
42         data = json.dumps(response) + "\n"
43         # Possible race condition here by having session
44         # close connection?
45         # I assume Python connections are thread safe interfaces
46         try:
47             connection = self.connection()
48             while data:
49                 l = connection.send(data)
50                 data = data[l:]
51         except:
52             self.stop()
53
54
55 class TcpClientRequestor(threading.Thread):
56
57     def __init__(self, dispatcher, session):
58         self.shared = dispatcher.shared
59         self.dispatcher = dispatcher
60         self.message = ""
61         self.session = session
62         threading.Thread.__init__(self)
63
64     def run(self):
65         while not self.shared.stopped():
66             if not self.update():
67                 break
68
69             self.session.time = time.time()
70
71             while self.parse():
72                 pass
73
74     def update(self):
75         data = self.receive()
76         if not data:
77             # close_session
78             self.session.stop()
79             return False
80
81         self.message += data
82         return True
83
84     def receive(self):
85         try:
86             return self.session.connection().recv(2048)
87         except:
88             return ''
89
90     def parse(self):
91         raw_buffer = self.message.find('\n')
92         if raw_buffer == -1:
93             return False
94
95         raw_command = self.message[0:raw_buffer].strip()
96         self.message = self.message[raw_buffer + 1:]
97         if raw_command == 'quit':
98             self.session.stop()
99             return False
100
101         try:
102             command = json.loads(raw_command)
103         except:
104             self.dispatcher.push_response({"error": "bad JSON", "request": raw_command})
105             return True
106
107         try:
108             # Try to load vital fields, and return an error if
109             # unsuccessful.
110             message_id = command['id']
111             method = command['method']
112         except KeyError:
113             # Return an error JSON in response.
114             self.dispatcher.push_response({"error": "syntax error", "request": raw_command})
115         else:
116             self.dispatcher.push_request(self.session, command)
117
118         return True
119
120
121 class TcpServer(threading.Thread):
122
123     def __init__(self, dispatcher, host, port, use_ssl, ssl_certfile, ssl_keyfile):
124         self.shared = dispatcher.shared
125         self.dispatcher = dispatcher.request_dispatcher
126         threading.Thread.__init__(self)
127         self.daemon = True
128         self.host = host
129         self.port = port
130         self.lock = threading.Lock()
131         self.use_ssl = use_ssl
132         self.ssl_keyfile = ssl_keyfile
133         self.ssl_certfile = ssl_certfile
134
135     def run(self):
136         if self.use_ssl:
137             print_log("TCP/SSL server started.")
138         else:
139             print_log("TCP server started.")
140         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
141         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
142         sock.bind((self.host, self.port))
143         sock.listen(1)
144         while not self.shared.stopped():
145             try:
146                 session = TcpSession(*sock.accept(), use_ssl=self.use_ssl, ssl_certfile=self.ssl_certfile, ssl_keyfile=self.ssl_keyfile)
147             except BaseException, e:
148                 error = str(e)
149                 print_log("cannot start TCP session", error)
150                 time.sleep(0.1)
151                 continue
152             self.dispatcher.add_session(session)
153             self.dispatcher.collect_garbage()
154             client_req = TcpClientRequestor(self.dispatcher, session)
155             client_req.start()