Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync-v2): Fix issue when a reorg occurs during a streaming of transactions #855

Merged
merged 1 commit into from
Nov 9, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions hathor/p2p/sync_v2/streamers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class StreamEnd(IntFlag):
STREAM_BECAME_VOIDED = 3 # this will happen when the current chain becomes voided while it is being sent
TX_NOT_CONFIRMED = 4
INVALID_PARAMS = 5
INTERNAL_ERROR = 6

def __str__(self):
if self is StreamEnd.END_HASH_REACHED:
Expand All @@ -54,6 +55,8 @@ def __str__(self):
return 'streamed reached a tx that is not confirmed'
elif self is StreamEnd.INVALID_PARAMS:
return 'streamed with invalid parameters'
elif self is StreamEnd.INTERNAL_ERROR:
return 'internal error'
else:
raise ValueError(f'invalid StreamEnd value: {self.value}')

Expand Down Expand Up @@ -89,7 +92,18 @@ def schedule_if_needed(self) -> None:
if self.delayed_call and self.delayed_call.active():
return

self.delayed_call = self.sync_agent.reactor.callLater(0, self.send_next)
self.delayed_call = self.sync_agent.reactor.callLater(0, self.safe_send_next)

def safe_send_next(self) -> None:
"""Call send_next() and schedule next call."""
try:
self.send_next()
except Exception:
self.stop()
self.sync_agent.send_blocks_end(StreamEnd.INTERNAL_ERROR)
raise
else:
self.schedule_if_needed()

def start(self) -> None:
"""Start pushing."""
Expand Down Expand Up @@ -189,8 +203,6 @@ def send_next(self) -> None:
self.sync_agent.send_blocks_end(StreamEnd.NO_MORE_BLOCKS)
return

self.schedule_if_needed()


class TransactionsStreamingServer(_StreamingServerBase):
"""Streams all transactions confirmed by the given block, from right to left (decreasing timestamp).
Expand Down Expand Up @@ -243,6 +255,13 @@ def get_iter(self) -> Iterator[BaseTransaction]:
yield from it
if self.current_block == self.last_block:
break

# Check if this block is still in the best blockchain.
if self.current_block.get_metadata().voided_by:
self.stop()
self.sync_agent.send_blocks_end(StreamEnd.STREAM_BECAME_VOIDED)
return

self.current_block = self.current_block.get_next_block_best_chain()
self.start_from.clear()

Expand All @@ -263,7 +282,6 @@ def send_next(self) -> None:
# Skip blocks.
if cur.is_block:
self.bfs.skip_neighbors(cur)
self.schedule_if_needed()
return

assert isinstance(cur, Transaction)
Expand All @@ -283,7 +301,6 @@ def send_next(self) -> None:
if not_none(first_block.get_metadata().height) < not_none(self.current_block.get_metadata().height):
self.log.debug('skipping tx: out of current block')
self.bfs.skip_neighbors(cur)
self.schedule_if_needed()
return

self.log.debug('send next transaction', tx_id=cur.hash.hex())
Expand All @@ -294,5 +311,3 @@ def send_next(self) -> None:
self.stop()
self.sync_agent.send_transactions_end(StreamEnd.LIMIT_EXCEEDED)
return

self.schedule_if_needed()
Loading