do not queue requests that can be answered using the cache
[electrum-server.git] / processor.py
1 import json
2 import socket
3 import threading
4 import time
5 import traceback, sys
6 import Queue as queue
7
8 def random_string(N):
9     import random, string
10     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
11
12 def timestr():
13     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
14
15
16 class Shared:
17
18     def __init__(self):
19         self.lock = threading.Lock()
20         self._stopped = False
21
22     def stop(self):
23         print "Stopping Stratum"
24         with self.lock:
25             self._stopped = True
26
27     def stopped(self):
28         with self.lock:
29             return self._stopped
30
31
32 class Processor(threading.Thread):
33
34     def __init__(self):
35         threading.Thread.__init__(self)
36         self.daemon = True
37         self.dispatcher = None
38         self.queue = queue.Queue()
39
40     def process(self, request):
41         pass
42
43     def add_request(self, request):
44         self.queue.put(request)
45
46     def push_response(self, response):
47         #print "response", response
48         self.dispatcher.request_dispatcher.push_response(response)
49
50     def run(self):
51         while not self.shared.stopped():
52             request = self.queue.get(10000000000)
53             try:
54                 self.process(request)
55             except:
56                 traceback.print_exc(file=sys.stdout)
57
58         print "processor terminating"
59             
60
61
62 class Dispatcher:
63
64     def __init__(self):
65         self.shared = Shared()
66         self.request_dispatcher = RequestDispatcher(self.shared)
67         self.request_dispatcher.start()
68         self.response_dispatcher = \
69             ResponseDispatcher(self.shared, self.request_dispatcher)
70         self.response_dispatcher.start()
71
72     def register(self, prefix, processor):
73         processor.dispatcher = self
74         processor.shared = self.shared
75         processor.start()
76         self.request_dispatcher.processors[prefix] = processor
77
78
79
80 class RequestDispatcher(threading.Thread):
81
82     def __init__(self, shared):
83         self.shared = shared
84         threading.Thread.__init__(self)
85         self.daemon = True
86         self.request_queue = queue.Queue()
87         self.response_queue = queue.Queue()
88         self.internal_ids = {}
89         self.internal_id = 1
90         self.lock = threading.Lock()
91         self.sessions = []
92         self.processors = {}
93
94     def push_response(self, item):
95         self.response_queue.put(item)
96
97     def pop_response(self):
98         return self.response_queue.get()
99
100     def push_request(self, session, item):
101         self.request_queue.put((session,item))
102
103     def pop_request(self):
104         return self.request_queue.get()
105
106     def get_session_by_address(self, address):
107         for x in self.sessions:
108             if x.address == address:
109                 return x
110
111     def get_session_id(self, internal_id):
112         with self.lock:
113             return self.internal_ids.pop(internal_id)
114
115     def store_session_id(self, session, msgid):
116         with self.lock:
117             self.internal_ids[self.internal_id] = session, msgid
118             r = self.internal_id
119             self.internal_id += 1
120             return r
121
122     def run(self):
123         if self.shared is None:
124             raise TypeError("self.shared not set in Processor")
125         while not self.shared.stopped():
126             session, request = self.pop_request()
127             try:
128                 self.do_dispatch(session, request)
129             except:
130                 traceback.print_exc(file=sys.stdout)
131                 
132
133         self.stop()
134
135     def stop(self):
136         pass
137
138     def do_dispatch(self, session, request):
139         """ dispatch request to the relevant processor """
140
141         method = request['method']
142         params = request.get('params',[])
143         suffix = method.split('.')[-1]
144
145         is_new = session.protocol_version >= 0.5
146
147         if is_new and method == 'blockchain.address.get_history': 
148             method = 'blockchain.address.get_history2'
149             request['method'] = method
150
151         if suffix == 'subscribe':
152             if is_new and method == 'blockchain.address.subscribe': 
153                 method = 'blockchain.address.subscribe2'
154                 request['method'] = method
155
156             session.subscribe_to_service(method, params)
157
158         # store session and id locally
159         request['id'] = self.store_session_id(session, request['id'])
160
161         prefix = request['method'].split('.')[0]
162         try:
163             p = self.processors[prefix]
164         except:
165             print "error: no processor for", prefix
166             return
167
168         p.add_request(request)
169
170         if method in ['server.version']:
171             session.version = params[0]
172             try:
173                 session.protocol_version = float(params[1])
174             except:
175                 pass
176
177     def get_sessions(self):
178         with self.lock:
179             r = self.sessions[:]
180         return r
181
182     def add_session(self, session):
183         with self.lock:
184             self.sessions.append(session)
185
186     def collect_garbage(self):
187         # Deep copy entire sessions list and blank it
188         # This is done to minimise lock contention
189         with self.lock:
190             sessions = self.sessions[:]
191             self.sessions = []
192         for session in sessions:
193             if not session.stopped():
194                 # If session is still alive then re-add it back
195                 # to our internal register
196                 self.add_session(session)
197
198
199 class Session:
200
201     def __init__(self):
202         self._stopped = False
203         self.lock = threading.Lock()
204         self.subscriptions = []
205         self.address = ''
206         self.name = ''
207         self.version = 'unknown'
208         self.protocol_version = 0.
209         self.time = time.time()
210         threading.Timer(2, self.info).start()
211
212     # Debugging method. Doesn't need to be threadsafe.
213     def info(self):
214         for sub in self.subscriptions:
215             #print sub
216             method = sub[0]
217             if method == 'blockchain.address.subscribe':
218                 addr = sub[1]
219                 break
220         else:
221             addr = None
222
223         if self.subscriptions:
224             print timestr(), self.name, self.address, addr,\
225                 len(self.subscriptions), self.version
226
227     def stopped(self):
228         with self.lock:
229             return self._stopped
230
231     def subscribe_to_service(self, method, params):
232         subdesc = self.build_subdesc(method, params)
233         with self.lock:
234             if subdesc is not None:
235                 self.subscriptions.append(subdesc)
236
237     # subdesc = A subscription description
238     @staticmethod
239     def build_subdesc(method, params):
240         if method == "blockchain.numblocks.subscribe":
241             return method,
242         elif method == "blockchain.headers.subscribe":
243             return method,
244         elif method in ["blockchain.address.subscribe", "blockchain.address.subscribe2"]:
245             if not params:
246                 return None
247             else:
248                 return method, params[0]
249         else:
250             return None
251
252     def contains_subscription(self, subdesc):
253         with self.lock:
254             return subdesc in self.subscriptions
255     
256
257 class ResponseDispatcher(threading.Thread):
258
259     def __init__(self, shared, processor):
260         self.shared = shared
261         self.processor = processor
262         threading.Thread.__init__(self)
263         self.daemon = True
264
265     def run(self):
266         while not self.shared.stopped():
267             self.update()
268
269     def update(self):
270         response = self.processor.pop_response()
271         #print "pop response", response
272         internal_id = response.get('id')
273         method = response.get('method')
274         params = response.get('params')
275
276         # A notification
277         if internal_id is None: # and method is not None and params is not None:
278             self.notification(method, params, response)
279         # A response
280         elif internal_id is not None: # and method is None and params is None:
281             self.send_response(internal_id, response)
282         else:
283             print "no method", response
284
285     def notification(self, method, params, response):
286         subdesc = Session.build_subdesc(method, params)
287         for session in self.processor.sessions:
288             if session.stopped():
289                 continue
290             if session.contains_subscription(subdesc):
291                 if response.get('method') == "blockchain.address.subscribe2":
292                     response['method'] = "blockchain.address.subscribe"
293                 session.send_response(response)
294
295     def send_response(self, internal_id, response):
296         session, message_id = self.processor.get_session_id(internal_id)
297         if session:
298             response['id'] = message_id
299             session.send_response(response)
300         else:
301             print "send_response: no session", message_id, internal_id, response
302