Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(reliable-integration): update peer metadata schema #804

Merged
merged 1 commit into from
Oct 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
TransactionRocksDBStorage,
TransactionStorage,
)
from hathor.util import Random, Reactor, get_environment_info
from hathor.util import Random, Reactor, get_environment_info, not_none
from hathor.verification.verification_service import VerificationService, VertexVerifiers
from hathor.wallet import BaseWallet, Wallet

Expand Down Expand Up @@ -396,9 +396,16 @@ def _get_or_create_event_storage(self) -> EventStorage:

def _get_or_create_event_manager(self) -> EventManager:
if self._event_manager is None:
peer_id = self._get_peer_id()
settings = self._get_or_create_settings()
reactor = self._get_reactor()
storage = self._get_or_create_event_storage()
factory = EventWebsocketFactory(reactor, storage)
factory = EventWebsocketFactory(
peer_id=not_none(peer_id.id),
network=settings.NETWORK_NAME,
reactor=reactor,
event_storage=storage,
)
self._event_manager = EventManager(
reactor=reactor,
pubsub=self._get_or_create_pubsub(),
Expand Down
9 changes: 7 additions & 2 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from hathor.p2p.utils import discover_hostname, get_genesis_short_hash
from hathor.pubsub import PubSubManager
from hathor.stratum import StratumFactory
from hathor.util import Random, Reactor
from hathor.util import Random, Reactor, not_none
from hathor.verification.verification_service import VerificationService, VertexVerifiers
from hathor.wallet import BaseWallet, HDWallet, Wallet

Expand Down Expand Up @@ -157,7 +157,12 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
pubsub = PubSubManager(reactor)

if self._args.x_enable_event_queue:
self.event_ws_factory = EventWebsocketFactory(reactor, event_storage)
self.event_ws_factory = EventWebsocketFactory(
peer_id=not_none(peer_id.id),
network=network,
reactor=reactor,
event_storage=event_storage
)

event_manager = EventManager(
event_storage=event_storage,
Expand Down
4 changes: 3 additions & 1 deletion hathor/cli/events_simulator/events_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def execute(args: Namespace) -> None:

forwarding_ws_factory = EventForwardingWebsocketFactory(
simulator=simulator,
peer_id='simulator_peer_id',
network='simulator_network',
reactor=reactor,
event_storage=event_ws_factory._event_storage
)
Expand All @@ -80,7 +82,7 @@ def execute(args: Namespace) -> None:

log.info('Started simulating events', scenario=args.scenario, seed=simulator.seed)

forwarding_ws_factory.start(stream_id='simulator')
forwarding_ws_factory.start(stream_id='simulator_stream_id')
scenario.simulate(simulator, manager)
reactor.listenTCP(args.port, site)
reactor.run()
Expand Down
3 changes: 1 addition & 2 deletions hathor/event/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(
pubsub: PubSubManager,
reactor: Reactor,
event_ws_factory: Optional[EventWebsocketFactory] = None,
):
) -> None:
self.log = logger.new()

self._reactor = reactor
Expand Down Expand Up @@ -233,7 +233,6 @@ def _create_event(
"""Actually creates a BaseEvent."""
return BaseEvent.from_event_arguments(
event_id=0 if self._last_event is None else self._last_event.id + 1,
peer_id=self._peer_id,
timestamp=self._reactor.seconds(),
event_type=event_type,
event_args=event_args,
Expand Down
4 changes: 0 additions & 4 deletions hathor/event/model/base_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@


class BaseEvent(BaseModel, use_enum_values=True):
# Full node id, because different full nodes can have different sequences of events
peer_id: str
# Event unique id, determines event order
id: NonNegativeInt
# Timestamp in which the event was emitted, this follows the unix_timestamp format, it's only informative, events
Expand All @@ -42,7 +40,6 @@ class BaseEvent(BaseModel, use_enum_values=True):
@classmethod
def from_event_arguments(
cls,
peer_id: str,
event_id: NonNegativeInt,
timestamp: float,
event_type: EventType,
Expand All @@ -53,7 +50,6 @@ def from_event_arguments(
event_data_type = event_type.data_type()

return cls(
peer_id=peer_id,
id=event_id,
timestamp=timestamp,
type=event_type,
Expand Down
13 changes: 12 additions & 1 deletion hathor/event/websocket/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,18 @@ class EventWebsocketFactory(WebSocketServerFactory):
# The unique stream ID
_stream_id: Optional[str] = None

def __init__(self, reactor: Reactor, event_storage: EventStorage):
def __init__(
self,
*,
peer_id: str,
network: str,
reactor: Reactor,
event_storage: EventStorage
) -> None:
super().__init__()
self.log = logger.new()
self._peer_id = peer_id
self._network = network
self._reactor = reactor
self._event_storage = event_storage
self._connections: set[EventWebsocketProtocol] = set()
Expand Down Expand Up @@ -113,6 +122,8 @@ def _send_event_to_connection(self, connection: EventWebsocketProtocol, event: B
assert self._latest_event_id is not None, '_latest_event_id must be set.'

response = EventResponse(
peer_id=self._peer_id,
network=self._network,
event=event,
latest_event_id=self._latest_event_id,
stream_id=not_none(self._stream_id)
Expand Down
4 changes: 4 additions & 0 deletions hathor/event/websocket/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ class EventResponse(Response):
Args:
type: The type of the response.
peer_id: Full node id, because different full nodes can have different sequences of events.
network: The network for which this event was generated.
event: The event.
latest_event_id: The ID of the latest event known by the server.
stream_id: The ID of the current stream.
"""

type: str = Field(default='EVENT', const=True)
peer_id: str
network: str
event: BaseEvent
latest_event_id: NonNegativeInt
stream_id: str
Expand Down
5 changes: 0 additions & 5 deletions tests/event/test_base_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
@pytest.mark.parametrize('group_id', [None, 0, 1, 1000])
def test_create_base_event(event_id, group_id):
event = BaseEvent(
peer_id='some_peer',
id=event_id,
timestamp=123.3,
type=EventType.VERTEX_METADATA_CHANGED,
Expand All @@ -34,7 +33,6 @@ def test_create_base_event(event_id, group_id):
)

expected = dict(
peer_id='some_peer',
id=event_id,
timestamp=123.3,
type='VERTEX_METADATA_CHANGED',
Expand Down Expand Up @@ -76,7 +74,6 @@ def test_create_base_event(event_id, group_id):
def test_create_base_event_fail_id(event_id):
with pytest.raises(ValidationError):
BaseEvent(
peer_id='some_peer',
id=event_id,
timestamp=123.3,
type=EventType.VERTEX_METADATA_CHANGED,
Expand All @@ -88,7 +85,6 @@ def test_create_base_event_fail_id(event_id):
def test_create_base_event_fail_group_id(group_id):
with pytest.raises(ValidationError):
BaseEvent(
peer_id='some_peer',
id=0,
timestamp=123.3,
type=EventType.VERTEX_METADATA_CHANGED,
Expand All @@ -100,7 +96,6 @@ def test_create_base_event_fail_group_id(group_id):
def test_create_base_event_fail_data_type():
with pytest.raises(ValidationError):
BaseEvent(
peer_id='some_peer',
id=0,
timestamp=123.3,
type=EventType.VERTEX_METADATA_CHANGED,
Expand Down
Loading