adding host, port and two subscribe methods
[electrum-server.git] / stratum.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 class Processor(threading.Thread):
8
9     def __init__(self):
10         self.shared = None
11         self.lock = threading.Lock()
12         self.sessions = []
13         threading.Thread.__init__(self)
14         self.daemon = True
15
16     def add_session(self, session):
17         with self.lock:
18             self.sessions.append(session)
19
20     def run(self):
21         if self.shared is None:
22             raise TypeError("self.shared not set in Processor")
23         while not self.shared.stopped():
24             # Deep copy entire sessions list and blank it
25             # This is done to minimise lock contention
26             with self.lock:
27                 sessions = self.sessions[:]
28                 self.sessions = []
29             for session in sessions:
30                 if not session.stopped():
31                     # If session is still alive then re-add it back
32                     # to our internal register
33                     self.add_session(session)
34                     self.process(session)
35         self.stop()
36
37     def stop(self):
38         pass
39
40     def process(self, session):
41         request = session.pop_request()
42         print "New request", request
43         # Execute and when ready, you call
44         # session.push_response(response)
45
46 class Session:
47
48     def __init__(self, connection, address):
49         self._connection = connection
50         self.address = address
51         self._stopped = False
52         self.lock = threading.Lock()
53
54         self.request_queue = queue.Queue()
55         self.response_queue = queue.Queue()
56         self.numblocks_sub = None
57         self.addresses_sub = {}
58
59     def stop(self):
60         self._connection.close()
61         print "Terminating connection:", self.address[0]
62         with self.lock:
63             self._stopped = True
64
65     def stopped(self):
66         with self.lock:
67             return self._stopped
68
69     def connection(self):
70         if self.stopped():
71             raise Exception("Session was stopped")
72         else:
73             return self._connection
74
75     def push_request(self, item):
76         self.request_queue.put(item)
77
78     def pop_request(self):
79         return self.request_queue.get()
80
81     def push_response(self, item):
82         self.response_queue.put(item)
83
84     def pop_response(self):
85         return self.response_queue.get()
86
87     def subscribe_to_numblocks(self,message_id):
88         with self.lock:
89             self.numblocks_sub = message_id
90     
91     def subscribe_to_address(self,address,message_id,status):
92         with self.lock:
93             self.addresses_sub[address] = message_id,status
94
95
96 class TcpClientResponder(threading.Thread):
97
98     def __init__(self, shared, session):
99         self.shared = shared
100         self.session = session
101         threading.Thread.__init__(self)
102
103     def run(self):
104         while not self.shared.stopped() or self.session.stopped():
105             response = self.session.pop_response()
106             raw_response = json.dumps(response)
107             # Possible race condition here by having session
108             # close connection?
109             # I assume Python connections are thread safe interfaces
110             connection = self.session.connection()
111             try:
112                 connection.send(raw_response + "\n")
113             except:
114                 self.session.stop()
115
116 class TcpClientRequestor(threading.Thread):
117
118     def __init__(self, shared, session):
119         self.shared = shared
120         self.message = ""
121         self.session = session
122         threading.Thread.__init__(self)
123
124     def run(self):
125         while not self.shared.stopped():
126             if not self.update():
127                 self.session.stop()
128                 return
129
130     def update(self):
131         data = self.receive()
132         if data is None:
133             # close_session
134             self.stop()
135             return False
136
137         self.message += data
138         if not self.parse():
139             return False
140         return True
141
142     def receive(self):
143         try:
144             return self.session.connection().recv(1024)
145         except socket.error:
146             return None
147
148     def parse(self):
149         raw_buffer = self.message.find('\n')
150         if raw_buffer == -1:
151             return True
152
153         raw_command = self.message[0:raw_buffer].strip()
154         self.message = self.message[raw_buffer + 1:]
155         if raw_command == 'quit': 
156             return False
157
158         try:
159             command = json.loads(raw_command)
160         except:
161             self.session.push_response(
162                 {"error": "bad JSON", "request": raw_command})
163             return True
164
165         try:
166             # Try to load vital fields, and return an error if
167             # unsuccessful.
168             message_id = command['id']
169             method = command['method']
170         except KeyError:
171             # Return an error JSON in response.
172             self.session.push_response(
173                 {"error": "syntax error", "request": raw_command})
174         else:
175             self.session.push_request(command)
176
177         return True
178
179 class TcpServer(threading.Thread):
180
181     def __init__(self, shared, processor, host, port):
182         self.shared = shared
183         self.processor = processor
184         self.clients = []
185         threading.Thread.__init__(self)
186         self.daemon = True
187         self.host = host
188         self.port = port
189
190     def run(self):
191         print "TCP server started."
192         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
193         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
194         sock.bind((self.host, self.port))
195         sock.listen(1)
196         while not self.shared.stopped():
197             session = Session(*sock.accept())
198             client_req = TcpClientRequestor(self.shared, session)
199             client_req.start()
200             client_res = TcpClientResponder(self.shared, session)
201             client_res.start()
202             self.processor.add_session(session)
203
204 class Shared:
205
206     def __init__(self):
207         self.lock = threading.Lock()
208         self._stopped = False
209
210     def stop(self):
211         print "Stopping Stratum"
212         with self.lock:
213             self._stopped = True
214
215     def stopped(self):
216         with self.lock:
217             return self._stopped
218
219 class Stratum:
220
221     def start(self, processor):
222         shared = Shared()
223         # Bind shared to processor since constructor is user defined
224         processor.shared = shared
225         processor.start()
226         # Create various transports we need
227         transports = TcpServer(shared, processor, "176.31.24.241",50001),
228         for server in transports:
229             server.start()
230         while not shared.stopped():
231             if raw_input() == "quit":
232                 shared.stop()
233             time.sleep(1)
234
235 if __name__ == "__main__":
236     processor = Processor()
237     app = Stratum()
238     app.start(processor)
239