Removed redundant while loop - hooray for flatter code.
[electrum-server.git] / stratum.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 class Processor(threading.Thread):
8
9     def __init__(self):
10         self.shared = None
11         self.lock = threading.Lock()
12         self.sessions = []
13         threading.Thread.__init__(self)
14         self.daemon = True
15
16     def add_session(self, session):
17         with self.lock:
18             self.sessions.append(session)
19
20     def run(self):
21         if self.shared is None:
22             raise TypeError("self.shared not set in Processor")
23         while not self.shared.stopped():
24             # Deep copy entire sessions list and blank it
25             # This is done to minimise lock contention
26             with self.lock:
27                 sessions = self.sessions[:]
28                 self.sessions = []
29             for session in sessions:
30                 if not session.stopped():
31                     # If session is still alive then re-add it back
32                     # to our internal register
33                     self.add_session(session)
34                     self.process(session)
35
36     def process(self, session):
37         request = session.pop_request()
38         print "New request", request
39         # Execute and when ready, you call
40         # session.push_response(response)
41
42 class Session:
43
44     def __init__(self, connection, address):
45         self._connection = connection
46         self.address = address
47         self._stopped = False
48         self.lock = threading.Lock()
49
50         self.request_queue = queue.Queue()
51         self.response_queue = queue.Queue()
52
53     def stop(self):
54         self._connection.close()
55         print "Terminating connection:", self.address[0]
56         with self.lock:
57             self._stopped = True
58
59     def stopped(self):
60         with self.lock:
61             return self._stopped
62
63     def connection(self):
64         if self.stopped():
65             raise Exception("Session was stopped")
66         else:
67             return self._connection
68
69     def push_request(self, item):
70         self.request_queue.put(item)
71
72     def pop_request(self):
73         return self.request_queue.get()
74
75     def push_response(self, item):
76         self.response_queue.put(item)
77
78     def pop_response(self):
79         return self.response_queue.get()
80
81 class TcpClientResponder(threading.Thread):
82
83     def __init__(self, shared, session):
84         self.shared = shared
85         self.session = session
86         threading.Thread.__init__(self)
87
88     def run(self):
89         while not self.shared.stopped() or self.session.stopped():
90             response = self.session.pop_response()
91             raw_response = json.dumps(response)
92             # Possible race condition here by having session
93             # close connection?
94             # I assume Python connections are thread safe interfaces
95             connection = self.session.connection()
96             try:
97                 connection.send(raw_response + "\n")
98             except:
99                 self.session.stop()
100
101 class TcpClientRequestor(threading.Thread):
102
103     def __init__(self, shared, session):
104         self.shared = shared
105         self.message = ""
106         self.session = session
107         threading.Thread.__init__(self)
108
109     def run(self):
110         while not self.shared.stopped():
111             if not self.update():
112                 self.session.stop()
113                 return
114
115     def update(self):
116         data = self.receive()
117         if data is None:
118             # close_session
119             self.stop()
120             return False
121
122         self.message += data
123         if not self.parse():
124             return False
125         return True
126
127     def receive(self):
128         try:
129             return self.session.connection().recv(1024)
130         except socket.error:
131             return None
132
133     def parse(self):
134         raw_buffer = self.message.find('\n')
135         if raw_buffer == -1:
136             return True
137
138         raw_command = self.message[0:raw_buffer].strip()
139         self.message = self.message[raw_buffer + 1:]
140         if raw_command == 'quit': 
141             return False
142
143         try:
144             command = json.loads(raw_command)
145         except:
146             self.session.push_response(
147                 {"error": "bad JSON", "request": raw_command})
148             return True
149
150         try:
151             # Try to load vital fields, and return an error if
152             # unsuccessful.
153             message_id = command['id']
154             method = command['method']
155         except KeyError:
156             # Return an error JSON in response.
157             self.session.push_response(
158                 {"error": "syntax error", "request": raw_command})
159         else:
160             self.session.push_request(command)
161
162         return True
163
164 class TcpServer(threading.Thread):
165
166     def __init__(self, shared, processor):
167         self.shared = shared
168         self.processor = processor
169         self.clients = []
170         threading.Thread.__init__(self)
171         self.daemon = True
172
173     def run(self):
174         print "TCP server started."
175         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
176         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
177         sock.bind(("localhost", 50001))
178         sock.listen(1)
179         while not self.shared.stopped():
180             session = Session(*sock.accept())
181             client_req = TcpClientRequestor(self.shared, session)
182             client_req.start()
183             client_res = TcpClientResponder(self.shared, session)
184             client_res.start()
185             self.processor.add_session(session)
186
187 class Shared:
188
189     def __init__(self):
190         self.lock = threading.Lock()
191         self._stopped = False
192
193     def stop(self):
194         print "Stopping Stratum"
195         with self.lock:
196             self._stopped = True
197
198     def stopped(self):
199         with self.lock:
200             return self._stopped
201
202 class Stratum:
203
204     def start(self, processor):
205         shared = Shared()
206         # Bind shared to processor since constructor is user defined
207         processor.shared = shared
208         processor.start()
209         # Create various transports we need
210         transports = TcpServer(shared, processor),
211         for server in transports:
212             server.start()
213         while not shared.stopped():
214             if raw_input() == "quit":
215                 shared.stop()
216             time.sleep(1)
217
218 if __name__ == "__main__":
219     processor = Processor()
220     app = Stratum()
221     app.start(processor)
222