add two update methods for processor
[electrum-server.git] / stratum.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 class Processor(threading.Thread):
8
9     def __init__(self):
10         self.shared = None
11         self.lock = threading.Lock()
12         self.sessions = []
13         threading.Thread.__init__(self)
14         self.daemon = True
15         self.request_queue = queue.Queue()
16         self.response_queue = queue.Queue()
17
18     def add_session(self, session):
19         with self.lock:
20             self.sessions.append(session)
21
22     def push_response(self, session, item):
23         self.response_queue.put((session,item))
24
25     def pop_response(self):
26         return self.response_queue.get()
27
28     def push_request(self, session, item):
29         self.request_queue.put((session,item))
30
31     def pop_request(self):
32         return self.request_queue.get()
33
34     def run(self):
35         if self.shared is None:
36             raise TypeError("self.shared not set in Processor")
37         while not self.shared.stopped():
38             # Deep copy entire sessions list and blank it
39             # This is done to minimise lock contention
40             with self.lock:
41                 sessions = self.sessions[:]
42                 self.sessions = []
43             for session in sessions:
44                 if not session.stopped():
45                     # If session is still alive then re-add it back
46                     # to our internal register
47                     self.add_session(session)
48
49             session, request = self.pop_request()
50             self.process(session, request)
51
52         self.stop()
53
54     def stop(self):
55         pass
56
57     def process(self, session, request):
58         print "New request", request
59         # Do stuff...
60         # response = request
61         # When ready, you call
62         # self.push_response(session,response)
63
64     def update_from_blocknum(self,block_number):
65         for session in self.sessions:
66             if session.numblocks_sub is not None:
67                 response = { 'id':session.numblocks_sub, 'result':block_number }
68                 self.push_response(session,response)
69
70     def update_from_address(self,addr):
71         for session in self.sessions:
72             m = session.addresses_sub.get(addr)
73             if m:
74                 status = self.get_status( addr )
75                 message_id, last_status = m
76                 if status != last_status:
77                     session.subscribe_to_address(message_id, status)
78                     response = { 'id':message_id, 'result':status }
79                     self.push_response(session,response)
80
81     def get_status(self,addr):
82         # return status of an address
83         # return store.get_status(addr)
84         pass
85
86
87 class Session:
88
89     def __init__(self, connection, address):
90         self._connection = connection
91         self.address = address
92         self._stopped = False
93         self.lock = threading.Lock()
94         self.numblocks_sub = None
95         self.addresses_sub = {}
96         print "new session", address
97
98     def stop(self):
99         self._connection.close()
100         print "Terminating connection:", self.address[0]
101         with self.lock:
102             self._stopped = True
103
104     def stopped(self):
105         with self.lock:
106             return self._stopped
107
108     def connection(self):
109         if self.stopped():
110             raise Exception("Session was stopped")
111         else:
112             return self._connection
113
114     def subscribe_to_numblocks(self,message_id):
115         with self.lock:
116             self.numblocks_sub = message_id
117     
118     def subscribe_to_address(self,address,message_id,status):
119         with self.lock:
120             self.addresses_sub[address] = message_id,status
121
122
123 class TcpResponder(threading.Thread):
124
125     def __init__(self, shared, processor):
126         self.shared = shared
127         self.processor = processor
128         threading.Thread.__init__(self)
129
130     def run(self):
131         while not self.shared.stopped():
132             session,response = self.processor.pop_response()
133             raw_response = json.dumps(response)
134             # Possible race condition here by having session
135             # close connection?
136             # I assume Python connections are thread safe interfaces
137             try:
138                 connection = session.connection()
139                 connection.send(raw_response + "\n")
140             except:
141                 session.stop()
142
143 class TcpClientRequestor(threading.Thread):
144
145     def __init__(self, shared, processor, session):
146         self.shared = shared
147         self.processor = processor
148         self.message = ""
149         self.session = session
150         threading.Thread.__init__(self)
151
152     def run(self):
153         while not self.shared.stopped():
154             if not self.update():
155                 break
156
157             while self.parse():
158                 pass
159
160     def update(self):
161         data = self.receive()
162         if not data:
163             # close_session
164             self.session.stop()
165             return False
166
167         self.message += data
168         return True
169
170     def receive(self):
171         try:
172             return self.session.connection().recv(1024)
173         except socket.error:
174             return ''
175
176     def parse(self):
177         raw_buffer = self.message.find('\n')
178         if raw_buffer == -1:
179             return False
180
181         raw_command = self.message[0:raw_buffer].strip()
182         self.message = self.message[raw_buffer + 1:]
183         if raw_command == 'quit': 
184             self.session.stop()
185             return False
186
187         try:
188             command = json.loads(raw_command)
189         except:
190             self.processor.push_response(self.session,
191                 {"error": "bad JSON", "request": raw_command})
192             return True
193
194         try:
195             # Try to load vital fields, and return an error if
196             # unsuccessful.
197             message_id = command['id']
198             method = command['method']
199         except KeyError:
200             # Return an error JSON in response.
201             self.processor.push_response(self.session,
202                 {"error": "syntax error", "request": raw_command})
203         else:
204             self.processor.push_request(self.session,command)
205
206         return True
207
208 class TcpServer(threading.Thread):
209
210     def __init__(self, shared, processor, host, port):
211         self.shared = shared
212         self.processor = processor
213         self.clients = []
214         threading.Thread.__init__(self)
215         self.daemon = True
216         self.host = host
217         self.port = port
218
219     def run(self):
220         print "TCP server started."
221         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
222         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
223         sock.bind((self.host, self.port))
224         sock.listen(1)
225         responder = TcpResponder(self.shared, self.processor)
226         responder.start()
227         while not self.shared.stopped():
228             session = Session(*sock.accept())
229             client_req = TcpClientRequestor(self.shared, self.processor, session)
230             client_req.start()
231             self.processor.add_session(session)
232
233 class Shared:
234
235     def __init__(self):
236         self.lock = threading.Lock()
237         self._stopped = False
238
239     def stop(self):
240         print "Stopping Stratum"
241         with self.lock:
242             self._stopped = True
243
244     def stopped(self):
245         with self.lock:
246             return self._stopped
247
248 class Stratum:
249
250     def start(self, processor):
251         shared = Shared()
252         # Bind shared to processor since constructor is user defined
253         processor.shared = shared
254         processor.start()
255         # Create various transports we need
256         transports = TcpServer(shared, processor, "176.31.24.241", 50001),
257         for server in transports:
258             server.start()
259         while not shared.stopped():
260             if raw_input() == "quit":
261                 shared.stop()
262             time.sleep(1)
263
264 if __name__ == "__main__":
265     processor = Processor()
266     app = Stratum()
267     app.start(processor)
268