code style fixes
[p2pool.git] / p2pool / worker_interface.py
1 from __future__ import division
2
3 import json
4 import random
5
6 from twisted.internet import defer, reactor
7 from twisted.python import log
8
9 import p2pool
10 from p2pool.util import jsonrpc, deferred_resource, variable
11
12 # TODO: branch on User-Agent to remove overhead of workarounds
13
14 def get_memory(user_agent):
15     user_agent2 = '' if user_agent is None else user_agent.lower()
16     if 'java' in user_agent2: return 0 # hopefully diablominer...
17     if 'cpuminer' in user_agent2: return 0
18     if 'cgminer' in user_agent2: return 1
19     if 'poclbm' in user_agent2: return 1
20     if 'phoenix' in user_agent2: return 2
21     print 'Unknown miner User-Agent:', repr(user_agent)
22     return 0
23
24 def get_id(request):
25     return request.getClientIP(), request.getHeader('Authorization'), request.getHeader('User-Agent')
26
27 last_cache_invalidation = {} # XXX remove global
28
29 def merge(gw1, gw2):
30     if gw1['hash1'] != gw2['hash1']:
31         raise ValueError()
32     if gw1['target'] != gw2['target']:
33         raise ValueError()
34     return dict(
35         data=gw1['data'],
36         midstate=gw2['midstate'],
37         hash1=gw1['hash1'],
38         target=gw1['target'],
39     )
40
41 class LongPollingWorkerInterface(deferred_resource.DeferredResource):
42     def __init__(self, work, compute):
43         self.work = work
44         self.compute = compute
45     
46     @defer.inlineCallbacks
47     def render_GET(self, request):
48         try:
49             try:
50                 id = random.randrange(10000)
51                 if p2pool.DEBUG:
52                     print 'LONG POLL', id
53                 
54                 request_id = get_id(request)
55                 memory = get_memory(request.getHeader('User-Agent'))
56                 
57                 if request_id not in last_cache_invalidation:
58                     last_cache_invalidation[request_id] = variable.Variable((None, None))
59                 
60                 while True:
61                     work = self.work.value
62                     thought_work = last_cache_invalidation[request_id].value
63                     if work != thought_work[-1]:
64                         break
65                     if p2pool.DEBUG:
66                         print 'POLL %i WAITING' % (id,)
67                     yield defer.DeferredList([self.work.changed.get_deferred(), last_cache_invalidation[request_id].changed.get_deferred()], fireOnOneCallback=True)
68                 
69                 if thought_work[-1] is not None and work != thought_work[-1] and any(x is None or work['previous_block'] == x['previous_block'] for x in thought_work[-memory or len(thought_work):]):
70                     # clients won't believe the update
71                     newwork = work.copy()
72                     newwork['previous_block'] = random.randrange(2**256)
73                     if p2pool.DEBUG:
74                         print 'longpoll faked', id
75                     res = self.compute(work, request.getHeader('X-All-Targets') is not None)
76                     newres = self.compute(newwork, request.getHeader('X-All-Targets') is not None)
77                 else:
78                     newwork = work
79                     newres = res = self.compute(work, request.getHeader('X-All-Targets') is not None)
80                 
81                 reactor.callLater(.01, lambda: last_cache_invalidation[request_id].set((thought_work[-1], newwork)))
82                 
83                 request.setHeader('X-Long-Polling', '/long-polling')
84                 request.setHeader('Content-Type', 'application/json')
85                 request.write(json.dumps({
86                     'jsonrpc': '2.0',
87                     'id': 0,
88                     'result': merge(newres.getwork(), res.getwork()),
89                     'error': None,
90                 }))
91             except jsonrpc.Error:
92                 raise
93             except Exception:
94                 log.err(None, 'Squelched long polling error:')
95                 raise jsonrpc.Error(-32099, u'Unknown error')
96         
97         except jsonrpc.Error, e:
98             request.write(json.dumps({
99                 'jsonrpc': '2.0',
100                 'id': 0,
101                 'result': None,
102                 'error': e._to_obj(),
103             }))
104         
105         
106         if p2pool.DEBUG:
107             print 'END POLL %i %x' % (id, work['best_share_hash'] % 2**32 if work['best_share_hash'] is not None else 0)
108     render_POST = render_GET
109
110 class RateInterface(deferred_resource.DeferredResource):
111     def __init__(self, get_rate):
112         self.get_rate = get_rate
113     
114     def render_GET(self, request):
115         request.setHeader('Content-Type', 'application/json')
116         request.write(json.dumps(self.get_rate()))
117
118 class WorkerInterface(jsonrpc.Server):
119     def __init__(self, work, compute, response_callback, get_rate):
120         jsonrpc.Server.__init__(self)
121         
122         self.work = work
123         self.compute = compute
124         self.response_callback = response_callback
125         self.get_rate = get_rate
126         
127         self.putChild('long-polling',
128             LongPollingWorkerInterface(self.work, self.compute))
129         self.putChild('rate',
130             RateInterface(get_rate))
131         self.putChild('', self)
132     
133     def rpc_getwork(self, request, data=None):
134         request.setHeader('X-Long-Polling', '/long-polling')
135         
136         if data is not None:
137             return self.response_callback(data)
138         
139         request_id = get_id(request)
140         memory = get_memory(request.getHeader('User-Agent'))
141         
142         if request_id not in last_cache_invalidation:
143             last_cache_invalidation[request_id] = variable.Variable((None, None))
144         
145         work = self.work.value
146         thought_work = last_cache_invalidation[request_id].value
147         
148         if thought_work[-1] is not None and work != thought_work[-1] and any(x is None or work['previous_block'] == x['previous_block'] for x in thought_work[-memory or len(thought_work):]):
149             # clients won't believe the update
150             newwork = work.copy()
151             newwork['previous_block'] = random.randrange(2**256)
152             if p2pool.DEBUG:
153                 print 'getwork faked'
154             res = self.compute(work, request.getHeader('X-All-Targets') is not None)
155             newres = self.compute(newwork, request.getHeader('X-All-Targets') is not None)
156         else:
157             newwork = work
158             newres = res = self.compute(work, request.getHeader('X-All-Targets') is not None)
159         
160         reactor.callLater(.01, lambda: last_cache_invalidation[request_id].set((thought_work[-1], newwork)))
161         if p2pool.DEBUG:
162             print 'END GETWORK %i' % (work['best_share_hash'] % 2**32 if work['best_share_hash'] is not None else 0,)
163         
164         return merge(newres.getwork(), res.getwork())
165     rpc_getwork.takes_request = True