major reorganisation, http works now
[electrum-server.git] / processor.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 class Shared:
8
9     def __init__(self):
10         self.lock = threading.Lock()
11         self._stopped = False
12
13     def stop(self):
14         print "Stopping Stratum"
15         with self.lock:
16             self._stopped = True
17
18     def stopped(self):
19         with self.lock:
20             return self._stopped
21
22
23 class Processor(threading.Thread):
24
25     def __init__(self):
26         self.shared = None
27         threading.Thread.__init__(self)
28         self.daemon = True
29         self.request_queue = queue.Queue()
30         self.response_queue = queue.Queue()
31         self.internal_ids = {}
32         self.internal_id = 1
33         self.lock = threading.Lock()
34         self.sessions = []
35
36     def push_response(self, item):
37         self.response_queue.put(item)
38
39     def pop_response(self):
40         return self.response_queue.get()
41
42     def push_request(self, session, item):
43         self.request_queue.put((session,item))
44
45     def pop_request(self):
46         return self.request_queue.get()
47
48     def get_session_id(self, internal_id):
49         with self.lock:
50             return self.internal_ids.pop(internal_id)
51
52     def store_session_id(self, session, msgid):
53         with self.lock:
54             self.internal_ids[self.internal_id] = session, msgid
55             r = self.internal_id
56             self.internal_id += 1
57             return r
58
59     def run(self):
60         if self.shared is None:
61             raise TypeError("self.shared not set in Processor")
62         while not self.shared.stopped():
63             session, request = self.pop_request()
64
65             method = request['method']
66             params = request.get('params',[])
67
68             if method in [ 'numblocks.subscribe', 'address.subscribe', 'server.peers']:
69                 session.subscribe_to_service(method, params)
70
71             # store session and id locally
72             request['id'] = self.store_session_id(session, request['id'])
73             self.process(request)
74
75         self.stop()
76
77     def stop(self):
78         pass
79
80     def process(self, request):
81         print "New request", request
82         # Do stuff...
83         # response = request
84         # When ready, you call
85         # self.push_response(response)
86
87     def add_session(self, session):
88         with self.lock:
89             self.sessions.append(session)
90
91     def collect_garbage(self):
92         # Deep copy entire sessions list and blank it
93         # This is done to minimise lock contention
94         with self.lock:
95             sessions = self.sessions[:]
96             self.sessions = []
97         for session in sessions:
98             if not session.stopped():
99                 # If session is still alive then re-add it back
100                 # to our internal register
101                 self.add_session(session)
102
103
104 class Session:
105
106     def __init__(self):
107         self._stopped = False
108         self.lock = threading.Lock()
109         self.subscriptions = []
110
111     def stopped(self):
112         with self.lock:
113             return self._stopped
114
115     def subscribe_to_service(self, method, params):
116         with self.lock:
117             self.subscriptions.append((method, params))
118     
119
120 class Dispatcher(threading.Thread):
121
122     def __init__(self, shared, processor):
123         self.shared = shared
124         self.processor = processor
125         threading.Thread.__init__(self)
126
127     def run(self):
128         while not self.shared.stopped():
129             response = self.processor.pop_response()
130             #print "pop response", response
131             internal_id = response.get('id')
132             params = response.get('params',[])
133             try:
134                 method = response['method']
135             except:
136                 print "no method", response
137                 continue
138
139             if internal_id:
140                 session, message_id = self.processor.get_session_id(internal_id)
141                 response['id'] = message_id
142                 session.send_response(response)
143
144             else:
145                 for session in self.processor.sessions:
146                     if not session.stopped():
147                         if (method,params) in session.subscriptions:
148                             session.send_response(response)
149