added miner timestamp rolling check
[p2pool.git] / p2pool / bitcoin / worker_interface.py
1 from __future__ import division
2
3 import StringIO
4 import json
5 import random
6 import sys
7
8 from twisted.internet import defer
9
10 import p2pool
11 from p2pool.bitcoin import getwork
12 from p2pool.util import expiring_dict, jsonrpc, variable
13
14 class _Provider(object):
15     def __init__(self, parent, long_poll):
16         self.parent = parent
17         self.long_poll = long_poll
18     
19     def rpc_getwork(self, request, data=None):
20         return self.parent._getwork(request, data, long_poll=self.long_poll)
21
22 class _GETableServer(jsonrpc.Server):
23     def __init__(self, provider, render_get_func):
24         jsonrpc.Server.__init__(self, provider)
25         self.render_GET = render_get_func
26
27 class WorkerBridge(object):
28     def __init__(self):
29         self.new_work_event = variable.Event()
30     
31     def preprocess_request(self, request):
32         return request, # *args to self.compute
33     
34     def get_work(self, request):
35         raise NotImplementedError()
36
37 class WorkerInterface(object):
38     def __init__(self, worker_bridge):
39         self.worker_bridge = worker_bridge
40         
41         self.worker_views = {}
42         
43         self.work_cache = {}
44         self.work_cache_times = self.worker_bridge.new_work_event.times
45         
46         self.merkle_roots = expiring_dict.ExpiringDict(300)
47     
48     def attach_to(self, res, get_handler=None):
49         res.putChild('', _GETableServer(_Provider(self, long_poll=False), get_handler))
50         
51         def repost(request):
52             request.content = StringIO.StringIO(json.dumps(dict(id=0, method='getwork')))
53             return s.render_POST(request)
54         s = _GETableServer(_Provider(self, long_poll=True), repost)
55         res.putChild('long-polling', s)
56     
57     @defer.inlineCallbacks
58     def _getwork(self, request, data, long_poll):
59         request.setHeader('X-Long-Polling', '/long-polling')
60         request.setHeader('X-Roll-NTime', 'expire=10')
61         request.setHeader('X-Is-P2Pool', 'true')
62         
63         if data is not None:
64             header = getwork.decode_data(data)
65             if header['merkle_root'] not in self.merkle_roots:
66                 print >>sys.stderr, '''Couldn't link returned work's merkle root with its handler. This should only happen if this process was recently restarted!'''
67                 defer.returnValue(False)
68             handler, orig_timestamp = self.merkle_roots[header['merkle_root']]
69             dt = header['timestamp'] - orig_timestamp
70             if dt < 0 or dt % 12 == 11 or dt >= 600:
71                 print >>sys.stderr, '''Miner %s @ %s rolled timestamp improperly! This may be a bug in the miner that is causing you to lose work!''' % (request.getUser(), request.getClientIP())
72             defer.returnValue(handler(header, request))
73         
74         if p2pool.DEBUG:
75             id = random.randrange(1000, 10000)
76             print 'POLL %i START is_long_poll=%r user_agent=%r user=%r' % (id, long_poll, request.getHeader('User-Agent'), request.getUser())
77         
78         if long_poll:
79             request_id = request.getClientIP(), request.getHeader('Authorization')
80             if self.worker_views.get(request_id, self.worker_bridge.new_work_event.times) != self.worker_bridge.new_work_event.times:
81                 if p2pool.DEBUG:
82                     print 'POLL %i PUSH' % (id,)
83             else:
84                 if p2pool.DEBUG:
85                     print 'POLL %i WAITING' % (id,)
86                 yield self.worker_bridge.new_work_event.get_deferred()
87             self.worker_views[request_id] = self.worker_bridge.new_work_event.times
88         
89         key = self.worker_bridge.preprocess_request(request)
90         
91         if self.work_cache_times != self.worker_bridge.new_work_event.times:
92             self.work_cache = {}
93             self.work_cache_times = self.worker_bridge.new_work_event.times
94         
95         if key in self.work_cache:
96             res, orig_timestamp, handler = self.work_cache.pop(key)
97         else:
98             res, handler = self.worker_bridge.get_work(*key)
99             assert res.merkle_root not in self.merkle_roots
100             orig_timestamp = res.timestamp
101         
102         self.merkle_roots[res.merkle_root] = handler, orig_timestamp
103         
104         if res.timestamp + 12 < orig_timestamp + 600:
105             self.work_cache[key] = res.update(timestamp=res.timestamp + 12), orig_timestamp, handler
106         
107         if p2pool.DEBUG:
108             print 'POLL %i END identifier=%i' % (id, self.worker_bridge.new_work_event.times)
109         
110         defer.returnValue(res.getwork(identifier=str(self.worker_bridge.new_work_event.times), submitold=True))