Skip to content

Commit

Permalink
refactor(vertex-handler): remove p2p_manager dependency
Browse files Browse the repository at this point in the history
glevco committed Oct 15, 2024

Verified

This commit was signed with the committer’s verified signature.
panh99 Heng Pan
1 parent 409f3f3 commit dc2977f
Showing 10 changed files with 39 additions and 45 deletions.
1 change: 0 additions & 1 deletion hathor/builder/builder.py
Original file line number Diff line number Diff line change
@@ -614,7 +614,6 @@ def _get_or_create_vertex_handler(self) -> VertexHandler:
tx_storage=self._get_or_create_tx_storage(),
verification_service=self._get_or_create_verification_service(),
consensus=self._get_or_create_consensus(),
p2p_manager=self._get_or_create_p2p_manager(),
feature_service=self._get_or_create_feature_service(),
pubsub=self._get_or_create_pubsub(),
wallet=self._get_or_create_wallet(),
1 change: 0 additions & 1 deletion hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
@@ -334,7 +334,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
tx_storage=tx_storage,
verification_service=verification_service,
consensus=consensus_algorithm,
p2p_manager=p2p_manager,
feature_service=self.feature_service,
pubsub=pubsub,
wallet=self.wallet,
13 changes: 6 additions & 7 deletions hathor/manager.py
Original file line number Diff line number Diff line change
@@ -48,7 +48,6 @@
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.peer_id import PeerId
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.reactor import ReactorProtocol as Reactor
from hathor.reward_lock import is_spent_reward_locked
@@ -69,7 +68,6 @@
from hathor.websocket.factory import HathorAdminWebsocketFactory

logger = get_logger()
cpu = get_cpu_profiler()


