import os
import types
import traceback
-import sys
+import sys, threading
+
try:
import fcntl
except ImportError:
import json
+
+"""
+sessions are identified with cookies
+ - each session has a buffer of responses to requests
+
+
+from the processor point of view:
+ - the user only defines process() ; the rest is session management. thus sessions should not belong to processor
+
+"""
+
+
+def random_string(N):
+ import random, string
+ return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
+
+
+
+
def get_version(request):
# must be a dict
if 'jsonrpc' in request.keys():
allow_none=True,
encoding=encoding)
- def _marshaled_dispatch(self, data, dispatch_method = None):
+ def _marshaled_dispatch(self, session_id, data, dispatch_method = None):
response = None
try:
request = jsonrpclib.loads(data)
if type(result) is Fault:
responses.append(result.response())
continue
- resp_entry = self._marshaled_single_dispatch(req_entry)
+ resp_entry = self._marshaled_single_dispatch(session_id, req_entry)
if resp_entry is not None:
responses.append(resp_entry)
- # poll
- r = self._marshaled_single_dispatch({'method':'session.poll', 'params':[], 'id':'z' })
- r = jsonrpclib.loads(r)
- r = r.get('result')
+ r = self.poll_session(session_id)
for item in r:
responses.append(json.dumps(item))
return response
- def _marshaled_single_dispatch(self, request):
+ def _marshaled_single_dispatch(self, session_id, request):
# TODO - Use the multiprocessing and skip the response if
# it is a notification
# Put in support for custom dispatcher here
# (See SimpleXMLRPCServer._marshaled_dispatch)
method = request.get('method')
params = request.get('params')
- if params is None: params=[]
- params = [ self.session_id, request['id'] ] + params
- #print method, params
try:
- response = self._dispatch(method, params)
+ response = self._dispatch(method, session_id, request)
except:
exc_type, exc_value, exc_tb = sys.exc_info()
fault = Fault(-32603, '%s:%s' % (exc_type, exc_value))
fault = Fault(-32603, '%s:%s' % (exc_type, exc_value))
return fault.response()
- def _dispatch(self, method, params):
+ def _dispatch(self, method, session_id, request):
func = None
try:
func = self.funcs[method]
pass
if func is not None:
try:
- if type(params) is types.ListType:
- response = func(*params)
- else:
- response = func(**params)
+ response = func(session_id, request)
return response
except TypeError:
return Fault(-32602, 'Invalid parameters.')
self.report_404()
return
try:
- self.server.session_id = None
+ session_id = None
c = self.headers.get('cookie')
if c:
if c[0:8]=='SESSION=':
- #print "found cookie", c[8:]
- self.server.session_id = c[8:]
+ print "found cookie", c[8:]
+ session_id = c[8:]
- if self.server.session_id is None:
- r = self.server._marshaled_single_dispatch({'method':'session.create', 'params':[], 'id':'z' })
- r = jsonrpclib.loads(r)
- self.server.session_id = r.get('result')
- #print "setting cookie", self.server.session_id
+ if session_id is None:
+ session_id = self.server.create_session()
+ print "setting cookie", session_id
data = json.dumps([])
- response = self.server._marshaled_dispatch(data)
+ response = self.server._marshaled_dispatch(session_id, data)
self.send_response(200)
except Exception, e:
self.send_response(500)
if response == None:
response = ''
- if hasattr(self.server, 'session_id'):
- if self.server.session_id:
- self.send_header("Set-Cookie", "SESSION=%s"%self.server.session_id)
- self.session_id = None
+ if session_id:
+ self.send_header("Set-Cookie", "SESSION=%s"%session_id)
self.send_header("Content-type", "application/json-rpc")
self.send_header("Content-length", str(len(response)))
size_remaining -= len(L[-1])
data = ''.join(L)
- self.server.session_id = None
+ session_id = None
c = self.headers.get('cookie')
if c:
if c[0:8]=='SESSION=':
- #print "found cookie", c[8:]
- self.server.session_id = c[8:]
+ print "found cookie", c[8:]
+ session_id = c[8:]
- if self.server.session_id is None:
- r = self.server._marshaled_single_dispatch({'method':'session.create', 'params':[], 'id':'z' })
- r = jsonrpclib.loads(r)
- self.server.session_id = r.get('result')
- #print "setting cookie", self.server.session_id
+ if session_id is None:
+ session_id = self.server.create_session()
+ print "setting cookie", session_id
- response = self.server._marshaled_dispatch(data)
+ response = self.server._marshaled_dispatch(session_id, data)
self.send_response(200)
except Exception, e:
self.send_response(500)
if response == None:
response = ''
- if hasattr(self.server, 'session_id'):
- if self.server.session_id:
- self.send_header("Set-Cookie", "SESSION=%s"%self.server.session_id)
- self.session_id = None
+ if session_id:
+ self.send_header("Set-Cookie", "SESSION=%s"%session_id)
self.send_header("Content-type", "application/json-rpc")
self.send_header("Content-length", str(len(response)))
flags |= fcntl.FD_CLOEXEC
fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
+ self.sessions = {}
+
+
+
+ def create_session(self):
+ session_id = random_string(10)
+ self.sessions[session_id] = { 'addresses':{}, 'responses':[]}
+ return session_id
+
+ def poll_session(self,session_id):
+ responses = self.sessions[session_id]['responses']
+ self.sessions[session_id]['responses'] = []
+ print "poll: %d responses"%len(responses)
+ return responses
+
+
+class HttpResponder(threading.Thread):
+ """read responses from the queue and dispatch them to sessions"""
+ def __init__(self, shared, processor):
+ self.shared = shared
+ self.processor = processor
+ threading.Thread.__init__(self)
+
+ def run(self):
+ while not self.shared.stopped():
+ session,response = self.processor.pop_response()
+ if not session.stopped():
+ raw_response = json.dumps(response)
+ session.responses.append(response)
+
+
+
+class HttpServer(threading.Thread):
+ def __init__(self, shared, _processor, host, port):
+ self.shared = shared
+ self.processor = _processor
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.host = host
+ self.port = port
+ self.lock = threading.Lock()
+
+ def run(self):
+ # see http://code.google.com/p/jsonrpclib/
+ from SocketServer import ThreadingMixIn
+ from StratumJSONRPCServer import StratumJSONRPCServer
+ class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
+ server = StratumThreadedJSONRPCServer(( self.host, self.port))
+ for s in ['server.peers', 'server.banner', 'transaction.broadcast', \
+ 'address.get_history','address.subscribe', 'numblocks.subscribe', 'client.version']:
+ server.register_function(self.process, s)
+
+ server.register_function(self.do_stop, 'stop')
+ print "HTTP server started."
+ server.serve_forever()
+
+ def process(self, session, request):
+ print session, request
+
+ def do_stop(self, session, request):
+ self.shared.stop()
+ return 'ok'
+
+