Skip to content

Commit

Permalink
feat(multiprocess-verification): implement on_new_vertex_async
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed May 8, 2024
1 parent 1303641 commit 4b6a318
Showing 1 changed file with 66 additions and 12 deletions.
78 changes: 66 additions & 12 deletions hathor/vertex_handler/vertex_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
import datetime

from structlog import get_logger
from twisted.internet.task import deferLater

from hathor.conf.settings import HathorSettings
from hathor.consensus import ConsensusAlgorithm
from hathor.exception import HathorError, InvalidNewTransaction
from hathor.p2p.manager import ConnectionsManager
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.reactor import ReactorProtocol
from hathor.transaction import BaseTransaction, Block
from hathor.transaction import Block, Vertex
from hathor.transaction.storage import TransactionStorage
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
from hathor.verification.verification_service import VerificationService
Expand Down Expand Up @@ -68,26 +69,29 @@ def __init__(

def on_new_vertex(
self,
vertex: BaseTransaction,
vertex: Vertex,
*,
quiet: bool = False,
fails_silently: bool = True,
propagate_to_peers: bool = True,
reject_locked_reward: bool = True,
) -> bool:
""" New method for adding transactions or blocks that steps the validation state machine.
"""Method for adding vertices (transactions or blocks) that steps the validation state machine, synchronously.
:param vertex: transaction to be added
:param quiet: if True will not log when a new tx is accepted
:param fails_silently: if False will raise an exception when tx cannot be added
:param propagate_to_peers: if True will relay the tx to other peers if it is accepted
"""
is_pre_valid = self._pre_validate_vertex(vertex, fails_silently=fails_silently)
if not is_pre_valid:
return False

is_valid = self._validate_vertex(
vertex,
fails_silently=fails_silently,
reject_locked_reward=reject_locked_reward
)

if not is_valid:
return False

Expand All @@ -101,13 +105,46 @@ def on_new_vertex(

return True

def _validate_vertex(
async def on_new_vertex_async(
self,
vertex: BaseTransaction,
vertex: Vertex,
*,
fails_silently: bool,
reject_locked_reward: bool,
quiet: bool = False,
fails_silently: bool = True,
propagate_to_peers: bool = True,
reject_locked_reward: bool = True,
) -> bool:
"""Method for adding vertices (transactions or blocks) that steps the validation state machine, asynchronously.
This is exactly the same method as `on_new_vertex()`, except it calls async verification.
:param vertex: transaction to be added
:param quiet: if True will not log when a new tx is accepted
:param fails_silently: if False will raise an exception when tx cannot be added
:param propagate_to_peers: if True will relay the tx to other peers if it is accepted
"""
is_pre_valid = self._pre_validate_vertex(vertex, fails_silently=fails_silently)
if not is_pre_valid:
return False

is_valid = await self._validate_vertex_async(
vertex,
fails_silently=fails_silently,
reject_locked_reward=reject_locked_reward
)
if not is_valid:
return False

self._save_and_run_consensus(vertex)
self._post_consensus(
vertex,
quiet=quiet,
propagate_to_peers=propagate_to_peers,
reject_locked_reward=reject_locked_reward
)

return True

def _pre_validate_vertex(self, vertex: Vertex, *, fails_silently: bool) -> bool:
assert self._tx_storage.is_only_valid_allowed()
already_exists = False
if self._tx_storage.transaction_exists(vertex.hash):
Expand Down Expand Up @@ -144,6 +181,11 @@ def _validate_vertex(
self._log.warn('on_new_tx(): previously marked as invalid', tx=vertex.hash_hex)
return False

return True

def _validate_vertex(self, vertex: Vertex, *, fails_silently: bool, reject_locked_reward: bool) -> bool:
metadata = vertex.get_metadata()

if not metadata.validation.is_fully_connected():
try:
self._verification_service.validate_full(vertex, reject_locked_reward=reject_locked_reward)
Expand All @@ -155,7 +197,19 @@ def _validate_vertex(

return True

def _save_and_run_consensus(self, vertex: BaseTransaction) -> None:
async def _validate_vertex_async(
self,
vertex: Vertex,
*,
fails_silently: bool,
reject_locked_reward: bool,
) -> bool:
# TODO: This method simply calls synchronous verification, but it also releases the reactor loop. This is
# temporary, and soon this method will be changed to call verification on a separate process.
await deferLater(self._reactor, 0, lambda: None)
return self._validate_vertex(vertex, fails_silently=fails_silently, reject_locked_reward=reject_locked_reward)

def _save_and_run_consensus(self, vertex: Vertex) -> None:
# The method below adds the tx as a child of the parents
# This needs to be called right before the save because we were adding the children
# in the tx parents even if the tx was invalid (failing the verifications above)
Expand All @@ -167,7 +221,7 @@ def _save_and_run_consensus(self, vertex: BaseTransaction) -> None:

def _post_consensus(
self,
vertex: BaseTransaction,
vertex: Vertex,
*,
quiet: bool,
propagate_to_peers: bool,
Expand Down Expand Up @@ -205,7 +259,7 @@ def _post_consensus(
# Propagate to our peers.
self._p2p_manager.send_tx_to_peers(vertex)

def _log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None:
def _log_new_object(self, tx: Vertex, message_fmt: str, *, quiet: bool) -> None:
""" A shortcut for logging additional information for block/txs.
"""
metadata = tx.get_metadata()
Expand All @@ -228,7 +282,7 @@ def _log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool)
log_func = self._log.debug
log_func(message, **kwargs)

def _log_feature_states(self, vertex: BaseTransaction) -> None:
def _log_feature_states(self, vertex: Vertex) -> None:
"""Log features states for a block. Used as part of the Feature Activation Phased Testing."""
if not isinstance(vertex, Block):
return
Expand Down

0 comments on commit 4b6a318

Please sign in to comment.