Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: Change HathorManager.on_new_tx() to accept only fully valid…
Browse files Browse the repository at this point in the history
…ated transactions
msbrogli committed Jun 14, 2023

Verified

This commit was signed with the committer’s verified signature.
987Nabil Nabil Abdel-Hafeez
1 parent fbc4f0a commit c4b2393
Showing 1 changed file with 54 additions and 85 deletions.
139 changes: 54 additions & 85 deletions hathor/manager.py
Original file line number Diff line number Diff line change
@@ -437,7 +437,7 @@ def _initialize_components_full_verification(self) -> None:
if self.tx_storage.indexes.mempool_tips is not None:
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
if self.tx_storage.indexes.deps is not None:
self.sync_v2_step_validations([tx])
self.sync_v2_step_validations([tx], quiet=True)
self.tx_storage.save_transaction(tx, only_metadata=True)
else:
assert tx.validate_basic(skip_block_weight_verification=skip_block_weight_verification)
@@ -653,7 +653,7 @@ def _sync_v2_resume_validations(self) -> None:
tx = self.tx_storage.get_transaction(tx_hash)
if tx.get_metadata().validation.is_final():
depended_final_txs.append(tx)
self.sync_v2_step_validations(depended_final_txs)
self.sync_v2_step_validations(depended_final_txs, quiet=False)
self.log.debug('pending validations finished')

def add_listen_address(self, addr: str) -> None:
@@ -910,8 +910,7 @@ def propagate_tx(self, tx: BaseTransaction, fails_silently: bool = True) -> bool
@cpu.profiler('on_new_tx')
def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = None,
quiet: bool = False, fails_silently: bool = True, propagate_to_peers: bool = True,
skip_block_weight_verification: bool = False, sync_checkpoints: bool = False,
partial: bool = False, reject_locked_reward: bool = True) -> bool:
skip_block_weight_verification: bool = False, reject_locked_reward: bool = True) -> bool:
""" New method for adding transactions or blocks that steps the validation state machine.
:param tx: transaction to be added
@@ -920,10 +919,6 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
:param fails_silently: if False will raise an exception when tx cannot be added
:param propagate_to_peers: if True will relay the tx to other peers if it is accepted
:param skip_block_weight_verification: if True will not check the tx PoW
:param sync_checkpoints: if True and also partial=True, will try to validate as a checkpoint and set the proper
validation state, this is used for adding txs from the sync-checkpoints phase
:param partial: if True will accept txs that can't be fully validated yet (because of missing parent/input) but
will run a basic validation of what can be validated (PoW and other basic fields)
"""
assert tx.hash is not None
if self.tx_storage.transaction_exists(tx.hash):
@@ -958,94 +953,66 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
self.log.warn('on_new_tx(): previously marked as invalid', tx=tx.hash_hex)
return False

# if partial=False (the default) we don't even try to partially validate transactions
if not partial or (metadata.validation.is_fully_connected() or tx.can_validate_full()):
if not metadata.validation.is_fully_connected():
try:
tx.validate_full(sync_checkpoints=sync_checkpoints, reject_locked_reward=reject_locked_reward)
except HathorError as e:
if not fails_silently:
raise InvalidNewTransaction('full validation failed') from e
self.log.warn('on_new_tx(): full validation failed', tx=tx.hash_hex, exc_info=True)
return False

# The method below adds the tx as a child of the parents
# This needs to be called right before the save because we were adding the children
# in the tx parents even if the tx was invalid (failing the verifications above)
# then I would have a children that was not in the storage
tx.update_initial_metadata(save=False)
self.tx_storage.save_transaction(tx)
self.tx_storage.add_to_indexes(tx)
if not metadata.validation.is_fully_connected():
try:
self.consensus_algorithm.update(tx)
tx.validate_full(reject_locked_reward=reject_locked_reward)
except HathorError as e:
if not fails_silently:
raise InvalidNewTransaction('consensus update failed') from e
self.log.warn('on_new_tx(): consensus update failed', tx=tx.hash_hex)
return False
else:
assert tx.validate_full(skip_block_weight_verification=True, reject_locked_reward=reject_locked_reward)
self.tx_storage.indexes.update(tx)
if self.tx_storage.indexes.mempool_tips:
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
self.tx_fully_validated(tx)
elif sync_checkpoints:
assert self.tx_storage.indexes.deps is not None
metadata.children = self.tx_storage.indexes.deps.known_children(tx)
try:
tx.validate_checkpoint(self.checkpoints)
except HathorError:
if not fails_silently:
raise InvalidNewTransaction('checkpoint validation failed')
self.log.warn('on_new_tx(): checkpoint validation failed', tx=tx.hash_hex, exc_info=True)
raise InvalidNewTransaction('full validation failed') from e
self.log.warn('on_new_tx(): full validation failed', tx=tx.hash_hex, exc_info=True)
return False
self.tx_storage.save_transaction(tx)
else:
if isinstance(tx, Block) and not tx.has_basic_block_parent():
if not fails_silently:
raise InvalidNewTransaction('block parent needs to be at least basic-valid')
self.log.warn('on_new_tx(): block parent needs to be at least basic-valid', tx=tx.hash_hex)
return False
if not tx.validate_basic():
if not fails_silently:
raise InvalidNewTransaction('basic validation failed')
self.log.warn('on_new_tx(): basic validation failed', tx=tx.hash_hex)
return False

