Fix indentation
[electrum-server.git] / transports / stratum_http.py
1 #!/usr/bin/env python
2 # Copyright(C) 2012 thomasv@gitorious
3
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program.  If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
17
18 import jsonrpclib
19 from jsonrpclib import Fault
20 from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS
21 import SimpleXMLRPCServer
22 import SocketServer
23 import socket
24 import logging
25 import os, time
26 import types
27 import traceback
28 import sys, threading
29
30 try:
31     import fcntl
32 except ImportError:
33     # For Windows
34     fcntl = None
35
36 import json
37
38
39 """
40 sessions are identified with cookies
41  - each session has a buffer of responses to requests
42
43
44 from the processor point of view: 
45  - the user only defines process() ; the rest is session management.  thus sessions should not belong to processor
46
47 """
48
49
50 from processor import random_string
51
52
53 def get_version(request):
54     # must be a dict
55     if 'jsonrpc' in request.keys():
56         return 2.0
57     if 'id' in request.keys():
58         return 1.0
59     return None
60     
61 def validate_request(request):
62     if type(request) is not types.DictType:
63         fault = Fault(
64             -32600, 'Request must be {}, not %s.' % type(request)
65         )
66         return fault
67     rpcid = request.get('id', None)
68     version = get_version(request)
69     if not version:
70         fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
71         return fault        
72     request.setdefault('params', [])
73     method = request.get('method', None)
74     params = request.get('params')
75     param_types = (types.ListType, types.DictType, types.TupleType)
76     if not method or type(method) not in types.StringTypes or \
77         type(params) not in param_types:
78         fault = Fault(
79             -32600, 'Invalid request parameters or method.', rpcid=rpcid
80         )
81         return fault
82     return True
83
84 class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
85
86     def __init__(self, encoding=None):
87         SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self,
88                                         allow_none=True,
89                                         encoding=encoding)
90
91     def _marshaled_dispatch(self, session_id, data, dispatch_method = None):
92         response = None
93         try:
94             request = jsonrpclib.loads(data)
95         except Exception, e:
96             fault = Fault(-32700, 'Request %s invalid. (%s)' % (data, e))
97             response = fault.response()
98             return response
99
100         session = self.dispatcher.get_session_by_address(session_id)
101         if not session:
102             return 'Error: session not found'
103         session.time = time.time()
104
105         responses = []
106         if type(request) is not types.ListType:
107             request = [ request ]
108
109         for req_entry in request:
110             result = validate_request(req_entry)
111             if type(result) is Fault:
112                 responses.append(result.response())
113                 continue
114
115             self.dispatcher.do_dispatch(session, req_entry)
116                 
117             if req_entry['method'] == 'server.stop':
118                 return json.dumps({'result':'ok'})
119
120         r = self.poll_session(session)
121         for item in r:
122             responses.append(json.dumps(item))
123             
124         if len(responses) > 1:
125             response = '[%s]' % ','.join(responses)
126         elif len(responses) == 1:
127             response = responses[0]
128         else:
129             response = ''
130
131         return response
132
133
134
135 class StratumJSONRPCRequestHandler(
136         SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
137             
138     def do_OPTIONS(self):
139         self.send_response(200)
140         self.send_header('Allow', 'GET, POST, OPTIONS')
141         self.send_header('Access-Control-Allow-Origin', '*')
142         self.send_header('Access-Control-Allow-Headers', 'X-Request, X-Requested-With')
143         self.send_header('Content-Length', '0')
144         self.end_headers()
145             
146     def do_GET(self):
147         if not self.is_rpc_path_valid():
148             self.report_404()
149             return
150         try:
151             session_id = None
152             c = self.headers.get('cookie')
153             if c:
154                 if c[0:8]=='SESSION=':
155                     #print "found cookie", c[8:]
156                     session_id = c[8:]
157
158             if session_id is None:
159                 session_id = self.server.create_session()
160                 #print "setting cookie", session_id
161
162             data = json.dumps([])
163             response = self.server._marshaled_dispatch(session_id, data)
164             self.send_response(200)
165         except Exception, e:
166             self.send_response(500)
167             err_lines = traceback.format_exc().splitlines()
168             trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
169             fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
170             response = fault.response()
171             print "500", trace_string
172         if response == None:
173             response = ''
174
175         if session_id:
176             self.send_header("Set-Cookie", "SESSION=%s"%session_id)
177
178         self.send_header("Content-type", "application/json-rpc")
179         self.send_header("Access-Control-Allow-Origin", "*")
180         self.send_header("Content-length", str(len(response)))
181         self.end_headers()
182         self.wfile.write(response)
183         self.wfile.flush()
184         self.connection.shutdown(1)
185
186
187     def do_POST(self):
188         if not self.is_rpc_path_valid():
189             self.report_404()
190             return
191         try:
192             max_chunk_size = 10*1024*1024
193             size_remaining = int(self.headers["content-length"])
194             L = []
195             while size_remaining:
196                 chunk_size = min(size_remaining, max_chunk_size)
197                 L.append(self.rfile.read(chunk_size))
198                 size_remaining -= len(L[-1])
199             data = ''.join(L)
200
201             session_id = None
202             c = self.headers.get('cookie')
203             if c:
204                 if c[0:8]=='SESSION=':
205                     #print "found cookie", c[8:]
206                     session_id = c[8:]
207
208             if session_id is None:
209                 session_id = self.server.create_session()
210                 #print "setting cookie", session_id
211
212             response = self.server._marshaled_dispatch(session_id, data)
213             self.send_response(200)
214         except Exception, e:
215             self.send_response(500)
216             err_lines = traceback.format_exc().splitlines()
217             trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
218             fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
219             response = fault.response()
220             print "500", trace_string
221         if response == None:
222             response = ''
223
224         if session_id:
225             self.send_header("Set-Cookie", "SESSION=%s"%session_id)
226
227         self.send_header("Content-type", "application/json-rpc")
228         self.send_header("Access-Control-Allow-Origin", "*")
229         self.send_header("Content-length", str(len(response)))
230         self.end_headers()
231         self.wfile.write(response)
232         self.wfile.flush()
233         self.connection.shutdown(1)
234
235
236 class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher):
237
238     allow_reuse_address = True
239
240     def __init__(self, addr, requestHandler=StratumJSONRPCRequestHandler,
241                  logRequests=False, encoding=None, bind_and_activate=True,
242                  address_family=socket.AF_INET):
243         self.logRequests = logRequests
244         StratumJSONRPCDispatcher.__init__(self, encoding)
245         # TCPServer.__init__ has an extra parameter on 2.6+, so
246         # check Python version and decide on how to call it
247         vi = sys.version_info
248         self.address_family = address_family
249         if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX:
250             # Unix sockets can't be bound if they already exist in the
251             # filesystem. The convention of e.g. X11 is to unlink
252             # before binding again.
253             if os.path.exists(addr): 
254                 try:
255                     os.unlink(addr)
256                 except OSError:
257                     logging.warning("Could not unlink socket %s", addr)
258         # if python 2.5 and lower
259         if vi[0] < 3 and vi[1] < 6:
260             SocketServer.TCPServer.__init__(self, addr, requestHandler)
261         else:
262             SocketServer.TCPServer.__init__(self, addr, requestHandler,
263                 bind_and_activate)
264         if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
265             flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
266             flags |= fcntl.FD_CLOEXEC
267             fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
268
269
270
271     def create_session(self):
272         session_id = random_string(10)
273         session = HttpSession(session_id)
274         self.dispatcher.add_session(session)
275         return session_id
276
277     def poll_session(self, session):
278         q = session.pending_responses
279         responses = []
280         while not q.empty():
281             r = q.get()
282             responses.append(r)
283         #print "poll: %d responses"%len(responses)
284         return responses
285
286
287 from processor import Session
288 import Queue
289
290 class HttpSession(Session):
291
292     def __init__(self, session_id):
293         Session.__init__(self)
294         self.pending_responses = Queue.Queue()
295         self.address = session_id
296         self.name = "HTTP"
297
298     def send_response(self, response):
299         raw_response = json.dumps(response)
300         self.pending_responses.put(response)
301
302     def stopped(self):
303         with self.lock:
304             if time.time() - self.time > 60:
305                 self._stopped = True
306             return self._stopped
307
308 class HttpServer(threading.Thread):
309     def __init__(self, dispatcher, host, port):
310         self.shared = dispatcher.shared
311         self.dispatcher = dispatcher.request_dispatcher
312         threading.Thread.__init__(self)
313         self.daemon = True
314         self.host = host
315         self.port = port
316         self.lock = threading.Lock()
317
318     def run(self):
319         # see http://code.google.com/p/jsonrpclib/
320         from SocketServer import ThreadingMixIn
321         class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
322
323         self.server = StratumThreadedJSONRPCServer(( self.host, self.port))
324         self.server.dispatcher = self.dispatcher
325         self.server.register_function(None, 'server.stop')
326         self.server.register_function(None, 'server.info')
327
328         print "HTTP server started."
329         self.server.serve_forever()
330