7 class Processor(threading.Thread):
11 threading.Thread.__init__(self)
13 self.request_queue = queue.Queue()
14 self.response_queue = queue.Queue()
17 def push_response(self, item):
18 self.response_queue.put(item)
20 def pop_response(self):
21 return self.response_queue.get()
23 def push_request(self, session, item):
24 self.request_queue.put((session,item))
26 def pop_request(self):
27 return self.request_queue.get()
30 if self.shared is None:
31 raise TypeError("self.shared not set in Processor")
32 while not self.shared.stopped():
33 session, request = self.pop_request()
35 method = request['method']
36 params = request.get('params',[])
38 if method == 'numblocks.subscribe':
39 session.subscribe_to_numblocks()
41 elif method == 'address.subscribe':
43 session.subscribe_to_address(address)
45 elif method == 'server.peers':
46 session.subscribe_to_peers()
48 message_id = request['id']
49 self.id_session[message_id] = session
57 def process(self, request):
58 print "New request", request
61 # When ready, you call
62 # self.push_response(response)
68 def __init__(self, connection, address):
69 self._connection = connection
70 self.address = address
72 self.lock = threading.Lock()
73 self.numblocks_sub = None
74 self.addresses_sub = {}
75 print "new session", address
78 self._connection.close()
79 print "Terminating connection:", self.address[0]
89 raise Exception("Session was stopped")
91 return self._connection
93 def subscribe_to_numblocks(self):
95 self.numblocks_sub = True
97 def subscribe_to_peers(self):
100 def subscribe_to_address(self,address):
102 self.addresses_sub[address] = 'unknown'
105 class TcpResponder(threading.Thread):
107 def __init__(self, shared, processor, server):
109 self.processor = processor
111 threading.Thread.__init__(self)
115 while not self.shared.stopped():
116 response = self.processor.pop_response()
117 # if it is a subscription, find the list of sessions that suuscribed
119 # if there is an id, there should be a session
120 # note: I must add and remove the session id to the message id..
122 message_id = response.get('id')
124 method = response['method']
126 print "no method", response
130 session = self.processor.id_session.pop(message_id)
131 self.send_response(response, session)
133 elif method == 'numblocks.subscribe':
134 for session in self.server.sessions:
135 if not session.stopped():
136 if session.numblocks_sub:
137 self.send_response(response, session)
139 elif method == 'address.subscribe':
140 for session in self.server.sessions:
141 if not session.stopped():
142 addr = response['params'][0]
143 last_status = session.addresses_sub.get(addr)
145 new_status = response.get('result')
146 if new_status != last_status:
147 session.addresses_sub[addr] = new_status
148 self.send_response(response, session)
150 print "error", response
153 def send_response(self, response, session):
154 raw_response = json.dumps(response)
155 # Possible race condition here by having session
157 # I assume Python connections are thread safe interfaces
159 connection = session.connection()
160 connection.send(raw_response + "\n")
164 class TcpClientRequestor(threading.Thread):
166 def __init__(self, shared, processor, session):
168 self.processor = processor
170 self.session = session
171 threading.Thread.__init__(self)
174 while not self.shared.stopped():
175 if not self.update():
182 data = self.receive()
193 return self.session.connection().recv(1024)
198 raw_buffer = self.message.find('\n')
202 raw_command = self.message[0:raw_buffer].strip()
203 self.message = self.message[raw_buffer + 1:]
204 if raw_command == 'quit':
209 command = json.loads(raw_command)
211 self.processor.push_response({"error": "bad JSON", "request": raw_command})
215 # Try to load vital fields, and return an error if
217 message_id = command['id']
218 method = command['method']
220 # Return an error JSON in response.
221 self.processor.push_response({"error": "syntax error", "request": raw_command})
223 self.processor.push_request(self.session,command)
227 class TcpServer(threading.Thread):
229 def __init__(self, shared, processor, host, port):
231 self.processor = processor
233 threading.Thread.__init__(self)
237 self.lock = threading.Lock()
240 print "TCP server started."
241 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
242 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
243 sock.bind((self.host, self.port))
245 responder = TcpResponder(self.shared, self.processor, self)
247 while not self.shared.stopped():
248 session = Session(*sock.accept())
249 client_req = TcpClientRequestor(self.shared, self.processor, session)
251 self.add_session(session)
252 self.collect_garbage()
254 def add_session(self, session):
256 self.sessions.append(session)
258 def collect_garbage(self):
259 # Deep copy entire sessions list and blank it
260 # This is done to minimise lock contention
262 sessions = self.sessions[:]
264 for session in sessions:
265 if not session.stopped():
266 # If session is still alive then re-add it back
267 # to our internal register
268 self.add_session(session)
274 self.lock = threading.Lock()
275 self._stopped = False
278 print "Stopping Stratum"
288 def start(self, processor):
290 # Bind shared to processor since constructor is user defined
291 processor.shared = shared
293 # Create various transports we need
294 transports = TcpServer(shared, processor, "176.31.24.241", 50001),
295 for server in transports:
297 while not shared.stopped():
298 if raw_input() == "quit":
302 if __name__ == "__main__":
303 processor = Processor()