abstract services from processor; define server.peers as a service
[electrum-server.git] / stratum.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 class Processor(threading.Thread):
8
9     def __init__(self):
10         self.shared = None
11         threading.Thread.__init__(self)
12         self.daemon = True
13         self.request_queue = queue.Queue()
14         self.response_queue = queue.Queue()
15         self.internal_ids = {}
16         self.internal_id = 0
17         self.lock = threading.Lock()
18
19     def push_response(self, item):
20         self.response_queue.put(item)
21
22     def pop_response(self):
23         return self.response_queue.get()
24
25     def push_request(self, session, item):
26         self.request_queue.put((session,item))
27
28     def pop_request(self):
29         return self.request_queue.get()
30
31     def get_session_id(self, internal_id):
32         with self.lock:
33             return session_ids.pop(internal_id)
34
35     def store_session_id(self, session, msgid):
36         with self.lock:
37             self.internal_ids[self.internal_id] = session, msgid
38             self.internal_id += 1
39
40     def run(self):
41         if self.shared is None:
42             raise TypeError("self.shared not set in Processor")
43         while not self.shared.stopped():
44             session, request = self.pop_request()
45
46             method = request['method']
47             params = request.get('params',[])
48
49             if method in [ 'numblocks.subscribe', 'address.subscribe', 'server.peers']:
50                 session.subscribe_to_service(method, params)
51
52             # store session and id locally
53             request['id'] = self.store_session_id(session, request['id'])
54             self.process(request)
55
56         self.stop()
57
58     def stop(self):
59         pass
60
61     def process(self, request):
62         print "New request", request
63         # Do stuff...
64         # response = request
65         # When ready, you call
66         # self.push_response(response)
67
68
69
70 class Session:
71
72     def __init__(self, connection, address):
73         self._connection = connection
74         self.address = address
75         self._stopped = False
76         self.lock = threading.Lock()
77         self.subscriptions = []
78         print "new session", address
79
80     def stop(self):
81         self._connection.close()
82         print "Terminating connection:", self.address[0]
83         with self.lock:
84             self._stopped = True
85
86     def stopped(self):
87         with self.lock:
88             return self._stopped
89
90     def connection(self):
91         if self.stopped():
92             raise Exception("Session was stopped")
93         else:
94             return self._connection
95
96     def subscribe_to_service(self, method, params):
97         with self.lock:
98             self.subscriptions.append((method, params))
99     
100
101
102 class TcpResponder(threading.Thread):
103
104     def __init__(self, shared, processor, server):
105         self.shared = shared
106         self.processor = processor
107         self.server = server
108         threading.Thread.__init__(self)
109
110
111     def run(self):
112         while not self.shared.stopped():
113             response = self.processor.pop_response()
114
115             internal_id = response.get('id')
116             params = response.get('params',[])
117             try:
118                 method = response['method']
119             except:
120                 print "no method", response
121                 continue
122
123             if internal_id:
124                 session, message_id = self.processor.get_session_id(internal_id)
125                 if message_id:
126                     response['id'] = message_id
127                 self.send_response(response, session)
128
129             else:
130                 for session in self.server.sessions:
131                     if not session.stopped():
132                         if (method,params) in session.subscriptions:
133                             self.send_response(response, session)
134
135
136
137     def send_response(self, response, session):
138         raw_response = json.dumps(response)
139         # Possible race condition here by having session
140         # close connection?
141         # I assume Python connections are thread safe interfaces
142         try:
143             connection = session.connection()
144             connection.send(raw_response + "\n")
145         except:
146             session.stop()
147
148 class TcpClientRequestor(threading.Thread):
149
150     def __init__(self, shared, processor, session):
151         self.shared = shared
152         self.processor = processor
153         self.message = ""
154         self.session = session
155         threading.Thread.__init__(self)
156
157     def run(self):
158         while not self.shared.stopped():
159             if not self.update():
160                 break
161
162             while self.parse():
163                 pass
164
165     def update(self):
166         data = self.receive()
167         if not data:
168             # close_session
169             self.session.stop()
170             return False
171
172         self.message += data
173         return True
174
175     def receive(self):
176         try:
177             return self.session.connection().recv(1024)
178         except socket.error:
179             return ''
180
181     def parse(self):
182         raw_buffer = self.message.find('\n')
183         if raw_buffer == -1:
184             return False
185
186         raw_command = self.message[0:raw_buffer].strip()
187         self.message = self.message[raw_buffer + 1:]
188         if raw_command == 'quit': 
189             self.session.stop()
190             return False
191
192         try:
193             command = json.loads(raw_command)
194         except:
195             self.processor.push_response({"error": "bad JSON", "request": raw_command})
196             return True
197
198         try:
199             # Try to load vital fields, and return an error if
200             # unsuccessful.
201             message_id = command['id']
202             method = command['method']
203         except KeyError:
204             # Return an error JSON in response.
205             self.processor.push_response({"error": "syntax error", "request": raw_command})
206         else:
207             self.processor.push_request(self.session,command)
208
209         return True
210
211 class TcpServer(threading.Thread):
212
213     def __init__(self, shared, processor, host, port):
214         self.shared = shared
215         self.processor = processor
216         self.sessions = []
217         threading.Thread.__init__(self)
218         self.daemon = True
219         self.host = host
220         self.port = port
221         self.lock = threading.Lock()
222
223     def run(self):
224         print "TCP server started."
225         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
226         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
227         sock.bind((self.host, self.port))
228         sock.listen(1)
229         responder = TcpResponder(self.shared, self.processor, self)
230         responder.start()
231         while not self.shared.stopped():
232             session = Session(*sock.accept())
233             client_req = TcpClientRequestor(self.shared, self.processor, session)
234             client_req.start()
235             self.add_session(session)
236             self.collect_garbage()
237
238     def add_session(self, session):
239         with self.lock:
240             self.sessions.append(session)
241
242     def collect_garbage(self):
243         # Deep copy entire sessions list and blank it
244         # This is done to minimise lock contention
245         with self.lock:
246             sessions = self.sessions[:]
247             self.sessions = []
248         for session in sessions:
249             if not session.stopped():
250                 # If session is still alive then re-add it back
251                 # to our internal register
252                 self.add_session(session)
253
254
255 class Shared:
256
257     def __init__(self):
258         self.lock = threading.Lock()
259         self._stopped = False
260
261     def stop(self):
262         print "Stopping Stratum"
263         with self.lock:
264             self._stopped = True
265
266     def stopped(self):
267         with self.lock:
268             return self._stopped
269
270 class Stratum:
271
272     def start(self, processor):
273         shared = Shared()
274         # Bind shared to processor since constructor is user defined
275         processor.shared = shared
276         processor.start()
277         # Create various transports we need
278         transports = TcpServer(shared, processor, "176.31.24.241", 50001),
279         for server in transports:
280             server.start()
281         while not shared.stopped():
282             if raw_input() == "quit":
283                 shared.stop()
284             time.sleep(1)
285
286 if __name__ == "__main__":
287     processor = Processor()
288     app = Stratum()
289     app.start(processor)
290