moved worker_interface.py to bitcoin/
[p2pool.git] / p2pool / bitcoin / worker_interface.py
1 from __future__ import division
2
3 import base64
4 import json
5 import random
6
7 from twisted.internet import defer, reactor
8 from twisted.python import log
9
10 import p2pool
11 from p2pool import data as p2pool_data
12 from p2pool.util import jsonrpc, deferred_resource, variable
13 from p2pool.bitcoin import getwork
14
15 def get_memory(request):
16     if request.getHeader('X-Miner-Extensions') is not None and 'workidentifier' in request.getHeader('X-Miner-Extensions').split(' '):
17         return 0
18     if request.getHeader('X-Work-Identifier') is not None:
19         return 0
20     user_agent = request.getHeader('User-Agent')
21     user_agent2 = '' if user_agent is None else user_agent.lower()
22     if 'java' in user_agent2 or 'diablominer' in user_agent2: return 0 # hopefully diablominer...
23     if 'cpuminer' in user_agent2: return 0
24     if 'tenebrix miner' in user_agent2: return 0
25     if 'ufasoft' in user_agent2: return 0 # not confirmed
26     if 'cgminer' in user_agent2: return 1
27     if 'poclbm' in user_agent2: return 1
28     if 'phoenix' in user_agent2: return 2
29     print 'Unknown miner User-Agent:', repr(user_agent)
30     return 0
31
32 def get_max_target(request): # inclusive
33     if request.getHeader('X-All-Targets') is not None or (request.getHeader('X-Miner-Extensions') is not None and 'alltargets' in request.getHeader('X-Miner-Extensions')):
34         return 2**256-1
35     user_agent = request.getHeader('User-Agent')
36     user_agent2 = '' if user_agent is None else user_agent.lower()
37     if 'java' in user_agent2 or 'diablominer' in user_agent2: return 2**256//2**32-1 # hopefully diablominer...
38     if 'cpuminer' in user_agent2: return 2**256//2**32-1
39     if 'tenebrix miner' in user_agent2: return 2**256-1
40     if 'cgminer' in user_agent2: return 2**256//2**32-1
41     if 'poclbm' in user_agent2: return 2**256//2**32-1
42     if 'phoenix' in user_agent2: return 2**256//2**32-1
43     print 'Unknown miner User-Agent:', repr(user_agent)
44     return 2**256//2**32-1
45
46 def get_username(request):
47     try:
48         return base64.b64decode(request.getHeader('Authorization').split(' ', 1)[1]).split(':')[0]
49     except: # XXX
50         return None
51
52 def get_id(request):
53     return request.getClientIP(), request.getHeader('Authorization')
54
55 class Holds(object):
56     def __init__(self):
57         self.holds = {}
58     
59     @defer.inlineCallbacks
60     def wait_hold(self, request_id):
61         while request_id in self.holds:
62             yield self.holds[request_id].get_deferred()
63     
64     def set_hold(self, request_id, dt):
65         if request_id in self.holds:
66             raise ValueError('hold already present!')
67         self.holds[request_id] = variable.Event()
68         self.holds[request_id].status = 0
69         def cb():
70             if self.holds[request_id].status != 0:
71                 raise AssertionError()
72             self.holds[request_id].status = 1
73             self.holds.pop(request_id).happened()
74         reactor.callLater(dt, cb)
75
76 class LongPollingWorkerInterface(deferred_resource.DeferredResource):
77     def __init__(self, parent):
78         self.parent = parent
79     
80     @defer.inlineCallbacks
81     def render_GET(self, request):
82         request.setHeader('Content-Type', 'application/json')
83         request.setHeader('X-Long-Polling', '/long-polling')
84         request.setHeader('X-Roll-NTime', 'expire=60')
85         try:
86             try:
87                 request.write(json.dumps({
88                     'jsonrpc': '2.0',
89                     'id': 0,
90                     'result': (yield self.parent.getwork(request, long_poll=True)),
91                     'error': None,
92                 }))
93             except jsonrpc.Error:
94                 raise
95             except Exception:
96                 log.err(None, 'Squelched long polling error:')
97                 raise jsonrpc.Error(-32099, u'Unknown error')
98         except jsonrpc.Error, e:
99             request.write(json.dumps({
100                 'jsonrpc': '2.0',
101                 'id': 0,
102                 'result': None,
103                 'error': e._to_obj(),
104             }))
105     render_POST = render_GET
106
107 class WorkerInterface(jsonrpc.Server):
108     def __init__(self, work, compute, response_callback):
109         jsonrpc.Server.__init__(self)
110         
111         self.work = work
112         self.compute = compute
113         self.response_callback = response_callback
114         self.holds = Holds()
115         self.last_cache_invalidation = {}
116         
117         self.putChild('long-polling', LongPollingWorkerInterface(self))
118         self.putChild('', self)
119     
120     @defer.inlineCallbacks
121     def rpc_getwork(self, request, data=None):
122         request.setHeader('X-Long-Polling', '/long-polling')
123         request.setHeader('X-Roll-NTime', 'expire=60')
124         
125         if data is not None:
126             defer.returnValue(self.response_callback(getwork.decode_data(data), request))
127         
128         defer.returnValue((yield self.getwork(request)))
129     rpc_getwork.takes_request = True
130     
131     @defer.inlineCallbacks
132     def getwork(self, request, long_poll=False):
133         request_id = get_id(request)
134         memory = get_memory(request)
135         
136         id = random.randrange(10000)
137         if p2pool.DEBUG:
138             print 'POLL %i START long_poll=%r user_agent=%r x-work-identifier=%r user=%r' % (id, long_poll, request.getHeader('User-Agent'), request.getHeader('X-Work-Identifier'), get_username(request))
139         
140         if request_id not in self.last_cache_invalidation:
141             self.last_cache_invalidation[request_id] = variable.Variable((None, None))
142         
143         yield self.holds.wait_hold(request_id)
144         work = self.work.value
145         thought_work = self.last_cache_invalidation[request_id].value
146         
147         if long_poll and work == thought_work[-1]:
148             if p2pool.DEBUG:
149                 print 'POLL %i WAITING user=%r' % (id, get_username(request))
150             yield defer.DeferredList([self.work.changed.get_deferred(), self.last_cache_invalidation[request_id].changed.get_deferred()], fireOnOneCallback=True)
151         work = self.work.value
152         
153         if thought_work[-1] is not None and work != thought_work[-1] and any(x is None or work['previous_block'] == x['previous_block'] for x in thought_work[-memory or len(thought_work):]):
154             # clients won't believe the update
155             work = work.copy()
156             work['previous_block'] = random.randrange(2**256)
157             if p2pool.DEBUG:
158                 print 'POLL %i FAKED user=%r' % (id, get_username(request))
159             self.holds.set_hold(request_id, .01)
160         res = self.compute(work, request)
161         
162         self.last_cache_invalidation[request_id].set((thought_work[-1], work))
163         if p2pool.DEBUG:
164             print 'POLL %i END %s user=%r' % (id, p2pool_data.format_hash(work['best_share_hash']), get_username(request))
165         
166         res = res.update(share_target=min(res.share_target, get_max_target(request)))
167         
168         defer.returnValue(res.getwork(identifier=str(work['best_share_hash'])))