in worker_interface, cache merkle roots by moving timestamp forward
[p2pool.git] / p2pool / bitcoin / worker_interface.py
1 from __future__ import division
2
3 import base64
4 import random
5 import weakref
6
7 from twisted.internet import defer
8
9 import p2pool
10 from p2pool import data as p2pool_data
11 from p2pool.util import jsonrpc, variable
12 from p2pool.bitcoin import getwork
13
14 def get_username(request):
15     try:
16         return base64.b64decode(request.getHeader('Authorization').split(' ', 1)[1]).split(':')[0]
17     except: # XXX
18         return None
19
20 def get_id(request):
21     return request.getClientIP(), request.getHeader('Authorization')
22
23 class LongPollingWorkerInterface(jsonrpc.Server):
24     def __init__(self, parent):
25         jsonrpc.Server.__init__(self)
26         self.parent = parent
27     
28     def rpc_getwork(self, request, data=None):
29         return self.parent.getwork(request, data, long_poll=True)
30
31 class WorkerInterface(jsonrpc.Server):
32     def __init__(self, compute, response_callback, new_work_event=variable.Event()):
33         jsonrpc.Server.__init__(self)
34         
35         self.compute = compute
36         self.response_callback = response_callback
37         self.new_work_event = new_work_event
38         
39         self.worker_views = {}
40         
41         self.work_cache = {} # username -> (blockattempt, work-identifier-string)
42         watch_id = new_work_event.watch(lambda *args: self_ref().work_cache.clear())
43         self_ref = weakref.ref(self, lambda _: new_work_event.unwatch(watch_id))
44         
45         self.putChild('long-polling', LongPollingWorkerInterface(self))
46         self.putChild('', self)
47     
48     def rpc_getwork(self, request, data=None):
49         return self.getwork(request, data, long_poll=False)
50     
51     @defer.inlineCallbacks
52     def getwork(self, request, data, long_poll):
53         request.setHeader('X-Long-Polling', '/long-polling')
54         request.setHeader('X-Roll-NTime', 'expire=10')
55         
56         if data is not None:
57             defer.returnValue(self.response_callback(getwork.decode_data(data), request))
58         
59         request_id = get_id(request)
60         
61         if p2pool.DEBUG:
62             id = random.randrange(1000, 10000)
63             print 'POLL %i START long_poll=%r user_agent=%r x-work-identifier=%r user=%r' % (id, long_poll, request.getHeader('User-Agent'), request.getHeader('X-Work-Identifier'), get_username(request))
64         
65         if request_id not in self.worker_views:
66             self.worker_views[request_id] = variable.Variable(None)
67         
68         if long_poll and self.worker_views[request_id].value in [None, self.new_work_event.times]:
69             if p2pool.DEBUG:
70                 print 'POLL %i WAITING user=%r' % (id, get_username(request))
71             yield self.new_work_event.get_deferred()
72         
73         username = get_username(request)
74         
75         if username in self.work_cache:
76             res, identifier = self.work_cache[username]
77         else:
78             res, identifier = self.compute(username)
79         
80         self.work_cache[username] = res.update(timestamp=res.timestamp + 12), identifier # XXX doesn't bound timestamp
81         
82         if long_poll:
83             self.worker_views[request_id].set(self.new_work_event.times)
84         
85         if p2pool.DEBUG:
86             print 'POLL %i END %s user=%r' % (id, p2pool_data.format_hash(identifier), get_username(request)) # XXX identifier is hack
87         
88         defer.returnValue(res.getwork(identifier=str(identifier)))