create dispatcher class; redefine processors as threads
[electrum-server.git] / processor.py
1 import json
2 import socket
3 import threading
4 import time
5 import traceback, sys
6 import Queue as queue
7
8 class Shared:
9
10     def __init__(self):
11         self.lock = threading.Lock()
12         self._stopped = False
13
14     def stop(self):
15         print "Stopping Stratum"
16         with self.lock:
17             self._stopped = True
18
19     def stopped(self):
20         with self.lock:
21             return self._stopped
22
23
24 class Processor(threading.Thread):
25
26     def __init__(self):
27         threading.Thread.__init__(self)
28         self.daemon = True
29         self.dispatcher = None
30
31     def process(self, request):
32         pass
33
34     def push_response(self, response):
35         self.dispatcher.request_dispatcher.push_response(response)
36
37
38
39 class Dispatcher:
40
41     def __init__(self):
42         self.shared = Shared()
43         self.request_dispatcher = RequestDispatcher(self.shared)
44         self.request_dispatcher.start()
45         self.response_dispatcher = ResponseDispatcher(self.shared, self.request_dispatcher)
46         self.response_dispatcher.start()
47
48     def register(self, prefix, processor):
49         processor.dispatcher = self
50         processor.shared = self.shared
51         processor.start()
52         self.request_dispatcher.processors[prefix] = processor
53
54
55
56 class RequestDispatcher(threading.Thread):
57
58     def __init__(self, shared):
59         self.shared = shared
60         threading.Thread.__init__(self)
61         self.daemon = True
62         self.request_queue = queue.Queue()
63         self.response_queue = queue.Queue()
64         self.internal_ids = {}
65         self.internal_id = 1
66         self.lock = threading.Lock()
67         self.sessions = []
68         self.processors = {}
69
70     def push_response(self, item):
71         self.response_queue.put(item)
72
73     def pop_response(self):
74         return self.response_queue.get()
75
76     def push_request(self, session, item):
77         self.request_queue.put((session,item))
78
79     def pop_request(self):
80         return self.request_queue.get()
81
82     def get_session_id(self, internal_id):
83         with self.lock:
84             return self.internal_ids.pop(internal_id)
85
86     def store_session_id(self, session, msgid):
87         with self.lock:
88             self.internal_ids[self.internal_id] = session, msgid
89             r = self.internal_id
90             self.internal_id += 1
91             return r
92
93     def run(self):
94         if self.shared is None:
95             raise TypeError("self.shared not set in Processor")
96         while not self.shared.stopped():
97             session, request = self.pop_request()
98             self.process(session, request)
99
100         self.stop()
101
102     def stop(self):
103         pass
104
105     def process(self, session, request):
106         method = request['method']
107         params = request.get('params',[])
108
109         suffix = method.split('.')[-1]
110         if suffix == 'subscribe':
111             session.subscribe_to_service(method, params)
112
113         # store session and id locally
114         request['id'] = self.store_session_id(session, request['id'])
115
116         # dispatch request to the relevant module..
117         prefix = request['method'].split('.')[0]
118         try:
119             p = self.processors[prefix]
120         except:
121             print "error: no processor for", prefix
122             return
123         try:
124             p.process(request)
125         except:
126             traceback.print_exc(file=sys.stdout)
127
128
129     def add_session(self, session):
130         with self.lock:
131             self.sessions.append(session)
132
133     def collect_garbage(self):
134         # Deep copy entire sessions list and blank it
135         # This is done to minimise lock contention
136         with self.lock:
137             sessions = self.sessions[:]
138             self.sessions = []
139         for session in sessions:
140             if not session.stopped():
141                 # If session is still alive then re-add it back
142                 # to our internal register
143                 self.add_session(session)
144
145
146 class Session:
147
148     def __init__(self):
149         self._stopped = False
150         self.lock = threading.Lock()
151         self.subscriptions = []
152
153     def stopped(self):
154         with self.lock:
155             return self._stopped
156
157     def subscribe_to_service(self, method, params):
158         with self.lock:
159             self.subscriptions.append((method, params))
160     
161
162 class ResponseDispatcher(threading.Thread):
163
164     def __init__(self, shared, processor):
165         self.shared = shared
166         self.processor = processor
167         threading.Thread.__init__(self)
168         self.daemon = True
169
170     def run(self):
171         while not self.shared.stopped():
172             response = self.processor.pop_response()
173             #print "pop response", response
174             internal_id = response.get('id')
175             params = response.get('params',[])
176             try:
177                 method = response['method']
178             except:
179                 print "no method", response
180                 continue
181
182             if internal_id:
183                 session, message_id = self.processor.get_session_id(internal_id)
184                 response['id'] = message_id
185                 session.send_response(response)
186
187             else:
188                 for session in self.processor.sessions:
189                     if not session.stopped():
190                         if (method,params) in session.subscriptions:
191                             session.send_response(response)
192