refactored jsonrpc: broke generic jsonrpc handling out of HTTP-specific implementatio...
authorForrest Voight <forrest.voight@gmail.com>
Fri, 28 Dec 2012 09:41:02 +0000 (04:41 -0500)
committerForrest Voight <forrest.voight@gmail.com>
Fri, 28 Dec 2012 09:43:29 +0000 (04:43 -0500)
p2pool/bitcoin/worker_interface.py
p2pool/main.py
p2pool/test/test_node.py
p2pool/util/jsonrpc.py
p2pool/work.py

index dee637d..2f49a4e 100644 (file)
@@ -19,9 +19,9 @@ class _Provider(object):
     def rpc_getwork(self, request, data=None):
         return self.parent._getwork(request, data, long_poll=self.long_poll)
 
-class _GETableServer(jsonrpc.Server):
+class _GETableServer(jsonrpc.HTTPServer):
     def __init__(self, provider, render_get_func):
-        jsonrpc.Server.__init__(self, provider)
+        jsonrpc.HTTPServer.__init__(self, provider)
         self.render_GET = render_get_func
 
 class WorkerBridge(object):
index 098eb3a..998b426 100644 (file)
@@ -47,7 +47,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         # connect to bitcoind over JSON-RPC and do initial getmemorypool
         url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
-        bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
+        bitcoind = jsonrpc.HTTPProxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
         yield helper.check(bitcoind, net)
         temp_work = yield helper.getwork(bitcoind)
         
index 1279a05..28fa48d 100644 (file)
@@ -177,7 +177,7 @@ class Test(unittest.TestCase):
         bitd = bitcoind()
         
         mm_root = resource.Resource()
-        mm_root.putChild('', jsonrpc.Server(mm_provider))
+        mm_root.putChild('', jsonrpc.HTTPServer(mm_provider))
         mm_port = reactor.listenTCP(0, server.Site(mm_root))
         
         n = node.Node(bitd, bitd, [], [], mynet)
@@ -188,7 +188,7 @@ class Test(unittest.TestCase):
         worker_interface.WorkerInterface(wb).attach_to(web_root)
         port = reactor.listenTCP(0, server.Site(web_root))
         
-        proxy = jsonrpc.Proxy('http://127.0.0.1:' + str(port.getHost().port))
+        proxy = jsonrpc.HTTPProxy('http://127.0.0.1:' + str(port.getHost().port))
         
         yield deferral.sleep(3)
         
@@ -230,7 +230,7 @@ class Test(unittest.TestCase):
         yield deferral.sleep(3)
         
         for i in xrange(SHARES):
-            proxy = jsonrpc.Proxy('http://127.0.0.1:' + str(random.choice(nodes).web_port.getHost().port))
+            proxy = jsonrpc.HTTPProxy('http://127.0.0.1:' + str(random.choice(nodes).web_port.getHost().port))
             blah = yield proxy.rpc_getwork()
             yield proxy.rpc_getwork(blah['data'])
             yield deferral.sleep(.02)
index aa334a6..35c1ec6 100644 (file)
@@ -34,66 +34,36 @@ def Error_for_code(code):
             Error.__init__(self, code, *args, **kwargs)
     return NarrowError
 
+
 class Proxy(object):
-    def __init__(self, url, headers={}, timeout=5):
-        self._url = url
-        self._headers = headers
-        self._timeout = timeout
-    
-    @defer.inlineCallbacks
-    def callRemote(self, method, *params):
-        id_ = 0
-        
-        try:
-            data = yield client.getPage(
-                url=self._url,
-                method='POST',
-                headers=dict(self._headers, **{'Content-Type': 'application/json'}),
-                postdata=json.dumps({
-                    'jsonrpc': '2.0',
-                    'method': method,
-                    'params': params,
-                    'id': id_,
-                }),
-                timeout=self._timeout,
-            )
-        except error.Error, e:
-            try:
-                resp = json.loads(e.response)
-            except:
-                raise e
-        else:
-            resp = json.loads(data)
-        
-        if resp['id'] != id_:
-            raise ValueError('invalid id')
-        if 'error' in resp and resp['error'] is not None:
-            raise Error_for_code(resp['error']['code'])(resp['error']['message'], resp['error'].get('data', None))
-        defer.returnValue(resp['result'])
+    def __init__(self, func, services=[]):
+        self._func = func
+        self._services = services
     
     def __getattr__(self, attr):
         if attr.startswith('rpc_'):
