raise ValueError('''gentx doesn't match hash_link''')
return gentx # only used by as_block
- def as_block(self, tracker):
+ def get_other_tx_hashes(self, tracker):
+ return []
+
+ def get_other_txs(self, tracker, known_txs):
+ return []
+
+ def get_other_txs_size(self, tracker, known_txs):
+ return 0
+
+ def get_new_txs_size(self, known_txs):
+ return 0
+
+ def as_block(self, tracker, known_txs):
if self.other_txs is None:
raise ValueError('share does not contain all txs')
return dict(header=self.header, txs=[self.check(tracker)] + self.other_txs)
return gentx # only used by as_block
- def as_block(self, tracker):
- other_tx_hashes = [tracker.items[tracker.get_nth_parent_hash(self.hash, x['share_count'])].share_info['new_transaction_hashes'][x['tx_count']] for x in self.share_info['transaction_hash_refs']]
+ def get_other_tx_hashes(self, tracker):
+ return [tracker.items[tracker.get_nth_parent_hash(self.hash, x['share_count'])].share_info['new_transaction_hashes'][x['tx_count']] for x in self.share_info['transaction_hash_refs']]
+
+ def as_block(self, tracker, known_txs):
+ other_tx_hashes = self.get_other_tx_hashes(tracker)
+ if not all(tx_hash in known_txs for tx_hash in other_tx_hashes):
+ return None # not all txs present
- print [tx_hash in self.peer.remembered_txs for tx_hash in other_tx_hashes]
- txs = [self.check(tracker)] + [self.peer.remembered_txs[tx_hash] for tx_hash in other_tx_hashes]
- print
- print 'SUCCESS'
- print
- return dict(header=self.header, txs=txs)
+ return dict(header=self.header, txs=[self.check(tracker)] + [known_txs[tx_hash] for tx_hash in other_tx_hashes])
class WeightsSkipList(forest.TrackerSkipList):
self.verified.add(share)
return True
- def think(self, block_rel_height_func, previous_block, bits):
+ def think(self, block_rel_height_func, previous_block, bits, known_txs):
desired = set()
# O(len(self.heads))
#self.items[h].peer is None,
self.items[h].pow_hash <= self.items[h].header['bits'].target, # is block solution
(self.items[h].header['previous_block'], self.items[h].header['bits']) == (previous_block, bits) or self.items[h].peer is None,
+ self.items[h].as_block(self, known_txs) is not None,
-self.items[h].time_seen,
), h) for h in self.verified.tails.get(best_tail, []))
if p2pool.DEBUG:
if p2pool.DEBUG:
print 'Stale detected! %x < %x' % (best_share.header['previous_block'], previous_block)
best = best_share.previous_hash
+ elif best_share.as_block(self, known_txs) is None:
+ print 'Share with incomplete transactions detected! Jumping from %s to %s!' % (format_hash(best), format_hash(best_share.previous_hash))
+ best = best_share.previous_hash
timestamp_cutoff = min(int(time.time()), best_share.timestamp) - 3600
target_cutoff = 2**256//(self.net.SHARE_PERIOD*best_tail_score[1] + 1) * 2 if best_tail_score[1] is not None else 2**256-1
# BEST SHARE
+ known_txs_var = variable.Variable({}) # hash -> tx
+ mining_txs_var = variable.Variable({}) # hash -> tx
get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
best_share_var = variable.Variable(None)
desired_var = variable.Variable(None)
def set_best_share():
- best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
+ best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'], known_txs_var.value)
best_share_var.set(best)
desired_var.set(desired)
# setup p2p logic and join p2pool network
- known_txs_var = variable.Variable({}) # hash -> tx
- mining_txs_var = variable.Variable({}) # hash -> tx
# update mining_txs according to getwork results
- @bitcoind_work.changed.watch
- def _(work):
+ @bitcoind_work.changed.run_and_watch
+ def _(_=None):
new_mining_txs = {}
- for tx in work['transactions']:
- new_mining_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
+ new_known_txs = dict(known_txs_var.value)
+ for tx in bitcoind_work.value['transactions']:
+ tx_hash = bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))
+ new_mining_txs[tx_hash] = tx
+ new_known_txs[tx_hash] = tx
mining_txs_var.set(new_mining_txs)
+ known_txs_var.set(new_known_txs)
# forward transactions seen to bitcoind
@known_txs_var.transitioned.watch
def _(before, after):
@tracker.verified.added.watch
def _(share):
if share.pow_hash <= share.header['bits'].target:
- submit_block(share.as_block(tracker), ignore_failure=True)
+ block = share.as_block(tracker, known_txs_var.value)
+ if block is None:
+ print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
+ return
+ submit_block(block, ignore_failure=True)
print
print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
print
shares.append(share)
for peer in list(p2p_node.peers.itervalues()):
- yield peer.sendShares([share for share in shares if share.peer is not peer])
+ yield peer.sendShares([share for share in shares if share.peer is not peer], tracker, known_txs_var.value, include_txs_with=[share_hash])
# send share when the chain changes to their chain
best_share_var.changed.watch(broadcast_share)
class PeerMisbehavingError(Exception):
pass
+
+def fragment(f, **kwargs):
+ try:
+ return f(**kwargs)
+ except p2protocol.TooLong:
+ att(f, **dict((k, v[:len(v)//2]) for k, v in kwargs.iteritems()))
+ return att(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems()))
+
class Protocol(p2protocol.Protocol):
def __init__(self, node, incoming):
p2protocol.Protocol.__init__(self, node.net.PREFIX, 1000000, node.traffic_happened)
added = set(after) - set(before)
removed = set(before) - set(after)
if added:
- self.send_remember_tx(tx_hashes=[x for x in added if x in self.remote_tx_hashes], txs=[after[x] for x in added if x not in self.remote_tx_hashes])
+ fragment(self.send_remember_tx, tx_hashes=[x for x in added if x in self.remote_tx_hashes], txs=[after[x] for x in added if x not in self.remote_tx_hashes])
if removed:
self.send_forget_tx(tx_hashes=removed)
watch_id2 = self.node.mining_txs_var.transitioned.watch(update_remote_view_of_my_mining_txs)
self.connection_lost_event.watch(lambda: self.node.mining_txs_var.transitioned.unwatch(watch_id2))
- self.send_remember_tx(tx_hashes=[], txs=self.node.mining_txs_var.value.values())
+ fragment(self.send_remember_tx, tx_hashes=[], txs=self.node.mining_txs_var.value.values())
message_ping = pack.ComposedType([])
def handle_ping(self):
def handle_shares(self, shares):
self.node.handle_shares([p2pool_data.load_share(share, self.node.net, self) for share in shares if share['type'] not in [6, 7]], self)
- def sendShares(self, shares):
- def att(f, **kwargs):
- try:
- return f(**kwargs)
- except p2protocol.TooLong:
- att(f, **dict((k, v[:len(v)//2]) for k, v in kwargs.iteritems()))
- return att(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems()))
- if shares:
- return att(self.send_shares, shares=[share.as_share() for share in shares])
- else:
+ def sendShares(self, shares, tracker, known_txs, include_txs_with=[]):
+ if not shares:
return defer.succeed(None)
+
+ tx_hashes = set()
+ for share in shares:
+ if share.hash in include_txs_with:
+ tx_hashes.update(share.get_other_tx_hashes(tracker))
+
+ hashes_to_send = [x for x in tx_hashes if x not in self.node.mining_txs_var.value]
+
+ fragment(self.send_remember_tx, tx_hashes=[x for x in hashes_to_send if x in self.remote_tx_hashes], txs=[known_txs[x] for x in hashes_to_send if x not in self.remote_tx_hashes and x in known_txs]) # last one required?
+ fragment(self.send_shares, shares=[share.as_share() for share in shares])
+ res = self.send_forget_tx(tx_hashes=hashes_to_send)
+
+ return res
message_sharereq = pack.ComposedType([
self.node.lost_conn(proto, reason)
class Node(object):
- def __init__(self, best_share_hash_func, port, net, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, traffic_happened=variable.Event()):
+ def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, traffic_happened=variable.Event(), known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({})):
self.best_share_hash_func = best_share_hash_func
self.port = port
self.net = net
- self.known_txs_var = known_txs_var
- self.mining_txs_var = mining_txs_var
self.addr_store = dict(addr_store)
self.connect_addrs = connect_addrs
self.preferred_storage = preferred_storage
self.traffic_happened = traffic_happened
+ self.known_txs_var = known_txs_var
+ self.mining_txs_var = mining_txs_var
self.nonce = random.randrange(2**64)
self.peers = {}
self._once = None
self.times = 0
+ def run_and_watch(self, func):
+ func()
+ return self.watch(func)
def watch(self, func):
id = self.id_generator.next()
self.observers[id] = func