diff --git a/hathor/p2p/sync_v2/transaction_streaming_client.py b/hathor/p2p/sync_v2/transaction_streaming_client.py index 07f420618..f83ad68c0 100644 --- a/hathor/p2p/sync_v2/transaction_streaming_client.py +++ b/hathor/p2p/sync_v2/transaction_streaming_client.py @@ -99,6 +99,9 @@ def resume(self) -> Deferred[StreamEnd]: def fails(self, reason: 'StreamingError') -> None: """Fail the execution by resolving the deferred with an error.""" + if self._deferred.called: + self.log.warn('already failed before', new_reason=repr(reason)) + return self._deferred.errback(reason) def handle_transaction(self, tx: BaseTransaction) -> None: @@ -125,6 +128,9 @@ def handle_transaction(self, tx: BaseTransaction) -> None: @inlineCallbacks def process_queue(self) -> Generator[Any, Any, None]: """Process next transaction in the queue.""" + if self._deferred.called: + return + if self._is_processing: return diff --git a/tests/p2p/test_sync_v2.py b/tests/p2p/test_sync_v2.py index 17ea451c0..e393431c3 100644 --- a/tests/p2p/test_sync_v2.py +++ b/tests/p2p/test_sync_v2.py @@ -1,12 +1,22 @@ +import base64 +import re + import pytest from twisted.internet.defer import inlineCallbacks, succeed from twisted.python.failure import Failure from hathor.conf import HathorSettings +from hathor.p2p.messages import ProtocolMessages from hathor.p2p.peer_id import PeerId from hathor.p2p.sync_v2.agent import _HeightInfo from hathor.simulator import FakeConnection -from hathor.simulator.trigger import StopAfterNMinedBlocks, StopAfterNTransactions, StopWhenTrue, Trigger +from hathor.simulator.trigger import ( + StopAfterNMinedBlocks, + StopAfterNTransactions, + StopWhenSendLineMatch, + StopWhenTrue, + Trigger, +) from hathor.transaction.storage.traversal import DFSWalk from tests.simulation.base import SimulatorTestCase from tests.utils import HAS_ROCKSDB @@ -321,3 +331,50 @@ def fake_get_peer_block_hashes(heights): sync_agent.get_peer_block_hashes = fake_get_peer_block_hashes common_block_info = yield sync_agent.find_best_common_block(my_best_block, fake_peer_best_block) self.assertIsNone(common_block_info) + + def test_multiple_unexpected_txs(self) -> None: + manager1 = self.create_peer(enable_sync_v1=False, enable_sync_v2=True) + manager1.allow_mining_without_peers() + + # mine some blocks (10, could be any amount) + miner1 = self.simulator.create_miner(manager1, hashpower=10e6) + miner1.start() + self.assertTrue(self.simulator.run(3 * 3600, trigger=StopAfterNMinedBlocks(miner1, quantity=100))) + miner1.stop() + + # generate some transactions (10, could by any amount >1) + gen_tx1 = self.simulator.create_tx_generator(manager1, rate=3., hashpower=10e9, ignore_no_funds=True) + gen_tx1.start() + self.assertTrue(self.simulator.run(3 * 3600, trigger=StopAfterNTransactions(gen_tx1, quantity=10))) + gen_tx1.stop() + + # mine some blocks (2 to be sure, 1 should be enough) + miner1.start() + self.assertTrue(self.simulator.run(3 * 3600, trigger=StopAfterNMinedBlocks(miner1, quantity=2))) + miner1.stop() + + # create a new peer and run sync and stop when it requests transactions, so we can inject it with invalid ones + manager2 = self.create_peer(enable_sync_v1=False, enable_sync_v2=True) + conn12 = FakeConnection(manager1, manager2, latency=0.05) + self.simulator.add_connection(conn12) + regex = re.compile(rf'{ProtocolMessages.GET_TRANSACTIONS_BFS.value} '.encode('ascii')) + self.assertTrue(self.simulator.run(2 * 60, trigger=StopWhenSendLineMatch(conn12._proto2, regex))) + + # make up some transactions that the node isn't expecting + best_block = manager1.tx_storage.get_best_block() + existing_tx = manager1.tx_storage.get_transaction(list(best_block.get_tx_parents())[0]) + fake_txs = [] + for i in range(3): + fake_tx = existing_tx.clone() + fake_tx.timestamp += 1 + i # incrementally add timestamp so something is guaranteed to change + manager1.cpu_mining_service.resolve(fake_tx) + fake_txs.append(fake_tx) + + # send fake transactions to manager2, before the fix the first should fail with no issue, but the second would + # end up on an AlreadyCalledError because the deferred.errback will be called twice + for fake_tx in fake_txs: + sync_node2 = conn12.proto2.state.sync_agent + sync_node2.handle_transaction(base64.b64encode(fake_tx.get_struct()).decode()) + + # force the processing of async code, nothing should break + self.simulator.run(0)