From: Forrest Voight Date: Fri, 28 Dec 2012 09:41:02 +0000 (-0500) Subject: refactored jsonrpc: broke generic jsonrpc handling out of HTTP-specific implementatio... X-Git-Tag: 10.0~5 X-Git-Url: https://git.novaco.in/?p=p2pool.git;a=commitdiff_plain;h=38bda9cbbf508fe8f8e5701e1d7cc644fad5f6ec refactored jsonrpc: broke generic jsonrpc handling out of HTTP-specific implementation, allow trees of rpc calls --- diff --git a/p2pool/bitcoin/worker_interface.py b/p2pool/bitcoin/worker_interface.py index dee637d..2f49a4e 100644 --- a/p2pool/bitcoin/worker_interface.py +++ b/p2pool/bitcoin/worker_interface.py @@ -19,9 +19,9 @@ class _Provider(object): def rpc_getwork(self, request, data=None): return self.parent._getwork(request, data, long_poll=self.long_poll) -class _GETableServer(jsonrpc.Server): +class _GETableServer(jsonrpc.HTTPServer): def __init__(self, provider, render_get_func): - jsonrpc.Server.__init__(self, provider) + jsonrpc.HTTPServer.__init__(self, provider) self.render_GET = render_get_func class WorkerBridge(object): diff --git a/p2pool/main.py b/p2pool/main.py index 098eb3a..998b426 100644 --- a/p2pool/main.py +++ b/p2pool/main.py @@ -47,7 +47,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): # connect to bitcoind over JSON-RPC and do initial getmemorypool url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port) print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username) - bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30) + bitcoind = jsonrpc.HTTPProxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30) yield helper.check(bitcoind, net) temp_work = yield helper.getwork(bitcoind) diff --git a/p2pool/test/test_node.py b/p2pool/test/test_node.py index 1279a05..28fa48d 100644 --- a/p2pool/test/test_node.py +++ b/p2pool/test/test_node.py @@ -177,7 +177,7 @@ class Test(unittest.TestCase): bitd = bitcoind() mm_root = resource.Resource() - mm_root.putChild('', jsonrpc.Server(mm_provider)) + mm_root.putChild('', jsonrpc.HTTPServer(mm_provider)) mm_port = reactor.listenTCP(0, server.Site(mm_root)) n = node.Node(bitd, bitd, [], [], mynet) @@ -188,7 +188,7 @@ class Test(unittest.TestCase): worker_interface.WorkerInterface(wb).attach_to(web_root) port = reactor.listenTCP(0, server.Site(web_root)) - proxy = jsonrpc.Proxy('http://127.0.0.1:' + str(port.getHost().port)) + proxy = jsonrpc.HTTPProxy('http://127.0.0.1:' + str(port.getHost().port)) yield deferral.sleep(3) @@ -230,7 +230,7 @@ class Test(unittest.TestCase): yield deferral.sleep(3) for i in xrange(SHARES): - proxy = jsonrpc.Proxy('http://127.0.0.1:' + str(random.choice(nodes).web_port.getHost().port)) + proxy = jsonrpc.HTTPProxy('http://127.0.0.1:' + str(random.choice(nodes).web_port.getHost().port)) blah = yield proxy.rpc_getwork() yield proxy.rpc_getwork(blah['data']) yield deferral.sleep(.02) diff --git a/p2pool/util/jsonrpc.py b/p2pool/util/jsonrpc.py index aa334a6..35c1ec6 100644 --- a/p2pool/util/jsonrpc.py +++ b/p2pool/util/jsonrpc.py @@ -34,66 +34,36 @@ def Error_for_code(code): Error.__init__(self, code, *args, **kwargs) return NarrowError + class Proxy(object): - def __init__(self, url, headers={}, timeout=5): - self._url = url - self._headers = headers - self._timeout = timeout - - @defer.inlineCallbacks - def callRemote(self, method, *params): - id_ = 0 - - try: - data = yield client.getPage( - url=self._url, - method='POST', - headers=dict(self._headers, **{'Content-Type': 'application/json'}), - postdata=json.dumps({ - 'jsonrpc': '2.0', - 'method': method, - 'params': params, - 'id': id_, - }), - timeout=self._timeout, - ) - except error.Error, e: - try: - resp = json.loads(e.response) - except: - raise e - else: - resp = json.loads(data) - - if resp['id'] != id_: - raise ValueError('invalid id') - if 'error' in resp and resp['error'] is not None: - raise Error_for_code(resp['error']['code'])(resp['error']['message'], resp['error'].get('data', None)) - defer.returnValue(resp['result']) + def __init__(self, func, services=[]): + self._func = func + self._services = services def __getattr__(self, attr): if attr.startswith('rpc_'): - return lambda *params: self.callRemote(attr[len('rpc_'):], *params) - raise AttributeError('%r object has no attribute %r' % (self.__class__.__name__, attr)) + return lambda *params: self._func('.'.join(self._services + [attr[len('rpc_'):]]), params) + elif attr.startswith('svc_'): + return Proxy(self._func, self._services + [attr[len('svc_'):]]) + else: + raise AttributeError('%r object has no attribute %r' % (self.__class__.__name__, attr)) -class Server(deferred_resource.DeferredResource): - def __init__(self, provider): - deferred_resource.DeferredResource.__init__(self) - self._provider = provider - - @defer.inlineCallbacks - def render_POST(self, request): +@defer.inlineCallbacks +def _handle(data, provider, preargs=(), response_handler=None): id_ = None try: try: - data = request.content.read() - try: req = json.loads(data) except Exception: raise Error_for_code(-32700)(u'Parse error') + if 'result' in req or 'error' in req: + response_handler(req['id'], req['result'] if 'error' not in req or req['error'] is None else + failure.Failure(Error_for_code(resp['error']['code'])(resp['error']['message'], resp['error'].get('data', None)))) + defer.returnValue(None) + id_ = req.get('id', None) method = req.get('method', None) if not isinstance(method, basestring): @@ -102,11 +72,16 @@ class Server(deferred_resource.DeferredResource): if not isinstance(params, list): raise Error_for_code(-32600)(u'Invalid Request') - method_meth = getattr(self._provider, 'rpc_' + method, None) + for service_name in method.split('.')[:-1]: + provider = getattr(provider, 'svc_' + service_name, None) + if provider is None: + raise Error_for_code(-32601)(u'Service not found') + + method_meth = getattr(provider, 'rpc_' + method.split('.')[-1], None) if method_meth is None: raise Error_for_code(-32601)(u'Method not found') - result = yield method_meth(request, *params) + result = yield method_meth(*list(preargs) + list(params)) error = None except Error: raise @@ -117,12 +92,56 @@ class Server(deferred_resource.DeferredResource): result = None error = e._to_obj() - data = json.dumps(dict( + defer.returnValue(json.dumps(dict( jsonrpc='2.0', id=id_, result=result, error=error, - )) + ))) + +# HTTP + +@defer.inlineCallbacks +def _http_do(url, headers, timeout, method, params): + id_ = 0 + + try: + data = yield client.getPage( + url=url, + method='POST', + headers=dict(headers, **{'Content-Type': 'application/json'}), + postdata=json.dumps({ + 'jsonrpc': '2.0', + 'method': method, + 'params': params, + 'id': id_, + }), + timeout=timeout, + ) + except error.Error, e: + try: + resp = json.loads(e.response) + except: + raise e + else: + resp = json.loads(data) + + if resp['id'] != id_: + raise ValueError('invalid id') + if 'error' in resp and resp['error'] is not None: + raise Error_for_code(resp['error']['code'])(resp['error']['message'], resp['error'].get('data', None)) + defer.returnValue(resp['result']) +HTTPProxy = lambda url, headers={}, timeout=5: Proxy(lambda method, params: _http_do(url, headers, timeout, method, params)) + +class HTTPServer(deferred_resource.DeferredResource): + def __init__(self, provider): + deferred_resource.DeferredResource.__init__(self) + self._provider = provider + + @defer.inlineCallbacks + def render_POST(self, request): + data = yield _handle(request.content.read(), self._provider, preargs=[request]) + assert data is not None request.setHeader('Content-Type', 'application/json') request.setHeader('Content-Length', len(data)) request.write(data) diff --git a/p2pool/work.py b/p2pool/work.py index 6315796..5f4f2a4 100644 --- a/p2pool/work.py +++ b/p2pool/work.py @@ -60,7 +60,7 @@ class WorkerBridge(worker_interface.WorkerBridge): @defer.inlineCallbacks def set_merged_work(merged_url, merged_userpass): - merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass))) + merged_proxy = jsonrpc.HTTPProxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass))) while self.running: auxblock = yield deferral.retry('Error while calling merged getauxblock:', 30)(merged_proxy.rpc_getauxblock)() self.merged_work.set(dict(self.merged_work.value, **{auxblock['chainid']: dict(