25e07387de4c24a48dce7baed421eddc8858c1eb
[electrum-server.git] / transports / stratum_http.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 import jsonrpclib
19 from jsonrpclib import Fault
20 from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS
21 import SimpleXMLRPCServer
22 import SocketServer
23 import socket
24 import logging
25 import os
26 import types
27 import traceback
28 import sys, threading
29
30 try:
31     import fcntl
32 except ImportError:
33     # For Windows
34     fcntl = None
35
36 import json
37
38
39 """
40 sessions are identified with cookies
41  - each session has a buffer of responses to requests
42
43
44 from the processor point of view: 
45  - the user only defines process() ; the rest is session management.  thus sessions should not belong to processor
46
47 """
48
49
50 def random_string(N):
51     import random, string
52     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
53
54
55
56
57 def get_version(request):
58     # must be a dict
59     if 'jsonrpc' in request.keys():
60         return 2.0
61     if 'id' in request.keys():
62         return 1.0
63     return None
64     
65 def validate_request(request):
66     if type(request) is not types.DictType:
67         fault = Fault(
68             -32600, 'Request must be {}, not %s.' % type(request)
69         )
70         return fault
71     rpcid = request.get('id', None)
72     version = get_version(request)
73     if not version:
74         fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
75         return fault        
76     request.setdefault('params', [])
77     method = request.get('method', None)
78     params = request.get('params')
79     param_types = (types.ListType, types.DictType, types.TupleType)
80     if not method or type(method) not in types.StringTypes or \
81         type(params) not in param_types:
82         fault = Fault(
83             -32600, 'Invalid request parameters or method.', rpcid=rpcid
84         )
85         return fault
86     return True
87
88 class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
89
90     def __init__(self, encoding=None):
91         SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self,
92                                         allow_none=True,
93                                         encoding=encoding)
94
95     def _marshaled_dispatch(self, session_id, data, dispatch_method = None):
96         response = None
97         try:
98             request = jsonrpclib.loads(data)
99         except Exception, e:
100             fault = Fault(-32700, 'Request %s invalid. (%s)' % (data, e))
101             response = fault.response()
102             return response
103
104         responses = []
105         if type(request) is not types.ListType:
106             request = [ request ]
107
108         for req_entry in request:
109             result = validate_request(req_entry)
110             if type(result) is Fault:
111                 responses.append(result.response())
112                 continue
113
114             session = self.sessions.get(session_id)
115             self.dispatcher.process(session, req_entry)
116                 
117             if req_entry['method'] == 'server.stop':
118                 return json.dumps({'result':'ok'})
119
120         r = self.poll_session(session_id)
121         for item in r:
122             responses.append(json.dumps(item))
123             
124         if len(responses) > 1:
125             response = '[%s]' % ','.join(responses)
126         elif len(responses) == 1:
127             response = responses[0]
128         else:
129             response = ''
130
131         return response
132
133
134
135 class StratumJSONRPCRequestHandler(
136         SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
137     
138     def do_GET(self):
139         if not self.is_rpc_path_valid():
140             self.report_404()
141             return
142         try:
143             session_id = None
144             c = self.headers.get('cookie')
145             if c:
146                 if c[0:8]=='SESSION=':
147                     #print "found cookie", c[8:]
148                     session_id = c[8:]
149
150             if session_id is None:
151                 session_id = self.server.create_session()
152                 #print "setting cookie", session_id
153
154             data = json.dumps([])
155             response = self.server._marshaled_dispatch(session_id, data)
156             self.send_response(200)
157         except Exception, e:
158             self.send_response(500)
159             err_lines = traceback.format_exc().splitlines()
160             trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
161             fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
162             response = fault.response()
163             print "500", trace_string
164         if response == None:
165             response = ''
166
167         if session_id:
168             self.send_header("Set-Cookie", "SESSION=%s"%session_id)
169
170         self.send_header("Content-type", "application/json-rpc")
171         self.send_header("Content-length", str(len(response)))
172         self.end_headers()
173         self.wfile.write(response)
174         self.wfile.flush()
175         self.connection.shutdown(1)
176
177
178     def do_POST(self):
179         if not self.is_rpc_path_valid():
180             self.report_404()
181             return
182         try:
183             max_chunk_size = 10*1024*1024
184             size_remaining = int(self.headers["content-length"])
185             L = []
186             while size_remaining:
187                 chunk_size = min(size_remaining, max_chunk_size)
188                 L.append(self.rfile.read(chunk_size))
189                 size_remaining -= len(L[-1])
190             data = ''.join(L)
191
192             session_id = None
193             c = self.headers.get('cookie')
194             if c:
195                 if c[0:8]=='SESSION=':
196                     #print "found cookie", c[8:]
197                     session_id = c[8:]
198
199             if session_id is None:
200                 session_id = self.server.create_session()
201                 #print "setting cookie", session_id
202
203             response = self.server._marshaled_dispatch(session_id, data)
204             self.send_response(200)
205         except Exception, e:
206             self.send_response(500)
207             err_lines = traceback.format_exc().splitlines()
208             trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
209             fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
210             response = fault.response()
211             print "500", trace_string
212         if response == None:
213             response = ''
214
215         if session_id:
216             self.send_header("Set-Cookie", "SESSION=%s"%session_id)
217
218         self.send_header("Content-type", "application/json-rpc")
219         self.send_header("Content-length", str(len(response)))
220         self.end_headers()
221         self.wfile.write(response)
222         self.wfile.flush()
223         self.connection.shutdown(1)
224
225
226 class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher):
227
228     allow_reuse_address = True
229
230     def __init__(self, addr, requestHandler=StratumJSONRPCRequestHandler,
231                  logRequests=False, encoding=None, bind_and_activate=True,
232                  address_family=socket.AF_INET):
233         self.logRequests = logRequests
234         StratumJSONRPCDispatcher.__init__(self, encoding)
235         # TCPServer.__init__ has an extra parameter on 2.6+, so
236         # check Python version and decide on how to call it
237         vi = sys.version_info
238         self.address_family = address_family
239         if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX:
240             # Unix sockets can't be bound if they already exist in the
241             # filesystem. The convention of e.g. X11 is to unlink
242             # before binding again.
243             if os.path.exists(addr): 
244                 try:
245                     os.unlink(addr)
246                 except OSError:
247                     logging.warning("Could not unlink socket %s", addr)
248         # if python 2.5 and lower
249         if vi[0] < 3 and vi[1] < 6:
250             SocketServer.TCPServer.__init__(self, addr, requestHandler)
251         else:
252             SocketServer.TCPServer.__init__(self, addr, requestHandler,
253                 bind_and_activate)
254         if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
255             flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
256             flags |= fcntl.FD_CLOEXEC
257             fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
258
259         self.sessions = {}
260
261
262
263     def create_session(self):
264         session_id = random_string(10)
265         self.sessions[session_id] = HttpSession(session_id)
266         return session_id
267
268     def poll_session(self, session_id):
269         q = self.sessions[session_id].pending_responses
270         responses = []
271         while not q.empty():
272             r = q.get()
273             responses.append(r)
274         #print "poll: %d responses"%len(responses)
275         return responses
276
277
278 from processor import Session
279 import Queue
280
281 class HttpSession(Session):
282
283     def __init__(self, session_id):
284         Session.__init__(self)
285         self.pending_responses = Queue.Queue()
286         print "new http session", session_id
287
288     def send_response(self, response):
289         raw_response = json.dumps(response)
290         self.pending_responses.put(response)
291
292 class HttpServer(threading.Thread):
293     def __init__(self, dispatcher, host, port):
294         self.shared = dispatcher.shared
295         self.dispatcher = dispatcher.request_dispatcher
296         threading.Thread.__init__(self)
297         self.daemon = True
298         self.host = host
299         self.port = port
300         self.lock = threading.Lock()
301
302     def run(self):
303         # see http://code.google.com/p/jsonrpclib/
304         from SocketServer import ThreadingMixIn
305         class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
306
307         self.server = StratumThreadedJSONRPCServer(( self.host, self.port))
308         self.server.dispatcher = self.dispatcher
309         self.server.register_function(None, 'server.stop')
310
311         print "HTTP server started."
312         self.server.serve_forever()
313