fix: restore method get_session_by_address
[electrum-server.git] / processor.py
1 import json
2 import Queue as queue
3 import socket
4 import threading
5 import time
6 import traceback
7 import sys
8
9 from utils import random_string, timestr, print_log
10
11
12 class Shared:
13
14     def __init__(self, config):
15         self.lock = threading.Lock()
16         self._stopped = False
17         self.config = config
18
19     def stop(self):
20         print_log("Stopping Stratum")
21         with self.lock:
22             self._stopped = True
23
24     def stopped(self):
25         with self.lock:
26             return self._stopped
27
28
29 class Processor(threading.Thread):
30
31     def __init__(self):
32         threading.Thread.__init__(self)
33         self.daemon = True
34         self.dispatcher = None
35         self.queue = queue.Queue()
36
37     def process(self, session, request):
38         pass
39
40     def add_request(self, session, request):
41         self.queue.put((session, request))
42
43     def push_response(self, session, response):
44         #print "response", response
45         self.dispatcher.request_dispatcher.push_response(session, response)
46
47     def run(self):
48         while not self.shared.stopped():
49             request, session = self.queue.get(10000000000)
50             try:
51                 self.process(request, session)
52             except:
53                 traceback.print_exc(file=sys.stdout)
54
55         print_log("processor terminating")
56
57
58 class Dispatcher:
59
60     def __init__(self, config):
61         self.shared = Shared(config)
62         self.request_dispatcher = RequestDispatcher(self.shared)
63         self.request_dispatcher.start()
64         self.response_dispatcher = \
65             ResponseDispatcher(self.shared, self.request_dispatcher)
66         self.response_dispatcher.start()
67
68     def register(self, prefix, processor):
69         processor.dispatcher = self
70         processor.shared = self.shared
71         processor.start()
72         self.request_dispatcher.processors[prefix] = processor
73
74
75 class RequestDispatcher(threading.Thread):
76
77     def __init__(self, shared):
78         self.shared = shared
79         threading.Thread.__init__(self)
80         self.daemon = True
81         self.request_queue = queue.Queue()
82         self.response_queue = queue.Queue()
83         self.lock = threading.Lock()
84         self.idlock = threading.Lock()
85         self.sessions = []
86         self.processors = {}
87
88     def push_response(self, session, item):
89         self.response_queue.put((session, item))
90
91     def pop_response(self):
92         return self.response_queue.get()
93
94     def push_request(self, session, item):
95         self.request_queue.put((session, item))
96
97     def pop_request(self):
98         return self.request_queue.get()
99
100     def get_session_by_address(self, address):
101         for x in self.sessions:
102             if x.address == address:
103                 return x
104
105     def run(self):
106         if self.shared is None:
107             raise TypeError("self.shared not set in Processor")
108         while not self.shared.stopped():
109             session, request = self.pop_request()
110             try:
111                 self.do_dispatch(session, request)
112             except:
113                 traceback.print_exc(file=sys.stdout)
114
115         self.stop()
116
117     def stop(self):
118         pass
119
120     def do_dispatch(self, session, request):
121         """ dispatch request to the relevant processor """
122
123         method = request['method']
124         params = request.get('params', [])
125         suffix = method.split('.')[-1]
126
127         if session is not None:
128             if suffix == 'subscribe':
129                 session.subscribe_to_service(method, params)
130
131         prefix = request['method'].split('.')[0]
132         try:
133             p = self.processors[prefix]
134         except:
135             print_log("error: no processor for", prefix)
136             return
137
138         p.add_request(session, request)
139
140         if method in ['server.version']:
141             session.version = params[0]
142             try:
143                 session.protocol_version = float(params[1])
144             except:
145                 pass
146
147
148     def get_sessions(self):
149         with self.lock:
150             r = self.sessions[:]
151         return r
152
153     def add_session(self, session):
154         with self.lock:
155             self.sessions.append(session)
156
157     def collect_garbage(self):
158         # Deep copy entire sessions list and blank it
159         # This is done to minimize lock contention
160         with self.lock:
161             sessions = self.sessions[:]
162
163         active_sessions = []
164
165         now = time.time()
166         for session in sessions:
167             if (now - session.time) > 1000:
168                 session.stop()
169
170         bp = self.processors['blockchain']
171
172         for session in sessions:
173             if not session.stopped():
174                 # If session is still alive then re-add it back
175                 # to our internal register
176                 active_sessions.append(session)
177             else:
178                 session.stop_subscriptions(bp)
179
180         with self.lock:
181             self.sessions = active_sessions[:]
182
183
184
185 class Session:
186
187     def __init__(self):
188         self._stopped = False
189         self.lock = threading.Lock()
190         self.subscriptions = []
191         self.address = ''
192         self.name = ''
193         self.version = 'unknown'
194         self.protocol_version = 0.
195         self.time = time.time()
196         threading.Timer(2, self.info).start()
197
198
199     # Debugging method. Doesn't need to be threadsafe.
200     def info(self):
201         for sub in self.subscriptions:
202             #print sub
203             method = sub[0]
204             if method == 'blockchain.address.subscribe':
205                 addr = sub[1]
206                 break
207         else:
208             addr = None
209
210         if self.subscriptions:
211             print_log("%4s" % self.name,
212                       "%15s" % self.address,
213                       "%35s" % addr,
214                       "%3d" % len(self.subscriptions),
215                       self.version)
216
217     def stopped(self):
218         with self.lock:
219             return self._stopped
220
221
222     def subscribe_to_service(self, method, params):
223         with self.lock:
224             if (method, params) not in self.subscriptions:
225                 self.subscriptions.append((method,params))
226
227
228     def stop_subscriptions(self, bp):
229         with self.lock:
230             s = self.subscriptions[:]
231
232         for method, params in s:
233             with bp.watch_lock:
234                 if method == 'blockchain.numblocks.subscribe':
235                     if self in bp.watch_blocks:
236                         bp.watch_blocks.remove(self)
237                 elif method == 'blockchain.headers.subscribe':
238                     if self in bp.watch_headers:
239                         bp.watch_headers.remove(self)
240                 elif method == "blockchain.address.subscribe":
241                     addr = params[0]
242                     l = bp.watched_addresses.get(addr)
243                     if not l:
244                         continue
245                     if self in l:
246                         l.remove(self)
247                     if l == []:
248                         bp.watched_addresses.pop(addr)
249
250         with self.lock:
251             self.subscriptions = []
252
253
254 class ResponseDispatcher(threading.Thread):
255
256     def __init__(self, shared, request_dispatcher):
257         self.shared = shared
258         self.request_dispatcher = request_dispatcher
259         threading.Thread.__init__(self)
260         self.daemon = True
261
262     def run(self):
263         while not self.shared.stopped():
264             session, response = self.request_dispatcher.pop_response()
265             session.send_response(response)