stratum support
[p2pool.git] / p2pool / util / jsonrpc.py
1 from __future__ import division
2
3 import json
4 import weakref
5
6 from twisted.internet import defer
7 from twisted.protocols import basic
8 from twisted.python import failure, log
9 from twisted.web import client, error
10
11 from p2pool.util import deferral, deferred_resource, memoize
12
13 class Error(Exception):
14     def __init__(self, code, message, data=None):
15         if type(self) is Error:
16             raise TypeError("can't directly instantiate Error class; use Error_for_code")
17         if not isinstance(code, int):
18             raise TypeError('code must be an int')
19         #if not isinstance(message, unicode):
20         #    raise TypeError('message must be a unicode')
21         self.code, self.message, self.data = code, message, data
22     def __str__(self):
23         return '%i %s' % (self.code, self.message) + (' %r' % (self.data, ) if self.data is not None else '')
24     def _to_obj(self):
25         return {
26             'code': self.code,
27             'message': self.message,
28             'data': self.data,
29         }
30
31 @memoize.memoize_with_backing(weakref.WeakValueDictionary())
32 def Error_for_code(code):
33     class NarrowError(Error):
34         def __init__(self, *args, **kwargs):
35             Error.__init__(self, code, *args, **kwargs)
36     return NarrowError
37
38
39 class Proxy(object):
40     def __init__(self, func, services=[]):
41         self._func = func
42         self._services = services
43     
44     def __getattr__(self, attr):
45         if attr.startswith('rpc_'):
46             return lambda *params: self._func('.'.join(self._services + [attr[len('rpc_'):]]), params)
47         elif attr.startswith('svc_'):
48             return Proxy(self._func, self._services + [attr[len('svc_'):]])
49         else:
50             raise AttributeError('%r object has no attribute %r' % (self.__class__.__name__, attr))
51
52 @defer.inlineCallbacks
53 def _handle(data, provider, preargs=(), response_handler=None):
54         id_ = None
55         
56         try:
57             try:
58                 try:
59                     req = json.loads(data)
60                 except Exception:
61                     raise Error_for_code(-32700)(u'Parse error')
62                 
63                 if 'result' in req or 'error' in req:
64                     response_handler(req['id'], req['result'] if 'error' not in req or req['error'] is None else
65                         failure.Failure(Error_for_code(req['error']['code'])(req['error']['message'], req['error'].get('data', None))))
66                     defer.returnValue(None)
67                 
68                 id_ = req.get('id', None)
69                 method = req.get('method', None)
70                 if not isinstance(method, basestring):
71                     raise Error_for_code(-32600)(u'Invalid Request')
72                 params = req.get('params', [])
73                 if not isinstance(params, list):
74                     raise Error_for_code(-32600)(u'Invalid Request')
75                 
76                 for service_name in method.split('.')[:-1]:
77                     provider = getattr(provider, 'svc_' + service_name, None)
78                     if provider is None:
79                         raise Error_for_code(-32601)(u'Service not found')
80                 
81                 method_meth = getattr(provider, 'rpc_' + method.split('.')[-1], None)
82                 if method_meth is None:
83                     raise Error_for_code(-32601)(u'Method not found')
84                 
85                 result = yield method_meth(*list(preargs) + list(params))
86                 error = None
87             except Error:
88                 raise
89             except Exception:
90                 log.err(None, 'Squelched JSON error:')
91                 raise Error_for_code(-32099)(u'Unknown error')
92         except Error, e:
93             result = None
94             error = e._to_obj()
95         
96         defer.returnValue(json.dumps(dict(
97             jsonrpc='2.0',
98             id=id_,
99             result=result,
100             error=error,
101         )))
102
103 # HTTP
104
105 @defer.inlineCallbacks
106 def _http_do(url, headers, timeout, method, params):
107     id_ = 0
108     
109     try:
110         data = yield client.getPage(
111             url=url,
112             method='POST',
113             headers=dict(headers, **{'Content-Type': 'application/json'}),
114             postdata=json.dumps({
115                 'jsonrpc': '2.0',
116                 'method': method,
117                 'params': params,
118                 'id': id_,
119             }),
120             timeout=timeout,
121         )
122     except error.Error, e:
123         try:
124             resp = json.loads(e.response)
125         except:
126             raise e
127     else:
128         resp = json.loads(data)
129     
130     if resp['id'] != id_:
131         raise ValueError('invalid id')
132     if 'error' in resp and resp['error'] is not None:
133         raise Error_for_code(resp['error']['code'])(resp['error']['message'], resp['error'].get('data', None))
134     defer.returnValue(resp['result'])
135 HTTPProxy = lambda url, headers={}, timeout=5: Proxy(lambda method, params: _http_do(url, headers, timeout, method, params))
136
137 class HTTPServer(deferred_resource.DeferredResource):
138     def __init__(self, provider):
139         deferred_resource.DeferredResource.__init__(self)
140         self._provider = provider
141     
142     @defer.inlineCallbacks
143     def render_POST(self, request):
144         data = yield _handle(request.content.read(), self._provider, preargs=[request])
145         assert data is not None
146         request.setHeader('Content-Type', 'application/json')
147         request.setHeader('Content-Length', len(data))
148         request.write(data)
149
150 class LineBasedPeer(basic.LineOnlyReceiver):
151     delimiter = '\n'
152     
153     def __init__(self):
154         #basic.LineOnlyReceiver.__init__(self)
155         self._matcher = deferral.GenericDeferrer(max_id=2**30, func=lambda id, method, params: self.sendLine(json.dumps({
156             'jsonrpc': '2.0',
157             'method': method,
158             'params': params,
159             'id': id,
160         })))
161         self.other = Proxy(self._matcher)
162     
163     def lineReceived(self, line):
164         _handle(line, self, response_handler=self._matcher.got_response).addCallback(lambda line2: self.sendLine(line2) if line2 is not None else None)