rm junk
[electrum-server.git] / transports / stratum_http.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 import jsonrpclib
19 from jsonrpclib import Fault
20 from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS
21 import SimpleXMLRPCServer
22 import SocketServer
23 import socket
24 import logging
25 import os, time
26 import types
27 import traceback
28 import sys, threading
29
30 from OpenSSL import SSL
31
32 try:
33     import fcntl
34 except ImportError:
35     # For Windows
36     fcntl = None
37
38 import json
39
40
41 """
42 sessions are identified with cookies
43  - each session has a buffer of responses to requests
44
45
46 from the processor point of view: 
47  - the user only defines process() ; the rest is session management.  thus sessions should not belong to processor
48
49 """
50
51
52 from processor import random_string
53
54
55 def get_version(request):
56     # must be a dict
57     if 'jsonrpc' in request.keys():
58         return 2.0
59     if 'id' in request.keys():
60         return 1.0
61     return None
62     
63 def validate_request(request):
64     if type(request) is not types.DictType:
65         fault = Fault(
66             -32600, 'Request must be {}, not %s.' % type(request)
67         )
68         return fault
69     rpcid = request.get('id', None)
70     version = get_version(request)
71     if not version:
72         fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
73         return fault        
74     request.setdefault('params', [])
75     method = request.get('method', None)
76     params = request.get('params')
77     param_types = (types.ListType, types.DictType, types.TupleType)
78     if not method or type(method) not in types.StringTypes or \
79         type(params) not in param_types:
80         fault = Fault(
81             -32600, 'Invalid request parameters or method.', rpcid=rpcid
82         )
83         return fault
84     return True
85
86 class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
87
88     def __init__(self, encoding=None):
89         SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self,
90                                         allow_none=True,
91                                         encoding=encoding)
92
93     def _marshaled_dispatch(self, session_id, data, dispatch_method = None):
94         response = None
95         try:
96             request = jsonrpclib.loads(data)
97         except Exception, e:
98             fault = Fault(-32700, 'Request %s invalid. (%s)' % (data, e))
99             response = fault.response()
100             return response
101
102         session = self.dispatcher.get_session_by_address(session_id)
103         if not session:
104             return 'Error: session not found'
105         session.time = time.time()
106
107         responses = []
108         if type(request) is not types.ListType:
109             request = [ request ]
110
111         for req_entry in request:
112             result = validate_request(req_entry)
113             if type(result) is Fault:
114                 responses.append(result.response())
115                 continue
116
117             self.dispatcher.do_dispatch(session, req_entry)
118                 
119             if req_entry['method'] == 'server.stop':
120                 return json.dumps({'result':'ok'})
121
122         r = self.poll_session(session)
123         for item in r:
124             responses.append(json.dumps(item))
125             
126         if len(responses) > 1:
127             response = '[%s]' % ','.join(responses)
128         elif len(responses) == 1:
129             response = responses[0]
130         else:
131             response = ''
132
133         return response
134
135
136     def create_session(self):
137         session_id = random_string(10)
138         session = HttpSession(session_id)
139         self.dispatcher.add_session(session)
140         return session_id
141
142     def poll_session(self, session):
143         q = session.pending_responses
144         responses = []
145         while not q.empty():
146             r = q.get()
147             responses.append(r)
148         #print "poll: %d responses"%len(responses)
149         return responses
150
151
152
153
154 class StratumJSONRPCRequestHandler(
155         SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
156     
157     def do_GET(self):
158         if not self.is_rpc_path_valid():
159             self.report_404()
160             return
161         try:
162             session_id = None
163             c = self.headers.get('cookie')
164             if c:
165                 if c[0:8]=='SESSION=':
166                     #print "found cookie", c[8:]
167                     session_id = c[8:]
168
169             if session_id is None:
170                 session_id = self.server.create_session()
171                 #print "setting cookie", session_id
172
173             data = json.dumps([])
174             response = self.server._marshaled_dispatch(session_id, data)
175             self.send_response(200)
176         except Exception, e:
177             self.send_response(500)
178             err_lines = traceback.format_exc().splitlines()
179             trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
180             fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
181             response = fault.response()
182             print "500", trace_string
183         if response == None:
184             response = ''
185
186         if session_id:
187             self.send_header("Set-Cookie", "SESSION=%s"%session_id)
188
189         self.send_header("Content-type", "application/json-rpc")
190         self.send_header("Access-Control-Allow-Origin", "*")
191         self.send_header("Content-length", str(len(response)))
192         self.end_headers()
193         self.wfile.write(response)
194         self.wfile.flush()
195         self.connection.shutdown(1)
196
197
198     def do_POST(self):
199         if not self.is_rpc_path_valid():
200             self.report_404()
201             return
202         try:
203             max_chunk_size = 10*1024*1024
204             size_remaining = int(self.headers["content-length"])
205             L = []
206             while size_remaining:
207                 chunk_size = min(size_remaining, max_chunk_size)
208                 L.append(self.rfile.read(chunk_size))
209                 size_remaining -= len(L[-1])
210             data = ''.join(L)
211
212             session_id = None
213             c = self.headers.get('cookie')
214             if c:
215                 if c[0:8]=='SESSION=':
216                     #print "found cookie", c[8:]
217                     session_id = c[8:]
218
219             if session_id is None:
220                 session_id = self.server.create_session()
221                 #print "setting cookie", session_id
222
223             response = self.server._marshaled_dispatch(session_id, data)
224             self.send_response(200)
225         except Exception, e:
226             self.send_response(500)
227             err_lines = traceback.format_exc().splitlines()
228             trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
229             fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
230             response = fault.response()
231             print "500", trace_string
232         if response == None:
233             response = ''
234
235         if session_id:
236             self.send_header("Set-Cookie", "SESSION=%s"%session_id)
237
238         self.send_header("Content-type", "application/json-rpc")
239         self.send_header("Access-Control-Allow-Origin", "*")
240         self.send_header("Content-length", str(len(response)))
241         self.end_headers()
242         self.wfile.write(response)
243         self.wfile.flush()
244         self.connection.shutdown(1)
245
246
247
248
249 class SSLTCPServer(SocketServer.TCPServer):
250
251     def __init__(self, server_address, certfile, keyfile, RequestHandlerClass, bind_and_activate=True):
252         SocketServer.BaseServer.__init__(self, server_address, RequestHandlerClass)
253         ctx = SSL.Context(SSL.SSLv3_METHOD)
254         ctx.use_privatekey_file(keyfile)
255         ctx.use_certificate_file(certfile)
256         self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type))
257         if bind_and_activate:
258             self.server_bind()
259             self.server_activate()
260
261     def shutdown_request(self,request):
262         request.shutdown()
263
264
265 class StratumHTTPServer(SocketServer.TCPServer, StratumJSONRPCDispatcher):
266
267     allow_reuse_address = True
268
269     def __init__(self, addr, requestHandler=StratumJSONRPCRequestHandler,
270                  logRequests=False, encoding=None, bind_and_activate=True,
271                  address_family=socket.AF_INET):
272         self.logRequests = logRequests
273         StratumJSONRPCDispatcher.__init__(self, encoding)
274         # TCPServer.__init__ has an extra parameter on 2.6+, so
275         # check Python version and decide on how to call it
276         vi = sys.version_info
277         self.address_family = address_family
278         if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX:
279             # Unix sockets can't be bound if they already exist in the
280             # filesystem. The convention of e.g. X11 is to unlink
281             # before binding again.
282             if os.path.exists(addr): 
283                 try:
284                     os.unlink(addr)
285                 except OSError:
286                     logging.warning("Could not unlink socket %s", addr)
287
288         SocketServer.TCPServer.__init__(self, addr, requestHandler, bind_and_activate)
289
290         if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
291             flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
292             flags |= fcntl.FD_CLOEXEC
293             fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
294
295
296 class StratumHTTPSSLServer(SSLTCPServer, StratumJSONRPCDispatcher):
297
298     allow_reuse_address = True
299
300     def __init__(self, addr, certfile, keyfile,
301                  requestHandler=StratumJSONRPCRequestHandler,
302                  logRequests=False, encoding=None, bind_and_activate=True,
303                  address_family=socket.AF_INET):
304
305         self.logRequests = logRequests
306         StratumJSONRPCDispatcher.__init__(self, encoding)
307         # TCPServer.__init__ has an extra parameter on 2.6+, so
308         # check Python version and decide on how to call it
309         vi = sys.version_info
310         self.address_family = address_family
311         if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX:
312             # Unix sockets can't be bound if they already exist in the
313             # filesystem. The convention of e.g. X11 is to unlink
314             # before binding again.
315             if os.path.exists(addr): 
316                 try:
317                     os.unlink(addr)
318                 except OSError:
319                     logging.warning("Could not unlink socket %s", addr)
320
321         SSLTCPServer.__init__(self, addr, certfile, keyfile, requestHandler, bind_and_activate)
322
323         if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
324             flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
325             flags |= fcntl.FD_CLOEXEC
326             fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
327
328
329
330
331
332
333 from processor import Session
334 import Queue
335
336 class HttpSession(Session):
337
338     def __init__(self, session_id):
339         Session.__init__(self)
340         self.pending_responses = Queue.Queue()
341         self.address = session_id
342         self.name = "HTTP"
343
344     def send_response(self, response):
345         raw_response = json.dumps(response)
346         self.pending_responses.put(response)
347
348     def stopped(self):
349         with self.lock:
350             if time.time() - self.time > 60:
351                 self._stopped = True
352             return self._stopped
353
354 class HttpServer(threading.Thread):
355     def __init__(self, dispatcher, host, port, use_ssl, certfile, keyfile):
356         self.shared = dispatcher.shared
357         self.dispatcher = dispatcher.request_dispatcher
358         threading.Thread.__init__(self)
359         self.daemon = True
360         self.host = host
361         self.port = port
362         self.use_ssl = use_ssl
363         self.certfile = certfile
364         self.keyfile = keyfile
365         self.lock = threading.Lock()
366
367
368     def run(self):
369         # see http://code.google.com/p/jsonrpclib/
370         from SocketServer import ThreadingMixIn
371         if self.use_ssl:
372             class StratumThreadedServer(ThreadingMixIn, StratumHTTPSSLServer): pass
373             self.server = StratumThreadedServer(( self.host, self.port), self.certfile, self.keyfile)
374             print "HTTPS server started."
375         else:
376             class StratumThreadedServer(ThreadingMixIn, StratumHTTPServer): pass
377             self.server = StratumThreadedServer(( self.host, self.port))
378             print "HTTP server started."
379
380         self.server.dispatcher = self.dispatcher
381         self.server.register_function(None, 'server.stop')
382         self.server.register_function(None, 'server.info')
383
384         self.server.serve_forever()
385