Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(events): improve events during the load phase
Browse files Browse the repository at this point in the history
glevco committed Jun 9, 2023

Verified

This commit was signed with the committer’s verified signature.
snyk-bot Snyk bot
1 parent 6985b29 commit 392c44e
Showing 9 changed files with 103 additions and 54 deletions.
73 changes: 45 additions & 28 deletions hathor/event/event_manager.py
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Callable, Iterator, Optional
from typing import Callable, Iterable, Iterator, Optional

from structlog import get_logger

@@ -23,7 +23,7 @@
from hathor.event.websocket import EventWebsocketFactory
from hathor.pubsub import EventArguments, HathorEvents, PubSubManager
from hathor.transaction import BaseTransaction
from hathor.util import Reactor
from hathor.util import Reactor, generic_progress

logger = get_logger()

@@ -36,8 +36,6 @@
}

_SUBSCRIBE_EVENTS = [
HathorEvents.MANAGER_ON_START,
HathorEvents.LOAD_FINISHED,
HathorEvents.NETWORK_NEW_TX_ACCEPTED,
HathorEvents.REORG_STARTED,
HathorEvents.REORG_FINISHED,
@@ -125,6 +123,26 @@ def _subscribe_events(self) -> None:
for event in _SUBSCRIBE_EVENTS:
self._pubsub.subscribe(event, self._handle_hathor_event)

def load_started(self):
if not self._is_running:
return

self._handle_event(
event_type=EventType.LOAD_STARTED,
event_args=EventArguments(),
)
self._event_storage.save_node_state(NodeState.LOAD)

def load_finished(self):
if not self._is_running:
return

self._handle_event(
event_type=EventType.LOAD_FINISHED,
event_args=EventArguments(),
)
self._event_storage.save_node_state(NodeState.SYNC)

def _handle_hathor_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None:
"""Handles a PubSub 'HathorEvents' event."""
event_type = EventType.from_hathor_event(hathor_event)
@@ -136,14 +154,6 @@ def _handle_event(self, event_type: EventType, event_args: EventArguments) -> No
assert self._is_running, 'Cannot handle event, EventManager is not started.'
assert self._event_ws_factory is not None

event_specific_handlers = {
EventType.LOAD_STARTED: self._handle_load_started,
EventType.LOAD_FINISHED: self._handle_load_finished
}

if event_specific_handler := event_specific_handlers.get(event_type):
event_specific_handler()

event = self._handle_event_creation(event_type, event_args)

self._event_storage.save_event(event)
@@ -221,14 +231,6 @@ def _create_event(
group_id=group_id,
)

def _handle_load_started(self) -> None:
"""Event specific handler for EventType.LOAD_STARTED."""
self._event_storage.save_node_state(NodeState.LOAD)

def _handle_load_finished(self) -> None:
"""Event specific handler for EventType.LOAD_FINISHED."""
self._event_storage.save_node_state(NodeState.SYNC)

def _should_reload_events(self) -> bool:
"""Returns whether events should be reloaded or not."""
return self._previous_node_state in [None, NodeState.LOAD]
@@ -241,7 +243,12 @@ def save_event_queue_state(self, state: bool) -> None:
"""Saves whether the event queue feature is enabled from the storage."""
self._event_storage.save_event_queue_state(state)

def handle_load_phase_vertices(self, topological_iterator: Iterator[BaseTransaction]) -> None:
def handle_load_phase_vertices(
self,
*,
topological_iterator: Iterator[BaseTransaction],
total_vertices: int
) -> None:
"""
Either generates load phase events or not, depending on previous node state.
Does so asynchronously so events generated here are not processed before normal event handling.
@@ -251,10 +258,20 @@ def handle_load_phase_vertices(self, topological_iterator: Iterator[BaseTransact
if not self._should_reload_events():
return

for vertex in topological_iterator:
self._reactor.callLater(
delay=0,
callable=self._handle_event,
event_type=EventType.NEW_VERTEX_ACCEPTED,
event_args=EventArguments(tx=vertex)
)
def create_event_batch() -> Iterable[BaseEvent]:
assert self._event_ws_factory is not None
self.log.info('Starting creating events from existing database...')

for vertex in generic_progress(topological_iterator, log=self.log, total=total_vertices):
event = self._handle_event_creation(
event_type=EventType.NEW_VERTEX_ACCEPTED,
event_args=EventArguments(tx=vertex)
)

yield event
self._event_ws_factory.broadcast_event(event)
self._last_event = event

self.log.info('Finished creating events from existing database.')

self._event_storage.save_events(create_event_batch())
2 changes: 0 additions & 2 deletions hathor/event/model/event_type.py
Original file line number Diff line number Diff line change
@@ -40,8 +40,6 @@ def data_type(self) -> type[BaseEventData]:


_HATHOR_EVENT_TO_EVENT_TYPE = {
HathorEvents.MANAGER_ON_START: EventType.LOAD_STARTED,
HathorEvents.LOAD_FINISHED: EventType.LOAD_FINISHED,
HathorEvents.NETWORK_NEW_TX_ACCEPTED: EventType.NEW_VERTEX_ACCEPTED,
HathorEvents.REORG_STARTED: EventType.REORG_STARTED,
HathorEvents.REORG_FINISHED: EventType.REORG_FINISHED,
7 changes: 6 additions & 1 deletion hathor/event/storage/event_storage.py
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Iterator, Optional
from typing import Iterable, Iterator, Optional

from hathor.event.model.base_event import BaseEvent
from hathor.event.model.node_state import NodeState
@@ -25,6 +25,11 @@ def save_event(self, event: BaseEvent) -> None:
""" Saves an event in the storage"""
raise NotImplementedError

@abstractmethod
def save_events(self, events: Iterable[BaseEvent]) -> None:
""" Saves an event batch in the storage"""
raise NotImplementedError

@abstractmethod
def get_event(self, key: int) -> Optional[BaseEvent]:
""" Get a stored event by key"""
6 changes: 5 additions & 1 deletion hathor/event/storage/memory_storage.py
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Iterator, Optional
from typing import Iterable, Iterator, Optional

from hathor.event.model.base_event import BaseEvent
from hathor.event.model.node_state import NodeState
@@ -35,6 +35,10 @@ def save_event(self, event: BaseEvent) -> None:
self._last_group_id = event.group_id
self._events.append(event)

def save_events(self, events: Iterable[BaseEvent]) -> None:
for event in events:
self.save_event(event)

def get_event(self, key: int) -> Optional[BaseEvent]:
if key < 0:
raise ValueError(f'event.id \'{key}\' must be non-negative')
22 changes: 19 additions & 3 deletions hathor/event/storage/rocksdb_storage.py
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Iterator, Optional
from typing import TYPE_CHECKING, Iterable, Iterator, Optional, Union

from hathor.event.model.base_event import BaseEvent
from hathor.event.model.node_state import NodeState
@@ -21,6 +21,10 @@
from hathor.transaction.util import bytes_to_int, int_to_bytes
from hathor.util import json_dumpb

if TYPE_CHECKING:
import rocksdb


_CF_NAME_EVENT = b'event'
_CF_NAME_META = b'event-metadata'
_KEY_LAST_GROUP_ID = b'last-group-id'
@@ -66,17 +70,29 @@ def _db_get_last_group_id(self) -> Optional[int]:
return bytes_to_int(last_group_id)

def save_event(self, event: BaseEvent) -> None:
self._save_event(event, database=self._db)

def _save_event(self, event: BaseEvent, *, database: Union['rocksdb.DB', 'rocksdb.WriteBatch']) -> None:
if (self._last_event is None and event.id != 0) or \
(self._last_event is not None and event.id != self._last_event.id + 1):
raise ValueError('invalid event.id, ids must be sequential and leave no gaps')
event_data = json_dumpb(event.dict())
key = int_to_bytes(event.id, 8)
self._db.put((self._cf_event, key), event_data)
database.put((self._cf_event, key), event_data)
self._last_event = event
if event.group_id is not None:
self._db.put((self._cf_meta, _KEY_LAST_GROUP_ID), int_to_bytes(event.group_id, 8))
database.put((self._cf_meta, _KEY_LAST_GROUP_ID), int_to_bytes(event.group_id, 8))
self._last_group_id = event.group_id

def save_events(self, events: Iterable[BaseEvent]) -> None:
import rocksdb
batch = rocksdb.WriteBatch()

for event in events:
self._save_event(event, database=batch)

self._db.write(batch)

def get_event(self, key: int) -> Optional[BaseEvent]:
if key < 0:
raise ValueError(f'event.id \'{key}\' must be non-negative')
8 changes: 6 additions & 2 deletions hathor/manager.py
Original file line number Diff line number Diff line change
@@ -258,6 +258,7 @@ def start(self) -> None:

self.state = self.NodeState.INITIALIZING
self.pubsub.publish(HathorEvents.MANAGER_ON_START)
self._event_manager.load_started()
self.connections.start()
self.pow_thread_pool.start()

@@ -623,10 +624,13 @@ def _initialize_components_new(self) -> None:

if self._enable_event_queue:
topological_iterator = self.tx_storage.topological_iterator()
self._event_manager.handle_load_phase_vertices(topological_iterator)
self._event_manager.handle_load_phase_vertices(
topological_iterator=topological_iterator,
total_vertices=self.tx_storage.indexes.info.get_vertices_count()
)

self._event_manager.load_finished()
self.state = self.NodeState.READY
self.pubsub.publish(HathorEvents.LOAD_FINISHED)

t1 = time.time()
total_load_time = LogDuration(t1 - t0)
5 changes: 0 additions & 5 deletions hathor/pubsub.py
Original file line number Diff line number Diff line change
@@ -82,9 +82,6 @@ class HathorEvents(Enum):
WALLET_ELEMENT_VOIDED:
Triggered when a wallet element is marked as voided
LOAD_FINISHED
Triggered when manager finishes reading local data and it is ready to sync
REORG_STARTED
Trigerred when consensus algorithm finds that a reorg started to happen
@@ -126,8 +123,6 @@ class HathorEvents(Enum):

WALLET_ELEMENT_VOIDED = 'wallet:element_voided'

LOAD_FINISHED = 'manager:load_finished'

REORG_STARTED = 'reorg:started'

REORG_FINISHED = 'reorg:finished'
Loading

0 comments on commit 392c44e

Please sign in to comment.