Untested: response_queue (formerly output_queue) now expects a JSON object which...
[electrum-server.git] / stratum.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 class Processor(threading.Thread):
8
9     def __init__(self):
10         self.shared = None
11         self.lock = threading.Lock()
12         self.sessions = []
13         threading.Thread.__init__(self)
14         self.daemon = True
15
16     def add_session(self, session):
17         with self.lock:
18             self.sessions.append(session)
19
20     def run(self):
21         if self.shared is None:
22             raise TypeError("self.shared not set in Processor")
23         while not self.shared.stopped():
24             # Deep copy entire sessions list and blank it
25             # This is done to minimise lock contention
26             with self.lock:
27                 sessions = self.sessions[:]
28                 self.sessions = []
29             for session in sessions:
30                 if not session.stopped():
31                     # If session is still alive then re-add it back
32                     # to our internal register
33                     self.add_session(session)
34                     self.process(session)
35
36     def process(self, session):
37         request = session.pop_request()
38         print "New request", request
39         # Execute and when ready, you call
40         # session.push_response(response)
41
42 class Session:
43
44     def __init__(self, connection, address):
45         self._connection = connection
46         self.address = address
47         self._stopped = False
48         self.lock = threading.Lock()
49
50         self.request_queue = queue.Queue()
51         self.response_queue = queue.Queue()
52
53     def stop(self):
54         self._connection.close()
55         print "Terminating connection:", self.address[0]
56         with self.lock:
57             self._stopped = True
58
59     def stopped(self):
60         with self.lock:
61             return self._stopped
62
63     def connection(self):
64         if self.stopped():
65             raise Exception("Session was stopped")
66         else:
67             return self._connection
68
69     def push_request(self, item):
70         self.request_queue.put(item)
71
72     def pop_request(self):
73         return self.request_queue.get()
74
75     def push_response(self):
76         self.response_queue.put(item)
77
78     def pop_response(self):
79         return self.response_queue.get()
80
81 class TcpClientResponder(threading.Thread):
82
83     def __init__(self, shared, session):
84         self.shared = shared
85         self.session = session
86         threading.Thread.__init__(self)
87
88     def run(self):
89         while not self.shared.stopped() or self.session.stopped():
90             response = self.session.pop_response()
91             raw_response = json.dumps(response)
92             # Possible race condition here by having session
93             # close connection?
94             # I assume Python connections are thread safe interfaces
95             connection = self.session.connection()
96             try:
97                 connection.send(raw_response + "\n")
98             except:
99                 self.session.stop()
100
101 class TcpClientRequestor(threading.Thread):
102
103     def __init__(self, shared, session):
104         self.shared = shared
105         self.message = ""
106         self.session = session
107         threading.Thread.__init__(self)
108
109     def run(self):
110         while not self.shared.stopped():
111             if not self.update():
112                 self.session.stop()
113                 return
114
115     def update(self):
116         data = self.receive()
117         if data is None:
118             # close_session
119             self.stop()
120             return False
121
122         self.message += data
123         if not self.parse():
124             return False
125         return True
126
127     def receive(self):
128         try:
129             return self.session.connection().recv(1024)
130         except socket.error:
131             return None
132
133     def parse(self):
134         while True:
135             raw_buffer = self.message.find('\n')
136             if raw_buffer == -1:
137                 return True
138
139             command = self.message[0:raw_buffer].strip()
140             self.message = self.message[raw_buffer + 1:]
141             if command == 'quit': 
142                 return False
143
144             try:
145                 command = json.loads(command)
146             except:
147                 print "json error", repr(command)
148                 continue
149
150             try:
151                 message_id = command.get('id')
152                 method = command.get('method')
153                 params = command.get('params')
154             except:
155                 print "syntax error", repr(command), self.session.address[0]
156                 continue
157
158             self.session.push_request((message_id, method, params))
159             print message_id, method, params
160
161 class TcpServer(threading.Thread):
162
163     def __init__(self, shared, processor):
164         self.shared = shared
165         self.processor = processor
166         self.clients = []
167         threading.Thread.__init__(self)
168         self.daemon = True
169
170     def run(self):
171         print "TCP server started."
172         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
173         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
174         sock.bind(("localhost", 50001))
175         sock.listen(1)
176         while not self.shared.stopped():
177             session = Session(*sock.accept())
178             client_req = TcpClientRequestor(self.shared, session)
179             client_req.start()
180             client_res = TcpClientResponder(self.shared, session)
181             client_res.start()
182             self.processor.add_session(session)
183
184 class Shared:
185
186     def __init__(self):
187         self.lock = threading.Lock()
188         self._stopped = False
189
190     def stop(self):
191         print "Stopping Stratum"
192         with self.lock:
193             self._stopped = True
194
195     def stopped(self):
196         with self.lock:
197             return self._stopped
198
199 class Stratum:
200
201     def start(self, processor):
202         shared = Shared()
203         # Bind shared to processor since constructor is user defined
204         processor.shared = shared
205         processor.start()
206         # Create various transports we need
207         transports = TcpServer(shared, processor),
208         for server in transports:
209             server.start()
210         while not shared.stopped():
211             if raw_input() == "quit":
212                 shared.stop()
213             time.sleep(1)
214
215 if __name__ == "__main__":
216     processor = Processor()
217     app = Stratum()
218     app.start(processor)
219