bugfix: restore method name
[electrum-server.git] / processor.py
1 import json
2 import socket
3 import threading
4 import time
5 import traceback, sys
6 import Queue as queue
7
8 def random_string(N):
9     import random, string
10     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
11
12 def timestr():
13     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
14
15
16 class Shared:
17
18     def __init__(self):
19         self.lock = threading.Lock()
20         self._stopped = False
21
22     def stop(self):
23         print "Stopping Stratum"
24         with self.lock:
25             self._stopped = True
26
27     def stopped(self):
28         with self.lock:
29             return self._stopped
30
31
32 class Processor(threading.Thread):
33
34     def __init__(self):
35         threading.Thread.__init__(self)
36         self.daemon = True
37         self.dispatcher = None
38         self.queue = queue.Queue()
39
40     def process(self, request):
41         pass
42
43     def push_response(self, response):
44         #print "response", response
45         self.dispatcher.request_dispatcher.push_response(response)
46
47     def run(self):
48         while not self.shared.stopped():
49             request = self.queue.get(10000000000)
50             try:
51                 self.process(request)
52             except:
53                 traceback.print_exc(file=sys.stdout)
54
55         print "processor terminating"
56             
57
58
59 class Dispatcher:
60
61     def __init__(self):
62         self.shared = Shared()
63         self.request_dispatcher = RequestDispatcher(self.shared)
64         self.request_dispatcher.start()
65         self.response_dispatcher = \
66             ResponseDispatcher(self.shared, self.request_dispatcher)
67         self.response_dispatcher.start()
68
69     def register(self, prefix, processor):
70         processor.dispatcher = self
71         processor.shared = self.shared
72         processor.start()
73         self.request_dispatcher.processors[prefix] = processor
74
75
76
77 class RequestDispatcher(threading.Thread):
78
79     def __init__(self, shared):
80         self.shared = shared
81         threading.Thread.__init__(self)
82         self.daemon = True
83         self.request_queue = queue.Queue()
84         self.response_queue = queue.Queue()
85         self.internal_ids = {}
86         self.internal_id = 1
87         self.lock = threading.Lock()
88         self.sessions = []
89         self.processors = {}
90
91     def push_response(self, item):
92         self.response_queue.put(item)
93
94     def pop_response(self):
95         return self.response_queue.get()
96
97     def push_request(self, session, item):
98         self.request_queue.put((session,item))
99
100     def pop_request(self):
101         return self.request_queue.get()
102
103     def get_session_by_address(self, address):
104         for x in self.sessions:
105             if x.address == address:
106                 return x
107
108     def get_session_id(self, internal_id):
109         with self.lock:
110             return self.internal_ids.pop(internal_id)
111
112     def store_session_id(self, session, msgid):
113         with self.lock:
114             self.internal_ids[self.internal_id] = session, msgid
115             r = self.internal_id
116             self.internal_id += 1
117             return r
118
119     def run(self):
120         if self.shared is None:
121             raise TypeError("self.shared not set in Processor")
122         while not self.shared.stopped():
123             session, request = self.pop_request()
124             try:
125                 self.do_dispatch(session, request)
126             except:
127                 traceback.print_exc(file=sys.stdout)
128                 
129
130         self.stop()
131
132     def stop(self):
133         pass
134
135     def do_dispatch(self, session, request):
136         """ dispatch request to the relevant processor """
137
138         method = request['method']
139         params = request.get('params',[])
140         suffix = method.split('.')[-1]
141
142         is_new = session.protocol_version >= 0.5
143
144         if is_new and method == 'blockchain.address.get_history': 
145             method = 'blockchain.address.get_history2'
146             request['method'] = method
147
148         if suffix == 'subscribe':
149             if is_new and method == 'blockchain.address.subscribe': 
150                 method = 'blockchain.address.subscribe2'
151                 request['method'] = method
152
153             session.subscribe_to_service(method, params)
154
155         # store session and id locally
156         request['id'] = self.store_session_id(session, request['id'])
157
158         prefix = request['method'].split('.')[0]
159         try:
160             p = self.processors[prefix]
161         except:
162             print "error: no processor for", prefix
163             return
164
165         p.queue.put(request)
166
167         if method in ['server.version']:
168             session.version = params[0]
169             try:
170                 session.protocol_version = float(params[1])
171             except:
172                 pass
173
174     def get_sessions(self):
175         with self.lock:
176             r = self.sessions[:]
177         return r
178
179     def add_session(self, session):
180         with self.lock:
181             self.sessions.append(session)
182
183     def collect_garbage(self):
184         # Deep copy entire sessions list and blank it
185         # This is done to minimise lock contention
186         with self.lock:
187             sessions = self.sessions[:]
188             self.sessions = []
189         for session in sessions:
190             if not session.stopped():
191                 # If session is still alive then re-add it back
192                 # to our internal register
193                 self.add_session(session)
194
195
196 class Session:
197
198     def __init__(self):
199         self._stopped = False
200         self.lock = threading.Lock()
201         self.subscriptions = []
202         self.address = ''
203         self.name = ''
204         self.version = 'unknown'
205         self.protocol_version = 0.
206         self.time = time.time()
207         threading.Timer(2, self.info).start()
208
209     # Debugging method. Doesn't need to be threadsafe.
210     def info(self):
211         for sub in self.subscriptions:
212             #print sub
213             method = sub[0]
214             if method == 'blockchain.address.subscribe':
215                 addr = sub[1]
216                 break
217         else:
218             addr = None
219
220         if self.subscriptions:
221             print timestr(), self.name, self.address, addr,\
222                 len(self.subscriptions), self.version
223
224     def stopped(self):
225         with self.lock:
226             return self._stopped
227
228     def subscribe_to_service(self, method, params):
229         subdesc = self.build_subdesc(method, params)
230         with self.lock:
231             if subdesc is not None:
232                 self.subscriptions.append(subdesc)
233
234     # subdesc = A subscription description
235     @staticmethod
236     def build_subdesc(method, params):
237         if method == "blockchain.numblocks.subscribe":
238             return method,
239         elif method == "blockchain.headers.subscribe":
240             return method,
241         elif method in ["blockchain.address.subscribe", "blockchain.address.subscribe2"]:
242             if not params:
243                 return None
244             else:
245                 return method, params[0]
246         else:
247             return None
248
249     def contains_subscription(self, subdesc):
250         with self.lock:
251             return subdesc in self.subscriptions
252     
253
254 class ResponseDispatcher(threading.Thread):
255
256     def __init__(self, shared, processor):
257         self.shared = shared
258         self.processor = processor
259         threading.Thread.__init__(self)
260         self.daemon = True
261
262     def run(self):
263         while not self.shared.stopped():
264             self.update()
265
266     def update(self):
267         response = self.processor.pop_response()
268         #print "pop response", response
269         internal_id = response.get('id')
270         method = response.get('method')
271         params = response.get('params')
272
273         # A notification
274         if internal_id is None: # and method is not None and params is not None:
275             self.notification(method, params, response)
276         # A response
277         elif internal_id is not None: # and method is None and params is None:
278             self.send_response(internal_id, response)
279         else:
280             print "no method", response
281
282     def notification(self, method, params, response):
283         subdesc = Session.build_subdesc(method, params)
284         for session in self.processor.sessions:
285             if session.stopped():
286                 continue
287             if session.contains_subscription(subdesc):
288                 if response.get('method') == "blockchain.address.subscribe2":
289                     response['method'] = "blockchain.address.subscribe"
290                 session.send_response(response)
291
292     def send_response(self, internal_id, response):
293         session, message_id = self.processor.get_session_id(internal_id)
294         if session:
295             response['id'] = message_id
296             session.send_response(response)
297         else:
298             print "send_response: no session", message_id, internal_id, response
299