improved logging
[electrum-server.git] / transports / stratum_tcp.py
1 import json
2 import socket
3 import threading
4 import time
5 import Queue as queue
6
7 from processor import Session, Dispatcher, timestr
8
9 class TcpSession(Session):
10
11     def __init__(self, connection, address):
12         Session.__init__(self)
13         self._connection = connection
14         self.address = address[0]
15         self.version = 'unknown'
16         self.name = "TCP session"
17
18     def connection(self):
19         if self.stopped():
20             raise Exception("Session was stopped")
21         else:
22             return self._connection
23
24     def stop(self):
25         self._connection.close()
26         #print "Terminating connection:", self.address
27         with self.lock:
28             self._stopped = True
29
30     def send_response(self, response):
31         raw_response = json.dumps(response)
32         # Possible race condition here by having session
33         # close connection?
34         # I assume Python connections are thread safe interfaces
35         try:
36             connection = self.connection()
37             connection.send(raw_response + "\n")
38         except:
39             self.stop()
40
41
42
43 class TcpClientRequestor(threading.Thread):
44
45     def __init__(self, dispatcher, session):
46         self.shared = dispatcher.shared
47         self.dispatcher = dispatcher
48         self.message = ""
49         self.session = session
50         threading.Thread.__init__(self)
51
52     def run(self):
53         while not self.shared.stopped():
54             if not self.update():
55                 break
56
57             while self.parse():
58                 pass
59
60     def update(self):
61         data = self.receive()
62         if not data:
63             # close_session
64             self.session.stop()
65             return False
66
67         self.message += data
68         return True
69
70     def receive(self):
71         try:
72             return self.session.connection().recv(1024)
73         except:
74             return ''
75
76     def parse(self):
77         raw_buffer = self.message.find('\n')
78         if raw_buffer == -1:
79             return False
80
81         raw_command = self.message[0:raw_buffer].strip()
82         self.message = self.message[raw_buffer + 1:]
83         if raw_command == 'quit': 
84             self.session.stop()
85             return False
86
87         try:
88             command = json.loads(raw_command)
89         except:
90             self.dispatcher.push_response({"error": "bad JSON", "request": raw_command})
91             return True
92
93         try:
94             # Try to load vital fields, and return an error if
95             # unsuccessful.
96             message_id = command['id']
97             method = command['method']
98         except KeyError:
99             # Return an error JSON in response.
100             self.dispatcher.push_response({"error": "syntax error", "request": raw_command})
101         else:
102             self.dispatcher.push_request(self.session,command)
103
104         return True
105
106 class TcpServer(threading.Thread):
107
108     def __init__(self, dispatcher, host, port):
109         self.shared = dispatcher.shared
110         self.dispatcher = dispatcher.request_dispatcher
111         threading.Thread.__init__(self)
112         self.daemon = True
113         self.host = host
114         self.port = port
115         self.lock = threading.Lock()
116
117     def run(self):
118         print "TCP server started."
119         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
120         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
121         sock.bind((self.host, self.port))
122         sock.listen(1)
123         while not self.shared.stopped():
124             session = TcpSession(*sock.accept())
125             client_req = TcpClientRequestor(self.dispatcher, session)
126             client_req.start()
127             self.dispatcher.add_session(session)
128             self.dispatcher.collect_garbage()
129
130
131
132