Front-end cannibalised from old server code.
[electrum-server.git] / stratum.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 class Processor(threading.Thread):
8
9     def __init__(self):
10         self.shared = None
11         self.lock = threading.Lock()
12         self.sessions = []
13         threading.Thread.__init__(self)
14         self.daemon = True
15
16     def add_session(self, session):
17         with self.lock:
18             self.sessions.append(session)
19
20     def run(self):
21         if self.shared is None:
22             raise TypeError("self.shared not set in Processor")
23         while not self.shared.stopped():
24             # Deep copy entire sessions list and blank it
25             # This is done to minimise lock contention
26             with self.lock:
27                 sessions = self.sessions[:]
28                 self.sessions = []
29             for session in sessions:
30                 if not session.stopped():
31                     # If session is still alive then re-add it back
32                     # to our internal register
33                     self.add_session(session)
34                     self.process(session)
35
36     def process(self, session):
37         request = session.pop_request()
38         print "New request", request
39         # Execute and when ready, you call
40         # session.push_response(response)
41
42 class Session:
43
44     def __init__(self, connection, address):
45         self._connection = connection
46         self.address = address
47         self._stopped = False
48         self.lock = threading.Lock()
49
50         self.request_queue = queue.Queue()
51         self.response_queue = queue.Queue()
52
53     def stop(self):
54         self._connection.close()
55         print "Terminating connection:", self.address[0]
56         with self.lock:
57             self._stopped = True
58
59     def stopped(self):
60         with self.lock:
61             return self._stopped
62
63     def connection(self):
64         if self.stopped():
65             raise Exception("Session was stopped")
66         else:
67             return self._connection
68
69     def push_request(self, item):
70         self.request_queue.put(item)
71
72     def pop_request(self):
73         return self.request_queue.get()
74
75     def push_response(self):
76         self.response_queue.put(item)
77
78     def pop_response(self):
79         return self.response_queue.get()
80
81 class TcpClientResponder(threading.Thread):
82
83     def __init__(self, shared, session):
84         self.shared = shared
85         self.session = session
86         threading.Thread.__init__(self)
87
88     def run(self):
89         while not self.shared.stopped() or self.session.stopped():
90             response = self.session.pop_response()
91             # Possible race condition here by having session
92             # close connection?
93             # I assume Python connections are thread safe interfaces
94             connection = self.session.connection()
95             try:
96                 connection.send(response + "\n")
97             except:
98                 self.session.stop()
99
100 class TcpClientRequestor(threading.Thread):
101
102     def __init__(self, shared, session):
103         self.shared = shared
104         self.message = ""
105         self.session = session
106         threading.Thread.__init__(self)
107
108     def run(self):
109         while not self.shared.stopped():
110             if not self.update():
111                 self.session.stop()
112                 return
113
114     def update(self):
115         data = self.receive()
116         if data is None:
117             # close_session
118             self.stop()
119             return False
120
121         self.message += data
122         if not self.parse():
123             return False
124         return True
125
126     def receive(self):
127         try:
128             return self.session.connection().recv(1024)
129         except socket.error:
130             return None
131
132     def parse(self):
133         while True:
134             raw_buffer = self.message.find('\n')
135             if raw_buffer == -1:
136                 return True
137
138             command = self.message[0:raw_buffer].strip()
139             self.message = self.message[raw_buffer + 1:]
140             if command == 'quit': 
141                 return False
142
143             try:
144                 command = json.loads(command)
145             except:
146                 print "json error", repr(command)
147                 continue
148
149             try:
150                 message_id = command.get('id')
151                 method = command.get('method')
152                 params = command.get('params')
153             except:
154                 print "syntax error", repr(command), self.session.address[0]
155                 continue
156
157             self.session.push_request((message_id, method, params))
158             print message_id, method, params
159
160 class TcpServer(threading.Thread):
161
162     def __init__(self, shared, processor):
163         self.shared = shared
164         self.processor = processor
165         self.clients = []
166         threading.Thread.__init__(self)
167         self.daemon = True
168
169     def run(self):
170         print "TCP server started."
171         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
172         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
173         sock.bind(("localhost", 50001))
174         sock.listen(1)
175         while not self.shared.stopped():
176             session = Session(*sock.accept())
177             client_req = TcpClientRequestor(self.shared, session)
178             client_req.start()
179             client_res = TcpClientResponder(self.shared, session)
180             client_res.start()
181             self.processor.add_session(session)
182
183 class Shared:
184
185     def __init__(self):
186         self.lock = threading.Lock()
187         self._stopped = False
188
189     def stop(self):
190         print "Stopping Stratum"
191         with self.lock:
192             self._stopped = True
193
194     def stopped(self):
195         with self.lock:
196             return self._stopped
197
198 class Stratum:
199
200     def start(self, processor):
201         shared = Shared()
202         # Bind shared to processor since constructor is user defined
203         processor.shared = shared
204         processor.start()
205         # Create various transports we need
206         transports = TcpServer(shared, processor),
207         for server in transports:
208             server.start()
209         while not shared.stopped():
210             if raw_input() == "quit":
211                 shared.stop()
212             time.sleep(1)
213
214 if __name__ == "__main__":
215     processor = Processor()
216     app = Stratum()
217     app.start(processor)
218