import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
from bitcoin import worker_interface, height_tracker
-from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging
+from util import fixargparse, jsonrpc, variable, deferral, math, logging
from . import p2p, networks, web, work
import p2pool, p2pool.data as p2pool_data
# BEST SHARE
get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
- requested = expiring_dict.ExpiringDict(300)
- peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
best_share_var = variable.Variable(None)
+ desired_var = variable.Variable(None)
def set_best_share():
best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
best_share_var.set(best)
-
- t = time.time()
- for peer2, share_hash in desired:
- if share_hash not in tracker.tails: # was received in the time tracker.think was running
- continue
- last_request_time, count = requested.get(share_hash, (None, 0))
- if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
- continue
- potential_peers = set()
- for head in tracker.tails[share_hash]:
- potential_peers.update(peer_heads.get(head, set()))
- potential_peers = [peer for peer in potential_peers if peer.connected2]
- if count == 0 and peer2 is not None and peer2.connected2:
- peer = peer2
- else:
- peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
- if peer is None:
- continue
-
- print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
- peer.send_getshares(
- hashes=[share_hash],
- parents=2000,
- stops=list(set(tracker.heads) | set(
- tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
- ))[:100],
- )
- requested[share_hash] = t, count + 1
+ desired_var.set(desired)
bitcoind_work.changed.watch(lambda _: set_best_share())
set_best_share()
-
print ' ...success!'
print
tracker.add(share)
- if shares and peer is not None:
- peer_heads.setdefault(shares[0].hash, set()).add(peer)
-
if new_count:
set_best_share()
if len(shares) > 5:
print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
+ @defer.inlineCallbacks
def handle_share_hashes(self, hashes, peer):
- t = time.time()
- get_hashes = []
- for share_hash in hashes:
- if share_hash in tracker.items:
- continue
- last_request_time, count = requested.get(share_hash, (None, 0))
- if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
- continue
- print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
- get_hashes.append(share_hash)
- requested[share_hash] = t, count + 1
-
- if hashes and peer is not None:
- peer_heads.setdefault(hashes[0], set()).add(peer)
- if get_hashes:
- peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
+ new_hashes = [x for x in hashes if x not in tracker.items]
+ if not new_hashes:
+ return
+ try:
+ shares = yield peer.get_shares(
+ hashes=new_hashes,
+ parents=0,
+ stops=[],
+ )
+ except:
+ log.err(None, 'in handle_share_hashes:')
+ else:
+ self.handle_shares(shares, peer)
def handle_get_shares(self, hashes, parents, stops, peer):
parents = min(parents, 1000//len(hashes))
ss.add_verified_hash(share.hash)
task.LoopingCall(save_shares).start(60)
+ @apply
+ @defer.inlineCallbacks
+ def download_shares():
+ while True:
+ desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
+ peer2, share_hash = random.choice(desired)
+
+ if len(p2p_node.peers) == 0:
+ yield deferral.sleep(1)
+ continue
+ peer = random.choice(p2p_node.peers.values())
+
+ print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
+ try:
+ shares = yield peer.get_shares(
+ hashes=[share_hash],
+ parents=500,
+ stops=[],
+ )
+ except:
+ log.err(None, 'in download_shares:')
+ else:
+ p2p_node.handle_shares(shares, peer)
+
print ' ...success!'
print
self.other_version = None
self.connected2 = False
-
- self.get_shares = deferral.GenericDeferrer(2**256, lambda id, hashes, parents, stops: self.send_sharereq(id=id, hashes=hashes, parents=parents, stops=stops))
def connectionMade(self):
p2protocol.Protocol.connectionMade(self)
)
self.timeout_delayed = reactor.callLater(10, self._connect_timeout)
+
+ self.get_shares = deferral.GenericDeferrer(
+ max_id=2**256,
+ func=lambda id, hashes, parents, stops: self.send_sharereq(id=id, hashes=hashes, parents=parents, stops=stops),
+ timeout=15,
+ on_timeout=self.transport.loseConnection,
+ )
def _connect_timeout(self):
self.timeout_delayed = None
])
def handle_sharereply(self, id, result, shares):
if result == 'good':
- res = shares
+ res = [p2pool_data.load_share(share, self.node.net, self) for share in shares if share['type'] not in [6, 7]]
else:
res = failure.Failure("sharereply result: " + result)
self.get_shares.got_response(id, res)
Converts query with identifier/got response interface to deferred interface
'''
- def __init__(self, max_id, func, timeout=5):
+ def __init__(self, max_id, func, timeout=5, on_timeout=lambda: None):
self.max_id = max_id
self.func = func
self.timeout = timeout
+ self.on_timeout = on_timeout
self.map = {}
def __call__(self, *args, **kwargs):
def timeout():
self.map.pop(id)
df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
+ self.on_timeout()
timer = reactor.callLater(self.timeout, timeout)
self.func(id, *args, **kwargs)
self.map[id] = df, timer
def respond_all(self, resp):
while self.map:
- df, timer = self.map.popitem()
+ id, (df, timer) = self.map.popitem()
timer.cancel()
df.errback(resp)