new workflow
[electrum-server.git] / stratum.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 class Processor(threading.Thread):
8
9     def __init__(self):
10         self.shared = None
11         threading.Thread.__init__(self)
12         self.daemon = True
13         self.request_queue = queue.Queue()
14         self.response_queue = queue.Queue()
15         self.id_session = {}
16
17     def push_response(self, item):
18         self.response_queue.put(item)
19
20     def pop_response(self):
21         return self.response_queue.get()
22
23     def push_request(self, session, item):
24         self.request_queue.put((session,item))
25
26     def pop_request(self):
27         return self.request_queue.get()
28
29     def run(self):
30         if self.shared is None:
31             raise TypeError("self.shared not set in Processor")
32         while not self.shared.stopped():
33             session, request = self.pop_request()
34
35             method = request['method']
36             params = request.get('params',[])
37
38             if method == 'numblocks.subscribe':
39                 session.subscribe_to_numblocks()
40
41             elif method == 'address.subscribe':
42                 address = params[0]
43                 session.subscribe_to_address(address)
44
45             elif method == 'server.peers':
46                 session.subscribe_to_peers()
47
48             message_id = request['id']
49             self.id_session[message_id] = session
50             self.process(request)
51
52         self.stop()
53
54     def stop(self):
55         pass
56
57     def process(self, request):
58         print "New request", request
59         # Do stuff...
60         # response = request
61         # When ready, you call
62         # self.push_response(response)
63
64
65
66 class Session:
67
68     def __init__(self, connection, address):
69         self._connection = connection
70         self.address = address
71         self._stopped = False
72         self.lock = threading.Lock()
73         self.numblocks_sub = None
74         self.addresses_sub = {}
75         print "new session", address
76
77     def stop(self):
78         self._connection.close()
79         print "Terminating connection:", self.address[0]
80         with self.lock:
81             self._stopped = True
82
83     def stopped(self):
84         with self.lock:
85             return self._stopped
86
87     def connection(self):
88         if self.stopped():
89             raise Exception("Session was stopped")
90         else:
91             return self._connection
92
93     def subscribe_to_numblocks(self):
94         with self.lock:
95             self.numblocks_sub = True
96     
97     def subscribe_to_peers(self):
98         pass
99
100     def subscribe_to_address(self,address):
101         with self.lock:
102             self.addresses_sub[address] = 'unknown'
103
104
105 class TcpResponder(threading.Thread):
106
107     def __init__(self, shared, processor, server):
108         self.shared = shared
109         self.processor = processor
110         self.server = server
111         threading.Thread.__init__(self)
112
113
114     def run(self):
115         while not self.shared.stopped():
116             response = self.processor.pop_response()
117             # if it is a subscription, find the list of sessions that suuscribed
118             
119             # if there is an id, there should be a session
120             # note: I must add and remove the session id to the message id..
121
122             message_id = response.get('id')
123             try:
124                 method = response['method']
125             except:
126                 print "no method", response
127                 continue
128
129             if message_id:
130                 session = self.processor.id_session.pop(message_id)
131                 self.send_response(response, session)
132
133             elif method == 'numblocks.subscribe':
134                 for session in self.server.sessions:
135                     if not session.stopped():
136                         if session.numblocks_sub:
137                             self.send_response(response, session)
138
139             elif method == 'address.subscribe':
140                 for session in self.server.sessions:
141                     if not session.stopped():
142                         addr = response['params'][0]
143                         last_status = session.addresses_sub.get(addr)
144                         if last_status:
145                             new_status = response.get('result')
146                             if new_status != last_status:
147                                 session.addresses_sub[addr] = new_status
148                                 self.send_response(response, session)
149             else:
150                 print "error", response
151
152
153     def send_response(self, response, session):
154         raw_response = json.dumps(response)
155         # Possible race condition here by having session
156         # close connection?
157         # I assume Python connections are thread safe interfaces
158         try:
159             connection = session.connection()
160             connection.send(raw_response + "\n")
161         except:
162             session.stop()
163
164 class TcpClientRequestor(threading.Thread):
165
166     def __init__(self, shared, processor, session):
167         self.shared = shared
168         self.processor = processor
169         self.message = ""
170         self.session = session
171         threading.Thread.__init__(self)
172
173     def run(self):
174         while not self.shared.stopped():
175             if not self.update():
176                 break
177
178             while self.parse():
179                 pass
180
181     def update(self):
182         data = self.receive()
183         if not data:
184             # close_session
185             self.session.stop()
186             return False
187
188         self.message += data
189         return True
190
191     def receive(self):
192         try:
193             return self.session.connection().recv(1024)
194         except socket.error:
195             return ''
196
197     def parse(self):
198         raw_buffer = self.message.find('\n')
199         if raw_buffer == -1:
200             return False
201
202         raw_command = self.message[0:raw_buffer].strip()
203         self.message = self.message[raw_buffer + 1:]
204         if raw_command == 'quit': 
205             self.session.stop()
206             return False
207
208         try:
209             command = json.loads(raw_command)
210         except:
211             self.processor.push_response({"error": "bad JSON", "request": raw_command})
212             return True
213
214         try:
215             # Try to load vital fields, and return an error if
216             # unsuccessful.
217             message_id = command['id']
218             method = command['method']
219         except KeyError:
220             # Return an error JSON in response.
221             self.processor.push_response({"error": "syntax error", "request": raw_command})
222         else:
223             self.processor.push_request(self.session,command)
224
225         return True
226
227 class TcpServer(threading.Thread):
228
229     def __init__(self, shared, processor, host, port):
230         self.shared = shared
231         self.processor = processor
232         self.sessions = []
233         threading.Thread.__init__(self)
234         self.daemon = True
235         self.host = host
236         self.port = port
237         self.lock = threading.Lock()
238
239     def run(self):
240         print "TCP server started."
241         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
242         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
243         sock.bind((self.host, self.port))
244         sock.listen(1)
245         responder = TcpResponder(self.shared, self.processor, self)
246         responder.start()
247         while not self.shared.stopped():
248             session = Session(*sock.accept())
249             client_req = TcpClientRequestor(self.shared, self.processor, session)
250             client_req.start()
251             self.add_session(session)
252             self.collect_garbage()
253
254     def add_session(self, session):
255         with self.lock:
256             self.sessions.append(session)
257
258     def collect_garbage(self):
259         # Deep copy entire sessions list and blank it
260         # This is done to minimise lock contention
261         with self.lock:
262             sessions = self.sessions[:]
263             self.sessions = []
264         for session in sessions:
265             if not session.stopped():
266                 # If session is still alive then re-add it back
267                 # to our internal register
268                 self.add_session(session)
269
270
271 class Shared:
272
273     def __init__(self):
274         self.lock = threading.Lock()
275         self._stopped = False
276
277     def stop(self):
278         print "Stopping Stratum"
279         with self.lock:
280             self._stopped = True
281
282     def stopped(self):
283         with self.lock:
284             return self._stopped
285
286 class Stratum:
287
288     def start(self, processor):
289         shared = Shared()
290         # Bind shared to processor since constructor is user defined
291         processor.shared = shared
292         processor.start()
293         # Create various transports we need
294         transports = TcpServer(shared, processor, "176.31.24.241", 50001),
295         for server in transports:
296             server.start()
297         while not shared.stopped():
298             if raw_input() == "quit":
299                 shared.stop()
300             time.sleep(1)
301
302 if __name__ == "__main__":
303     processor = Processor()
304     app = Stratum()
305     app.start(processor)
306