import random
import sys
+import time
-from twisted.internet import defer, reactor, task
+from twisted.internet import defer, reactor
from twisted.python import log
from p2pool import data as p2pool_data, p2p
class P2PNode(p2p.Node):
- def __init__(self, node, p2pool_port, p2pool_conns, addrs, connect_addrs):
+ def __init__(self, node, **kwargs):
self.node = node
p2p.Node.__init__(self,
best_share_hash_func=lambda: node.best_share_var.value,
- port=p2pool_port,
net=node.net,
- addr_store=addrs,
- connect_addrs=connect_addrs,
- max_incoming_conns=p2pool_conns,
known_txs_var=node.known_txs_var,
mining_txs_var=node.mining_txs_var,
- )
+ **kwargs)
def handle_shares(self, shares, peer):
if len(shares) > 5:
print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
new_count = 0
- for share in shares:
+ all_new_txs = {}
+ for share, new_txs in shares:
+ if new_txs is not None:
+ all_new_txs.update((bitcoin_data.hash256(bitcoin_data.tx_type.pack(new_tx)), new_tx) for new_tx in new_txs)
+
if share.hash in self.node.tracker.items:
#print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
continue
new_count += 1
- #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
+ #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer_addr)
self.node.tracker.add(share)
+ new_known_txs = dict(self.node.known_txs_var.value)
+ new_known_txs.update(all_new_txs)
+ self.node.known_txs_var.set(new_known_txs)
+
if new_count:
self.node.set_best_share()
except:
log.err(None, 'in handle_share_hashes:')
else:
- self.handle_shares(shares, peer)
+ self.handle_shares([(share, []) for share in shares], peer)
def handle_get_shares(self, hashes, parents, stops, peer):
parents = min(parents, 1000//len(hashes))
if share.hash in stops:
break
shares.append(share)
- print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
+ if len(shares) > 0:
+ print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
return shares
def handle_bestblock(self, header, peer):
raise p2p.PeerMisbehavingError('received block header fails PoW test')
self.node.handle_header(header)
- @defer.inlineCallbacks
def broadcast_share(self, share_hash):
shares = []
for share in self.node.tracker.get_chain(share_hash, min(5, self.node.tracker.get_height(share_hash))):
self.shared_share_hashes.add(share.hash)
shares.append(share)
- for peer in list(self.peers.itervalues()):
- yield peer.sendShares([share for share in shares if share.peer is not peer], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash])
+ for peer in self.peers.itervalues():
+ peer.sendShares([share for share in shares if share.peer_addr != peer.addr], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash])
def start(self):
p2p.Node.start(self)
def download_shares():
while True:
desired = yield self.node.desired_var.get_when_satisfies(lambda val: len(val) != 0)
- peer2, share_hash = random.choice(desired)
+ peer_addr, share_hash = random.choice(desired)
if len(self.peers) == 0:
yield deferral.sleep(1)
try:
shares = yield peer.get_shares(
hashes=[share_hash],
- parents=500,
- stops=[],
+ parents=random.randrange(500), # randomize parents so that we eventually get past a too large block of shares
+ stops=list(set(self.node.tracker.heads) | set(
+ self.node.tracker.get_nth_parent_hash(head, min(max(0, self.node.tracker.get_height_and_last(head)[0] - 1), 10)) for head in self.node.tracker.heads
+ ))[:100],
)
+ except defer.TimeoutError:
+ print 'Share request timed out!'
+ continue
except:
log.err(None, 'in download_shares:')
continue
if not shares:
yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
continue
- self.handle_shares(shares, peer)
+ self.handle_shares([(share, []) for share in shares], peer)
@self.node.best_block_header.changed.watch
self.bitcoind_work = variable.Variable((yield helper.getwork(self.bitcoind)))
@defer.inlineCallbacks
def work_poller():
- while True:
+ while stop_signal.times == 0:
flag = self.factory.new_block.get_deferred()
try:
self.bitcoind_work.set((yield helper.getwork(self.bitcoind, self.bitcoind_work.value['use_getblocktemplate'])))
self.handle_header = handle_header
@defer.inlineCallbacks
def poll_header():
+ if self.factory.conn.value is None:
+ return
handle_header((yield self.factory.conn.value.get_block_header(self.bitcoind_work.value['previous_block'])))
self.bitcoind_work.changed.watch(lambda _: poll_header())
yield deferral.retry('Error while requesting best block header:')(poll_header)()
if tx_hash in self.known_txs_var.value:
new_known_txs[tx_hash] = self.known_txs_var.value[tx_hash]
self.known_txs_var.set(new_known_txs)
- task.LoopingCall(forget_old_txs).start(10)
+ t = deferral.RobustLoopingCall(forget_old_txs)
+ t.start(10)
+ stop_signal.watch(t.stop)
+
+ t = deferral.RobustLoopingCall(self.clean_tracker)
+ t.start(5)
+ stop_signal.watch(t.stop)
def set_best_share(self):
- best, desired = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
+ best, desired, decorated_heads = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
self.best_share_var.set(best)
self.desired_var.set(desired)
def get_current_txouts(self):
return p2pool_data.get_expected_payouts(self.tracker, self.best_share_var.value, self.bitcoind_work.value['bits'].target, self.bitcoind_work.value['subsidy'], self.net)
+
+ def clean_tracker(self):
+ best, desired, decorated_heads = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
+
+ # eat away at heads
+ if decorated_heads:
+ for i in xrange(1000):
+ to_remove = set()
+ for share_hash, tail in self.tracker.heads.iteritems():
+ if share_hash in [head_hash for score, head_hash in decorated_heads[-5:]]:
+ #print 1
+ continue
+ if self.tracker.items[share_hash].time_seen > time.time() - 300:
+ #print 2
+ continue
+ if share_hash not in self.tracker.verified.items and max(self.tracker.items[after_tail_hash].time_seen for after_tail_hash in self.tracker.reverse.get(tail)) > time.time() - 120: # XXX stupid
+ #print 3
+ continue
+ to_remove.add(share_hash)
+ if not to_remove:
+ break
+ for share_hash in to_remove:
+ if share_hash in self.tracker.verified.items:
+ self.tracker.verified.remove(share_hash)
+ self.tracker.remove(share_hash)
+ #print "_________", to_remove
+
+ # drop tails
+ for i in xrange(1000):
+ to_remove = set()
+ for tail, heads in self.tracker.tails.iteritems():
+ if min(self.tracker.get_height(head) for head in heads) < 2*self.tracker.net.CHAIN_LENGTH + 10:
+ continue
+ to_remove.update(self.tracker.reverse.get(tail, set()))
+ if not to_remove:
+ break
+ # if removed from this, it must be removed from verified
+ #start = time.time()
+ for aftertail in to_remove:
+ if self.tracker.items[aftertail].previous_hash not in self.tracker.tails:
+ print "erk", aftertail, self.tracker.items[aftertail].previous_hash
+ continue
+ if aftertail in self.tracker.verified.items:
+ self.tracker.verified.remove(aftertail)
+ self.tracker.remove(aftertail)
+ #end = time.time()
+ #print "removed! %i %f" % (len(to_remove), (end - start)/len(to_remove))
+
+ self.set_best_share()