e1ebcd1ba278b3cf5f96dc694f8bc1cb1a1aa274
[electrum-server.git] / transports / stratum_tcp.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 from processor import Session, Dispatcher, timestr
8
9 class TcpSession(Session):
10
11     def __init__(self, connection, address, use_ssl, ssl_certfile, ssl_keyfile):
12         Session.__init__(self)
13         if use_ssl:
14             import ssl
15             self._connection = ssl.wrap_socket(
16                 connection,
17                 server_side=True,
18                 certfile=ssl_certfile,
19                 keyfile=ssl_keyfile,
20                 ssl_version=ssl.PROTOCOL_SSLv23)
21         else:
22             self._connection = connection
23
24         self.address = address[0]
25         self.name = "TCP"
26
27     def connection(self):
28         if self.stopped():
29             raise Exception("Session was stopped")
30         else:
31             return self._connection
32
33     def stop(self):
34         self._connection.close()
35         #print "Terminating connection:", self.address
36         with self.lock:
37             self._stopped = True
38
39     def send_response(self, response):
40         data = json.dumps(response) + "\n"
41         # Possible race condition here by having session
42         # close connection?
43         # I assume Python connections are thread safe interfaces
44         try:
45             connection = self.connection()
46             while data:
47                 l = connection.send(data)
48                 data = data[l:]
49         except:
50             self.stop()
51
52
53
54 class TcpClientRequestor(threading.Thread):
55
56     def __init__(self, dispatcher, session):
57         self.shared = dispatcher.shared
58         self.dispatcher = dispatcher
59         self.message = ""
60         self.session = session
61         threading.Thread.__init__(self)
62
63     def run(self):
64         while not self.shared.stopped():
65             if not self.update():
66                 break
67
68             self.session.time = time.time()
69
70             while self.parse():
71                 pass
72
73     def update(self):
74         data = self.receive()
75         if not data:
76             # close_session
77             self.session.stop()
78             return False
79
80         self.message += data
81         return True
82
83     def receive(self):
84         try:
85             return self.session.connection().recv(2048)
86         except:
87             return ''
88
89     def parse(self):
90         raw_buffer = self.message.find('\n')
91         if raw_buffer == -1:
92             return False
93
94         raw_command = self.message[0:raw_buffer].strip()
95         self.message = self.message[raw_buffer + 1:]
96         if raw_command == 'quit': 
97             self.session.stop()
98             return False
99
100         try:
101             command = json.loads(raw_command)
102         except:
103             self.dispatcher.push_response({"error": "bad JSON", "request": raw_command})
104             return True
105
106         try:
107             # Try to load vital fields, and return an error if
108             # unsuccessful.
109             message_id = command['id']
110             method = command['method']
111         except KeyError:
112             # Return an error JSON in response.
113             self.dispatcher.push_response({"error": "syntax error", "request": raw_command})
114         else:
115             self.dispatcher.push_request(self.session,command)
116
117         return True
118
119 class TcpServer(threading.Thread):
120
121     def __init__(self, dispatcher, host, port, use_ssl, ssl_certfile, ssl_keyfile):
122         self.shared = dispatcher.shared
123         self.dispatcher = dispatcher.request_dispatcher
124         threading.Thread.__init__(self)
125         self.daemon = True
126         self.host = host
127         self.port = port
128         self.lock = threading.Lock()
129         self.use_ssl = use_ssl
130         self.ssl_keyfile = ssl_keyfile
131         self.ssl_certfile = ssl_certfile
132
133     def run(self):
134         print "TCP server started.", self.use_ssl
135         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
136         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
137         sock.bind((self.host, self.port))
138         sock.listen(1)
139         while not self.shared.stopped():
140             session = TcpSession(*sock.accept(), use_ssl=self.use_ssl, ssl_certfile=self.ssl_certfile, ssl_keyfile=self.ssl_keyfile)
141             self.dispatcher.add_session(session)
142             self.dispatcher.collect_garbage()
143             client_req = TcpClientRequestor(self.dispatcher, session)
144             client_req.start()
145