11 self.lock = threading.Lock()
15 print "Stopping Stratum"
24 class Processor(threading.Thread):
28 threading.Thread.__init__(self)
30 self.request_queue = queue.Queue()
31 self.response_queue = queue.Queue()
32 self.internal_ids = {}
34 self.lock = threading.Lock()
38 def push_response(self, item):
39 self.response_queue.put(item)
41 def pop_response(self):
42 return self.response_queue.get()
44 def push_request(self, session, item):
45 self.request_queue.put((session,item))
47 def pop_request(self):
48 return self.request_queue.get()
50 def get_session_id(self, internal_id):
52 return self.internal_ids.pop(internal_id)
54 def store_session_id(self, session, msgid):
56 self.internal_ids[self.internal_id] = session, msgid
61 def register(self, prefix, function):
62 self.processors[prefix] = function
65 if self.shared is None:
66 raise TypeError("self.shared not set in Processor")
67 while not self.shared.stopped():
68 session, request = self.pop_request()
70 method = request['method']
71 params = request.get('params',[])
73 suffix = method.split('.')[-1]
74 if suffix == 'subscribe':
75 session.subscribe_to_service(method, params)
77 # store session and id locally
78 request['id'] = self.store_session_id(session, request['id'])
80 # dispatch request to the relevant module..
81 prefix = method.split('.')[0]
83 func = self.processors[prefix]
85 print "error: no processor for", prefix
89 func(request,self.response_queue)
91 traceback.print_exc(file=sys.stdout)
99 def process(self, request):
100 print "New request", request
103 # When ready, you call
104 # self.push_response(response)
106 def add_session(self, session):
108 self.sessions.append(session)
110 def collect_garbage(self):
111 # Deep copy entire sessions list and blank it
112 # This is done to minimise lock contention
114 sessions = self.sessions[:]
116 for session in sessions:
117 if not session.stopped():
118 # If session is still alive then re-add it back
119 # to our internal register
120 self.add_session(session)
126 self._stopped = False
127 self.lock = threading.Lock()
128 self.subscriptions = []
134 def subscribe_to_service(self, method, params):
136 self.subscriptions.append((method, params))
139 class Dispatcher(threading.Thread):
141 def __init__(self, shared, processor):
143 self.processor = processor
144 threading.Thread.__init__(self)
148 while not self.shared.stopped():
149 response = self.processor.pop_response()
150 #print "pop response", response
151 internal_id = response.get('id')
152 params = response.get('params',[])
154 method = response['method']
156 print "no method", response
160 session, message_id = self.processor.get_session_id(internal_id)
161 response['id'] = message_id
162 session.send_response(response)
165 for session in self.processor.sessions:
166 if not session.stopped():
167 if (method,params) in session.subscriptions:
168 session.send_response(response)