add a sleep() to sessions, and remove some dead code
[electrum-server.git] / processor.py
1 import json
2 import Queue as queue
3 import socket
4 import threading
5 import time
6 import traceback
7 import sys
8
9 from utils import random_string, timestr, print_log
10
11
12 class Shared:
13
14     def __init__(self, config):
15         self.lock = threading.Lock()
16         self._stopped = False
17         self.config = config
18
19     def stop(self):
20         print_log("Stopping Stratum")
21         with self.lock:
22             self._stopped = True
23
24     def stopped(self):
25         with self.lock:
26             return self._stopped
27
28
29 class Processor(threading.Thread):
30
31     def __init__(self):
32         threading.Thread.__init__(self)
33         self.daemon = True
34         self.dispatcher = None
35         self.queue = queue.Queue()
36
37     def process(self, session, request):
38         pass
39
40     def add_request(self, session, request):
41         self.queue.put((session, request))
42
43     def push_response(self, session, response):
44         #print "response", response
45         self.dispatcher.request_dispatcher.push_response(session, response)
46
47     def run(self):
48         while not self.shared.stopped():
49             request, session = self.queue.get(10000000000)
50             try:
51                 self.process(request, session)
52             except:
53                 traceback.print_exc(file=sys.stdout)
54
55         print_log("processor terminating")
56
57
58 class Dispatcher:
59
60     def __init__(self, config):
61         self.shared = Shared(config)
62         self.request_dispatcher = RequestDispatcher(self.shared)
63         self.request_dispatcher.start()
64         self.response_dispatcher = \
65             ResponseDispatcher(self.shared, self.request_dispatcher)
66         self.response_dispatcher.start()
67
68     def register(self, prefix, processor):
69         processor.dispatcher = self
70         processor.shared = self.shared
71         processor.start()
72         self.request_dispatcher.processors[prefix] = processor
73
74
75 class RequestDispatcher(threading.Thread):
76
77     def __init__(self, shared):
78         self.shared = shared
79         threading.Thread.__init__(self)
80         self.daemon = True
81         self.request_queue = queue.Queue()
82         self.response_queue = queue.Queue()
83         self.lock = threading.Lock()
84         self.idlock = threading.Lock()
85         self.sessions = []
86         self.processors = {}
87
88     def push_response(self, session, item):
89         self.response_queue.put((session, item))
90
91     def pop_response(self):
92         return self.response_queue.get()
93
94     def push_request(self, session, item):
95         self.request_queue.put((session, item))
96
97     def pop_request(self):
98         return self.request_queue.get()
99
100     def run(self):
101         if self.shared is None:
102             raise TypeError("self.shared not set in Processor")
103         while not self.shared.stopped():
104             session, request = self.pop_request()
105             try:
106                 self.do_dispatch(session, request)
107             except:
108                 traceback.print_exc(file=sys.stdout)
109
110         self.stop()
111
112     def stop(self):
113         pass
114
115     def do_dispatch(self, session, request):
116         """ dispatch request to the relevant processor """
117
118         method = request['method']
119         params = request.get('params', [])
120         suffix = method.split('.')[-1]
121
122         if session is not None:
123             if suffix == 'subscribe':
124                 session.subscribe_to_service(method, params)
125
126         prefix = request['method'].split('.')[0]
127         try:
128             p = self.processors[prefix]
129         except:
130             print_log("error: no processor for", prefix)
131             return
132
133         p.add_request(session, request)
134
135         if method in ['server.version']:
136             session.version = params[0]
137             try:
138                 session.protocol_version = float(params[1])
139             except:
140                 pass
141
142
143     def get_sessions(self):
144         with self.lock:
145             r = self.sessions[:]
146         return r
147
148     def add_session(self, session):
149         with self.lock:
150             self.sessions.append(session)
151
152     def collect_garbage(self):
153         # Deep copy entire sessions list and blank it
154         # This is done to minimize lock contention
155         with self.lock:
156             sessions = self.sessions[:]
157
158         active_sessions = []
159
160         now = time.time()
161         for session in sessions:
162             if (now - session.time) > 1000:
163                 session.stop()
164
165         bp = self.processors['blockchain']
166
167         for session in sessions:
168             if not session.stopped():
169                 # If session is still alive then re-add it back
170                 # to our internal register
171                 active_sessions.append(session)
172             else:
173                 session.stop_subscriptions(bp)
174
175         with self.lock:
176             self.sessions = active_sessions[:]
177
178
179
180 class Session:
181
182     def __init__(self):
183         self._stopped = False
184         self.lock = threading.Lock()
185         self.subscriptions = []
186         self.address = ''
187         self.name = ''
188         self.version = 'unknown'
189         self.protocol_version = 0.
190         self.time = time.time()
191         threading.Timer(2, self.info).start()
192
193
194     # Debugging method. Doesn't need to be threadsafe.
195     def info(self):
196         for sub in self.subscriptions:
197             #print sub
198             method = sub[0]
199             if method == 'blockchain.address.subscribe':
200                 addr = sub[1]
201                 break
202         else:
203             addr = None
204
205         if self.subscriptions:
206             print_log("%4s" % self.name,
207                       "%15s" % self.address,
208                       "%35s" % addr,
209                       "%3d" % len(self.subscriptions),
210                       self.version)
211
212     def stopped(self):
213         with self.lock:
214             return self._stopped
215
216
217     def subscribe_to_service(self, method, params):
218         with self.lock:
219             if (method, params) not in self.subscriptions:
220                 self.subscriptions.append((method,params))
221
222
223     def stop_subscriptions(self, bp):
224         with self.lock:
225             s = self.subscriptions[:]
226
227         for method, params in s:
228             with bp.watch_lock:
229                 if method == 'blockchain.numblocks.subscribe':
230                     if self in bp.watch_blocks:
231                         bp.watch_blocks.remove(self)
232                 elif method == 'blockchain.headers.subscribe':
233                     if self in bp.watch_headers:
234                         bp.watch_headers.remove(self)
235                 elif method == "blockchain.address.subscribe":
236                     addr = params[0]
237                     l = bp.watched_addresses.get(addr)
238                     if not l:
239                         continue
240                     if self in l:
241                         l.remove(self)
242                     if l == []:
243                         bp.watched_addresses.pop(addr)
244
245         with self.lock:
246             self.subscriptions = []
247
248
249 class ResponseDispatcher(threading.Thread):
250
251     def __init__(self, shared, request_dispatcher):
252         self.shared = shared
253         self.request_dispatcher = request_dispatcher
254         threading.Thread.__init__(self)
255         self.daemon = True
256
257     def run(self):
258         while not self.shared.stopped():
259             session, response = self.request_dispatcher.pop_response()
260             session.send_response(response)