From: Forrest Voight Date: Fri, 12 Oct 2012 20:21:06 +0000 (-0400) Subject: limit max amount of tx data stored to 1.5 MB X-Git-Tag: 8.0^2~12 X-Git-Url: https://git.novaco.in/?p=p2pool.git;a=commitdiff_plain;h=1254fa83fe69af66c9dd9b9001e15b71035c7589 limit max amount of tx data stored to 1.5 MB --- diff --git a/p2pool/p2p.py b/p2pool/p2p.py index 49a239d..e80d97e 100644 --- a/p2pool/p2p.py +++ b/p2pool/p2p.py @@ -24,6 +24,8 @@ def fragment(f, **kwargs): return att(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems())) class Protocol(p2protocol.Protocol): + max_remembered_txs_size = 2500000 + def __init__(self, node, incoming): p2protocol.Protocol.__init__(self, node.net.PREFIX, 1000000, node.traffic_happened) self.node = node @@ -70,7 +72,10 @@ class Protocol(p2protocol.Protocol): ) self.remote_tx_hashes = set() # view of peer's known_txs # not actually initially empty, but sending txs instead of tx hashes won't hurt + self.remote_remembered_txs_size = 0 + self.remembered_txs = {} # view of peer's mining_txs + self.remembered_txs_size = 0 def _connect_timeout(self): self.timeout_delayed = None @@ -168,12 +173,17 @@ class Protocol(p2protocol.Protocol): added = set(after) - set(before) removed = set(before) - set(after) if added: + self.remote_remembered_txs_size += sum(len(bitcoin_data.tx_type.pack(after[x])) for x in added) + assert self.remote_remembered_txs_size <= self.max_remembered_txs_size fragment(self.send_remember_tx, tx_hashes=[x for x in added if x in self.remote_tx_hashes], txs=[after[x] for x in added if x not in self.remote_tx_hashes]) if removed: self.send_forget_tx(tx_hashes=removed) + self.remote_remembered_txs_size -= sum(len(bitcoin_data.tx_type.pack(before[x])) for x in removed) watch_id2 = self.node.mining_txs_var.transitioned.watch(update_remote_view_of_my_mining_txs) self.connection_lost_event.watch(lambda: self.node.mining_txs_var.transitioned.unwatch(watch_id2)) + self.remote_remembered_txs_size += sum(len(bitcoin_data.tx_type.pack(x)) for x in self.node.mining_txs_var.value.values()) + assert self.remote_remembered_txs_size <= self.max_remembered_txs_size fragment(self.send_remember_tx, tx_hashes=[], txs=self.node.mining_txs_var.value.values()) message_ping = pack.ComposedType([]) @@ -248,12 +258,19 @@ class Protocol(p2protocol.Protocol): if share.hash in include_txs_with: tx_hashes.update(share.get_other_tx_hashes(tracker)) - hashes_to_send = [x for x in tx_hashes if x not in self.node.mining_txs_var.value] + hashes_to_send = [x for x in tx_hashes if x not in self.node.mining_txs_var.value and x in known_txs] - fragment(self.send_remember_tx, tx_hashes=[x for x in hashes_to_send if x in self.remote_tx_hashes], txs=[known_txs[x] for x in hashes_to_send if x not in self.remote_tx_hashes and x in known_txs]) # last one required? + new_remote_remembered_txs_size = self.remote_remembered_txs_size + sum(len(bitcoin_data.tx_type.pack(known_txs[x])) for x in hashes_to_send) + if new_remote_remembered_txs_size > self.max_remembered_txs_size: + raise ValueError('shares have too many txs') + self.remote_remembered_txs_size = new_remote_remembered_txs_size + + fragment(self.send_remember_tx, tx_hashes=[x for x in hashes_to_send if x in self.remote_tx_hashes], txs=[known_txs[x] for x in hashes_to_send if x not in self.remote_tx_hashes]) fragment(self.send_shares, shares=[share.as_share() for share in shares]) res = self.send_forget_tx(tx_hashes=hashes_to_send) + self.remote_remembered_txs_size -= sum(len(bitcoin_data.tx_type.pack(known_txs[x])) for x in hashes_to_send) + return res @@ -310,18 +327,24 @@ class Protocol(p2protocol.Protocol): for tx_hash in tx_hashes: if tx_hash not in self.remembered_txs: self.remembered_txs[tx_hash] = self.node.known_txs_var.value[tx_hash] + self.remembered_txs_size += len(bitcoin_data.tx_type.pack(self.node.known_txs_var.value[tx_hash])) new_known_txs = dict(self.node.known_txs_var.value) for tx in txs: tx_hash = bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx)) if tx_hash not in self.remembered_txs: self.remembered_txs[tx_hash] = tx + self.remembered_txs_size += len(bitcoin_data.tx_type.pack(tx)) new_known_txs[tx_hash] = tx self.node.known_txs_var.set(new_known_txs) + if self.remembered_txs_size >= self.max_remembered_txs_size: + raise PeerMisbehavingError('too much transaction data stored') message_forget_tx = pack.ComposedType([ ('tx_hashes', pack.ListType(pack.IntType(256))), ]) def handle_forget_tx(self, tx_hashes): for tx_hash in tx_hashes: + self.remembered_txs_size -= len(bitcoin_data.tx_type.pack(self.remembered_txs[tx_hash])) + assert self.remembered_txs_size >= 0 del self.remembered_txs[tx_hash] diff --git a/p2pool/test/test_p2p.py b/p2pool/test/test_p2p.py index a771e8c..7707ce7 100644 --- a/p2pool/test/test_p2p.py +++ b/p2pool/test/test_p2p.py @@ -1,9 +1,11 @@ import random -from twisted.internet import defer +from twisted.internet import defer, endpoints, protocol, reactor from twisted.trial import unittest from p2pool import networks, p2p +from p2pool.bitcoin import data as bitcoin_data +from p2pool.util import deferral class Test(unittest.TestCase): @@ -29,3 +31,56 @@ class Test(unittest.TestCase): yield df finally: yield n.stop() + + @defer.inlineCallbacks + def test_tx_limit(self): + class MyNode(p2p.Node): + def __init__(self, df): + p2p.Node.__init__(self, lambda: None, 29333, networks.nets['litecoin_testnet'], {}, set([('127.0.0.1', 19338)]), 0, 0, 0, 0) + + self.df = df + self.sent_time = 0 + + @defer.inlineCallbacks + def got_conn(self, conn): + p2p.Node.got_conn(self, conn) + huge_tx = dict( + version=0, + tx_ins=[], + tx_outs=[dict( + value=0, + script='x'*900000, + )], + lock_time=0, + ) + new_mining_txs = dict(self.mining_txs_var.value) + new_mining_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(huge_tx))] = huge_tx + self.mining_txs_var.set(new_mining_txs) + + yield deferral.sleep(1) + + huge_tx = dict( + version=0, + tx_ins=[], + tx_outs=[dict( + value=0, + script='x'*900000, + )], + lock_time=1, + ) + new_mining_txs = dict(self.mining_txs_var.value) + new_mining_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(huge_tx))] = huge_tx + self.mining_txs_var.set(new_mining_txs) + self.sent_time = reactor.seconds() + + + def lost_conn(self, conn, reason): + self.df.callback(None) + + df = defer.Deferred() + n = MyNode(df) + n.start() + yield df + if not (n.sent_time <= reactor.seconds() <= n.sent_time + 1): + raise ValueError('node did not disconnect within 1 seconds of receiving too much tx data') + yield n.stop()