generic processor; register backends
[electrum-server.git] / processor.py
1 import json
2 import socket
3 import threading
4 import time
5 import traceback, sys
6 import Queue as queue
7
8 class Shared:
9
10     def __init__(self):
11         self.lock = threading.Lock()
12         self._stopped = False
13
14     def stop(self):
15         print "Stopping Stratum"
16         with self.lock:
17             self._stopped = True
18
19     def stopped(self):
20         with self.lock:
21             return self._stopped
22
23
24 class Processor(threading.Thread):
25
26     def __init__(self):
27         self.shared = None
28         threading.Thread.__init__(self)
29         self.daemon = True
30         self.request_queue = queue.Queue()
31         self.response_queue = queue.Queue()
32         self.internal_ids = {}
33         self.internal_id = 1
34         self.lock = threading.Lock()
35         self.sessions = []
36         self.processors = {}
37
38     def push_response(self, item):
39         self.response_queue.put(item)
40
41     def pop_response(self):
42         return self.response_queue.get()
43
44     def push_request(self, session, item):
45         self.request_queue.put((session,item))
46
47     def pop_request(self):
48         return self.request_queue.get()
49
50     def get_session_id(self, internal_id):
51         with self.lock:
52             return self.internal_ids.pop(internal_id)
53
54     def store_session_id(self, session, msgid):
55         with self.lock:
56             self.internal_ids[self.internal_id] = session, msgid
57             r = self.internal_id
58             self.internal_id += 1
59             return r
60
61     def register(self, prefix, function):
62         self.processors[prefix] = function
63
64     def run(self):
65         if self.shared is None:
66             raise TypeError("self.shared not set in Processor")
67         while not self.shared.stopped():
68             session, request = self.pop_request()
69
70             method = request['method']
71             params = request.get('params',[])
72
73             suffix = method.split('.')[-1]
74             if suffix == 'subscribe':
75                 session.subscribe_to_service(method, params)
76
77             # store session and id locally
78             request['id'] = self.store_session_id(session, request['id'])
79
80             # dispatch request to the relevant module..
81             prefix = method.split('.')[0]
82             try:
83                 func = self.processors[prefix]
84             except:
85                 print "error: no processor for", prefix
86                 continue
87
88             try:
89                 func(request,self.response_queue)
90             except:
91                 traceback.print_exc(file=sys.stdout)
92                 continue
93
94         self.stop()
95
96     def stop(self):
97         pass
98
99     def process(self, request):
100         print "New request", request
101         # Do stuff...
102         # response = request
103         # When ready, you call
104         # self.push_response(response)
105
106     def add_session(self, session):
107         with self.lock:
108             self.sessions.append(session)
109
110     def collect_garbage(self):
111         # Deep copy entire sessions list and blank it
112         # This is done to minimise lock contention
113         with self.lock:
114             sessions = self.sessions[:]
115             self.sessions = []
116         for session in sessions:
117             if not session.stopped():
118                 # If session is still alive then re-add it back
119                 # to our internal register
120                 self.add_session(session)
121
122
123 class Session:
124
125     def __init__(self):
126         self._stopped = False
127         self.lock = threading.Lock()
128         self.subscriptions = []
129
130     def stopped(self):
131         with self.lock:
132             return self._stopped
133
134     def subscribe_to_service(self, method, params):
135         with self.lock:
136             self.subscriptions.append((method, params))
137     
138
139 class Dispatcher(threading.Thread):
140
141     def __init__(self, shared, processor):
142         self.shared = shared
143         self.processor = processor
144         threading.Thread.__init__(self)
145         self.daemon = True
146
147     def run(self):
148         while not self.shared.stopped():
149             response = self.processor.pop_response()
150             #print "pop response", response
151             internal_id = response.get('id')
152             params = response.get('params',[])
153             try:
154                 method = response['method']
155             except:
156                 print "no method", response
157                 continue
158
159             if internal_id:
160                 session, message_id = self.processor.get_session_id(internal_id)
161                 response['id'] = message_id
162                 session.send_response(response)
163
164             else:
165                 for session in self.processor.sessions:
166                     if not session.stopped():
167                         if (method,params) in session.subscriptions:
168                             session.send_response(response)
169