Return JSONs on error scenarios instead of polluting stdout :)
[electrum-server.git] / stratum.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 class Processor(threading.Thread):
8
9     def __init__(self):
10         self.shared = None
11         self.lock = threading.Lock()
12         self.sessions = []
13         threading.Thread.__init__(self)
14         self.daemon = True
15
16     def add_session(self, session):
17         with self.lock:
18             self.sessions.append(session)
19
20     def run(self):
21         if self.shared is None:
22             raise TypeError("self.shared not set in Processor")
23         while not self.shared.stopped():
24             # Deep copy entire sessions list and blank it
25             # This is done to minimise lock contention
26             with self.lock:
27                 sessions = self.sessions[:]
28                 self.sessions = []
29             for session in sessions:
30                 if not session.stopped():
31                     # If session is still alive then re-add it back
32                     # to our internal register
33                     self.add_session(session)
34                     self.process(session)
35
36     def process(self, session):
37         request = session.pop_request()
38         print "New request", request
39         # Execute and when ready, you call
40         # session.push_response(response)
41
42 class Session:
43
44     def __init__(self, connection, address):
45         self._connection = connection
46         self.address = address
47         self._stopped = False
48         self.lock = threading.Lock()
49
50         self.request_queue = queue.Queue()
51         self.response_queue = queue.Queue()
52
53     def stop(self):
54         self._connection.close()
55         print "Terminating connection:", self.address[0]
56         with self.lock:
57             self._stopped = True
58
59     def stopped(self):
60         with self.lock:
61             return self._stopped
62
63     def connection(self):
64         if self.stopped():
65             raise Exception("Session was stopped")
66         else:
67             return self._connection
68
69     def push_request(self, item):
70         self.request_queue.put(item)
71
72     def pop_request(self):
73         return self.request_queue.get()
74
75     def push_response(self, item):
76         self.response_queue.put(item)
77
78     def pop_response(self):
79         return self.response_queue.get()
80
81 class TcpClientResponder(threading.Thread):
82
83     def __init__(self, shared, session):
84         self.shared = shared
85         self.session = session
86         threading.Thread.__init__(self)
87
88     def run(self):
89         while not self.shared.stopped() or self.session.stopped():
90             response = self.session.pop_response()
91             raw_response = json.dumps(response)
92             # Possible race condition here by having session
93             # close connection?
94             # I assume Python connections are thread safe interfaces
95             connection = self.session.connection()
96             try:
97                 connection.send(raw_response + "\n")
98             except:
99                 self.session.stop()
100
101 class TcpClientRequestor(threading.Thread):
102
103     def __init__(self, shared, session):
104         self.shared = shared
105         self.message = ""
106         self.session = session
107         threading.Thread.__init__(self)
108
109     def run(self):
110         while not self.shared.stopped():
111             if not self.update():
112                 self.session.stop()
113                 return
114
115     def update(self):
116         data = self.receive()
117         if data is None:
118             # close_session
119             self.stop()
120             return False
121
122         self.message += data
123         if not self.parse():
124             return False
125         return True
126
127     def receive(self):
128         try:
129             return self.session.connection().recv(1024)
130         except socket.error:
131             return None
132
133     def parse(self):
134         while True:
135             raw_buffer = self.message.find('\n')
136             if raw_buffer == -1:
137                 return True
138
139             raw_command = self.message[0:raw_buffer].strip()
140             self.message = self.message[raw_buffer + 1:]
141             if raw_command == 'quit': 
142                 return False
143
144             try:
145                 command = json.loads(raw_command)
146             except:
147                 self.session.push_response(
148                     {"error": "bad JSON", "request": raw_command})
149                 return True
150
151             try:
152                 # Try to load vital fields, and return an error if
153                 # unsuccessful.
154                 message_id = command['id']
155                 method = command['method']
156             except KeyError:
157                 # Return an error JSON in response.
158                 self.session.push_response(
159                     {"error": "syntax error", "request": raw_command})
160             else:
161                 self.session.push_request(command)
162
163             return True
164
165 class TcpServer(threading.Thread):
166
167     def __init__(self, shared, processor):
168         self.shared = shared
169         self.processor = processor
170         self.clients = []
171         threading.Thread.__init__(self)
172         self.daemon = True
173
174     def run(self):
175         print "TCP server started."
176         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
177         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
178         sock.bind(("localhost", 50001))
179         sock.listen(1)
180         while not self.shared.stopped():
181             session = Session(*sock.accept())
182             client_req = TcpClientRequestor(self.shared, session)
183             client_req.start()
184             client_res = TcpClientResponder(self.shared, session)
185             client_res.start()
186             self.processor.add_session(session)
187
188 class Shared:
189
190     def __init__(self):
191         self.lock = threading.Lock()
192         self._stopped = False
193
194     def stop(self):
195         print "Stopping Stratum"
196         with self.lock:
197             self._stopped = True
198
199     def stopped(self):
200         with self.lock:
201             return self._stopped
202
203 class Stratum:
204
205     def start(self, processor):
206         shared = Shared()
207         # Bind shared to processor since constructor is user defined
208         processor.shared = shared
209         processor.start()
210         # Create various transports we need
211         transports = TcpServer(shared, processor),
212         for server in transports:
213             server.start()
214         while not shared.stopped():
215             if raw_input() == "quit":
216                 shared.stop()
217             time.sleep(1)
218
219 if __name__ == "__main__":
220     processor = Processor()
221     app = Stratum()
222     app.start(processor)
223