# The method below adds the tx as a child of the parents
# This needs to be called right before the save because we were adding the children
# in the tx parents even if the tx was invalid (failing the verifications above)
# then I would have a children that was not in the storage
tx.update_initial_metadata(save=False)
self.tx_storage.save_transaction(tx)

if tx.is_transaction and self.tx_storage.indexes.deps is not None:
self.tx_storage.indexes.deps.remove_from_needed_index(tx.hash)

if self.tx_storage.indexes.deps is not None:
try:
self.sync_v2_step_validations([tx])
except (AssertionError, HathorError) as e:
if not fails_silently:
raise InvalidNewTransaction('step validations failed') from e
self.log.warn('on_new_tx(): step validations failed', tx=tx.hash_hex, exc_info=True)
return False
# The method below adds the tx as a child of the parents
# This needs to be called right before the save because we were adding the children
# in the tx parents even if the tx was invalid (failing the verifications above)
# then I would have a children that was not in the storage
tx.update_initial_metadata(save=False)
self.tx_storage.save_transaction(tx)
self.tx_storage.add_to_indexes(tx)
try:
self.consensus_algorithm.update(tx)
except HathorError as e:
if not fails_silently:
raise InvalidNewTransaction('consensus update failed') from e
self.log.warn('on_new_tx(): consensus update failed', tx=tx.hash_hex)
return False

if not quiet:
ts_date = datetime.datetime.fromtimestamp(tx.timestamp)
now = datetime.datetime.fromtimestamp(self.reactor.seconds())
if tx.is_block:
self.log.info('new block', tx=tx, ts_date=ts_date, time_from_now=tx.get_time_from_now(now))
else:
self.log.info('new tx', tx=tx, ts_date=ts_date, time_from_now=tx.get_time_from_now(now))
assert tx.validate_full(skip_block_weight_verification=True, reject_locked_reward=reject_locked_reward)
self.tx_storage.indexes.update(tx)
if self.tx_storage.indexes.mempool_tips:
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
self.tx_fully_validated(tx, quiet=quiet)

if propagate_to_peers:
# Propagate to our peers.
self.connections.send_tx_to_peers(tx)

return True

def sync_v2_step_validations(self, txs: Iterable[BaseTransaction]) -> None:
def log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None:
""" A shortcut for logging additional information for block/txs.
"""
metadata = tx.get_metadata()
now = datetime.datetime.fromtimestamp(self.reactor.seconds())
kwargs = {
'tx': tx,
'ts_date': datetime.datetime.fromtimestamp(tx.timestamp),
'time_from_now': tx.get_time_from_now(now),
'validation': metadata.validation.name,
}
if tx.is_block:
message = message_fmt.format('block')
if isinstance(tx, Block):
kwargs['height'] = tx.get_height()
else:
message = message_fmt.format('tx')
if not quiet:
log_func = self.log.info
else:
log_func = self.log.debug
log_func(message, **kwargs)

def sync_v2_step_validations(self, txs: Iterable[BaseTransaction], *, quiet: bool) -> None:
""" Step all validations until none can be stepped anymore.
"""
assert self.tx_storage.indexes is not None
@@ -1072,9 +1039,9 @@ def sync_v2_step_validations(self, txs: Iterable[BaseTransaction]) -> None:
self.tx_storage.indexes.update(tx)
if self.tx_storage.indexes.mempool_tips:
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
self.tx_fully_validated(tx)
self.tx_fully_validated(tx, quiet=quiet)

def tx_fully_validated(self, tx: BaseTransaction) -> None:
def tx_fully_validated(self, tx: BaseTransaction, *, quiet: bool) -> None:
""" Handle operations that need to happen once the tx becomes fully validated.
This might happen immediately after we receive the tx, if we have all dependencies
@@ -1093,6 +1060,8 @@ def tx_fully_validated(self, tx: BaseTransaction) -> None:
# TODO Remove it and use pubsub instead.
self.wallet.on_new_tx(tx)

self.log_new_object(tx, 'new {}', quiet=quiet)

def listen(self, description: str, use_ssl: Optional[bool] = None) -> None:
endpoint = self.connections.listen(description, use_ssl)
# XXX: endpoint: IStreamServerEndpoint does not intrinsically have a port, but in practice all concrete cases

0 comments on commit c4b2393

Please sign in to comment.