-            return lambda *params: self.callRemote(attr[len('rpc_'):], *params)
-        raise AttributeError('%r object has no attribute %r' % (self.__class__.__name__, attr))
+            return lambda *params: self._func('.'.join(self._services + [attr[len('rpc_'):]]), params)
+        elif attr.startswith('svc_'):
+            return Proxy(self._func, self._services + [attr[len('svc_'):]])
+        else:
+            raise AttributeError('%r object has no attribute %r' % (self.__class__.__name__, attr))
 
-class Server(deferred_resource.DeferredResource):
-    def __init__(self, provider):
-        deferred_resource.DeferredResource.__init__(self)
-        self._provider = provider
-    
-    @defer.inlineCallbacks
-    def render_POST(self, request):
+@defer.inlineCallbacks
+def _handle(data, provider, preargs=(), response_handler=None):
         id_ = None
         
         try:
             try:
-                data = request.content.read()
-                
                 try:
                     req = json.loads(data)
                 except Exception:
                     raise Error_for_code(-32700)(u'Parse error')
                 
+                if 'result' in req or 'error' in req:
+                    response_handler(req['id'], req['result'] if 'error' not in req or req['error'] is None else
+                        failure.Failure(Error_for_code(resp['error']['code'])(resp['error']['message'], resp['error'].get('data', None))))
+                    defer.returnValue(None)
+                
                 id_ = req.get('id', None)
                 method = req.get('method', None)
                 if not isinstance(method, basestring):
@@ -102,11 +72,16 @@ class Server(deferred_resource.DeferredResource):
                 if not isinstance(params, list):
                     raise Error_for_code(-32600)(u'Invalid Request')
                 
-                method_meth = getattr(self._provider, 'rpc_' + method, None)
+                for service_name in method.split('.')[:-1]:
+                    provider = getattr(provider, 'svc_' + service_name, None)
+                    if provider is None:
+                        raise Error_for_code(-32601)(u'Service not found')
+                
+                method_meth = getattr(provider, 'rpc_' + method.split('.')[-1], None)
                 if method_meth is None:
                     raise Error_for_code(-32601)(u'Method not found')
                 
-                result = yield method_meth(request, *params)
+                result = yield method_meth(*list(preargs) + list(params))
                 error = None
             except Error:
                 raise
@@ -117,12 +92,56 @@ class Server(deferred_resource.DeferredResource):
             result = None
             error = e._to_obj()
         
-        data = json.dumps(dict(
+        defer.returnValue(json.dumps(dict(
             jsonrpc='2.0',
             id=id_,
             result=result,
             error=error,
-        ))
+        )))
+
+# HTTP
+
+@defer.inlineCallbacks
+def _http_do(url, headers, timeout, method, params):
+    id_ = 0
+    
+    try:
+        data = yield client.getPage(
+            url=url,
+            method='POST',
+            headers=dict(headers, **{'Content-Type': 'application/json'}),
+            postdata=json.dumps({
+                'jsonrpc': '2.0',
+                'method': method,
+                'params': params,
+                'id': id_,
+            }),
+            timeout=timeout,
+        )
+    except error.Error, e:
+        try:
+            resp = json.loads(e.response)
+        except:
+            raise e
+    else:
+        resp = json.loads(data)
+    
+    if resp['id'] != id_:
+        raise ValueError('invalid id')
+    if 'error' in resp and resp['error'] is not None:
+        raise Error_for_code(resp['error']['code'])(resp['error']['message'], resp['error'].get('data', None))
+    defer.returnValue(resp['result'])
+HTTPProxy = lambda url, headers={}, timeout=5: Proxy(lambda method, params: _http_do(url, headers, timeout, method, params))
+
+class HTTPServer(deferred_resource.DeferredResource):
+    def __init__(self, provider):
+        deferred_resource.DeferredResource.__init__(self)
+        self._provider = provider
+    
+    @defer.inlineCallbacks
+    def render_POST(self, request):
+        data = yield _handle(request.content.read(), self._provider, preargs=[request])
+        assert data is not None
         request.setHeader('Content-Type', 'application/json')
         request.setHeader('Content-Length', len(data))
         request.write(data)
index 6315796..5f4f2a4 100644 (file)
@@ -60,7 +60,7 @@ class WorkerBridge(worker_interface.WorkerBridge):
         
         @defer.inlineCallbacks
         def set_merged_work(merged_url, merged_userpass):
-            merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
+            merged_proxy = jsonrpc.HTTPProxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
             while self.running:
                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 30)(merged_proxy.rpc_getauxblock)()
                 self.merged_work.set(dict(self.merged_work.value, **{auxblock['chainid']: dict(