11 self.lock = threading.Lock()
15 print "Stopping Stratum"
24 class Processor(threading.Thread):
28 threading.Thread.__init__(self)
30 self.request_queue = queue.Queue()
31 self.response_queue = queue.Queue()
32 self.internal_ids = {}
34 self.lock = threading.Lock()
38 def push_response(self, item):
39 self.response_queue.put(item)
41 def pop_response(self):
42 return self.response_queue.get()
44 def push_request(self, session, item):
45 self.request_queue.put((session,item))
47 def pop_request(self):
48 return self.request_queue.get()
50 def get_session_id(self, internal_id):
52 return self.internal_ids.pop(internal_id)
54 def store_session_id(self, session, msgid):
56 self.internal_ids[self.internal_id] = session, msgid
61 def register(self, prefix, function):
62 self.processors[prefix] = function
65 if self.shared is None:
66 raise TypeError("self.shared not set in Processor")
67 while not self.shared.stopped():
68 session, request = self.pop_request()
69 self.process(session, request)
76 def process(self, session, request):
77 method = request['method']
78 params = request.get('params',[])
80 suffix = method.split('.')[-1]
81 if suffix == 'subscribe':
82 session.subscribe_to_service(method, params)
84 # store session and id locally
85 request['id'] = self.store_session_id(session, request['id'])
87 # dispatch request to the relevant module..
88 prefix = request['method'].split('.')[0]
90 func = self.processors[prefix]
92 print "error: no processor for", prefix
95 func(request,self.response_queue)
97 traceback.print_exc(file=sys.stdout)
100 def add_session(self, session):
102 self.sessions.append(session)
104 def collect_garbage(self):
105 # Deep copy entire sessions list and blank it
106 # This is done to minimise lock contention
108 sessions = self.sessions[:]
110 for session in sessions:
111 if not session.stopped():
112 # If session is still alive then re-add it back
113 # to our internal register
114 self.add_session(session)
120 self._stopped = False
121 self.lock = threading.Lock()
122 self.subscriptions = []
128 def subscribe_to_service(self, method, params):
130 self.subscriptions.append((method, params))
133 class Dispatcher(threading.Thread):
135 def __init__(self, shared, processor):
137 self.processor = processor
138 threading.Thread.__init__(self)
142 while not self.shared.stopped():
143 response = self.processor.pop_response()
144 #print "pop response", response
145 internal_id = response.get('id')
146 params = response.get('params',[])
148 method = response['method']
150 print "no method", response
154 session, message_id = self.processor.get_session_id(internal_id)
155 response['id'] = message_id
156 session.send_response(response)
159 for session in self.processor.sessions:
160 if not session.stopped():
161 if (method,params) in session.subscriptions:
162 session.send_response(response)