deb58b940a1e99c6c67c0ef507d3b0ab0238bef7
[electrum-server.git] / stratum.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 class Processor(threading.Thread):
8
9     def __init__(self):
10         self.shared = None
11         self.lock = threading.Lock()
12         self.sessions = []
13         threading.Thread.__init__(self)
14         self.daemon = True
15         self.request_queue = queue.Queue()
16         self.response_queue = queue.Queue()
17
18     def add_session(self, session):
19         with self.lock:
20             self.sessions.append(session)
21
22     def push_response(self, session, item):
23         self.response_queue.put((session,item))
24
25     def pop_response(self):
26         return self.response_queue.get()
27
28     def push_request(self, session, item):
29         self.request_queue.put((session,item))
30
31     def pop_request(self):
32         return self.request_queue.get()
33
34     def collect_garbage(self):
35         # Deep copy entire sessions list and blank it
36         # This is done to minimise lock contention
37         with self.lock:
38             sessions = self.sessions[:]
39             self.sessions = []
40         for session in sessions:
41             if not session.stopped():
42                 # If session is still alive then re-add it back
43                 # to our internal register
44                 self.add_session(session)
45
46     def run(self):
47         if self.shared is None:
48             raise TypeError("self.shared not set in Processor")
49         while not self.shared.stopped():
50             self.collect_garbage()
51             session, request = self.pop_request()
52             self.process(session, request)
53
54         self.stop()
55
56     def stop(self):
57         pass
58
59     def process(self, session, request):
60         print "New request", request
61         # Do stuff...
62         # response = request
63         # When ready, you call
64         # self.push_response(session,response)
65
66     def update_from_blocknum(self,block_number):
67         for session in self.sessions:
68             if not session.stopped():
69                 if session.numblocks_sub is not None:
70                     response = { 'id':session.numblocks_sub, 'result':block_number }
71                     self.push_response(session,response)
72
73     def update_from_address(self,addr):
74         for session in self.sessions:
75             if not session.stopped():
76                 m = session.addresses_sub.get(addr)
77                 if m:
78                     status = self.get_status( addr )
79                     message_id, last_status = m
80                     if status != last_status:
81                         session.subscribe_to_address(addr,message_id, status)
82                         response = { 'id':message_id, 'result':status }
83                         self.push_response(session,response)
84
85     def get_status(self,addr):
86         # return status of an address
87         # return store.get_status(addr)
88         pass
89
90
91 class Session:
92
93     def __init__(self, connection, address):
94         self._connection = connection
95         self.address = address
96         self._stopped = False
97         self.lock = threading.Lock()
98         self.numblocks_sub = None
99         self.addresses_sub = {}
100         print "new session", address
101
102     def stop(self):
103         self._connection.close()
104         print "Terminating connection:", self.address[0]
105         with self.lock:
106             self._stopped = True
107
108     def stopped(self):
109         with self.lock:
110             return self._stopped
111
112     def connection(self):
113         if self.stopped():
114             raise Exception("Session was stopped")
115         else:
116             return self._connection
117
118     def subscribe_to_numblocks(self,message_id):
119         with self.lock:
120             self.numblocks_sub = message_id
121     
122     def subscribe_to_address(self,address,message_id,status):
123         with self.lock:
124             self.addresses_sub[address] = message_id,status
125
126
127 class TcpResponder(threading.Thread):
128
129     def __init__(self, shared, processor):
130         self.shared = shared
131         self.processor = processor
132         threading.Thread.__init__(self)
133
134     def run(self):
135         while not self.shared.stopped():
136             session,response = self.processor.pop_response()
137             raw_response = json.dumps(response)
138             # Possible race condition here by having session
139             # close connection?
140             # I assume Python connections are thread safe interfaces
141             try:
142                 connection = session.connection()
143                 connection.send(raw_response + "\n")
144             except:
145                 session.stop()
146
147 class TcpClientRequestor(threading.Thread):
148
149     def __init__(self, shared, processor, session):
150         self.shared = shared
151         self.processor = processor
152         self.message = ""
153         self.session = session
154         threading.Thread.__init__(self)
155
156     def run(self):
157         while not self.shared.stopped():
158             if not self.update():
159                 break
160
161             while self.parse():
162                 pass
163
164     def update(self):
165         data = self.receive()
166         if not data:
167             # close_session
168             self.session.stop()
169             return False
170
171         self.message += data
172         return True
173
174     def receive(self):
175         try:
176             return self.session.connection().recv(1024)
177         except socket.error:
178             return ''
179
180     def parse(self):
181         raw_buffer = self.message.find('\n')
182         if raw_buffer == -1:
183             return False
184
185         raw_command = self.message[0:raw_buffer].strip()
186         self.message = self.message[raw_buffer + 1:]
187         if raw_command == 'quit': 
188             self.session.stop()
189             return False
190
191         try:
192             command = json.loads(raw_command)
193         except:
194             self.processor.push_response(self.session,
195                 {"error": "bad JSON", "request": raw_command})
196             return True
197
198         try:
199             # Try to load vital fields, and return an error if
200             # unsuccessful.
201             message_id = command['id']
202             method = command['method']
203         except KeyError:
204             # Return an error JSON in response.
205             self.processor.push_response(self.session,
206                 {"error": "syntax error", "request": raw_command})
207         else:
208             self.processor.push_request(self.session,command)
209
210         return True
211
212 class TcpServer(threading.Thread):
213
214     def __init__(self, shared, processor, host, port):
215         self.shared = shared
216         self.processor = processor
217         self.clients = []
218         threading.Thread.__init__(self)
219         self.daemon = True
220         self.host = host
221         self.port = port
222
223     def run(self):
224         print "TCP server started."
225         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
226         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
227         sock.bind((self.host, self.port))
228         sock.listen(1)
229         responder = TcpResponder(self.shared, self.processor)
230         responder.start()
231         while not self.shared.stopped():
232             session = Session(*sock.accept())
233             client_req = TcpClientRequestor(self.shared, self.processor, session)
234             client_req.start()
235             self.processor.add_session(session)
236
237 class Shared:
238
239     def __init__(self):
240         self.lock = threading.Lock()
241         self._stopped = False
242
243     def stop(self):
244         print "Stopping Stratum"
245         with self.lock:
246             self._stopped = True
247
248     def stopped(self):
249         with self.lock:
250             return self._stopped
251
252 class Stratum:
253
254     def start(self, processor):
255         shared = Shared()
256         # Bind shared to processor since constructor is user defined
257         processor.shared = shared
258         processor.start()
259         # Create various transports we need
260         transports = TcpServer(shared, processor, "176.31.24.241", 50001),
261         for server in transports:
262             server.start()
263         while not shared.stopped():
264             if raw_input() == "quit":
265                 shared.stop()
266             time.sleep(1)
267
268 if __name__ == "__main__":
269     processor = Processor()
270     app = Stratum()
271     app.start(processor)
272