server-side support for ssl
[electrum-server.git] / transports / stratum_tcp.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 from processor import Session, Dispatcher, timestr
8
9 class TcpSession(Session):
10
11     def __init__(self, connection, address, use_ssl, ssl_certfile, ssl_keyfile):
12         Session.__init__(self)
13         print connection, address, use_ssl
14         if use_ssl:
15             import ssl
16             self._connection = ssl.wrap_socket(
17                 connection,
18                 server_side=True,
19                 certfile=ssl_certfile,
20                 keyfile=ssl_keyfile,
21                 ssl_version=ssl.PROTOCOL_TLSv1)
22         else:
23             self._connection = connection
24
25         self.address = address[0]
26         self.name = "TCP"
27
28     def connection(self):
29         if self.stopped():
30             raise Exception("Session was stopped")
31         else:
32             return self._connection
33
34     def stop(self):
35         self._connection.close()
36         #print "Terminating connection:", self.address
37         with self.lock:
38             self._stopped = True
39
40     def send_response(self, response):
41         data = json.dumps(response) + "\n"
42         # Possible race condition here by having session
43         # close connection?
44         # I assume Python connections are thread safe interfaces
45         try:
46             connection = self.connection()
47             while data:
48                 l = connection.send(data)
49                 data = data[l:]
50         except:
51             self.stop()
52
53
54
55 class TcpClientRequestor(threading.Thread):
56
57     def __init__(self, dispatcher, session):
58         self.shared = dispatcher.shared
59         self.dispatcher = dispatcher
60         self.message = ""
61         self.session = session
62         threading.Thread.__init__(self)
63
64     def run(self):
65         while not self.shared.stopped():
66             if not self.update():
67                 break
68
69             self.session.time = time.time()
70
71             while self.parse():
72                 pass
73
74     def update(self):
75         data = self.receive()
76         if not data:
77             # close_session
78             self.session.stop()
79             return False
80
81         self.message += data
82         return True
83
84     def receive(self):
85         try:
86             return self.session.connection().recv(2048)
87         except:
88             return ''
89
90     def parse(self):
91         raw_buffer = self.message.find('\n')
92         if raw_buffer == -1:
93             return False
94
95         raw_command = self.message[0:raw_buffer].strip()
96         self.message = self.message[raw_buffer + 1:]
97         if raw_command == 'quit': 
98             self.session.stop()
99             return False
100
101         try:
102             command = json.loads(raw_command)
103         except:
104             self.dispatcher.push_response({"error": "bad JSON", "request": raw_command})
105             return True
106
107         try:
108             # Try to load vital fields, and return an error if
109             # unsuccessful.
110             message_id = command['id']
111             method = command['method']
112         except KeyError:
113             # Return an error JSON in response.
114             self.dispatcher.push_response({"error": "syntax error", "request": raw_command})
115         else:
116             self.dispatcher.push_request(self.session,command)
117
118         return True
119
120 class TcpServer(threading.Thread):
121
122     def __init__(self, dispatcher, host, port, use_ssl, ssl_certfile, ssl_keyfile):
123         self.shared = dispatcher.shared
124         self.dispatcher = dispatcher.request_dispatcher
125         threading.Thread.__init__(self)
126         self.daemon = True
127         self.host = host
128         self.port = port
129         self.lock = threading.Lock()
130         self.use_ssl = use_ssl
131         self.ssl_keyfile = ssl_keyfile
132         self.ssl_certfile = ssl_certfile
133
134     def run(self):
135         print "TCP server started.", self.use_ssl
136         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
137         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
138         sock.bind((self.host, self.port))
139         sock.listen(1)
140         while not self.shared.stopped():
141             session = TcpSession(*sock.accept(), use_ssl=self.use_ssl, ssl_certfile=self.ssl_certfile, ssl_keyfile=self.ssl_keyfile)
142             self.dispatcher.add_session(session)
143             self.dispatcher.collect_garbage()
144             client_req = TcpClientRequestor(self.dispatcher, session)
145             client_req.start()
146