5 from twisted.internet import defer, reactor, task
6 from twisted.python import log
8 from p2pool import data as p2pool_data, p2p
9 from p2pool.bitcoin import data as bitcoin_data, helper, height_tracker
10 from p2pool.util import deferral, variable
13 class P2PNode(p2p.Node):
14 def __init__(self, node, **kwargs):
16 p2p.Node.__init__(self,
17 best_share_hash_func=lambda: node.best_share_var.value,
19 known_txs_var=node.known_txs_var,
20 mining_txs_var=node.mining_txs_var,
23 def handle_shares(self, shares, peer):
25 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
29 for share, new_txs in shares:
30 if new_txs is not None:
31 all_new_txs.update((bitcoin_data.hash256(bitcoin_data.tx_type.pack(new_tx)), new_tx) for new_tx in new_txs)
33 if share.hash in self.node.tracker.items:
34 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
39 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer_addr)
41 self.node.tracker.add(share)
43 new_known_txs = dict(self.node.known_txs_var.value)
44 new_known_txs.update(all_new_txs)
45 self.node.known_txs_var.set(new_known_txs)
48 self.node.set_best_share()
51 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(self.node.tracker.items), 2*self.node.net.CHAIN_LENGTH)
53 @defer.inlineCallbacks
54 def handle_share_hashes(self, hashes, peer):
55 new_hashes = [x for x in hashes if x not in self.node.tracker.items]
59 shares = yield peer.get_shares(
65 log.err(None, 'in handle_share_hashes:')
67 self.handle_shares([(share, []) for share in shares], peer)
69 def handle_get_shares(self, hashes, parents, stops, peer):
70 parents = min(parents, 1000//len(hashes))
73 for share_hash in hashes:
74 for share in self.node.tracker.get_chain(share_hash, min(parents + 1, self.node.tracker.get_height(share_hash))):
75 if share.hash in stops:
79 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
82 def handle_bestblock(self, header, peer):
83 if self.node.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
84 raise p2p.PeerMisbehavingError('received block header fails PoW test')
85 self.node.handle_header(header)
87 def broadcast_share(self, share_hash):
89 for share in self.node.tracker.get_chain(share_hash, min(5, self.node.tracker.get_height(share_hash))):
90 if share.hash in self.shared_share_hashes:
92 self.shared_share_hashes.add(share.hash)
95 for peer in self.peers.itervalues():
96 peer.sendShares([share for share in shares if share.peer_addr != peer.addr], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash])
101 self.shared_share_hashes = set(self.node.tracker.items)
102 self.node.tracker.removed.watch_weakref(self, lambda self, share: self.shared_share_hashes.discard(share.hash))
105 @defer.inlineCallbacks
106 def download_shares():
108 desired = yield self.node.desired_var.get_when_satisfies(lambda val: len(val) != 0)
109 peer_addr, share_hash = random.choice(desired)
111 if len(self.peers) == 0:
112 yield deferral.sleep(1)
114 peer = random.choice(self.peers.values())
116 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
118 shares = yield peer.get_shares(
120 parents=random.randrange(500), # randomize parents so that we eventually get past a too large block of shares
121 stops=list(set(self.node.tracker.heads) | set(
122 self.node.tracker.get_nth_parent_hash(head, min(max(0, self.node.tracker.get_height_and_last(head)[0] - 1), 10)) for head in self.node.tracker.heads
125 except defer.TimeoutError:
126 print 'Share request timed out!'
129 log.err(None, 'in download_shares:')
133 yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
135 self.handle_shares([(share, []) for share in shares], peer)
138 @self.node.best_block_header.changed.watch
140 for peer in self.peers.itervalues():
141 peer.send_bestblock(header=header)
143 # send share when the chain changes to their chain
144 self.node.best_share_var.changed.watch(self.broadcast_share)
146 @self.node.tracker.verified.added.watch
148 if not (share.pow_hash <= share.header['bits'].target):
152 if (self.node.get_height_rel_highest(share.header['previous_block']) > -5 or
153 self.node.bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
154 self.broadcast_share(share.hash)
156 reactor.callLater(5, spread) # so get_height_rel_highest can update
160 def __init__(self, factory, bitcoind, shares, known_verified_share_hashes, net):
161 self.factory = factory
162 self.bitcoind = bitcoind
165 self.tracker = p2pool_data.OkayTracker(self.net)
168 self.tracker.add(share)
170 for share_hash in known_verified_share_hashes:
171 if share_hash in self.tracker.items:
172 self.tracker.verified.add(self.tracker.items[share_hash])
174 self.p2p_node = None # overwritten externally
176 @defer.inlineCallbacks
178 stop_signal = variable.Event()
179 self.stop = stop_signal.happened
183 self.bitcoind_work = variable.Variable((yield helper.getwork(self.bitcoind)))
184 @defer.inlineCallbacks
186 while stop_signal.times == 0:
187 flag = self.factory.new_block.get_deferred()
189 self.bitcoind_work.set((yield helper.getwork(self.bitcoind, self.bitcoind_work.value['use_getblocktemplate'])))
192 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
197 self.best_block_header = variable.Variable(None)
198 def handle_header(new_header):
199 # check that header matches current target
200 if not (self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= self.bitcoind_work.value['bits'].target):
202 bitcoind_best_block = self.bitcoind_work.value['previous_block']
203 if (self.best_block_header.value is None
205 new_header['previous_block'] == bitcoind_best_block and
206 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(self.best_block_header.value)) == bitcoind_best_block
207 ) # new is child of current and previous is current
209 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
210 self.best_block_header.value['previous_block'] != bitcoind_best_block
211 )): # new is current and previous is not a child of current
212 self.best_block_header.set(new_header)
213 self.handle_header = handle_header
214 @defer.inlineCallbacks
216 if self.factory.conn.value is None:
218 handle_header((yield self.factory.conn.value.get_block_header(self.bitcoind_work.value['previous_block'])))
219 self.bitcoind_work.changed.watch(lambda _: poll_header())
220 yield deferral.retry('Error while requesting best block header:')(poll_header)()
224 self.known_txs_var = variable.Variable({}) # hash -> tx
225 self.mining_txs_var = variable.Variable({}) # hash -> tx
226 self.get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(self.bitcoind, self.factory, lambda: self.bitcoind_work.value['previous_block'], self.net)
228 self.best_share_var = variable.Variable(None)
229 self.desired_var = variable.Variable(None)
230 self.bitcoind_work.changed.watch(lambda _: self.set_best_share())
231 self.set_best_share()
233 # setup p2p logic and join p2pool network
235 # update mining_txs according to getwork results
236 @self.bitcoind_work.changed.run_and_watch
239 new_known_txs = dict(self.known_txs_var.value)
240 for tx_hash, tx in zip(self.bitcoind_work.value['transaction_hashes'], self.bitcoind_work.value['transactions']):
241 new_mining_txs[tx_hash] = tx
242 new_known_txs[tx_hash] = tx
243 self.mining_txs_var.set(new_mining_txs)
244 self.known_txs_var.set(new_known_txs)
245 # add p2p transactions from bitcoind to known_txs
246 @self.factory.new_tx.watch
248 new_known_txs = dict(self.known_txs_var.value)
249 new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
250 self.known_txs_var.set(new_known_txs)
251 # forward transactions seen to bitcoind
252 @self.known_txs_var.transitioned.watch
253 @defer.inlineCallbacks
254 def _(before, after):
255 yield deferral.sleep(random.expovariate(1/1))
256 if self.factory.conn.value is None:
258 for tx_hash in set(after) - set(before):
259 self.factory.conn.value.send_tx(tx=after[tx_hash])
261 @self.tracker.verified.added.watch
263 if not (share.pow_hash <= share.header['bits'].target):
266 block = share.as_block(self.tracker, self.known_txs_var.value)
268 print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
270 helper.submit_block(block, True, self.factory, self.bitcoind, self.bitcoind_work, self.net)
272 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
275 def forget_old_txs():
277 if self.p2p_node is not None:
278 for peer in self.p2p_node.peers.itervalues():
279 new_known_txs.update(peer.remembered_txs)
280 new_known_txs.update(self.mining_txs_var.value)
281 for share in self.tracker.get_chain(self.best_share_var.value, min(120, self.tracker.get_height(self.best_share_var.value))):
282 for tx_hash in share.new_transaction_hashes:
283 if tx_hash in self.known_txs_var.value:
284 new_known_txs[tx_hash] = self.known_txs_var.value[tx_hash]
285 self.known_txs_var.set(new_known_txs)
286 t = task.LoopingCall(forget_old_txs)
288 stop_signal.watch(t.stop)
290 t = task.LoopingCall(self.clean_tracker)
292 stop_signal.watch(t.stop)
294 def set_best_share(self):
295 best, desired, decorated_heads = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
297 self.best_share_var.set(best)
298 self.desired_var.set(desired)
300 def get_current_txouts(self):
301 return p2pool_data.get_expected_payouts(self.tracker, self.best_share_var.value, self.bitcoind_work.value['bits'].target, self.bitcoind_work.value['subsidy'], self.net)
303 def clean_tracker(self):
304 best, desired, decorated_heads = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
308 for i in xrange(1000):
310 for share_hash, tail in self.tracker.heads.iteritems():
311 if share_hash in [head_hash for score, head_hash in decorated_heads[-5:]]:
314 if self.tracker.items[share_hash].time_seen > time.time() - 300:
317 if share_hash not in self.tracker.verified.items and max(self.tracker.items[after_tail_hash].time_seen for after_tail_hash in self.tracker.reverse.get(tail)) > time.time() - 120: # XXX stupid
320 to_remove.add(share_hash)
323 for share_hash in to_remove:
324 if share_hash in self.tracker.verified.items:
325 self.tracker.verified.remove(share_hash)
326 self.tracker.remove(share_hash)
327 #print "_________", to_remove
330 for i in xrange(1000):
332 for tail, heads in self.tracker.tails.iteritems():
333 if min(self.tracker.get_height(head) for head in heads) < 2*self.tracker.net.CHAIN_LENGTH + 10:
335 to_remove.update(self.tracker.reverse.get(tail, set()))
338 # if removed from this, it must be removed from verified
340 for aftertail in to_remove:
341 if self.tracker.items[aftertail].previous_hash not in self.tracker.tails:
342 print "erk", aftertail, self.tracker.items[aftertail].previous_hash
344 if aftertail in self.tracker.verified.items:
345 self.tracker.verified.remove(aftertail)
346 self.tracker.remove(aftertail)
348 #print "removed! %i %f" % (len(to_remove), (end - start)/len(to_remove))
350 self.set_best_share()