"methods" and "params" shouldnt be in response json.
[electrum-server.git] / processor.py
1 import json
2 import socket
3 import threading
4 import time
5 import traceback, sys
6 import Queue as queue
7
8 def random_string(N):
9     import random, string
10     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
11
12 def timestr():
13     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
14
15
16 class Shared:
17
18     def __init__(self):
19         self.lock = threading.Lock()
20         self._stopped = False
21
22     def stop(self):
23         print "Stopping Stratum"
24         with self.lock:
25             self._stopped = True
26
27     def stopped(self):
28         with self.lock:
29             return self._stopped
30
31
32 class Processor(threading.Thread):
33
34     def __init__(self):
35         threading.Thread.__init__(self)
36         self.daemon = True
37         self.dispatcher = None
38
39     def process(self, request):
40         pass
41
42     def push_response(self, response):
43         self.dispatcher.request_dispatcher.push_response(response)
44
45
46
47 class Dispatcher:
48
49     def __init__(self):
50         self.shared = Shared()
51         self.request_dispatcher = RequestDispatcher(self.shared)
52         self.request_dispatcher.start()
53         self.response_dispatcher = ResponseDispatcher(self.shared, self.request_dispatcher)
54         self.response_dispatcher.start()
55
56     def register(self, prefix, processor):
57         processor.dispatcher = self
58         processor.shared = self.shared
59         processor.start()
60         self.request_dispatcher.processors[prefix] = processor
61
62
63
64 class RequestDispatcher(threading.Thread):
65
66     def __init__(self, shared):
67         self.shared = shared
68         threading.Thread.__init__(self)
69         self.daemon = True
70         self.request_queue = queue.Queue()
71         self.response_queue = queue.Queue()
72         self.internal_ids = {}
73         self.internal_id = 1
74         self.lock = threading.Lock()
75         self.sessions = []
76         self.processors = {}
77
78     def push_response(self, item):
79         self.response_queue.put(item)
80
81     def pop_response(self):
82         return self.response_queue.get()
83
84     def push_request(self, session, item):
85         self.request_queue.put((session,item))
86
87     def pop_request(self):
88         return self.request_queue.get()
89
90     def get_session_id(self, internal_id):
91         with self.lock:
92             return self.internal_ids.pop(internal_id)
93
94     def store_session_id(self, session, msgid):
95         with self.lock:
96             self.internal_ids[self.internal_id] = session, msgid
97             r = self.internal_id
98             self.internal_id += 1
99             return r
100
101     def run(self):
102         if self.shared is None:
103             raise TypeError("self.shared not set in Processor")
104         while not self.shared.stopped():
105             session, request = self.pop_request()
106             self.process(session, request)
107
108         self.stop()
109
110     def stop(self):
111         pass
112
113     def process(self, session, request):
114         method = request['method']
115         params = request.get('params',[])
116
117         suffix = method.split('.')[-1]
118         if suffix == 'subscribe':
119             session.subscribe_to_service(method, params)
120
121         # store session and id locally
122         request['id'] = self.store_session_id(session, request['id'])
123
124         # dispatch request to the relevant module..
125         prefix = request['method'].split('.')[0]
126         try:
127             p = self.processors[prefix]
128         except:
129             print "error: no processor for", prefix
130             return
131         try:
132             p.process(request)
133         except:
134             traceback.print_exc(file=sys.stdout)
135
136         if method in ['server.version']:
137             session.version = params[0]
138
139
140     def add_session(self, session):
141         with self.lock:
142             self.sessions.append(session)
143
144     def collect_garbage(self):
145         # Deep copy entire sessions list and blank it
146         # This is done to minimise lock contention
147         with self.lock:
148             sessions = self.sessions[:]
149             self.sessions = []
150         for session in sessions:
151             if not session.stopped():
152                 # If session is still alive then re-add it back
153                 # to our internal register
154                 self.add_session(session)
155
156
157 class Session:
158
159     def __init__(self):
160         self._stopped = False
161         self.lock = threading.Lock()
162         self.subscriptions = []
163         self.address = ''
164         self.name = ''
165         threading.Timer(2, self.info).start()
166
167     def info(self):
168         for s in self.subscriptions:
169             m, p = s
170             if m == 'blockchain.address.subscribe':
171                 addr = p[0]
172                 break
173         else:
174             addr = None
175         print timestr(), self.name, self.address, addr, len(self.subscriptions), self.version
176
177     def stopped(self):
178         with self.lock:
179             return self._stopped
180
181     def subscribe_to_service(self, method, params):
182         with self.lock:
183             self.subscriptions.append((method, params))
184     
185
186 class ResponseDispatcher(threading.Thread):
187
188     def __init__(self, shared, processor):
189         self.shared = shared
190         self.processor = processor
191         threading.Thread.__init__(self)
192         self.daemon = True
193
194     def run(self):
195         while not self.shared.stopped():
196             response = self.processor.pop_response()
197             #print "pop response", response
198             internal_id = response.get('id')
199             params = response.get('params', [])
200
201             # This is wrong. "params" and "method" should never
202             # be in a response.
203             if internal_id is None:
204                 method = response.get('method')
205                 if method is None:
206                     print "no method", response
207                     continue
208                 for session in self.processor.sessions:
209                     if not session.stopped():
210                         if (method,params) in session.subscriptions:
211                             session.send_response(response)
212                 continue
213
214             session, message_id = self.processor.get_session_id(internal_id)
215             response['id'] = message_id
216             session.send_response(response)
217