Skip to content

Commit

Permalink
feat(sync-v2): Fix issue when a reorg occurs during a streaming of tr…
Browse files Browse the repository at this point in the history
…ansactions
  • Loading branch information
msbrogli committed Nov 9, 2023
1 parent db38db4 commit 9fff604
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions hathor/p2p/sync_v2/streamers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class StreamEnd(IntFlag):
STREAM_BECAME_VOIDED = 3 # this will happen when the current chain becomes voided while it is being sent
TX_NOT_CONFIRMED = 4
INVALID_PARAMS = 5
INTERNAL_ERROR = 6

def __str__(self):
if self is StreamEnd.END_HASH_REACHED:
Expand All @@ -54,6 +55,8 @@ def __str__(self):
return 'streamed reached a tx that is not confirmed'
elif self is StreamEnd.INVALID_PARAMS:
return 'streamed with invalid parameters'
elif self is StreamEnd.INTERNAL_ERROR:
return 'internal error'
else:
raise ValueError(f'invalid StreamEnd value: {self.value}')

Expand Down Expand Up @@ -89,7 +92,18 @@ def schedule_if_needed(self) -> None:
if self.delayed_call and self.delayed_call.active():
return

self.delayed_call = self.sync_agent.reactor.callLater(0, self.send_next)
self.delayed_call = self.sync_agent.reactor.callLater(0, self.safe_send_next)

def safe_send_next(self) -> None:
"""Call send_next() and schedule next call."""
try:
self.send_next()
except Exception:
self.stop()
self.sync_agent.send_blocks_end(StreamEnd.INTERNAL_ERROR)
raise
else:
self.schedule_if_needed()

def start(self) -> None:
"""Start pushing."""
Expand Down Expand Up @@ -189,8 +203,6 @@ def send_next(self) -> None:
self.sync_agent.send_blocks_end(StreamEnd.NO_MORE_BLOCKS)
return

self.schedule_if_needed()


class TransactionsStreamingServer(_StreamingServerBase):
"""Streams all transactions confirmed by the given block, from right to left (decreasing timestamp).
Expand Down Expand Up @@ -243,6 +255,13 @@ def get_iter(self) -> Iterator[BaseTransaction]:
yield from it
if self.current_block == self.last_block:
break

# Check if this block is still in the best blockchain.
if self.current_block.get_metadata().voided_by:
self.stop()
self.sync_agent.send_blocks_end(StreamEnd.STREAM_BECAME_VOIDED)
return

self.current_block = self.current_block.get_next_block_best_chain()
self.start_from.clear()

Expand All @@ -263,7 +282,6 @@ def send_next(self) -> None:
# Skip blocks.
if cur.is_block:
self.bfs.skip_neighbors(cur)
self.schedule_if_needed()
return

assert isinstance(cur, Transaction)
Expand All @@ -283,7 +301,6 @@ def send_next(self) -> None:
if not_none(first_block.get_metadata().height) < not_none(self.current_block.get_metadata().height):
self.log.debug('skipping tx: out of current block')
self.bfs.skip_neighbors(cur)
self.schedule_if_needed()
return

self.log.debug('send next transaction', tx_id=cur.hash.hex())
Expand All @@ -294,5 +311,3 @@ def send_next(self) -> None:
self.stop()
self.sync_agent.send_transactions_end(StreamEnd.LIMIT_EXCEEDED)
return

self.schedule_if_needed()

0 comments on commit 9fff604

Please sign in to comment.