From 19bf4eac02eb3e12ca168ebc87f61f561e848d90 Mon Sep 17 00:00:00 2001 From: Forrest Voight Date: Sun, 27 Jan 2013 17:21:21 -0500 Subject: [PATCH] Revert "broadcast shares in serial", strongly suspected of causing a memory leak This reverts commit 6f1a456b21db79b06cd6d3edd1904dd3c597b981. Conflicts: p2pool/main.py p2pool/p2p.py p2pool/util/p2protocol.py --- p2pool/node.py | 5 ++--- p2pool/p2p.py | 15 ++++----------- p2pool/util/p2protocol.py | 14 -------------- 3 files changed, 6 insertions(+), 28 deletions(-) diff --git a/p2pool/node.py b/p2pool/node.py index 44ca054..1c98866 100644 --- a/p2pool/node.py +++ b/p2pool/node.py @@ -76,7 +76,6 @@ class P2PNode(p2p.Node): raise p2p.PeerMisbehavingError('received block header fails PoW test') self.node.handle_header(header) - @defer.inlineCallbacks def broadcast_share(self, share_hash): shares = [] for share in self.node.tracker.get_chain(share_hash, min(5, self.node.tracker.get_height(share_hash))): @@ -85,8 +84,8 @@ class P2PNode(p2p.Node): self.shared_share_hashes.add(share.hash) shares.append(share) - for peer in list(self.peers.itervalues()): - yield peer.sendShares([share for share in shares if share.peer_addr != peer.addr], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash]) + for peer in self.peers.itervalues(): + peer.sendShares([share for share in shares if share.peer_addr != peer.addr], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash]) def start(self): p2p.Node.start(self) diff --git a/p2pool/p2p.py b/p2pool/p2p.py index d833961..979b51e 100644 --- a/p2pool/p2p.py +++ b/p2pool/p2p.py @@ -19,10 +19,10 @@ class PeerMisbehavingError(Exception): def fragment(f, **kwargs): try: - return f(**kwargs) + f(**kwargs) except p2protocol.TooLong: fragment(f, **dict((k, v[:len(v)//2]) for k, v in kwargs.iteritems())) - return fragment(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems())) + fragment(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems())) class Protocol(p2protocol.Protocol): max_remembered_txs_size = 2500000 @@ -36,8 +36,6 @@ class Protocol(p2protocol.Protocol): self.connected2 = False def connectionMade(self): - p2protocol.Protocol.connectionMade(self) - self.factory.proto_made_connection(self) self.connection_lost_event = variable.Event() @@ -260,9 +258,6 @@ class Protocol(p2protocol.Protocol): self.node.handle_shares([p2pool_data.load_share(share, self.node.net, self.addr) for share in shares if share['type'] >= 9], self) def sendShares(self, shares, tracker, known_txs, include_txs_with=[]): - if not shares: - return defer.succeed(None) - if self.other_version >= 8: tx_hashes = set() for share in shares: @@ -280,14 +275,12 @@ class Protocol(p2protocol.Protocol): fragment(self.send_remember_tx, tx_hashes=[x for x in hashes_to_send if x in self.remote_tx_hashes], txs=[known_txs[x] for x in hashes_to_send if x not in self.remote_tx_hashes]) - res = fragment(self.send_shares, shares=[share.as_share() for share in shares]) + fragment(self.send_shares, shares=[share.as_share() for share in shares]) if self.other_version >= 8: - res = self.send_forget_tx(tx_hashes=hashes_to_send) + self.send_forget_tx(tx_hashes=hashes_to_send) self.remote_remembered_txs_size -= sum(100 + bitcoin_data.tx_type.packed_size(known_txs[x]) for x in hashes_to_send) - - return res message_sharereq = pack.ComposedType([ diff --git a/p2pool/util/p2protocol.py b/p2pool/util/p2protocol.py index e1ba80c..28ace01 100644 --- a/p2pool/util/p2protocol.py +++ b/p2pool/util/p2protocol.py @@ -19,21 +19,8 @@ class Protocol(protocol.Protocol): self._message_prefix = message_prefix self._max_payload_length = max_payload_length self.dataReceived2 = datachunker.DataChunker(self.dataReceiver()) - self.paused_var = variable.Variable(False) self.traffic_happened = traffic_happened - def connectionMade(self): - self.transport.registerProducer(self, True) - - def pauseProducing(self): - self.paused_var.set(True) - - def resumeProducing(self): - self.paused_var.set(False) - - def stopProducing(self): - pass - def dataReceived(self, data): self.traffic_happened.happened('p2p/in', len(data)) self.dataReceived2(data) @@ -96,7 +83,6 @@ class Protocol(protocol.Protocol): data = self._message_prefix + struct.pack('<12sI', command, len(payload)) + hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] + payload self.traffic_happened.happened('p2p/out', len(data)) self.transport.write(data) - return self.paused_var.get_when_satisfies(lambda paused: not paused) def __getattr__(self, attr): prefix = 'send_' -- 1.7.1