class HathorManager:
@@ -171,8 +169,6 @@ def __init__(

self.is_started: bool = False

self.cpu = cpu

# XXX: first checkpoint must be genesis (height=0)
self.checkpoints: list[Checkpoint] = checkpoints or []
self.checkpoints_ready: list[bool] = [False] * len(self.checkpoints)
@@ -960,7 +956,6 @@ def propagate_tx(self, tx: BaseTransaction, fails_silently: bool = True) -> bool

return self.on_new_tx(tx, fails_silently=fails_silently, propagate_to_peers=True)

@cpu.profiler('on_new_tx')
def on_new_tx(
self,
tx: BaseTransaction,
@@ -977,14 +972,18 @@ def on_new_tx(
:param fails_silently: if False will raise an exception when tx cannot be added
:param propagate_to_peers: if True will relay the tx to other peers if it is accepted
"""
return self.vertex_handler.on_new_vertex(
result = self.vertex_handler.on_new_vertex(
tx,
quiet=quiet,
fails_silently=fails_silently,
propagate_to_peers=propagate_to_peers,
reject_locked_reward=reject_locked_reward,
)

if propagate_to_peers and result:
self.connections.send_tx_to_peers(tx)

return result

def has_sync_version_capability(self) -> bool:
return self._settings.CAPABILITY_SYNC_VERSION in self.capabilities

8 changes: 6 additions & 2 deletions hathor/p2p/sync_v1/agent.py
Original file line number Diff line number Diff line change
@@ -636,7 +636,9 @@ def handle_data(self, payload: str) -> None:
self.log.info('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id())
# If we have not requested the data, it is a new transaction being propagated
# in the network, thus, we propagate it as well.
result = self.manager.on_new_tx(tx, propagate_to_peers=True)
result = self.manager.vertex_handler.on_new_vertex(tx)
if result:
self.protocol.connections.send_tx_to_peers(tx)
self.update_received_stats(tx, result)

def update_received_stats(self, tx: 'BaseTransaction', result: bool) -> None:
@@ -685,7 +687,9 @@ def on_tx_success(self, tx: 'BaseTransaction') -> 'BaseTransaction':
success = True
else:
# Add tx to the DAG.
success = self.manager.on_new_tx(tx)
success = self.manager.vertex_handler.on_new_vertex(tx)
if success:
self.protocol.connections.send_tx_to_peers(tx)
# Updating stats data
self.update_received_stats(tx, success)
return tx
20 changes: 9 additions & 11 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
@@ -488,7 +488,9 @@ def handle_tips(self, payload: str) -> None:
data = [bytes.fromhex(x) for x in data]
# filter-out txs we already have
try:
self._receiving_tips.extend(VertexId(tx_id) for tx_id in data if not self.partial_vertex_exists(tx_id))
self._receiving_tips.extend(
VertexId(tx_id) for tx_id in data if not self.tx_storage.partial_vertex_exists(tx_id)
)
except ValueError:
self.protocol.send_error_and_close_connection('Invalid trasaction ID received')
# XXX: it's OK to do this *after* the extend because the payload is limited by the line protocol
@@ -553,12 +555,6 @@ def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) ->
assert self.protocol.state is not None
self.protocol.state.send_message(cmd, payload)

def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
""" Return true if the vertex exists no matter its validation state.
"""
with self.tx_storage.allow_partially_validated_context():
return self.tx_storage.transaction_exists(vertex_id)

@inlineCallbacks
def find_best_common_block(self,
my_best_block: _HeightInfo,
@@ -621,11 +617,11 @@ def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> G
try:
for tx in vertex_list:
if not self.tx_storage.transaction_exists(tx.hash):
self.vertex_handler.on_new_vertex(tx, propagate_to_peers=False, fails_silently=False)
self.vertex_handler.on_new_vertex(tx, fails_silently=False)
yield deferLater(self.reactor, 0, lambda: None)

if not self.tx_storage.transaction_exists(blk.hash):
self.vertex_handler.on_new_vertex(blk, propagate_to_peers=False, fails_silently=False)
self.vertex_handler.on_new_vertex(blk, fails_silently=False)
except InvalidNewTransaction:
self.protocol.send_error_and_close_connection('invalid vertex received')

@@ -1163,7 +1159,7 @@ def handle_data(self, payload: str) -> None:

tx.storage = self.protocol.node.tx_storage

if self.partial_vertex_exists(tx.hash):
if self.tx_storage.partial_vertex_exists(tx.hash):
# transaction already added to the storage, ignore it
# XXX: maybe we could add a hash blacklist and punish peers propagating known bad txs
self.tx_storage.compare_bytes_with_local_tx(tx)
@@ -1174,7 +1170,9 @@ def handle_data(self, payload: str) -> None:
if self.tx_storage.can_validate_full(tx):
self.log.debug('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id())
try:
self.vertex_handler.on_new_vertex(tx, propagate_to_peers=True, fails_silently=False)
result = self.vertex_handler.on_new_vertex(tx, fails_silently=False)
if result:
self.protocol.connections.send_tx_to_peers(tx)
except InvalidNewTransaction:
self.protocol.send_error_and_close_connection('invalid vertex received')
else:
10 changes: 2 additions & 8 deletions hathor/p2p/sync_v2/blockchain_streaming_client.py
Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@
from hathor.p2p.sync_v2.streamers import StreamEnd
from hathor.transaction import Block
from hathor.transaction.exceptions import HathorError
from hathor.types import VertexId

if TYPE_CHECKING:
from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo
@@ -75,11 +74,6 @@ def fails(self, reason: 'StreamingError') -> None:
"""Fail the execution by resolving the deferred with an error."""
self._deferred.errback(reason)

def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
"""Return true if the vertex exists no matter its validation state."""
with self.tx_storage.allow_partially_validated_context():
return self.tx_storage.transaction_exists(vertex_id)

def handle_blocks(self, blk: Block) -> None:
"""This method is called by the sync agent when a BLOCKS message is received."""
if self._deferred.called:
@@ -105,7 +99,7 @@ def handle_blocks(self, blk: Block) -> None:

# Check for repeated blocks.
is_duplicated = False
if self.partial_vertex_exists(blk.hash):
if self.tx_storage.partial_vertex_exists(blk.hash):
# We reached a block we already have. Skip it.
self._blk_repeated += 1
is_duplicated = True
@@ -132,7 +126,7 @@ def handle_blocks(self, blk: Block) -> None:

if self.tx_storage.can_validate_full(blk):
try:
self.vertex_handler.on_new_vertex(blk, propagate_to_peers=False, fails_silently=False)
self.vertex_handler.on_new_vertex(blk, fails_silently=False)
except HathorError:
self.fails(InvalidVertexError(blk.hash.hex()))
return
4 changes: 3 additions & 1 deletion hathor/p2p/sync_v2/mempool.py
Original file line number Diff line number Diff line change
@@ -140,7 +140,9 @@ def _add_tx(self, tx: BaseTransaction) -> None:
if self.tx_storage.transaction_exists(tx.hash):
return
try:
self.vertex_handler.on_new_vertex(tx, fails_silently=False)
result = self.vertex_handler.on_new_vertex(tx, fails_silently=False)
if result:
self.sync_agent.protocol.connections.send_tx_to_peers(tx)
except InvalidNewTransaction:
self.sync_agent.protocol.send_error_and_close_connection('invalid vertex received')
raise
7 changes: 5 additions & 2 deletions hathor/profiler/cpu.py
Original file line number Diff line number Diff line change
@@ -15,12 +15,15 @@
import time
from collections import defaultdict
from functools import wraps
from typing import Any, Callable, Union
from typing import Callable, ParamSpec, TypeVar, Union

from twisted.internet.task import LoopingCall

Key = tuple[str, ...]

T = TypeVar('T')
P = ParamSpec('P')


class ProcItem:
"""Store information for each process."""
@@ -184,7 +187,7 @@ def update(self) -> None:
t1 = time.process_time()
self.measures[('profiler',)].add_time(t1 - t0)

def profiler(self, key: Union[str, Callable[..., str]]) -> Callable[[Callable[..., Any]], Any]:
def profiler(self, key: Union[str, Callable[..., str]]) -> Callable[[Callable[P, T]], Callable[P, T]]:
"""Decorator to collect data. The `key` must be the key itself
or a method that returns the key.
5 changes: 5 additions & 0 deletions hathor/transaction/storage/transaction_storage.py
Original file line number Diff line number Diff line change
@@ -1154,6 +1154,11 @@ def can_validate_full(self, vertex: Vertex) -> bool:
return True
return all_exist and all_valid

def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
"""Return true if the vertex exists no matter its validation state."""
with self.allow_partially_validated_context():
return self.transaction_exists(vertex_id)


class BaseTransactionStorage(TransactionStorage):
indexes: Optional[IndexesManager]
15 changes: 3 additions & 12 deletions hathor/vertex_handler/vertex_handler.py
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@
from hathor.consensus import ConsensusAlgorithm
from hathor.exception import HathorError, InvalidNewTransaction
from hathor.feature_activation.feature_service import FeatureService
from hathor.p2p.manager import ConnectionsManager
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.reactor import ReactorProtocol
from hathor.transaction import BaseTransaction, Block
@@ -30,6 +30,7 @@
from hathor.wallet import BaseWallet

logger = get_logger()
cpu = get_cpu_profiler()


class VertexHandler:
@@ -40,7 +41,6 @@ class VertexHandler:
'_tx_storage',
'_verification_service',
'_consensus',
'_p2p_manager',
'_feature_service',
'_pubsub',
'_wallet',
@@ -55,7 +55,6 @@ def __init__(
tx_storage: TransactionStorage,
verification_service: VerificationService,
consensus: ConsensusAlgorithm,
p2p_manager: ConnectionsManager,
feature_service: FeatureService,
pubsub: PubSubManager,
wallet: BaseWallet | None,
@@ -67,27 +66,25 @@ def __init__(
self._tx_storage = tx_storage
self._verification_service = verification_service
self._consensus = consensus
self._p2p_manager = p2p_manager
self._feature_service = feature_service
self._pubsub = pubsub
self._wallet = wallet
self._log_vertex_bytes = log_vertex_bytes

@cpu.profiler('on_new_vertex')
def on_new_vertex(
self,
vertex: BaseTransaction,
*,
quiet: bool = False,
fails_silently: bool = True,
propagate_to_peers: bool = True,
reject_locked_reward: bool = True,
) -> bool:
""" New method for adding transactions or blocks that steps the validation state machine.
:param vertex: transaction to be added
:param quiet: if True will not log when a new tx is accepted
:param fails_silently: if False will raise an exception when tx cannot be added
:param propagate_to_peers: if True will relay the tx to other peers if it is accepted
"""
is_valid = self._validate_vertex(
vertex,
@@ -102,7 +99,6 @@ def on_new_vertex(
self._post_consensus(
vertex,
quiet=quiet,
propagate_to_peers=propagate_to_peers,
reject_locked_reward=reject_locked_reward
)

@@ -177,7 +173,6 @@ def _post_consensus(
vertex: BaseTransaction,
*,
quiet: bool,
propagate_to_peers: bool,
reject_locked_reward: bool,
) -> None:
""" Handle operations that need to happen once the tx becomes fully validated.
@@ -208,10 +203,6 @@ def _post_consensus(

self._log_new_object(vertex, 'new {}', quiet=quiet)

if propagate_to_peers:
# Propagate to our peers.
self._p2p_manager.send_tx_to_peers(vertex)

def _log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None:
""" A shortcut for logging additional information for block/txs.
"""

0 comments on commit dc2977f

Please sign in to comment.