Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(reliable-integration): update peer metadata schema
Browse files Browse the repository at this point in the history
glevco committed Oct 28, 2023

Verified

This commit was signed with the committer’s verified signature.
IvanGoncharov Ivan Goncharov
1 parent ac435e0 commit 5a0a15a
Showing 13 changed files with 164 additions and 139 deletions.
11 changes: 9 additions & 2 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@
TransactionRocksDBStorage,
TransactionStorage,
)
from hathor.util import Random, Reactor, get_environment_info
from hathor.util import Random, Reactor, get_environment_info, not_none
from hathor.verification.verification_service import VerificationService, VertexVerifiers
from hathor.wallet import BaseWallet, Wallet

@@ -396,9 +396,16 @@ def _get_or_create_event_storage(self) -> EventStorage:

def _get_or_create_event_manager(self) -> EventManager:
if self._event_manager is None:
peer_id = self._get_peer_id()
settings = self._get_or_create_settings()
reactor = self._get_reactor()
storage = self._get_or_create_event_storage()
factory = EventWebsocketFactory(reactor, storage)
factory = EventWebsocketFactory(
peer_id=not_none(peer_id.id),
network=settings.NETWORK_NAME,
reactor=reactor,
event_storage=storage,
)
self._event_manager = EventManager(
reactor=reactor,
pubsub=self._get_or_create_pubsub(),
9 changes: 7 additions & 2 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@
from hathor.p2p.utils import discover_hostname, get_genesis_short_hash
from hathor.pubsub import PubSubManager
from hathor.stratum import StratumFactory
from hathor.util import Random, Reactor
from hathor.util import Random, Reactor, not_none
from hathor.verification.verification_service import VerificationService, VertexVerifiers
from hathor.wallet import BaseWallet, HDWallet, Wallet

@@ -157,7 +157,12 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
pubsub = PubSubManager(reactor)

if self._args.x_enable_event_queue:
self.event_ws_factory = EventWebsocketFactory(reactor, event_storage)
self.event_ws_factory = EventWebsocketFactory(
peer_id=not_none(peer_id.id),
network=network,
reactor=reactor,
event_storage=event_storage
)

event_manager = EventManager(
event_storage=event_storage,
4 changes: 3 additions & 1 deletion hathor/cli/events_simulator/events_simulator.py
Original file line number Diff line number Diff line change
@@ -66,6 +66,8 @@ def execute(args: Namespace) -> None:

forwarding_ws_factory = EventForwardingWebsocketFactory(
simulator=simulator,
peer_id='simulator_peer_id',
network='simulator_network',
reactor=reactor,
event_storage=event_ws_factory._event_storage
)
@@ -80,7 +82,7 @@ def execute(args: Namespace) -> None:

log.info('Started simulating events', scenario=args.scenario, seed=simulator.seed)

forwarding_ws_factory.start(stream_id='simulator')
forwarding_ws_factory.start(stream_id='simulator_stream_id')
scenario.simulate(simulator, manager)
reactor.listenTCP(args.port, site)
reactor.run()
3 changes: 1 addition & 2 deletions hathor/event/event_manager.py
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ def __init__(
pubsub: PubSubManager,
reactor: Reactor,
event_ws_factory: Optional[EventWebsocketFactory] = None,
):
) -> None:
self.log = logger.new()

self._reactor = reactor
@@ -233,7 +233,6 @@ def _create_event(
"""Actually creates a BaseEvent."""
return BaseEvent.from_event_arguments(
event_id=0 if self._last_event is None else self._last_event.id + 1,
peer_id=self._peer_id,
timestamp=self._reactor.seconds(),
event_type=event_type,
event_args=event_args,
4 changes: 0 additions & 4 deletions hathor/event/model/base_event.py
Original file line number Diff line number Diff line change
@@ -23,8 +23,6 @@


class BaseEvent(BaseModel, use_enum_values=True):
# Full node id, because different full nodes can have different sequences of events
peer_id: str
# Event unique id, determines event order
id: NonNegativeInt
# Timestamp in which the event was emitted, this follows the unix_timestamp format, it's only informative, events
@@ -42,7 +40,6 @@ class BaseEvent(BaseModel, use_enum_values=True):
@classmethod
def from_event_arguments(
cls,
peer_id: str,
event_id: NonNegativeInt,
timestamp: float,
event_type: EventType,
@@ -53,7 +50,6 @@ def from_event_arguments(
event_data_type = event_type.data_type()

return cls(
peer_id=peer_id,
id=event_id,
timestamp=timestamp,
type=event_type,
13 changes: 12 additions & 1 deletion hathor/event/websocket/factory.py
Original file line number Diff line number Diff line change
@@ -40,9 +40,18 @@ class EventWebsocketFactory(WebSocketServerFactory):
# The unique stream ID
_stream_id: Optional[str] = None

def __init__(self, reactor: Reactor, event_storage: EventStorage):
def __init__(
self,
*,
peer_id: str,
network: str,
reactor: Reactor,
event_storage: EventStorage
) -> None:
super().__init__()
self.log = logger.new()
self._peer_id = peer_id
self._network = network
self._reactor = reactor
self._event_storage = event_storage
self._connections: set[EventWebsocketProtocol] = set()
@@ -113,6 +122,8 @@ def _send_event_to_connection(self, connection: EventWebsocketProtocol, event: B
assert self._latest_event_id is not None, '_latest_event_id must be set.'

response = EventResponse(
peer_id=self._peer_id,
network=self._network,
event=event,
latest_event_id=self._latest_event_id,
stream_id=not_none(self._stream_id)
4 changes: 4 additions & 0 deletions hathor/event/websocket/response.py
Original file line number Diff line number Diff line change
@@ -29,12 +29,16 @@ class EventResponse(Response):
Args:
type: The type of the response.
peer_id: Full node id, because different full nodes can have different sequences of events.
network: The network for which this event was generated.
event: The event.
latest_event_id: The ID of the latest event known by the server.
stream_id: The ID of the current stream.
"""

type: str = Field(default='EVENT', const=True)
peer_id: str
network: str
event: BaseEvent
latest_event_id: NonNegativeInt
stream_id: str
5 changes: 0 additions & 5 deletions tests/event/test_base_event.py
Original file line number Diff line number Diff line change
@@ -25,7 +25,6 @@
@pytest.mark.parametrize('group_id', [None, 0, 1, 1000])
def test_create_base_event(event_id, group_id):
event = BaseEvent(
peer_id='some_peer',
id=event_id,
timestamp=123.3,
type=EventType.VERTEX_METADATA_CHANGED,
@@ -34,7 +33,6 @@ def test_create_base_event(event_id, group_id):
)

expected = dict(
peer_id='some_peer',
id=event_id,
timestamp=123.3,
type='VERTEX_METADATA_CHANGED',
@@ -76,7 +74,6 @@ def test_create_base_event(event_id, group_id):
def test_create_base_event_fail_id(event_id):
with pytest.raises(ValidationError):
BaseEvent(
peer_id='some_peer',
id=event_id,
timestamp=123.3,
type=EventType.VERTEX_METADATA_CHANGED,
@@ -88,7 +85,6 @@ def test_create_base_event_fail_id(event_id):
def test_create_base_event_fail_group_id(group_id):
with pytest.raises(ValidationError):
BaseEvent(
peer_id='some_peer',
id=0,
timestamp=123.3,
type=EventType.VERTEX_METADATA_CHANGED,
@@ -100,7 +96,6 @@ def test_create_base_event_fail_group_id(group_id):
def test_create_base_event_fail_data_type():
with pytest.raises(ValidationError):
BaseEvent(
peer_id='some_peer',
id=0,
timestamp=123.3,
type=EventType.VERTEX_METADATA_CHANGED,
Loading

0 comments on commit 5a0a15a

Please sign in to comment.