restructuring: each processor has its own queue
[electrum-server.git] / processor.py
1 import json
2 import socket
3 import threading
4 import time
5 import traceback, sys
6 import Queue as queue
7
8 def random_string(N):
9     import random, string
10     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
11
12 def timestr():
13     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
14
15
16 class Shared:
17
18     def __init__(self):
19         self.lock = threading.Lock()
20         self._stopped = False
21
22     def stop(self):
23         print "Stopping Stratum"
24         with self.lock:
25             self._stopped = True
26
27     def stopped(self):
28         with self.lock:
29             return self._stopped
30
31
32 class Processor(threading.Thread):
33
34     def __init__(self):
35         threading.Thread.__init__(self)
36         self.daemon = True
37         self.dispatcher = None
38         self.queue = queue.Queue()
39
40     def process(self, request):
41         pass
42
43     def push_response(self, response):
44         #print "response", response
45         self.dispatcher.request_dispatcher.push_response(response)
46
47     def run(self):
48         while not self.shared.stopped():
49             request = self.queue.get(10000000000)
50             try:
51                 self.process(request)
52             except:
53                 traceback.print_exc(file=sys.stdout)
54
55         print "processor terminating"
56             
57
58
59 class Dispatcher:
60
61     def __init__(self):
62         self.shared = Shared()
63         self.request_dispatcher = RequestDispatcher(self.shared)
64         self.request_dispatcher.start()
65         self.response_dispatcher = \
66             ResponseDispatcher(self.shared, self.request_dispatcher)
67         self.response_dispatcher.start()
68
69     def register(self, prefix, processor):
70         processor.dispatcher = self
71         processor.shared = self.shared
72         processor.start()
73         self.request_dispatcher.processors[prefix] = processor
74
75
76
77 class RequestDispatcher(threading.Thread):
78
79     def __init__(self, shared):
80         self.shared = shared
81         threading.Thread.__init__(self)
82         self.daemon = True
83         self.request_queue = queue.Queue()
84         self.response_queue = queue.Queue()
85         self.internal_ids = {}
86         self.internal_id = 1
87         self.lock = threading.Lock()
88         self.sessions = []
89         self.processors = {}
90
91     def push_response(self, item):
92         self.response_queue.put(item)
93
94     def pop_response(self):
95         return self.response_queue.get()
96
97     def push_request(self, session, item):
98         self.request_queue.put((session,item))
99
100     def pop_request(self):
101         return self.request_queue.get()
102
103     def get_session_by_address(self, address):
104         for x in self.sessions:
105             if x.address == address:
106                 return x
107
108     def get_session_id(self, internal_id):
109         with self.lock:
110             return self.internal_ids.pop(internal_id)
111
112     def store_session_id(self, session, msgid):
113         with self.lock:
114             self.internal_ids[self.internal_id] = session, msgid
115             r = self.internal_id
116             self.internal_id += 1
117             return r
118
119     def run(self):
120         if self.shared is None:
121             raise TypeError("self.shared not set in Processor")
122         while not self.shared.stopped():
123             session, request = self.pop_request()
124             self.do_dispatch(session, request)
125
126         self.stop()
127
128     def stop(self):
129         pass
130
131     def do_dispatch(self, session, request):
132         """ dispatch request to the relevant processor """
133
134         method = request['method']
135         params = request.get('params',[])
136         suffix = method.split('.')[-1]
137         if suffix == 'subscribe':
138             session.subscribe_to_service(method, params)
139
140         # store session and id locally
141         request['id'] = self.store_session_id(session, request['id'])
142
143         prefix = request['method'].split('.')[0]
144         try:
145             p = self.processors[prefix]
146         except:
147             print "error: no processor for", prefix
148             return
149
150         p.queue.put(request)
151
152         if method in ['server.version']:
153             session.version = params[0]
154
155     def get_sessions(self):
156         with self.lock:
157             r = self.sessions[:]
158         return r
159
160     def add_session(self, session):
161         with self.lock:
162             self.sessions.append(session)
163
164     def collect_garbage(self):
165         # Deep copy entire sessions list and blank it
166         # This is done to minimise lock contention
167         with self.lock:
168             sessions = self.sessions[:]
169             self.sessions = []
170         for session in sessions:
171             if not session.stopped():
172                 # If session is still alive then re-add it back
173                 # to our internal register
174                 self.add_session(session)
175
176
177 class Session:
178
179     def __init__(self):
180         self._stopped = False
181         self.lock = threading.Lock()
182         self.subscriptions = []
183         self.address = ''
184         self.name = ''
185         self.version = 'unknown'
186         self.time = time.time()
187         threading.Timer(2, self.info).start()
188
189     # Debugging method. Doesn't need to be threadsafe.
190     def info(self):
191         for sub in self.subscriptions:
192             #print sub
193             method = sub[0]
194             if method == 'blockchain.address.subscribe':
195                 addr = sub[1]
196                 break
197         else:
198             addr = None
199
200         if self.subscriptions:
201             print timestr(), self.name, self.address, addr,\
202                 len(self.subscriptions), self.version
203
204     def stopped(self):
205         with self.lock:
206             return self._stopped
207
208     def subscribe_to_service(self, method, params):
209         subdesc = self.build_subdesc(method, params)
210         with self.lock:
211             if subdesc is not None:
212                 self.subscriptions.append(subdesc)
213
214     # subdesc = A subscription description
215     @staticmethod
216     def build_subdesc(method, params):
217         if method == "blockchain.numblocks.subscribe":
218             return method,
219         elif method == "blockchain.address.subscribe":
220             if not params:
221                 return None
222             else:
223                 return method, params[0]
224         else:
225             return None
226
227     def contains_subscription(self, subdesc):
228         with self.lock:
229             return subdesc in self.subscriptions
230     
231
232 class ResponseDispatcher(threading.Thread):
233
234     def __init__(self, shared, processor):
235         self.shared = shared
236         self.processor = processor
237         threading.Thread.__init__(self)
238         self.daemon = True
239
240     def run(self):
241         while not self.shared.stopped():
242             self.update()
243
244     def update(self):
245         response = self.processor.pop_response()
246         #print "pop response", response
247         internal_id = response.get('id')
248         method = response.get('method')
249         params = response.get('params')
250
251         # A notification
252         if internal_id is None: # and method is not None and params is not None:
253             self.notification(method, params, response)
254         # A response
255         elif internal_id is not None: # and method is None and params is None:
256             self.send_response(internal_id, response)
257         else:
258             print "no method", response
259
260     def notification(self, method, params, response):
261         subdesc = Session.build_subdesc(method, params)
262         for session in self.processor.sessions:
263             if session.stopped():
264                 continue
265             if session.contains_subscription(subdesc):
266                 session.send_response(response)
267
268     def send_response(self, internal_id, response):
269         session, message_id = self.processor.get_session_id(internal_id)
270         if session:
271             response['id'] = message_id
272             session.send_response(response)
273         else:
274             print "send_response: no session", message_id, internal_id, response
275