Skip to content

Commit

Permalink
feat(reliable-integration): update peer metadata schema
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Oct 3, 2023
1 parent 064f84b commit 4687a5f
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 139 deletions.
11 changes: 9 additions & 2 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
TransactionRocksDBStorage,
TransactionStorage,
)
from hathor.util import Random, Reactor, get_environment_info
from hathor.util import Random, Reactor, get_environment_info, not_none
from hathor.verification.verification_service import VerificationService
from hathor.wallet import BaseWallet, Wallet

Expand Down Expand Up @@ -394,9 +394,16 @@ def _get_or_create_event_storage(self) -> EventStorage:

def _get_or_create_event_manager(self) -> EventManager:
if self._event_manager is None:
peer_id = self._get_peer_id()
settings = self._get_or_create_settings()
reactor = self._get_reactor()
storage = self._get_or_create_event_storage()
factory = EventWebsocketFactory(reactor, storage)
factory = EventWebsocketFactory(
peer_id=not_none(peer_id.id),
network=settings.NETWORK_NAME,
reactor=reactor,
event_storage=storage,
)
self._event_manager = EventManager(
reactor=reactor,
pubsub=self._get_or_create_pubsub(),
Expand Down
9 changes: 7 additions & 2 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from hathor.p2p.utils import discover_hostname
from hathor.pubsub import PubSubManager
from hathor.stratum import StratumFactory
from hathor.util import Random, Reactor
from hathor.util import Random, Reactor, not_none
from hathor.verification.verification_service import VerificationService
from hathor.wallet import BaseWallet, HDWallet, Wallet

Expand Down Expand Up @@ -158,7 +158,12 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
pubsub = PubSubManager(reactor)

if self._args.x_enable_event_queue:
self.event_ws_factory = EventWebsocketFactory(reactor, event_storage)
self.event_ws_factory = EventWebsocketFactory(
peer_id=not_none(peer_id.id),
network=network,
reactor=reactor,
event_storage=event_storage
)

event_manager = EventManager(
event_storage=event_storage,
Expand Down
4 changes: 3 additions & 1 deletion hathor/cli/events_simulator/events_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def execute(args: Namespace) -> None:

forwarding_ws_factory = EventForwardingWebsocketFactory(
simulator=simulator,
peer_id='simulator_peer_id',
network='simulator_network',
reactor=reactor,
event_storage=event_ws_factory._event_storage
)
Expand All @@ -80,7 +82,7 @@ def execute(args: Namespace) -> None:

log.info('Started simulating events', scenario=args.scenario, seed=simulator.seed)

forwarding_ws_factory.start(stream_id='simulator')
forwarding_ws_factory.start(stream_id='simulator_stream_id')
scenario.simulate(simulator, manager)
reactor.listenTCP(args.port, site)
reactor.run()
Expand Down
3 changes: 1 addition & 2 deletions hathor/event/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(
pubsub: PubSubManager,
reactor: Reactor,
event_ws_factory: Optional[EventWebsocketFactory] = None,
):
) -> None:
self.log = logger.new()

self._reactor = reactor
Expand Down Expand Up @@ -233,7 +233,6 @@ def _create_event(
"""Actually creates a BaseEvent."""
return BaseEvent.from_event_arguments(
event_id=0 if self._last_event is None else self._last_event.id + 1,
peer_id=self._peer_id,
timestamp=self._reactor.seconds(),
event_type=event_type,
event_args=event_args,
Expand Down
4 changes: 0 additions & 4 deletions hathor/event/model/base_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@


class BaseEvent(BaseModel, use_enum_values=True):
# Full node id, because different full nodes can have different sequences of events
peer_id: str
# Event unique id, determines event order
id: NonNegativeInt
# Timestamp in which the event was emitted, this follows the unix_timestamp format, it's only informative, events
Expand All @@ -42,7 +40,6 @@ class BaseEvent(BaseModel, use_enum_values=True):
@classmethod
def from_event_arguments(
cls,
peer_id: str,
event_id: NonNegativeInt,
timestamp: float,
event_type: EventType,
Expand All @@ -53,7 +50,6 @@ def from_event_arguments(
event_data_type = event_type.data_type()

return cls(
peer_id=peer_id,
id=event_id,
timestamp=timestamp,
type=event_type,
Expand Down
13 changes: 12 additions & 1 deletion hathor/event/websocket/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,18 @@ class EventWebsocketFactory(WebSocketServerFactory):
# The unique stream ID
_stream_id: Optional[str] = None

def __init__(self, reactor: Reactor, event_storage: EventStorage):
def __init__(
self,
*,
peer_id: str,
network: str,
reactor: Reactor,
event_storage: EventStorage
) -> None:
super().__init__()
self.log = logger.new()
self._peer_id = peer_id
self._network = network
self._reactor = reactor
self._event_storage = event_storage
self._connections: set[EventWebsocketProtocol] = set()
Expand Down Expand Up @@ -113,6 +122,8 @@ def _send_event_to_connection(self, connection: EventWebsocketProtocol, event: B
assert self._latest_event_id is not None, '_latest_event_id must be set.'

response = EventResponse(
peer_id=self._peer_id,
network=self._network,
event=event,
latest_event_id=self._latest_event_id,
stream_id=not_none(self._stream_id)
Expand Down
4 changes: 4 additions & 0 deletions hathor/event/websocket/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ class EventResponse(Response):
Args:
type: The type of the response.
peer_id: Full node id, because different full nodes can have different sequences of events.
network: The network for which this event was generated.
event: The event.
latest_event_id: The ID of the latest event known by the server.
stream_id: The ID of the current stream.
"""

type: str = Field(default='EVENT', const=True)
peer_id: str
network: str
event: BaseEvent
latest_event_id: NonNegativeInt
stream_id: str
Expand Down
5 changes: 0 additions & 5 deletions tests/event/test_base_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
@pytest.mark.parametrize('group_id', [None, 0, 1, 1000])
def test_create_base_event(event_id, group_id):
event = BaseEvent(
peer_id='some_peer',
id=event_id,
timestamp=123.3,
type=EventType.VERTEX_METADATA_CHANGED,
Expand All @@ -34,7 +33,6 @@ def test_create_base_event(event_id, group_id):
)

expected = dict(
peer_id='some_peer',
id=event_id,
timestamp=123.3,
type='VERTEX_METADATA_CHANGED',
Expand Down Expand Up @@ -76,7 +74,6 @@ def test_create_base_event(event_id, group_id):
def test_create_base_event_fail_id(event_id):
with pytest.raises(ValidationError):
BaseEvent(
peer_id='some_peer',
id=event_id,
timestamp=123.3,
type=EventType.VERTEX_METADATA_CHANGED,
Expand All @@ -88,7 +85,6 @@ def test_create_base_event_fail_id(event_id):
def test_create_base_event_fail_group_id(group_id):
with pytest.raises(ValidationError):
BaseEvent(
peer_id='some_peer',
id=0,
timestamp=123.3,
type=EventType.VERTEX_METADATA_CHANGED,
Expand All @@ -100,7 +96,6 @@ def test_create_base_event_fail_group_id(group_id):
def test_create_base_event_fail_data_type():
with pytest.raises(ValidationError):
BaseEvent(
peer_id='some_peer',
id=0,
timestamp=123.3,
type=EventType.VERTEX_METADATA_CHANGED,
Expand Down
Loading

0 comments on commit 4687a5f

Please sign in to comment.