Abe module stub
[electrum-server.git] / stratum.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 class Processor(threading.Thread):
8
9     def __init__(self):
10         self.shared = None
11         self.lock = threading.Lock()
12         self.sessions = []
13         threading.Thread.__init__(self)
14         self.daemon = True
15
16     def add_session(self, session):
17         with self.lock:
18             self.sessions.append(session)
19
20     def run(self):
21         if self.shared is None:
22             raise TypeError("self.shared not set in Processor")
23         while not self.shared.stopped():
24             # Deep copy entire sessions list and blank it
25             # This is done to minimise lock contention
26             with self.lock:
27                 sessions = self.sessions[:]
28                 self.sessions = []
29             for session in sessions:
30                 if not session.stopped():
31                     # If session is still alive then re-add it back
32                     # to our internal register
33                     self.add_session(session)
34                     self.process(session)
35         self.stop()
36
37     def stop(self):
38         pass
39
40     def process(self, session):
41         request = session.pop_request()
42         print "New request", request
43         # Execute and when ready, you call
44         # session.push_response(response)
45
46 class Session:
47
48     def __init__(self, connection, address):
49         self._connection = connection
50         self.address = address
51         self._stopped = False
52         self.lock = threading.Lock()
53
54         self.request_queue = queue.Queue()
55         self.response_queue = queue.Queue()
56
57     def stop(self):
58         self._connection.close()
59         print "Terminating connection:", self.address[0]
60         with self.lock:
61             self._stopped = True
62
63     def stopped(self):
64         with self.lock:
65             return self._stopped
66
67     def connection(self):
68         if self.stopped():
69             raise Exception("Session was stopped")
70         else:
71             return self._connection
72
73     def push_request(self, item):
74         self.request_queue.put(item)
75
76     def pop_request(self):
77         return self.request_queue.get()
78
79     def push_response(self, item):
80         self.response_queue.put(item)
81
82     def pop_response(self):
83         return self.response_queue.get()
84
85 class TcpClientResponder(threading.Thread):
86
87     def __init__(self, shared, session):
88         self.shared = shared
89         self.session = session
90         threading.Thread.__init__(self)
91
92     def run(self):
93         while not self.shared.stopped() or self.session.stopped():
94             response = self.session.pop_response()
95             raw_response = json.dumps(response)
96             # Possible race condition here by having session
97             # close connection?
98             # I assume Python connections are thread safe interfaces
99             connection = self.session.connection()
100             try:
101                 connection.send(raw_response + "\n")
102             except:
103                 self.session.stop()
104
105 class TcpClientRequestor(threading.Thread):
106
107     def __init__(self, shared, session):
108         self.shared = shared
109         self.message = ""
110         self.session = session
111         threading.Thread.__init__(self)
112
113     def run(self):
114         while not self.shared.stopped():
115             if not self.update():
116                 self.session.stop()
117                 return
118
119     def update(self):
120         data = self.receive()
121         if data is None:
122             # close_session
123             self.stop()
124             return False
125
126         self.message += data
127         if not self.parse():
128             return False
129         return True
130
131     def receive(self):
132         try:
133             return self.session.connection().recv(1024)
134         except socket.error:
135             return None
136
137     def parse(self):
138         raw_buffer = self.message.find('\n')
139         if raw_buffer == -1:
140             return True
141
142         raw_command = self.message[0:raw_buffer].strip()
143         self.message = self.message[raw_buffer + 1:]
144         if raw_command == 'quit': 
145             return False
146
147         try:
148             command = json.loads(raw_command)
149         except:
150             self.session.push_response(
151                 {"error": "bad JSON", "request": raw_command})
152             return True
153
154         try:
155             # Try to load vital fields, and return an error if
156             # unsuccessful.
157             message_id = command['id']
158             method = command['method']
159         except KeyError:
160             # Return an error JSON in response.
161             self.session.push_response(
162                 {"error": "syntax error", "request": raw_command})
163         else:
164             self.session.push_request(command)
165
166         return True
167
168 class TcpServer(threading.Thread):
169
170     def __init__(self, shared, processor):
171         self.shared = shared
172         self.processor = processor
173         self.clients = []
174         threading.Thread.__init__(self)
175         self.daemon = True
176
177     def run(self):
178         print "TCP server started."
179         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
180         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
181         sock.bind(("176.31.24.241", 50001))
182         sock.listen(1)
183         while not self.shared.stopped():
184             session = Session(*sock.accept())
185             client_req = TcpClientRequestor(self.shared, session)
186             client_req.start()
187             client_res = TcpClientResponder(self.shared, session)
188             client_res.start()
189             self.processor.add_session(session)
190
191 class Shared:
192
193     def __init__(self):
194         self.lock = threading.Lock()
195         self._stopped = False
196
197     def stop(self):
198         print "Stopping Stratum"
199         with self.lock:
200             self._stopped = True
201
202     def stopped(self):
203         with self.lock:
204             return self._stopped
205
206 class Stratum:
207
208     def start(self, processor):
209         shared = Shared()
210         # Bind shared to processor since constructor is user defined
211         processor.shared = shared
212         processor.start()
213         # Create various transports we need
214         transports = TcpServer(shared, processor),
215         for server in transports:
216             server.start()
217         while not shared.stopped():
218             if raw_input() == "quit":
219                 shared.stop()
220             time.sleep(1)
221
222 if __name__ == "__main__":
223     processor = Processor()
224     app = Stratum()
225     app.start(processor)
226