use shared queues
[electrum-server.git] / stratum.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 class Processor(threading.Thread):
8
9     def __init__(self):
10         self.shared = None
11         self.lock = threading.Lock()
12         self.sessions = []
13         threading.Thread.__init__(self)
14         self.daemon = True
15         self.request_queue = queue.Queue()
16         self.response_queue = queue.Queue()
17
18     def add_session(self, session):
19         with self.lock:
20             self.sessions.append(session)
21
22     def push_response(self, session, item):
23         self.response_queue.put((session,item))
24
25     def pop_response(self):
26         return self.response_queue.get()
27
28     def push_request(self, session, item):
29         self.request_queue.put((session,item))
30
31     def pop_request(self):
32         return self.request_queue.get()
33
34     def run(self):
35         if self.shared is None:
36             raise TypeError("self.shared not set in Processor")
37         while not self.shared.stopped():
38             # Deep copy entire sessions list and blank it
39             # This is done to minimise lock contention
40             with self.lock:
41                 sessions = self.sessions[:]
42                 self.sessions = []
43             for session in sessions:
44                 if not session.stopped():
45                     # If session is still alive then re-add it back
46                     # to our internal register
47                     self.add_session(session)
48
49             session, request = self.pop_request()
50             self.process(session, request)
51
52         self.stop()
53
54     def stop(self):
55         pass
56
57     def process(self, session, request):
58         print "New request", request
59         # Do stuff...
60         # response = request
61         # When ready, you call
62         # self.push_response(session,response)
63
64 class Session:
65
66     def __init__(self, connection, address):
67         self._connection = connection
68         self.address = address
69         self._stopped = False
70         self.lock = threading.Lock()
71         self.numblocks_sub = None
72         self.addresses_sub = {}
73
74     def stop(self):
75         self._connection.close()
76         print "Terminating connection:", self.address[0]
77         with self.lock:
78             self._stopped = True
79
80     def stopped(self):
81         with self.lock:
82             return self._stopped
83
84     def connection(self):
85         if self.stopped():
86             raise Exception("Session was stopped")
87         else:
88             return self._connection
89
90     def subscribe_to_numblocks(self,message_id):
91         with self.lock:
92             self.numblocks_sub = message_id
93     
94     def subscribe_to_address(self,address,message_id,status):
95         with self.lock:
96             self.addresses_sub[address] = message_id,status
97
98
99 class TcpResponder(threading.Thread):
100
101     def __init__(self, shared, processor):
102         self.shared = shared
103         self.processor = processor
104         threading.Thread.__init__(self)
105
106     def run(self):
107         while not self.shared.stopped():
108             session,response = self.processor.pop_response()
109             raw_response = json.dumps(response)
110             # Possible race condition here by having session
111             # close connection?
112             # I assume Python connections are thread safe interfaces
113             connection = session.connection()
114             try:
115                 connection.send(raw_response + "\n")
116             except:
117                 session.stop()
118
119 class TcpClientRequestor(threading.Thread):
120
121     def __init__(self, shared, processor, session):
122         self.shared = shared
123         self.processor = processor
124         self.message = ""
125         self.session = session
126         threading.Thread.__init__(self)
127
128     def run(self):
129         while not self.shared.stopped():
130             if not self.update():
131                 self.session.stop()
132                 return
133
134     def update(self):
135         data = self.receive()
136         if data is None:
137             # close_session
138             self.stop()
139             return False
140
141         self.message += data
142         if not self.parse():
143             return False
144         return True
145
146     def receive(self):
147         try:
148             return self.session.connection().recv(1024)
149         except socket.error:
150             return None
151
152     def parse(self):
153         raw_buffer = self.message.find('\n')
154         if raw_buffer == -1:
155             return True
156
157         raw_command = self.message[0:raw_buffer].strip()
158         self.message = self.message[raw_buffer + 1:]
159         if raw_command == 'quit': 
160             return False
161
162         try:
163             command = json.loads(raw_command)
164         except:
165             self.processor.push_response(self.session,
166                 {"error": "bad JSON", "request": raw_command})
167             return True
168
169         try:
170             # Try to load vital fields, and return an error if
171             # unsuccessful.
172             message_id = command['id']
173             method = command['method']
174         except KeyError:
175             # Return an error JSON in response.
176             self.processor.push_response(self.session,
177                 {"error": "syntax error", "request": raw_command})
178         else:
179             self.processor.push_request(self.session,command)
180
181         return True
182
183 class TcpServer(threading.Thread):
184
185     def __init__(self, shared, processor, host, port):
186         self.shared = shared
187         self.processor = processor
188         self.clients = []
189         threading.Thread.__init__(self)
190         self.daemon = True
191         self.host = host
192         self.port = port
193
194     def run(self):
195         print "TCP server started."
196         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
197         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
198         sock.bind((self.host, self.port))
199         sock.listen(1)
200         responder = TcpResponder(self.shared, self.processor)
201         responder.start()
202         while not self.shared.stopped():
203             session = Session(*sock.accept())
204             client_req = TcpClientRequestor(self.shared, self.processor, session)
205             client_req.start()
206             self.processor.add_session(session)
207
208 class Shared:
209
210     def __init__(self):
211         self.lock = threading.Lock()
212         self._stopped = False
213
214     def stop(self):
215         print "Stopping Stratum"
216         with self.lock:
217             self._stopped = True
218
219     def stopped(self):
220         with self.lock:
221             return self._stopped
222
223 class Stratum:
224
225     def start(self, processor):
226         shared = Shared()
227         # Bind shared to processor since constructor is user defined
228         processor.shared = shared
229         processor.start()
230         # Create various transports we need
231         transports = TcpServer(shared, processor, "ecdsa.org", 50002),
232         for server in transports:
233             server.start()
234         while not shared.stopped():
235             if raw_input() == "quit":
236                 shared.stop()
237             time.sleep(1)
238
239 if __name__ == "__main__":
240     processor = Processor()
241     app = Stratum()
242     app.start(processor)
243