added timeout to rpc calls
[p2pool.git] / p2pool / util / jsonrpc.py
1 from __future__ import division
2
3 import base64
4 import json
5
6 from twisted.internet import defer
7 from twisted.python import log
8 from twisted.web import client, error
9
10 import deferred_resource
11
12 class Error(Exception):
13     def __init__(self, code, message, data=None):
14         if not isinstance(code, int):
15             raise TypeError('code must be an int')
16         if not isinstance(message, unicode):
17             raise TypeError('message must be a unicode')
18         self._code, self._message, self._data = code, message, data
19     def __str__(self):
20         return '%i %s' % (self._code, self._message) + (' %r' % (self._data, ) if self._data is not None else '')
21     def _to_obj(self):
22         return {
23             'code': self._code,
24             'message': self._message,
25             'data': self._data,
26         }
27
28 class Proxy(object):
29     def __init__(self, url, auth=None, timeout=5):
30         self._url = url
31         self._auth = auth
32         self._timeout = timeout
33     
34     @defer.inlineCallbacks
35     def callRemote(self, method, *params):
36         id_ = 0
37         
38         headers = {
39             'Content-Type': 'text/json',
40         }
41         if self._auth is not None:
42             headers['Authorization'] = 'Basic ' + base64.b64encode(':'.join(self._auth))
43         try:
44             data = yield client.getPage(
45                 url=self._url,
46                 method='POST',
47                 headers=headers,
48                 postdata=json.dumps({
49                     'jsonrpc': '2.0',
50                     'method': method,
51                     'params': params,
52                     'id': id_,
53                 }),
54                 timeout=self._timeout,
55             )
56         except error.Error, e:
57             try:
58                 resp = json.loads(e.response)
59             except:
60                 raise e
61         else:
62             resp = json.loads(data)
63         
64         if resp['id'] != id_:
65             raise ValueError('invalid id')
66         if 'error' in resp and resp['error'] is not None:
67             raise Error(**resp['error'])
68         defer.returnValue(resp['result'])
69     
70     def __getattr__(self, attr):
71         if attr.startswith('rpc_'):
72             return lambda *params: self.callRemote(attr[len('rpc_'):], *params)
73         raise AttributeError('%r object has no attribute %r' % (self.__class__.__name__, attr))
74
75 class Server(deferred_resource.DeferredResource):
76     extra_headers = None
77     
78     @defer.inlineCallbacks
79     def render_POST(self, request):
80         # missing batching, 1.0 notifications
81         data = request.content.read()
82         
83         if self.extra_headers is not None:
84             for name, value in self.extra_headers.iteritems():
85                 request.setHeader(name, value)
86         
87         try:
88             try:
89                 req = json.loads(data)
90             except Exception:
91                 raise RemoteError(-32700, u'Parse error')
92         except Error, e:
93             # id unknown
94             request.write(json.dumps({
95                 'jsonrpc': '2.0',
96                 'id': None,
97                 'result': None,
98                 'error': e._to_obj(),
99             }))
100         
101         id_ = req.get('id', None)
102         
103         try:
104             try:
105                 method = req['method']
106                 if not isinstance(method, unicode):
107                     raise ValueError()
108                 params = req.get('params', [])
109                 if not isinstance(params, list):
110                     raise ValueError()
111             except Exception:
112                 raise Error(-32600, u'Invalid Request')
113             
114             method_name = 'rpc_' + method
115             if not hasattr(self, method_name):
116                 raise Error(-32601, u'Method not found')
117             method_meth = getattr(self, method_name)
118             
119             if hasattr(method_meth, "takes_headers"):
120                 params = [request.received_headers] + list(params)
121             
122             try:
123                 result = yield method_meth(*params)
124             except Exception:
125                 print 'Squelched JSON method error:'
126                 log.err()
127                 raise Error(-32099, u'Unknown error')
128             
129             if id_ is None:
130                 return
131             
132             request.write(json.dumps({
133                 'jsonrpc': '2.0',
134                 'id': id_,
135                 'result': result,
136                 'error': None,
137             }))
138         except Error, e:
139             request.write(json.dumps({
140                 'jsonrpc': '2.0',
141                 'id': id_,
142                 'result': None,
143                 'error': e._to_obj(),
144             }))