1 from __future__ import division
8 from twisted.internet import defer
11 from p2pool.bitcoin import getwork
12 from p2pool.util import expiring_dict, jsonrpc, variable
14 class _Provider(object):
15 def __init__(self, parent, long_poll):
17 self.long_poll = long_poll
19 def rpc_getwork(self, request, data=None):
20 return self.parent._getwork(request, data, long_poll=self.long_poll)
22 class _GETableServer(jsonrpc.Server):
23 def __init__(self, provider, render_get_func):
24 jsonrpc.Server.__init__(self, provider)
25 self.render_GET = render_get_func
27 class WorkerBridge(object):
29 self.new_work_event = variable.Event()
31 def preprocess_request(self, request):
32 return request, # *args to self.compute
34 def get_work(self, request):
35 raise NotImplementedError()
37 class WorkerInterface(object):
38 def __init__(self, worker_bridge):
39 self.worker_bridge = worker_bridge
41 self.worker_views = {}
44 self.work_cache_times = self.worker_bridge.new_work_event.times
46 self.merkle_root_to_handler = expiring_dict.ExpiringDict(300)
48 def attach_to(self, res, get_handler=None):
49 res.putChild('', _GETableServer(_Provider(self, long_poll=False), get_handler))
52 request.content = StringIO.StringIO(json.dumps(dict(id=0, method='getwork')))
53 return s.render_POST(request)
54 s = _GETableServer(_Provider(self, long_poll=True), repost)
55 res.putChild('long-polling', s)
57 @defer.inlineCallbacks
58 def _getwork(self, request, data, long_poll):
59 request.setHeader('X-Long-Polling', '/long-polling')
60 request.setHeader('X-Roll-NTime', 'expire=10')
61 request.setHeader('X-Is-P2Pool', 'true')
64 header = getwork.decode_data(data)
65 if header['merkle_root'] not in self.merkle_root_to_handler:
66 print >>sys.stderr, '''Couldn't link returned work's merkle root with its handler. This should only happen if this process was recently restarted!'''
67 defer.returnValue(False)
68 defer.returnValue(self.merkle_root_to_handler[header['merkle_root']](header, request.getUser() if request.getUser() is not None else ''))
71 id = random.randrange(1000, 10000)
72 print 'POLL %i START is_long_poll=%r user_agent=%r user=%r' % (id, long_poll, request.getHeader('User-Agent'), request.getUser())
75 request_id = request.getClientIP(), request.getHeader('Authorization')
76 if self.worker_views.get(request_id, self.worker_bridge.new_work_event.times) != self.worker_bridge.new_work_event.times:
78 print 'POLL %i PUSH' % (id,)
81 print 'POLL %i WAITING' % (id,)
82 yield self.worker_bridge.new_work_event.get_deferred()
83 self.worker_views[request_id] = self.worker_bridge.new_work_event.times
85 key = self.worker_bridge.preprocess_request(request.getUser() if request.getUser() is not None else '')
87 if self.work_cache_times != self.worker_bridge.new_work_event.times:
89 self.work_cache_times = self.worker_bridge.new_work_event.times
91 if key in self.work_cache:
92 res, orig_timestamp, handler = self.work_cache.pop(key)
94 res, handler = self.worker_bridge.get_work(*key)
95 assert res.merkle_root not in self.merkle_root_to_handler
96 orig_timestamp = res.timestamp
98 self.merkle_root_to_handler[res.merkle_root] = handler
100 if res.timestamp + 120 < orig_timestamp + 1800:
101 self.work_cache[key] = res.update(timestamp=res.timestamp + 120), orig_timestamp, handler
104 print 'POLL %i END identifier=%i' % (id, self.worker_bridge.new_work_event.times)
106 defer.returnValue(res.getwork(identifier=str(self.worker_bridge.new_work_event.times), submitold=True))