9d063f259907faff76c8b08da2dce7fc643c1a3d
[electrum-server.git] / stratum.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 class Processor(threading.Thread):
8
9     def __init__(self):
10         self.shared = None
11         self.lock = threading.Lock()
12         self.sessions = []
13         threading.Thread.__init__(self)
14         self.daemon = True
15         self.request_queue = queue.Queue()
16         self.response_queue = queue.Queue()
17
18     def add_session(self, session):
19         with self.lock:
20             self.sessions.append(session)
21
22     def push_response(self, session, item):
23         self.response_queue.put((session,item))
24
25     def pop_response(self):
26         return self.response_queue.get()
27
28     def push_request(self, session, item):
29         self.request_queue.put((session,item))
30
31     def pop_request(self):
32         return self.request_queue.get()
33
34     def run(self):
35         if self.shared is None:
36             raise TypeError("self.shared not set in Processor")
37         while not self.shared.stopped():
38             # Deep copy entire sessions list and blank it
39             # This is done to minimise lock contention
40             with self.lock:
41                 sessions = self.sessions[:]
42                 self.sessions = []
43             for session in sessions:
44                 if not session.stopped():
45                     # If session is still alive then re-add it back
46                     # to our internal register
47                     self.add_session(session)
48
49             session, request = self.pop_request()
50             self.process(session, request)
51
52         self.stop()
53
54     def stop(self):
55         pass
56
57     def process(self, session, request):
58         print "New request", request
59         # Do stuff...
60         # response = request
61         # When ready, you call
62         # self.push_response(session,response)
63
64 class Session:
65
66     def __init__(self, connection, address):
67         self._connection = connection
68         self.address = address
69         self._stopped = False
70         self.lock = threading.Lock()
71         self.numblocks_sub = None
72         self.addresses_sub = {}
73
74     def stop(self):
75         self._connection.close()
76         print "Terminating connection:", self.address[0]
77         with self.lock:
78             self._stopped = True
79
80     def stopped(self):
81         with self.lock:
82             return self._stopped
83
84     def connection(self):
85         if self.stopped():
86             raise Exception("Session was stopped")
87         else:
88             return self._connection
89
90     def subscribe_to_numblocks(self,message_id):
91         with self.lock:
92             self.numblocks_sub = message_id
93     
94     def subscribe_to_address(self,address,message_id,status):
95         with self.lock:
96             self.addresses_sub[address] = message_id,status
97
98
99 class TcpResponder(threading.Thread):
100
101     def __init__(self, shared, processor):
102         self.shared = shared
103         self.processor = processor
104         threading.Thread.__init__(self)
105
106     def run(self):
107         while not self.shared.stopped():
108             session,response = self.processor.pop_response()
109             raw_response = json.dumps(response)
110             # Possible race condition here by having session
111             # close connection?
112             # I assume Python connections are thread safe interfaces
113             connection = session.connection()
114             try:
115                 connection.send(raw_response + "\n")
116             except:
117                 session.stop()
118
119 class TcpClientRequestor(threading.Thread):
120
121     def __init__(self, shared, processor, session):
122         self.shared = shared
123         self.processor = processor
124         self.message = ""
125         self.session = session
126         threading.Thread.__init__(self)
127
128     def run(self):
129         while not self.shared.stopped():
130             if not self.update():
131                 self.session.stop()
132                 break
133
134             while self.parse():
135                 pass
136
137     def update(self):
138         data = self.receive()
139         if not data:
140             # close_session
141             self.session.stop()
142             return False
143
144         self.message += data
145         return True
146
147     def receive(self):
148         try:
149             return self.session.connection().recv(1024)
150         except socket.error:
151             return ''
152
153     def parse(self):
154         raw_buffer = self.message.find('\n')
155         if raw_buffer == -1:
156             return False
157
158         raw_command = self.message[0:raw_buffer].strip()
159         self.message = self.message[raw_buffer + 1:]
160         if raw_command == 'quit': 
161             self.session.stop()
162             return False
163
164         try:
165             command = json.loads(raw_command)
166         except:
167             self.processor.push_response(self.session,
168                 {"error": "bad JSON", "request": raw_command})
169             return True
170
171         try:
172             # Try to load vital fields, and return an error if
173             # unsuccessful.
174             message_id = command['id']
175             method = command['method']
176         except KeyError:
177             # Return an error JSON in response.
178             self.processor.push_response(self.session,
179                 {"error": "syntax error", "request": raw_command})
180         else:
181             self.processor.push_request(self.session,command)
182
183         return True
184
185 class TcpServer(threading.Thread):
186
187     def __init__(self, shared, processor, host, port):
188         self.shared = shared
189         self.processor = processor
190         self.clients = []
191         threading.Thread.__init__(self)
192         self.daemon = True
193         self.host = host
194         self.port = port
195
196     def run(self):
197         print "TCP server started."
198         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
199         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
200         sock.bind((self.host, self.port))
201         sock.listen(1)
202         responder = TcpResponder(self.shared, self.processor)
203         responder.start()
204         while not self.shared.stopped():
205             session = Session(*sock.accept())
206             client_req = TcpClientRequestor(self.shared, self.processor, session)
207             client_req.start()
208             self.processor.add_session(session)
209
210 class Shared:
211
212     def __init__(self):
213         self.lock = threading.Lock()
214         self._stopped = False
215
216     def stop(self):
217         print "Stopping Stratum"
218         with self.lock:
219             self._stopped = True
220
221     def stopped(self):
222         with self.lock:
223             return self._stopped
224
225 class Stratum:
226
227     def start(self, processor):
228         shared = Shared()
229         # Bind shared to processor since constructor is user defined
230         processor.shared = shared
231         processor.start()
232         # Create various transports we need
233         transports = TcpServer(shared, processor, "176.31.24.241", 50001),
234         for server in transports:
235             server.start()
236         while not shared.stopped():
237             if raw_input() == "quit":
238                 shared.stop()
239             time.sleep(1)
240
241 if __name__ == "__main__":
242     processor = Processor()
243     app = Stratum()
244     app.start(processor)
245