for peer in p2p_node.peers.itervalues():
peer.send_bestblock(header=header)
+ @defer.inlineCallbacks
def broadcast_share(share_hash):
shares = []
for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
shared_share_hashes.add(share.hash)
shares.append(share)
- for peer in p2p_node.peers.itervalues():
- peer.sendShares([share for share in shares if share.peer is not peer])
+ for peer in list(p2p_node.peers.itervalues()):
+ yield peer.sendShares([share for share in shares if share.peer is not peer])
# send share when the chain changes to their chain
best_share_var.changed.watch(broadcast_share)
self.connected2 = False
def connectionMade(self):
+ p2protocol.Protocol.connectionMade(self)
+
self.factory.proto_made_connection(self)
self.addr = self.transport.getPeer().host, self.transport.getPeer().port
def sendShares(self, shares):
def att(f, **kwargs):
try:
- f(**kwargs)
+ return f(**kwargs)
except p2protocol.TooLong:
att(f, **dict((k, v[:len(v)//2]) for k, v in kwargs.iteritems()))
- att(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems()))
+ return att(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems()))
if shares:
- att(self.send_shares, shares=[share.as_share() for share in shares])
+ return att(self.send_shares, shares=[share.as_share() for share in shares])
+ else:
+ return defer.succeed(None)
message_sharereq = pack.ComposedType([
from twisted.python import log
import p2pool
-from p2pool.util import datachunker
+from p2pool.util import datachunker, variable
class TooLong(Exception):
pass
self._message_prefix = message_prefix
self._max_payload_length = max_payload_length
self.dataReceived = datachunker.DataChunker(self.dataReceiver())
+ self.paused_var = variable.Variable(False)
+
+ def connectionMade(self):
+ self.transport.registerProducer(self, True)
+
+ def pauseProducing(self):
+ self.paused_var.set(True)
+
+ def resumeProducing(self):
+ self.paused_var.set(False)
+
+ def stopProducing(self):
+ pass
def dataReceiver(self):
while True:
if len(payload) > self._max_payload_length:
raise TooLong('payload too long')
self.transport.write(self._message_prefix + struct.pack('<12sI', command, len(payload)) + hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] + payload)
+ return self.paused_var.get_when_satisfies(lambda paused: not paused)
def __getattr__(self, attr):
prefix = 'send_'
self.changed.happened(value)
self.transitioned.happened(oldvalue, value)
+ @defer.inlineCallbacks
+ def get_when_satisfies(self, func):
+ while True:
+ if func(self.value):
+ defer.returnValue(self.value)
+ yield self.changed.once.get_deferred()
+
def get_not_none(self):
- if self.value is not None:
- return defer.succeed(self.value)
- else:
- df = defer.Deferred()
- self.changed.once.watch(df.callback)
- return df
+ return self.get_when_satisfies(lambda val: val is not None)