fix: rename method subscribe2
[electrum-server.git] / processor.py
1 import json
2 import socket
3 import threading
4 import time
5 import traceback, sys
6 import Queue as queue
7
8 def random_string(N):
9     import random, string
10     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
11
12 def timestr():
13     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
14
15
16 class Shared:
17
18     def __init__(self):
19         self.lock = threading.Lock()
20         self._stopped = False
21
22     def stop(self):
23         print "Stopping Stratum"
24         with self.lock:
25             self._stopped = True
26
27     def stopped(self):
28         with self.lock:
29             return self._stopped
30
31
32 class Processor(threading.Thread):
33
34     def __init__(self):
35         threading.Thread.__init__(self)
36         self.daemon = True
37         self.dispatcher = None
38         self.queue = queue.Queue()
39
40     def process(self, request):
41         pass
42
43     def push_response(self, response):
44         #print "response", response
45         self.dispatcher.request_dispatcher.push_response(response)
46
47     def run(self):
48         while not self.shared.stopped():
49             request = self.queue.get(10000000000)
50             try:
51                 self.process(request)
52             except:
53                 traceback.print_exc(file=sys.stdout)
54
55         print "processor terminating"
56             
57
58
59 class Dispatcher:
60
61     def __init__(self):
62         self.shared = Shared()
63         self.request_dispatcher = RequestDispatcher(self.shared)
64         self.request_dispatcher.start()
65         self.response_dispatcher = \
66             ResponseDispatcher(self.shared, self.request_dispatcher)
67         self.response_dispatcher.start()
68
69     def register(self, prefix, processor):
70         processor.dispatcher = self
71         processor.shared = self.shared
72         processor.start()
73         self.request_dispatcher.processors[prefix] = processor
74
75
76
77 class RequestDispatcher(threading.Thread):
78
79     def __init__(self, shared):
80         self.shared = shared
81         threading.Thread.__init__(self)
82         self.daemon = True
83         self.request_queue = queue.Queue()
84         self.response_queue = queue.Queue()
85         self.internal_ids = {}
86         self.internal_id = 1
87         self.lock = threading.Lock()
88         self.sessions = []
89         self.processors = {}
90
91     def push_response(self, item):
92         self.response_queue.put(item)
93
94     def pop_response(self):
95         return self.response_queue.get()
96
97     def push_request(self, session, item):
98         self.request_queue.put((session,item))
99
100     def pop_request(self):
101         return self.request_queue.get()
102
103     def get_session_by_address(self, address):
104         for x in self.sessions:
105             if x.address == address:
106                 return x
107
108     def get_session_id(self, internal_id):
109         with self.lock:
110             return self.internal_ids.pop(internal_id)
111
112     def store_session_id(self, session, msgid):
113         with self.lock:
114             self.internal_ids[self.internal_id] = session, msgid
115             r = self.internal_id
116             self.internal_id += 1
117             return r
118
119     def run(self):
120         if self.shared is None:
121             raise TypeError("self.shared not set in Processor")
122         while not self.shared.stopped():
123             session, request = self.pop_request()
124             try:
125                 self.do_dispatch(session, request)
126             except:
127                 traceback.print_exc(file=sys.stdout)
128                 
129
130         self.stop()
131
132     def stop(self):
133         pass
134
135     def do_dispatch(self, session, request):
136         """ dispatch request to the relevant processor """
137
138         method = request['method']
139         params = request.get('params',[])
140         suffix = method.split('.')[-1]
141
142         try:
143             is_new = float(session.version) >= 1.3
144         except:
145             is_new = False
146
147         if is_new and method == 'blockchain.address.get_history': 
148             method = 'blockchain.address.get_history2'
149             request['method'] = method
150
151         if suffix == 'subscribe':
152             if is_new and method == 'blockchain.address.subscribe': 
153                 method = 'blockchain.address.subscribe2'
154                 request['method'] = method
155
156             session.subscribe_to_service(method, params)
157
158         # store session and id locally
159         request['id'] = self.store_session_id(session, request['id'])
160
161         prefix = request['method'].split('.')[0]
162         try:
163             p = self.processors[prefix]
164         except:
165             print "error: no processor for", prefix
166             return
167
168         p.queue.put(request)
169
170         if method in ['server.version']:
171             session.version = params[0]
172
173     def get_sessions(self):
174         with self.lock:
175             r = self.sessions[:]
176         return r
177
178     def add_session(self, session):
179         with self.lock:
180             self.sessions.append(session)
181
182     def collect_garbage(self):
183         # Deep copy entire sessions list and blank it
184         # This is done to minimise lock contention
185         with self.lock:
186             sessions = self.sessions[:]
187             self.sessions = []
188         for session in sessions:
189             if not session.stopped():
190                 # If session is still alive then re-add it back
191                 # to our internal register
192                 self.add_session(session)
193
194
195 class Session:
196
197     def __init__(self):
198         self._stopped = False
199         self.lock = threading.Lock()
200         self.subscriptions = []
201         self.address = ''
202         self.name = ''
203         self.version = 'unknown'
204         self.time = time.time()
205         threading.Timer(2, self.info).start()
206
207     # Debugging method. Doesn't need to be threadsafe.
208     def info(self):
209         for sub in self.subscriptions:
210             #print sub
211             method = sub[0]
212             if method == 'blockchain.address.subscribe':
213                 addr = sub[1]
214                 break
215         else:
216             addr = None
217
218         if self.subscriptions:
219             print timestr(), self.name, self.address, addr,\
220                 len(self.subscriptions), self.version
221
222     def stopped(self):
223         with self.lock:
224             return self._stopped
225
226     def subscribe_to_service(self, method, params):
227         subdesc = self.build_subdesc(method, params)
228         with self.lock:
229             if subdesc is not None:
230                 self.subscriptions.append(subdesc)
231
232     # subdesc = A subscription description
233     @staticmethod
234     def build_subdesc(method, params):
235         if method == "blockchain.numblocks.subscribe":
236             return method,
237         elif method == "blockchain.headers.subscribe":
238             return method,
239         elif method in ["blockchain.address.subscribe", "blockchain.address.subscribe2"]:
240             if not params:
241                 return None
242             else:
243                 return method, params[0]
244         else:
245             return None
246
247     def contains_subscription(self, subdesc):
248         with self.lock:
249             return subdesc in self.subscriptions
250     
251
252 class ResponseDispatcher(threading.Thread):
253
254     def __init__(self, shared, processor):
255         self.shared = shared
256         self.processor = processor
257         threading.Thread.__init__(self)
258         self.daemon = True
259
260     def run(self):
261         while not self.shared.stopped():
262             self.update()
263
264     def update(self):
265         response = self.processor.pop_response()
266         #print "pop response", response
267         internal_id = response.get('id')
268         method = response.get('method')
269         params = response.get('params')
270
271         if method == "blockchain.address.subscribe2":
272             method = "blockchain.address.subscribe"
273
274         # A notification
275         if internal_id is None: # and method is not None and params is not None:
276             self.notification(method, params, response)
277         # A response
278         elif internal_id is not None: # and method is None and params is None:
279             self.send_response(internal_id, response)
280         else:
281             print "no method", response
282
283     def notification(self, method, params, response):
284         subdesc = Session.build_subdesc(method, params)
285         for session in self.processor.sessions:
286             if session.stopped():
287                 continue
288             if session.contains_subscription(subdesc):
289                 session.send_response(response)
290
291     def send_response(self, internal_id, response):
292         session, message_id = self.processor.get_session_id(internal_id)
293         if session:
294             response['id'] = message_id
295             session.send_response(response)
296         else:
297             print "send_response: no session", message_id, internal_id, response
298