major reorganisation, http works now
[electrum-server.git] / stratum_http.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 import jsonrpclib
19 from jsonrpclib import Fault
20 from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS
21 import SimpleXMLRPCServer
22 import SocketServer
23 import socket
24 import logging
25 import os
26 import types
27 import traceback
28 import sys, threading
29
30 try:
31     import fcntl
32 except ImportError:
33     # For Windows
34     fcntl = None
35
36 import json
37
38
39 """
40 sessions are identified with cookies
41  - each session has a buffer of responses to requests
42
43
44 from the processor point of view: 
45  - the user only defines process() ; the rest is session management.  thus sessions should not belong to processor
46
47 """
48
49
50 def random_string(N):
51     import random, string
52     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
53
54
55
56
57 def get_version(request):
58     # must be a dict
59     if 'jsonrpc' in request.keys():
60         return 2.0
61     if 'id' in request.keys():
62         return 1.0
63     return None
64     
65 def validate_request(request):
66     if type(request) is not types.DictType:
67         fault = Fault(
68             -32600, 'Request must be {}, not %s.' % type(request)
69         )
70         return fault
71     rpcid = request.get('id', None)
72     version = get_version(request)
73     if not version:
74         fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
75         return fault        
76     request.setdefault('params', [])
77     method = request.get('method', None)
78     params = request.get('params')
79     param_types = (types.ListType, types.DictType, types.TupleType)
80     if not method or type(method) not in types.StringTypes or \
81         type(params) not in param_types:
82         fault = Fault(
83             -32600, 'Invalid request parameters or method.', rpcid=rpcid
84         )
85         return fault
86     return True
87
88 class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
89
90     def __init__(self, encoding=None):
91         SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self,
92                                         allow_none=True,
93                                         encoding=encoding)
94
95     def _marshaled_dispatch(self, session_id, data, dispatch_method = None):
96         response = None
97         try:
98             request = jsonrpclib.loads(data)
99         except Exception, e:
100             fault = Fault(-32700, 'Request %s invalid. (%s)' % (data, e))
101             response = fault.response()
102             return response
103
104         responses = []
105         if type(request) is not types.ListType:
106             request = [ request ]
107
108         for req_entry in request:
109             result = validate_request(req_entry)
110             if type(result) is Fault:
111                 responses.append(result.response())
112                 continue
113             resp_entry = self._marshaled_single_dispatch(session_id, req_entry)
114             if resp_entry is not None:
115                 responses.append(resp_entry)
116
117         r = self.poll_session(session_id)
118         for item in r:
119             responses.append(json.dumps(item))
120             
121         if len(responses) > 1:
122             response = '[%s]' % ','.join(responses)
123         elif len(responses) == 1:
124             response = responses[0]
125         else:
126             response = ''
127
128         return response
129
130     def _marshaled_single_dispatch(self, session_id, request):
131         # TODO - Use the multiprocessing and skip the response if
132         # it is a notification
133         # Put in support for custom dispatcher here
134         # (See SimpleXMLRPCServer._marshaled_dispatch)
135         method = request.get('method')
136         params = request.get('params')
137         try:
138             response = self._dispatch(method, session_id, request)
139         except:
140             exc_type, exc_value, exc_tb = sys.exc_info()
141             fault = Fault(-32603, '%s:%s' % (exc_type, exc_value))
142             return fault.response()
143         if 'id' not in request.keys() or request['id'] == None:
144             # It's a notification
145             return None
146
147         try:
148             response = jsonrpclib.dumps(response,
149                                         methodresponse=True,
150                                         rpcid=request['id']
151                                         )
152             return response
153         except:
154             exc_type, exc_value, exc_tb = sys.exc_info()
155             fault = Fault(-32603, '%s:%s' % (exc_type, exc_value))
156             return fault.response()
157
158     def _dispatch(self, method, session_id, request):
159         func = None
160         try:
161             func = self.funcs[method]
162         except KeyError:
163             if self.instance is not None:
164                 if hasattr(self.instance, '_dispatch'):
165                     return self.instance._dispatch(method, params)
166                 else:
167                     try:
168                         func = SimpleXMLRPCServer.resolve_dotted_attribute(
169                             self.instance,
170                             method,
171                             True
172                             )
173                     except AttributeError:
174                         pass
175         if func is not None:
176             try:
177                 response = func(session_id, request)
178                 return response
179             except TypeError:
180                 return Fault(-32602, 'Invalid parameters.')
181             except:
182                 err_lines = traceback.format_exc().splitlines()
183                 trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
184                 fault = jsonrpclib.Fault(-32603, 'Server error: %s' % 
185                                          trace_string)
186                 return fault
187         else:
188             return Fault(-32601, 'Method %s not supported.' % method)
189
190 class StratumJSONRPCRequestHandler(
191         SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
192     
193     def do_GET(self):
194         if not self.is_rpc_path_valid():
195             self.report_404()
196             return
197         try:
198             session_id = None
199             c = self.headers.get('cookie')
200             if c:
201                 if c[0:8]=='SESSION=':
202                     #print "found cookie", c[8:]
203                     session_id = c[8:]
204
205             if session_id is None:
206                 session_id = self.server.create_session()
207                 #print "setting cookie", session_id
208
209             data = json.dumps([])
210             response = self.server._marshaled_dispatch(session_id, data)
211             self.send_response(200)
212         except Exception, e:
213             self.send_response(500)
214             err_lines = traceback.format_exc().splitlines()
215             trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
216             fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
217             response = fault.response()
218             print "500", trace_string
219         if response == None:
220             response = ''
221
222         if session_id:
223             self.send_header("Set-Cookie", "SESSION=%s"%session_id)
224
225         self.send_header("Content-type", "application/json-rpc")
226         self.send_header("Content-length", str(len(response)))
227         self.end_headers()
228         self.wfile.write(response)
229         self.wfile.flush()
230         self.connection.shutdown(1)
231
232
233     def do_POST(self):
234         if not self.is_rpc_path_valid():
235             self.report_404()
236             return
237         try:
238             max_chunk_size = 10*1024*1024
239             size_remaining = int(self.headers["content-length"])
240             L = []
241             while size_remaining:
242                 chunk_size = min(size_remaining, max_chunk_size)
243                 L.append(self.rfile.read(chunk_size))
244                 size_remaining -= len(L[-1])
245             data = ''.join(L)
246
247             session_id = None
248             c = self.headers.get('cookie')
249             if c:
250                 if c[0:8]=='SESSION=':
251                     print "found cookie", c[8:]
252                     session_id = c[8:]
253
254             if session_id is None:
255                 session_id = self.server.create_session()
256                 print "setting cookie", session_id
257
258             response = self.server._marshaled_dispatch(session_id, data)
259             self.send_response(200)
260         except Exception, e:
261             self.send_response(500)
262             err_lines = traceback.format_exc().splitlines()
263             trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
264             fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
265             response = fault.response()
266             print "500", trace_string
267         if response == None:
268             response = ''
269
270         if session_id:
271             self.send_header("Set-Cookie", "SESSION=%s"%session_id)
272
273         self.send_header("Content-type", "application/json-rpc")
274         self.send_header("Content-length", str(len(response)))
275         self.end_headers()
276         self.wfile.write(response)
277         self.wfile.flush()
278         self.connection.shutdown(1)
279
280
281 class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher):
282
283     allow_reuse_address = True
284
285     def __init__(self, addr, requestHandler=StratumJSONRPCRequestHandler,
286                  logRequests=False, encoding=None, bind_and_activate=True,
287                  address_family=socket.AF_INET):
288         self.logRequests = logRequests
289         StratumJSONRPCDispatcher.__init__(self, encoding)
290         # TCPServer.__init__ has an extra parameter on 2.6+, so
291         # check Python version and decide on how to call it
292         vi = sys.version_info
293         self.address_family = address_family
294         if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX:
295             # Unix sockets can't be bound if they already exist in the
296             # filesystem. The convention of e.g. X11 is to unlink
297             # before binding again.
298             if os.path.exists(addr): 
299                 try:
300                     os.unlink(addr)
301                 except OSError:
302                     logging.warning("Could not unlink socket %s", addr)
303         # if python 2.5 and lower
304         if vi[0] < 3 and vi[1] < 6:
305             SocketServer.TCPServer.__init__(self, addr, requestHandler)
306         else:
307             SocketServer.TCPServer.__init__(self, addr, requestHandler,
308                 bind_and_activate)
309         if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
310             flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
311             flags |= fcntl.FD_CLOEXEC
312             fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
313
314         self.sessions = {}
315
316
317
318     def create_session(self):
319         session_id = random_string(10)
320         self.sessions[session_id] = HttpSession(session_id)
321         return session_id
322
323     def poll_session(self,session_id):
324         responses = self.sessions[session_id].pending_responses[:]
325         self.sessions[session_id].pending_responses = []
326         print "poll: %d responses"%len(responses)
327         return responses
328
329
330 from processor import Session
331
332 class HttpSession(Session):
333
334     def __init__(self, session_id):
335         Session.__init__(self)
336         self.pending_responses = []
337         print "new http session", session_id
338
339     def send_response(self, response):
340         raw_response = json.dumps(response)
341         self.pending_responses.append(response)
342
343 class HttpServer(threading.Thread):
344     def __init__(self, shared, _processor, host, port):
345         self.shared = shared
346         self.processor = _processor
347         threading.Thread.__init__(self)
348         self.daemon = True
349         self.host = host
350         self.port = port
351         self.lock = threading.Lock()
352
353     def run(self):
354         # see http://code.google.com/p/jsonrpclib/
355         from SocketServer import ThreadingMixIn
356         class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
357         self.server = StratumThreadedJSONRPCServer(( self.host, self.port))
358         for s in ['server.peers', 'server.banner', 'transaction.broadcast', \
359                       'address.get_history','address.subscribe', 'numblocks.subscribe', 'client.version']:
360             self.server.register_function(self.process, s)
361
362         self.server.register_function(self.do_stop, 'stop')
363
364         print "HTTP server started."
365         self.server.serve_forever()
366
367
368     def process(self, session_id, request):
369         #print session, request
370         session = self.server.sessions.get(session_id)
371         if session:
372             #print "zz",session_id,session
373             request['id'] = self.processor.store_session_id(session, request['id'])
374             self.processor.process(request)
375
376     def do_stop(self, session, request):
377         self.shared.stop()
378         return 'ok'
379
380
381