649987ab4a9c4b36f9d9db155baa7dd76570206b
[electrum-server.git] / stratum.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 class Processor(threading.Thread):
8
9     def __init__(self):
10         self.shared = None
11         threading.Thread.__init__(self)
12         self.daemon = True
13         self.request_queue = queue.Queue()
14         self.response_queue = queue.Queue()
15         self.internal_ids = {}
16         self.internal_id = 1
17         self.lock = threading.Lock()
18
19     def push_response(self, item):
20         self.response_queue.put(item)
21
22     def pop_response(self):
23         return self.response_queue.get()
24
25     def push_request(self, session, item):
26         self.request_queue.put((session,item))
27
28     def pop_request(self):
29         return self.request_queue.get()
30
31     def get_session_id(self, internal_id):
32         with self.lock:
33             return self.internal_ids.pop(internal_id)
34
35     def store_session_id(self, session, msgid):
36         with self.lock:
37             self.internal_ids[self.internal_id] = session, msgid
38             r = self.internal_id
39             self.internal_id += 1
40             return r
41
42     def run(self):
43         if self.shared is None:
44             raise TypeError("self.shared not set in Processor")
45         while not self.shared.stopped():
46             session, request = self.pop_request()
47
48             method = request['method']
49             params = request.get('params',[])
50
51             if method in [ 'numblocks.subscribe', 'address.subscribe', 'server.peers']:
52                 session.subscribe_to_service(method, params)
53
54             # store session and id locally
55             request['id'] = self.store_session_id(session, request['id'])
56             self.process(request)
57
58         self.stop()
59
60     def stop(self):
61         pass
62
63     def process(self, request):
64         print "New request", request
65         # Do stuff...
66         # response = request
67         # When ready, you call
68         # self.push_response(response)
69
70
71
72 class Session:
73
74     def __init__(self, connection, address):
75         self._connection = connection
76         self.address = address
77         self._stopped = False
78         self.lock = threading.Lock()
79         self.subscriptions = []
80         print "new session", address
81
82     def stop(self):
83         self._connection.close()
84         print "Terminating connection:", self.address[0]
85         with self.lock:
86             self._stopped = True
87
88     def stopped(self):
89         with self.lock:
90             return self._stopped
91
92     def connection(self):
93         if self.stopped():
94             raise Exception("Session was stopped")
95         else:
96             return self._connection
97
98     def subscribe_to_service(self, method, params):
99         with self.lock:
100             self.subscriptions.append((method, params))
101     
102
103
104 class TcpResponder(threading.Thread):
105
106     def __init__(self, shared, processor, server):
107         self.shared = shared
108         self.processor = processor
109         self.server = server
110         threading.Thread.__init__(self)
111
112
113     def run(self):
114         while not self.shared.stopped():
115             response = self.processor.pop_response()
116             internal_id = response.get('id')
117             params = response.get('params',[])
118             try:
119                 method = response['method']
120             except:
121                 print "no method", response
122                 continue
123
124             if internal_id:
125                 session, message_id = self.processor.get_session_id(internal_id)
126                 response['id'] = message_id
127                 self.send_response(response, session)
128
129             else:
130                 for session in self.server.sessions:
131                     if not session.stopped():
132                         if (method,params) in session.subscriptions:
133                             self.send_response(response, session)
134
135
136
137     def send_response(self, response, session):
138         raw_response = json.dumps(response)
139         # Possible race condition here by having session
140         # close connection?
141         # I assume Python connections are thread safe interfaces
142         try:
143             connection = session.connection()
144             connection.send(raw_response + "\n")
145         except:
146             session.stop()
147
148 class TcpClientRequestor(threading.Thread):
149
150     def __init__(self, shared, processor, session):
151         self.shared = shared
152         self.processor = processor
153         self.message = ""
154         self.session = session
155         threading.Thread.__init__(self)
156
157     def run(self):
158         while not self.shared.stopped():
159             if not self.update():
160                 break
161
162             while self.parse():
163                 pass
164
165     def update(self):
166         data = self.receive()
167         if not data:
168             # close_session
169             self.session.stop()
170             return False
171
172         self.message += data
173         return True
174
175     def receive(self):
176         try:
177             return self.session.connection().recv(1024)
178         except:
179             return ''
180
181     def parse(self):
182         raw_buffer = self.message.find('\n')
183         if raw_buffer == -1:
184             return False
185
186         raw_command = self.message[0:raw_buffer].strip()
187         self.message = self.message[raw_buffer + 1:]
188         if raw_command == 'quit': 
189             self.session.stop()
190             return False
191
192         try:
193             command = json.loads(raw_command)
194         except:
195             self.processor.push_response({"error": "bad JSON", "request": raw_command})
196             return True
197
198         try:
199             # Try to load vital fields, and return an error if
200             # unsuccessful.
201             message_id = command['id']
202             method = command['method']
203         except KeyError:
204             # Return an error JSON in response.
205             self.processor.push_response({"error": "syntax error", "request": raw_command})
206         else:
207             self.processor.push_request(self.session,command)
208
209         return True
210
211 class TcpServer(threading.Thread):
212
213     def __init__(self, shared, processor, host, port):
214         self.shared = shared
215         self.processor = processor
216         self.sessions = []
217         threading.Thread.__init__(self)
218         self.daemon = True
219         self.host = host
220         self.port = port
221         self.lock = threading.Lock()
222
223     def run(self):
224         print "TCP server started."
225         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
226         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
227         sock.bind((self.host, self.port))
228         sock.listen(1)
229         responder = TcpResponder(self.shared, self.processor, self)
230         responder.start()
231         while not self.shared.stopped():
232             session = Session(*sock.accept())
233             client_req = TcpClientRequestor(self.shared, self.processor, session)
234             client_req.start()
235             self.add_session(session)
236             self.collect_garbage()
237
238     def add_session(self, session):
239         with self.lock:
240             self.sessions.append(session)
241
242     def collect_garbage(self):
243         # Deep copy entire sessions list and blank it
244         # This is done to minimise lock contention
245         with self.lock:
246             sessions = self.sessions[:]
247             self.sessions = []
248         for session in sessions:
249             if not session.stopped():
250                 # If session is still alive then re-add it back
251                 # to our internal register
252                 self.add_session(session)
253
254
255 class Shared:
256
257     def __init__(self):
258         self.lock = threading.Lock()
259         self._stopped = False
260
261     def stop(self):
262         print "Stopping Stratum"
263         with self.lock:
264             self._stopped = True
265
266     def stopped(self):
267         with self.lock:
268             return self._stopped
269
270 class Stratum:
271
272     def start(self, processor):
273         shared = Shared()
274         # Bind shared to processor since constructor is user defined
275         processor.shared = shared
276         processor.start()
277         # Create various transports we need
278         transports = TcpServer(shared, processor, "176.31.24.241", 50001),
279         for server in transports:
280             server.start()
281         while not shared.stopped():
282             if raw_input() == "quit":
283                 shared.stop()
284             time.sleep(1)
285
286 if __name__ == "__main__":
287     processor = Processor()
288     app = Stratum()
289     app.start(processor)
290