catch exceptions in dispatch thread
[electrum-server.git] / processor.py
1 import json
2 import socket
3 import threading
4 import time
5 import traceback, sys
6 import Queue as queue
7
8 def random_string(N):
9     import random, string
10     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
11
12 def timestr():
13     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
14
15
16 class Shared:
17
18     def __init__(self):
19         self.lock = threading.Lock()
20         self._stopped = False
21
22     def stop(self):
23         print "Stopping Stratum"
24         with self.lock:
25             self._stopped = True
26
27     def stopped(self):
28         with self.lock:
29             return self._stopped
30
31
32 class Processor(threading.Thread):
33
34     def __init__(self):
35         threading.Thread.__init__(self)
36         self.daemon = True
37         self.dispatcher = None
38         self.queue = queue.Queue()
39
40     def process(self, request):
41         pass
42
43     def push_response(self, response):
44         #print "response", response
45         self.dispatcher.request_dispatcher.push_response(response)
46
47     def run(self):
48         while not self.shared.stopped():
49             request = self.queue.get(10000000000)
50             try:
51                 self.process(request)
52             except:
53                 traceback.print_exc(file=sys.stdout)
54
55         print "processor terminating"
56             
57
58
59 class Dispatcher:
60
61     def __init__(self):
62         self.shared = Shared()
63         self.request_dispatcher = RequestDispatcher(self.shared)
64         self.request_dispatcher.start()
65         self.response_dispatcher = \
66             ResponseDispatcher(self.shared, self.request_dispatcher)
67         self.response_dispatcher.start()
68
69     def register(self, prefix, processor):
70         processor.dispatcher = self
71         processor.shared = self.shared
72         processor.start()
73         self.request_dispatcher.processors[prefix] = processor
74
75
76
77 class RequestDispatcher(threading.Thread):
78
79     def __init__(self, shared):
80         self.shared = shared
81         threading.Thread.__init__(self)
82         self.daemon = True
83         self.request_queue = queue.Queue()
84         self.response_queue = queue.Queue()
85         self.internal_ids = {}
86         self.internal_id = 1
87         self.lock = threading.Lock()
88         self.sessions = []
89         self.processors = {}
90
91     def push_response(self, item):
92         self.response_queue.put(item)
93
94     def pop_response(self):
95         return self.response_queue.get()
96
97     def push_request(self, session, item):
98         self.request_queue.put((session,item))
99
100     def pop_request(self):
101         return self.request_queue.get()
102
103     def get_session_by_address(self, address):
104         for x in self.sessions:
105             if x.address == address:
106                 return x
107
108     def get_session_id(self, internal_id):
109         with self.lock:
110             return self.internal_ids.pop(internal_id)
111
112     def store_session_id(self, session, msgid):
113         with self.lock:
114             self.internal_ids[self.internal_id] = session, msgid
115             r = self.internal_id
116             self.internal_id += 1
117             return r
118
119     def run(self):
120         if self.shared is None:
121             raise TypeError("self.shared not set in Processor")
122         while not self.shared.stopped():
123             session, request = self.pop_request()
124             try:
125                 self.do_dispatch(session, request)
126             except:
127                 traceback.print_exc(file=sys.stdout)
128                 
129
130         self.stop()
131
132     def stop(self):
133         pass
134
135     def do_dispatch(self, session, request):
136         """ dispatch request to the relevant processor """
137
138         method = request['method']
139         params = request.get('params',[])
140         suffix = method.split('.')[-1]
141         if suffix == 'subscribe':
142             session.subscribe_to_service(method, params)
143
144         # store session and id locally
145         request['id'] = self.store_session_id(session, request['id'])
146
147         prefix = request['method'].split('.')[0]
148         try:
149             p = self.processors[prefix]
150         except:
151             print "error: no processor for", prefix
152             return
153
154         p.queue.put(request)
155
156         if method in ['server.version']:
157             session.version = params[0]
158
159     def get_sessions(self):
160         with self.lock:
161             r = self.sessions[:]
162         return r
163
164     def add_session(self, session):
165         with self.lock:
166             self.sessions.append(session)
167
168     def collect_garbage(self):
169         # Deep copy entire sessions list and blank it
170         # This is done to minimise lock contention
171         with self.lock:
172             sessions = self.sessions[:]
173             self.sessions = []
174         for session in sessions:
175             if not session.stopped():
176                 # If session is still alive then re-add it back
177                 # to our internal register
178                 self.add_session(session)
179
180
181 class Session:
182
183     def __init__(self):
184         self._stopped = False
185         self.lock = threading.Lock()
186         self.subscriptions = []
187         self.address = ''
188         self.name = ''
189         self.version = 'unknown'
190         self.time = time.time()
191         threading.Timer(2, self.info).start()
192
193     # Debugging method. Doesn't need to be threadsafe.
194     def info(self):
195         for sub in self.subscriptions:
196             #print sub
197             method = sub[0]
198             if method == 'blockchain.address.subscribe':
199                 addr = sub[1]
200                 break
201         else:
202             addr = None
203
204         if self.subscriptions:
205             print timestr(), self.name, self.address, addr,\
206                 len(self.subscriptions), self.version
207
208     def stopped(self):
209         with self.lock:
210             return self._stopped
211
212     def subscribe_to_service(self, method, params):
213         subdesc = self.build_subdesc(method, params)
214         with self.lock:
215             if subdesc is not None:
216                 self.subscriptions.append(subdesc)
217
218     # subdesc = A subscription description
219     @staticmethod
220     def build_subdesc(method, params):
221         if method == "blockchain.numblocks.subscribe":
222             return method,
223         elif method == "blockchain.address.subscribe":
224             if not params:
225                 return None
226             else:
227                 return method, params[0]
228         else:
229             return None
230
231     def contains_subscription(self, subdesc):
232         with self.lock:
233             return subdesc in self.subscriptions
234     
235
236 class ResponseDispatcher(threading.Thread):
237
238     def __init__(self, shared, processor):
239         self.shared = shared
240         self.processor = processor
241         threading.Thread.__init__(self)
242         self.daemon = True
243
244     def run(self):
245         while not self.shared.stopped():
246             self.update()
247
248     def update(self):
249         response = self.processor.pop_response()
250         #print "pop response", response
251         internal_id = response.get('id')
252         method = response.get('method')
253         params = response.get('params')
254
255         # A notification
256         if internal_id is None: # and method is not None and params is not None:
257             self.notification(method, params, response)
258         # A response
259         elif internal_id is not None: # and method is None and params is None:
260             self.send_response(internal_id, response)
261         else:
262             print "no method", response
263
264     def notification(self, method, params, response):
265         subdesc = Session.build_subdesc(method, params)
266         for session in self.processor.sessions:
267             if session.stopped():
268                 continue
269             if session.contains_subscription(subdesc):
270                 session.send_response(response)
271
272     def send_response(self, internal_id, response):
273         session, message_id = self.processor.get_session_id(internal_id)
274         if session:
275             response['id'] = message_id
276             session.send_response(response)
277         else:
278             print "send_response: no session", message_id, internal_id, response
279