ab6c5c3bf948f8ee7d3af7fbd47d0b3480ae0c69
[electrum-server.git] / transports / stratum_tcp.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 from processor import Session, Dispatcher
8
9 class TcpSession(Session):
10
11     def __init__(self, connection, address):
12         self._connection = connection
13         self.address = address
14         Session.__init__(self)
15         print "New session", address
16
17     def connection(self):
18         if self.stopped():
19             raise Exception("Session was stopped")
20         else:
21             return self._connection
22
23     def stop(self):
24         self._connection.close()
25         print "Terminating connection:", self.address[0]
26         with self.lock:
27             self._stopped = True
28
29     def send_response(self, response):
30         raw_response = json.dumps(response)
31         # Possible race condition here by having session
32         # close connection?
33         # I assume Python connections are thread safe interfaces
34         try:
35             connection = self.connection()
36             connection.send(raw_response + "\n")
37         except:
38             self.stop()
39
40
41
42 class TcpClientRequestor(threading.Thread):
43
44     def __init__(self, dispatcher, session):
45         self.shared = dispatcher.shared
46         self.dispatcher = dispatcher
47         self.message = ""
48         self.session = session
49         threading.Thread.__init__(self)
50
51     def run(self):
52         while not self.shared.stopped():
53             if not self.update():
54                 break
55
56             while self.parse():
57                 pass
58
59     def update(self):
60         data = self.receive()
61         if not data:
62             # close_session
63             self.session.stop()
64             return False
65
66         self.message += data
67         return True
68
69     def receive(self):
70         try:
71             return self.session.connection().recv(1024)
72         except:
73             return ''
74
75     def parse(self):
76         raw_buffer = self.message.find('\n')
77         if raw_buffer == -1:
78             return False
79
80         raw_command = self.message[0:raw_buffer].strip()
81         self.message = self.message[raw_buffer + 1:]
82         if raw_command == 'quit': 
83             self.session.stop()
84             return False
85
86         try:
87             command = json.loads(raw_command)
88         except:
89             self.dispatcher.push_response({"error": "bad JSON", "request": raw_command})
90             return True
91
92         try:
93             # Try to load vital fields, and return an error if
94             # unsuccessful.
95             message_id = command['id']
96             method = command['method']
97         except KeyError:
98             # Return an error JSON in response.
99             self.dispatcher.push_response({"error": "syntax error", "request": raw_command})
100         else:
101             self.dispatcher.push_request(self.session,command)
102
103         return True
104
105 class TcpServer(threading.Thread):
106
107     def __init__(self, dispatcher, host, port):
108         self.shared = dispatcher.shared
109         self.dispatcher = dispatcher.request_dispatcher
110         threading.Thread.__init__(self)
111         self.daemon = True
112         self.host = host
113         self.port = port
114         self.lock = threading.Lock()
115
116     def run(self):
117         print "TCP server started."
118         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
119         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
120         sock.bind((self.host, self.port))
121         sock.listen(1)
122         while not self.shared.stopped():
123             session = TcpSession(*sock.accept())
124             client_req = TcpClientRequestor(self.dispatcher, session)
125             client_req.start()
126             self.dispatcher.add_session(session)
127             self.dispatcher.collect_garbage()
128
129
130
131