print a less scary, more informative message when a share request times out
[p2pool.git] / p2pool / node.py
1 import random
2 import sys
3 import time
4
5 from twisted.internet import defer, reactor, task
6 from twisted.python import log
7
8 from p2pool import data as p2pool_data, p2p
9 from p2pool.bitcoin import data as bitcoin_data, helper, height_tracker
10 from p2pool.util import deferral, variable
11
12
13 class P2PNode(p2p.Node):
14     def __init__(self, node, **kwargs):
15         self.node = node
16         p2p.Node.__init__(self,
17             best_share_hash_func=lambda: node.best_share_var.value,
18             net=node.net,
19             known_txs_var=node.known_txs_var,
20             mining_txs_var=node.mining_txs_var,
21         **kwargs)
22     
23     def handle_shares(self, shares, peer):
24         if len(shares) > 5:
25             print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
26         
27         new_count = 0
28         for share in shares:
29             if share.hash in self.node.tracker.items:
30                 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
31                 continue
32             
33             new_count += 1
34             
35             #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
36             
37             self.node.tracker.add(share)
38         
39         if new_count:
40             self.node.set_best_share()
41         
42         if len(shares) > 5:
43             print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(self.node.tracker.items), 2*self.node.net.CHAIN_LENGTH)
44     
45     @defer.inlineCallbacks
46     def handle_share_hashes(self, hashes, peer):
47         new_hashes = [x for x in hashes if x not in self.node.tracker.items]
48         if not new_hashes:
49             return
50         try:
51             shares = yield peer.get_shares(
52                 hashes=new_hashes,
53                 parents=0,
54                 stops=[],
55             )
56         except:
57             log.err(None, 'in handle_share_hashes:')
58         else:
59             self.handle_shares(shares, peer)
60     
61     def handle_get_shares(self, hashes, parents, stops, peer):
62         parents = min(parents, 1000//len(hashes))
63         stops = set(stops)
64         shares = []
65         for share_hash in hashes:
66             for share in self.node.tracker.get_chain(share_hash, min(parents + 1, self.node.tracker.get_height(share_hash))):
67                 if share.hash in stops:
68                     break
69                 shares.append(share)
70         print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
71         return shares
72     
73     def handle_bestblock(self, header, peer):
74         if self.node.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
75             raise p2p.PeerMisbehavingError('received block header fails PoW test')
76         self.node.handle_header(header)
77     
78     @defer.inlineCallbacks
79     def broadcast_share(self, share_hash):
80         shares = []
81         for share in self.node.tracker.get_chain(share_hash, min(5, self.node.tracker.get_height(share_hash))):
82             if share.hash in self.shared_share_hashes:
83                 break
84             self.shared_share_hashes.add(share.hash)
85             shares.append(share)
86         
87         for peer in list(self.peers.itervalues()):
88             yield peer.sendShares([share for share in shares if share.peer is not peer], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash])
89     
90     def start(self):
91         p2p.Node.start(self)
92         
93         self.shared_share_hashes = set(self.node.tracker.items)
94         self.node.tracker.removed.watch_weakref(self, lambda self, share: self.shared_share_hashes.discard(share.hash))
95         
96         @apply
97         @defer.inlineCallbacks
98         def download_shares():
99             while True:
100                 desired = yield self.node.desired_var.get_when_satisfies(lambda val: len(val) != 0)
101                 peer2, share_hash = random.choice(desired)
102                 
103                 if len(self.peers) == 0:
104                     yield deferral.sleep(1)
105                     continue
106                 peer = random.choice(self.peers.values())
107                 
108                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
109                 try:
110                     shares = yield peer.get_shares(
111                         hashes=[share_hash],
112                         parents=49,
113                         stops=list(set(self.node.tracker.heads) | set(
114                             self.node.tracker.get_nth_parent_hash(head, min(max(0, self.node.tracker.get_height_and_last(head)[0] - 1), 10)) for head in self.node.tracker.heads
115                         ))[:100],
116                     )
117                 except defer.TimeoutError:
118                     print 'Share request timed out!'
119                     continue
120                 except:
121                     log.err(None, 'in download_shares:')
122                     continue
123                 
124                 if not shares:
125                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
126                     continue
127                 self.handle_shares(shares, peer)
128         
129         
130         @self.node.best_block_header.changed.watch
131         def _(header):
132             for peer in self.peers.itervalues():
133                 peer.send_bestblock(header=header)
134         
135         # send share when the chain changes to their chain
136         self.node.best_share_var.changed.watch(self.broadcast_share)
137         
138         @self.node.tracker.verified.added.watch
139         def _(share):
140             if not (share.pow_hash <= share.header['bits'].target):
141                 return
142             
143             def spread():
144                 if (self.node.get_height_rel_highest(share.header['previous_block']) > -5 or
145                     self.node.bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
146                     self.broadcast_share(share.hash)
147             spread()
148             reactor.callLater(5, spread) # so get_height_rel_highest can update
149         
150
151 class Node(object):
152     def __init__(self, factory, bitcoind, shares, known_verified_share_hashes, net):
153         self.factory = factory
154         self.bitcoind = bitcoind
155         self.net = net
156         
157         self.tracker = p2pool_data.OkayTracker(self.net)
158         
159         for share in shares:
160             self.tracker.add(share)
161         
162         for share_hash in known_verified_share_hashes:
163             if share_hash in self.tracker.items:
164                 self.tracker.verified.add(self.tracker.items[share_hash])
165         
166         self.p2p_node = None # overwritten externally
167     
168     @defer.inlineCallbacks
169     def start(self):
170         stop_signal = variable.Event()
171         self.stop = stop_signal.happened
172         
173         # BITCOIND WORK
174         
175         self.bitcoind_work = variable.Variable((yield helper.getwork(self.bitcoind)))
176         @defer.inlineCallbacks
177         def work_poller():
178             while stop_signal.times == 0:
179                 flag = self.factory.new_block.get_deferred()
180                 try:
181                     self.bitcoind_work.set((yield helper.getwork(self.bitcoind, self.bitcoind_work.value['use_getblocktemplate'])))
182                 except:
183                     log.err()
184                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
185         work_poller()
186         
187         # PEER WORK
188         
189         self.best_block_header = variable.Variable(None)
190         def handle_header(new_header):
191             # check that header matches current target
192             if not (self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= self.bitcoind_work.value['bits'].target):
193                 return
194             bitcoind_best_block = self.bitcoind_work.value['previous_block']
195             if (self.best_block_header.value is None
196                 or (
197                     new_header['previous_block'] == bitcoind_best_block and
198                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(self.best_block_header.value)) == bitcoind_best_block
199                 ) # new is child of current and previous is current
200                 or (
201                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
202                     self.best_block_header.value['previous_block'] != bitcoind_best_block
203                 )): # new is current and previous is not a child of current
204                 self.best_block_header.set(new_header)
205         self.handle_header = handle_header
206         @defer.inlineCallbacks
207         def poll_header():
208             if self.factory.conn.value is None:
209                 return
210             handle_header((yield self.factory.conn.value.get_block_header(self.bitcoind_work.value['previous_block'])))
211         self.bitcoind_work.changed.watch(lambda _: poll_header())
212         yield deferral.retry('Error while requesting best block header:')(poll_header)()
213         
214         # BEST SHARE
215         
216         self.known_txs_var = variable.Variable({}) # hash -> tx
217         self.mining_txs_var = variable.Variable({}) # hash -> tx
218         self.get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(self.bitcoind, self.factory, lambda: self.bitcoind_work.value['previous_block'], self.net)
219         
220         self.best_share_var = variable.Variable(None)
221         self.desired_var = variable.Variable(None)
222         self.bitcoind_work.changed.watch(lambda _: self.set_best_share())
223         self.set_best_share()
224         
225         # setup p2p logic and join p2pool network
226         
227         # update mining_txs according to getwork results
228         @self.bitcoind_work.changed.run_and_watch
229         def _(_=None):
230             new_mining_txs = {}
231             new_known_txs = dict(self.known_txs_var.value)
232             for tx_hash, tx in zip(self.bitcoind_work.value['transaction_hashes'], self.bitcoind_work.value['transactions']):
233                 new_mining_txs[tx_hash] = tx
234                 new_known_txs[tx_hash] = tx
235             self.mining_txs_var.set(new_mining_txs)
236             self.known_txs_var.set(new_known_txs)
237         # add p2p transactions from bitcoind to known_txs
238         @self.factory.new_tx.watch
239         def _(tx):
240             new_known_txs = dict(self.known_txs_var.value)
241             new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
242             self.known_txs_var.set(new_known_txs)
243         # forward transactions seen to bitcoind
244         @self.known_txs_var.transitioned.watch
245         @defer.inlineCallbacks
246         def _(before, after):
247             yield deferral.sleep(random.expovariate(1/1))
248             if self.factory.conn.value is None:
249                 return
250             for tx_hash in set(after) - set(before):
251                 self.factory.conn.value.send_tx(tx=after[tx_hash])
252         
253         @self.tracker.verified.added.watch
254         def _(share):
255             if not (share.pow_hash <= share.header['bits'].target):
256                 return
257             
258             block = share.as_block(self.tracker, self.known_txs_var.value)
259             if block is None:
260                 print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
261                 return
262             helper.submit_block(block, True, self.factory, self.bitcoind, self.bitcoind_work, self.net)
263             print
264             print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
265             print
266         
267         def forget_old_txs():
268             new_known_txs = {}
269             if self.p2p_node is not None:
270                 for peer in self.p2p_node.peers.itervalues():
271                     new_known_txs.update(peer.remembered_txs)
272             new_known_txs.update(self.mining_txs_var.value)
273             for share in self.tracker.get_chain(self.best_share_var.value, min(120, self.tracker.get_height(self.best_share_var.value))):
274                 for tx_hash in share.new_transaction_hashes:
275                     if tx_hash in self.known_txs_var.value:
276                         new_known_txs[tx_hash] = self.known_txs_var.value[tx_hash]
277             self.known_txs_var.set(new_known_txs)
278         t = task.LoopingCall(forget_old_txs)
279         t.start(10)
280         stop_signal.watch(t.stop)
281         
282         t = task.LoopingCall(self.clean_tracker)
283         t.start(5)
284         stop_signal.watch(t.stop)
285     
286     def set_best_share(self):
287         best, desired, decorated_heads = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
288         
289         self.best_share_var.set(best)
290         self.desired_var.set(desired)
291     
292     def get_current_txouts(self):
293         return p2pool_data.get_expected_payouts(self.tracker, self.best_share_var.value, self.bitcoind_work.value['bits'].target, self.bitcoind_work.value['subsidy'], self.net)
294     
295     def clean_tracker(self):
296         best, desired, decorated_heads = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
297         
298         # eat away at heads
299         if decorated_heads:
300             for i in xrange(1000):
301                 to_remove = set()
302                 for share_hash, tail in self.tracker.heads.iteritems():
303                     if share_hash in [head_hash for score, head_hash in decorated_heads[-5:]]:
304                         #print 1
305                         continue
306                     if self.tracker.items[share_hash].time_seen > time.time() - 300:
307                         #print 2
308                         continue
309                     if share_hash not in self.tracker.verified.items and max(self.tracker.items[after_tail_hash].time_seen for after_tail_hash in self.tracker.reverse.get(tail)) > time.time() - 120: # XXX stupid
310                         #print 3
311                         continue
312                     to_remove.add(share_hash)
313                 if not to_remove:
314                     break
315                 for share_hash in to_remove:
316                     if share_hash in self.tracker.verified.items:
317                         self.tracker.verified.remove(share_hash)
318                     self.tracker.remove(share_hash)
319                 #print "_________", to_remove
320         
321         # drop tails
322         for i in xrange(1000):
323             to_remove = set()
324             for tail, heads in self.tracker.tails.iteritems():
325                 if min(self.tracker.get_height(head) for head in heads) < 2*self.tracker.net.CHAIN_LENGTH + 10:
326                     continue
327                 to_remove.update(self.tracker.reverse.get(tail, set()))
328             if not to_remove:
329                 break
330             # if removed from this, it must be removed from verified
331             #start = time.time()
332             for aftertail in to_remove:
333                 if self.tracker.items[aftertail].previous_hash not in self.tracker.tails:
334                     print "erk", aftertail, self.tracker.items[aftertail].previous_hash
335                     continue
336                 if aftertail in self.tracker.verified.items:
337                     self.tracker.verified.remove(aftertail)
338                 self.tracker.remove(aftertail)
339             #end = time.time()
340             #print "removed! %i %f" % (len(to_remove), (end - start)/len(to_remove))
341         
342         self.set_best_share()