Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sync-v2): stop _process_transaction on error #877

Merged
merged 1 commit into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions hathor/p2p/sync_v2/transaction_streaming_client.py
msbrogli marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ def resume(self) -> Deferred[StreamEnd]:

def fails(self, reason: 'StreamingError') -> None:
"""Fail the execution by resolving the deferred with an error."""
if self._deferred.called:
self.log.warn('already failed before', new_reason=repr(reason))
return
self._deferred.errback(reason)

def handle_transaction(self, tx: BaseTransaction) -> None:
Expand All @@ -125,6 +128,9 @@ def handle_transaction(self, tx: BaseTransaction) -> None:
@inlineCallbacks
def process_queue(self) -> Generator[Any, Any, None]:
"""Process next transaction in the queue."""
if self._deferred.called:
return

if self._is_processing:
return

Expand Down
59 changes: 58 additions & 1 deletion tests/p2p/test_sync_v2.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import base64
import re

import pytest
from twisted.internet.defer import inlineCallbacks, succeed
from twisted.python.failure import Failure

from hathor.conf import HathorSettings
from hathor.p2p.messages import ProtocolMessages
from hathor.p2p.peer_id import PeerId
from hathor.p2p.sync_v2.agent import _HeightInfo
from hathor.simulator import FakeConnection
from hathor.simulator.trigger import StopAfterNMinedBlocks, StopAfterNTransactions, StopWhenTrue, Trigger
from hathor.simulator.trigger import (
StopAfterNMinedBlocks,
StopAfterNTransactions,
StopWhenSendLineMatch,
StopWhenTrue,
Trigger,
)
from hathor.transaction.storage.traversal import DFSWalk
from tests.simulation.base import SimulatorTestCase
from tests.utils import HAS_ROCKSDB
Expand Down Expand Up @@ -321,3 +331,50 @@ def fake_get_peer_block_hashes(heights):
sync_agent.get_peer_block_hashes = fake_get_peer_block_hashes
common_block_info = yield sync_agent.find_best_common_block(my_best_block, fake_peer_best_block)
self.assertIsNone(common_block_info)

def test_multiple_unexpected_txs(self) -> None:
manager1 = self.create_peer(enable_sync_v1=False, enable_sync_v2=True)
manager1.allow_mining_without_peers()

# mine some blocks (10, could be any amount)
miner1 = self.simulator.create_miner(manager1, hashpower=10e6)
miner1.start()
self.assertTrue(self.simulator.run(3 * 3600, trigger=StopAfterNMinedBlocks(miner1, quantity=100)))
miner1.stop()

# generate some transactions (10, could by any amount >1)
gen_tx1 = self.simulator.create_tx_generator(manager1, rate=3., hashpower=10e9, ignore_no_funds=True)
gen_tx1.start()
self.assertTrue(self.simulator.run(3 * 3600, trigger=StopAfterNTransactions(gen_tx1, quantity=10)))
gen_tx1.stop()

# mine some blocks (2 to be sure, 1 should be enough)
miner1.start()
self.assertTrue(self.simulator.run(3 * 3600, trigger=StopAfterNMinedBlocks(miner1, quantity=2)))
miner1.stop()

# create a new peer and run sync and stop when it requests transactions, so we can inject it with invalid ones
manager2 = self.create_peer(enable_sync_v1=False, enable_sync_v2=True)
conn12 = FakeConnection(manager1, manager2, latency=0.05)
self.simulator.add_connection(conn12)
regex = re.compile(rf'{ProtocolMessages.GET_TRANSACTIONS_BFS.value} '.encode('ascii'))
self.assertTrue(self.simulator.run(2 * 60, trigger=StopWhenSendLineMatch(conn12._proto2, regex)))

# make up some transactions that the node isn't expecting
best_block = manager1.tx_storage.get_best_block()
existing_tx = manager1.tx_storage.get_transaction(list(best_block.get_tx_parents())[0])
fake_txs = []
for i in range(3):
fake_tx = existing_tx.clone()
fake_tx.timestamp += 1 + i # incrementally add timestamp so something is guaranteed to change
manager1.cpu_mining_service.resolve(fake_tx)
fake_txs.append(fake_tx)

# send fake transactions to manager2, before the fix the first should fail with no issue, but the second would
# end up on an AlreadyCalledError because the deferred.errback will be called twice
for fake_tx in fake_txs:
sync_node2 = conn12.proto2.state.sync_agent
sync_node2.handle_transaction(base64.b64encode(fake_tx.get_struct()).decode())

# force the processing of async code, nothing should break
self.simulator.run(0)