Skip to content

Commit

Permalink
refactor(sync-v2): Modify TransactionStreamingClient to process verti…
Browse files Browse the repository at this point in the history
…ces asynchronously
  • Loading branch information
msbrogli committed Nov 10, 2023
1 parent 1171e10 commit e85ad70
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 21 deletions.
6 changes: 4 additions & 2 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from structlog import get_logger
from twisted.internet.defer import Deferred, inlineCallbacks
from twisted.internet.task import LoopingCall
from twisted.internet.task import LoopingCall, deferLater

from hathor.conf.get_settings import get_settings
from hathor.p2p.messages import ProtocolMessages
Expand Down Expand Up @@ -569,10 +569,12 @@ def find_best_common_block(self,
self.log.debug('find_best_common_block n-ary search finished', lo=lo, hi=hi)
return lo

def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> None:
@inlineCallbacks
def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> Generator[Any, Any, None]:
"""This method is called when a block and its transactions are downloaded."""
for tx in vertex_list:
self.manager.on_new_tx(tx, propagate_to_peers=False, fails_silently=False)
yield deferLater(self.reactor, 0, lambda: None)

self.manager.on_new_tx(blk, propagate_to_peers=False, fails_silently=False)

Expand Down
101 changes: 82 additions & 19 deletions hathor/p2p/sync_v2/transaction_streaming_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, Iterator
from collections import deque
from typing import TYPE_CHECKING, Any, Generator, Iterator, Optional

from structlog import get_logger
from twisted.internet.defer import Deferred
from twisted.internet.defer import Deferred, inlineCallbacks

from hathor.p2p.sync_v2.exception import (
InvalidVertexError,
Expand Down Expand Up @@ -45,23 +46,43 @@ def __init__(self,
self.protocol = self.sync_agent.protocol
self.tx_storage = self.sync_agent.tx_storage
self.manager = self.sync_agent.manager
self.reactor = self.manager.reactor

self.log = logger.new(peer=self.protocol.get_short_peer_id())

# List of blocks from which we will receive transactions.
self.partial_blocks = partial_blocks

# True if we are processing a transaction.
self._is_processing: bool = False

# Deferred return to the sync agent.
self._deferred: Deferred[StreamEnd] = Deferred()

# Number of transactions received.
self._tx_received: int = 0

# Maximum number of transactions to be received.
self._tx_max_quantity = limit

# Queue of transactions waiting to be processed.
self._queue: deque[BaseTransaction] = deque()

# Keeps the response code if the streaming has ended.
self._response_code: Optional[StreamEnd] = None

# Index to the current block.
self._idx: int = 0
self._buffer: list[VertexId] = []

# Set of hashes we are waiting to receive.
self._waiting_for: set[VertexId] = set()

# In-memory database of transactions already received but still
# waiting for dependencies.
self._db: dict[VertexId, BaseTransaction] = {}

self._prepare_block(self.partial_blocks[0])
assert self._waiting_for

def wait(self) -> Deferred[StreamEnd]:
"""Return the deferred."""
Expand All @@ -71,6 +92,7 @@ def resume(self) -> Deferred[StreamEnd]:
"""Resume receiving vertices."""
assert self._deferred.called
self._tx_received = 0
self._response_code = None
self._deferred = Deferred()
return self._deferred

Expand All @@ -92,9 +114,37 @@ def handle_transaction(self, tx: BaseTransaction) -> None:
return

assert tx.hash is not None

self.log.debug('tx received', tx_id=tx.hash.hex())

self._queue.append(tx)
assert len(self._queue) <= self._tx_max_quantity
if not self._is_processing:
self.reactor.callLater(0, self.process_queue)

@inlineCallbacks
def process_queue(self) -> Generator[Any, Any, None]:
"""Process next transaction in the queue."""
if self._is_processing:
return

if not self._queue:
self.check_end()
return

self._is_processing = True
try:
tx = self._queue.popleft()
yield self._process_transaction(tx)
finally:
self._is_processing = False

self.reactor.callLater(0, self.process_queue)

@inlineCallbacks
def _process_transaction(self, tx: BaseTransaction) -> Generator[Any, Any, None]:
"""Process transaction."""
assert tx.hash is not None

# Run basic verification.
if not tx.is_genesis:
try:
Expand All @@ -120,11 +170,13 @@ def handle_transaction(self, tx: BaseTransaction) -> None:
self._waiting_for.add(dep)

self._db[tx.hash] = tx
self._buffer.append(tx.hash)

if not self._waiting_for:
self.log.debug('no pending dependencies, processing buffer')
self._execute_and_prepare_next()
while not self._waiting_for:
result = yield self._execute_and_prepare_next()
if not result:
break
else:
self.log.debug('pending dependencies', counter=len(self._waiting_for))

Expand All @@ -144,38 +196,49 @@ def handle_transactions_end(self, response_code: StreamEnd) -> None:
"""This method is called by the sync agent when a TRANSACTIONS-END message is received."""
if self._deferred.called:
return
self.log.info('transactions streaming ended', reason=response_code, waiting_for=len(self._waiting_for))
self._deferred.callback(response_code)
assert self._response_code is None
self._response_code = response_code
self.check_end()

def check_end(self) -> None:
"""Check if the streaming has ended."""
if self._response_code is None:
return

if self._queue:
return

self.log.info('transactions streaming ended', reason=self._response_code, waiting_for=len(self._waiting_for))
self._deferred.callback(self._response_code)

def _execute_and_prepare_next(self) -> None:
@inlineCallbacks
def _execute_and_prepare_next(self) -> Generator[Any, Any, bool]:
"""Add the block and its vertices to the DAG."""
assert not self._waiting_for

blk = self.partial_blocks[self._idx]
vertex_list = [self._db[_id] for _id in self._buffer]
vertex_list = list(self._db.values())
vertex_list.sort(key=lambda v: v.timestamp)

try:
self.sync_agent.on_block_complete(blk, vertex_list)
yield self.sync_agent.on_block_complete(blk, vertex_list)
except HathorError as e:
self.fails(InvalidVertexError(repr(e)))
return
return False

self._idx += 1
if self._idx < len(self.partial_blocks):
self._prepare_block(self.partial_blocks[self._idx])
if self._idx >= len(self.partial_blocks):
return False

self._prepare_block(self.partial_blocks[self._idx])
return True

def _prepare_block(self, blk: 'Block') -> None:
"""Reset everything for the next block. It also adds blocks that have no dependencies."""
self._buffer.clear()
self._waiting_for.clear()
self._db.clear()

# Add pending dependencies from block.
for dep in blk.get_all_dependencies():
if not self.tx_storage.transaction_exists(dep):
self._waiting_for.add(dep)

# If block is ready to be added then do it.
if not self._waiting_for:
self._execute_and_prepare_next()

0 comments on commit e85ad70

Please sign in to comment.