a18594d890424f29a6e2640b1b7192f7e6d2476b
[electrum-server.git] / processor.py
1 import json
2 import socket
3 import threading
4 import time
5 import traceback, sys
6 import Queue as queue
7
8 class Shared:
9
10     def __init__(self):
11         self.lock = threading.Lock()
12         self._stopped = False
13
14     def stop(self):
15         print "Stopping Stratum"
16         with self.lock:
17             self._stopped = True
18
19     def stopped(self):
20         with self.lock:
21             return self._stopped
22
23
24 class Processor(threading.Thread):
25
26     def __init__(self):
27         self.shared = None
28         threading.Thread.__init__(self)
29         self.daemon = True
30         self.request_queue = queue.Queue()
31         self.response_queue = queue.Queue()
32         self.internal_ids = {}
33         self.internal_id = 1
34         self.lock = threading.Lock()
35         self.sessions = []
36         self.processors = {}
37
38     def push_response(self, item):
39         self.response_queue.put(item)
40
41     def pop_response(self):
42         return self.response_queue.get()
43
44     def push_request(self, session, item):
45         self.request_queue.put((session,item))
46
47     def pop_request(self):
48         return self.request_queue.get()
49
50     def get_session_id(self, internal_id):
51         with self.lock:
52             return self.internal_ids.pop(internal_id)
53
54     def store_session_id(self, session, msgid):
55         with self.lock:
56             self.internal_ids[self.internal_id] = session, msgid
57             r = self.internal_id
58             self.internal_id += 1
59             return r
60
61     def register(self, prefix, function):
62         self.processors[prefix] = function
63
64     def run(self):
65         if self.shared is None:
66             raise TypeError("self.shared not set in Processor")
67         while not self.shared.stopped():
68             session, request = self.pop_request()
69             self.process(session, request)
70
71         self.stop()
72
73     def stop(self):
74         pass
75
76     def process(self, session, request):
77         method = request['method']
78         params = request.get('params',[])
79
80         suffix = method.split('.')[-1]
81         if suffix == 'subscribe':
82             session.subscribe_to_service(method, params)
83
84         # store session and id locally
85         request['id'] = self.store_session_id(session, request['id'])
86
87         # dispatch request to the relevant module..
88         prefix = request['method'].split('.')[0]
89         try:
90             func = self.processors[prefix]
91         except:
92             print "error: no processor for", prefix
93             return
94         try:
95             func(request,self.response_queue)
96         except:
97             traceback.print_exc(file=sys.stdout)
98
99
100     def add_session(self, session):
101         with self.lock:
102             self.sessions.append(session)
103
104     def collect_garbage(self):
105         # Deep copy entire sessions list and blank it
106         # This is done to minimise lock contention
107         with self.lock:
108             sessions = self.sessions[:]
109             self.sessions = []
110         for session in sessions:
111             if not session.stopped():
112                 # If session is still alive then re-add it back
113                 # to our internal register
114                 self.add_session(session)
115
116
117 class Session:
118
119     def __init__(self):
120         self._stopped = False
121         self.lock = threading.Lock()
122         self.subscriptions = []
123
124     def stopped(self):
125         with self.lock:
126             return self._stopped
127
128     def subscribe_to_service(self, method, params):
129         with self.lock:
130             self.subscriptions.append((method, params))
131     
132
133 class Dispatcher(threading.Thread):
134
135     def __init__(self, shared, processor):
136         self.shared = shared
137         self.processor = processor
138         threading.Thread.__init__(self)
139         self.daemon = True
140
141     def run(self):
142         while not self.shared.stopped():
143             response = self.processor.pop_response()
144             #print "pop response", response
145             internal_id = response.get('id')
146             params = response.get('params',[])
147             try:
148                 method = response['method']
149             except:
150                 print "no method", response
151                 continue
152
153             if internal_id:
154                 session, message_id = self.processor.get_session_id(internal_id)
155                 response['id'] = message_id
156                 session.send_response(response)
157
158             else:
159                 for session in self.processor.sessions:
160                     if not session.stopped():
161                         if (method,params) in session.subscriptions:
162                             session.send_response(response)
163