8f0c9364ecd40a2449a879037aa6bf3ba8253271
[electrum-server.git] / transports / stratum_tcp.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 from processor import Session, Dispatcher, timestr
8
9 class TcpSession(Session):
10
11     def __init__(self, connection, address):
12         Session.__init__(self)
13         self._connection = connection
14         self.address = address[0]
15         self.name = "TCP session"
16
17     def connection(self):
18         if self.stopped():
19             raise Exception("Session was stopped")
20         else:
21             return self._connection
22
23     def stop(self):
24         self._connection.close()
25         #print "Terminating connection:", self.address
26         with self.lock:
27             self._stopped = True
28
29     def send_response(self, response):
30         data = json.dumps(response) + "\n"
31         # Possible race condition here by having session
32         # close connection?
33         # I assume Python connections are thread safe interfaces
34         try:
35             connection = self.connection()
36             while data:
37                 l = connection.send(data)
38                 data = data[l:]
39         except:
40             self.stop()
41
42
43
44 class TcpClientRequestor(threading.Thread):
45
46     def __init__(self, dispatcher, session):
47         self.shared = dispatcher.shared
48         self.dispatcher = dispatcher
49         self.message = ""
50         self.session = session
51         threading.Thread.__init__(self)
52
53     def run(self):
54         while not self.shared.stopped():
55             if not self.update():
56                 break
57
58             while self.parse():
59                 pass
60
61     def update(self):
62         data = self.receive()
63         if not data:
64             # close_session
65             self.session.stop()
66             return False
67
68         self.message += data
69         return True
70
71     def receive(self):
72         try:
73             return self.session.connection().recv(2048)
74         except:
75             return ''
76
77     def parse(self):
78         raw_buffer = self.message.find('\n')
79         if raw_buffer == -1:
80             return False
81
82         raw_command = self.message[0:raw_buffer].strip()
83         self.message = self.message[raw_buffer + 1:]
84         if raw_command == 'quit': 
85             self.session.stop()
86             return False
87
88         try:
89             command = json.loads(raw_command)
90         except:
91             self.dispatcher.push_response({"error": "bad JSON", "request": raw_command})
92             return True
93
94         try:
95             # Try to load vital fields, and return an error if
96             # unsuccessful.
97             message_id = command['id']
98             method = command['method']
99         except KeyError:
100             # Return an error JSON in response.
101             self.dispatcher.push_response({"error": "syntax error", "request": raw_command})
102         else:
103             self.dispatcher.push_request(self.session,command)
104
105         return True
106
107 class TcpServer(threading.Thread):
108
109     def __init__(self, dispatcher, host, port):
110         self.shared = dispatcher.shared
111         self.dispatcher = dispatcher.request_dispatcher
112         threading.Thread.__init__(self)
113         self.daemon = True
114         self.host = host
115         self.port = port
116         self.lock = threading.Lock()
117
118     def run(self):
119         print "TCP server started."
120         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
121         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
122         sock.bind((self.host, self.port))
123         sock.listen(1)
124         while not self.shared.stopped():
125             session = TcpSession(*sock.accept())
126             self.dispatcher.add_session(session)
127             self.dispatcher.collect_garbage()
128             client_req = TcpClientRequestor(self.dispatcher, session)
129             client_req.start()
130