10 return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
13 return time.strftime("[%d/%m/%Y-%H:%M:%S]")
19 self.lock = threading.Lock()
23 print "Stopping Stratum"
32 class Processor(threading.Thread):
35 threading.Thread.__init__(self)
37 self.dispatcher = None
39 def process(self, request):
42 def push_response(self, response):
43 print "response", response
44 self.dispatcher.request_dispatcher.push_response(response)
51 self.shared = Shared()
52 self.request_dispatcher = RequestDispatcher(self.shared)
53 self.request_dispatcher.start()
54 self.response_dispatcher = ResponseDispatcher(self.shared, self.request_dispatcher)
55 self.response_dispatcher.start()
57 def register(self, prefix, processor):
58 processor.dispatcher = self
59 processor.shared = self.shared
61 self.request_dispatcher.processors[prefix] = processor
65 class RequestDispatcher(threading.Thread):
67 def __init__(self, shared):
69 threading.Thread.__init__(self)
71 self.request_queue = queue.Queue()
72 self.response_queue = queue.Queue()
73 self.internal_ids = {}
75 self.lock = threading.Lock()
79 def push_response(self, item):
80 self.response_queue.put(item)
82 def pop_response(self):
83 return self.response_queue.get()
85 def push_request(self, session, item):
86 self.request_queue.put((session,item))
88 def pop_request(self):
89 return self.request_queue.get()
91 def get_session_id(self, internal_id):
93 return self.internal_ids.pop(internal_id)
95 def store_session_id(self, session, msgid):
97 self.internal_ids[self.internal_id] = session, msgid
103 if self.shared is None:
104 raise TypeError("self.shared not set in Processor")
105 while not self.shared.stopped():
106 session, request = self.pop_request()
107 self.process(session, request)
114 def process(self, session, request):
115 method = request['method']
116 params = request.get('params',[])
118 suffix = method.split('.')[-1]
119 if suffix == 'subscribe':
120 session.subscribe_to_service(method, params)
122 # store session and id locally
123 request['id'] = self.store_session_id(session, request['id'])
125 # dispatch request to the relevant module..
126 prefix = request['method'].split('.')[0]
128 p = self.processors[prefix]
130 print "error: no processor for", prefix
135 traceback.print_exc(file=sys.stdout)
137 if method in ['server.version']:
138 session.version = params[0]
141 def add_session(self, session):
143 self.sessions.append(session)
145 def collect_garbage(self):
146 # Deep copy entire sessions list and blank it
147 # This is done to minimise lock contention
149 sessions = self.sessions[:]
151 for session in sessions:
152 if not session.stopped():
153 # If session is still alive then re-add it back
154 # to our internal register
155 self.add_session(session)
161 self._stopped = False
162 self.lock = threading.Lock()
163 self.subscriptions = []
166 threading.Timer(2, self.info).start()
169 for s in self.subscriptions:
171 if m == 'blockchain.address.subscribe':
176 print timestr(), self.name, self.address, addr, len(self.subscriptions), self.version
182 def subscribe_to_service(self, method, params):
184 self.subscriptions.append((method, params))
187 class ResponseDispatcher(threading.Thread):
189 def __init__(self, shared, processor):
191 self.processor = processor
192 threading.Thread.__init__(self)
196 while not self.shared.stopped():
197 response = self.processor.pop_response()
198 #print "pop response", response
199 internal_id = response.get('id')
200 params = response.get('params', [])
202 # This is wrong. "params" and "method" should never
204 if internal_id is None:
205 method = response.get('method')
207 print "no method", response
209 for session in self.processor.sessions:
210 if not session.stopped():
211 if (method,params) in session.subscriptions:
212 session.send_response(response)
215 session, message_id = self.processor.get_session_id(internal_id)
216 response['id'] = message_id
217 session.send_response(response)