Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(events): improve events during the load phase #652

Merged
merged 1 commit into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 45 additions & 28 deletions hathor/event/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Callable, Iterator, Optional
from typing import Callable, Iterable, Iterator, Optional

from structlog import get_logger

Expand All @@ -23,7 +23,7 @@
from hathor.event.websocket import EventWebsocketFactory
from hathor.pubsub import EventArguments, HathorEvents, PubSubManager
from hathor.transaction import BaseTransaction
from hathor.util import Reactor
from hathor.util import Reactor, progress

logger = get_logger()

Expand All @@ -36,8 +36,6 @@
}

_SUBSCRIBE_EVENTS = [
HathorEvents.MANAGER_ON_START,
HathorEvents.LOAD_FINISHED,
HathorEvents.NETWORK_NEW_TX_ACCEPTED,
HathorEvents.REORG_STARTED,
HathorEvents.REORG_FINISHED,
Expand Down Expand Up @@ -125,6 +123,26 @@ def _subscribe_events(self) -> None:
for event in _SUBSCRIBE_EVENTS:
self._pubsub.subscribe(event, self._handle_hathor_event)

def load_started(self):
if not self._is_running:
return

self._handle_event(
event_type=EventType.LOAD_STARTED,
event_args=EventArguments(),
)
self._event_storage.save_node_state(NodeState.LOAD)

def load_finished(self):
if not self._is_running:
return

self._handle_event(
event_type=EventType.LOAD_FINISHED,
event_args=EventArguments(),
)
self._event_storage.save_node_state(NodeState.SYNC)

def _handle_hathor_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None:
"""Handles a PubSub 'HathorEvents' event."""
event_type = EventType.from_hathor_event(hathor_event)
Expand All @@ -136,14 +154,6 @@ def _handle_event(self, event_type: EventType, event_args: EventArguments) -> No
assert self._is_running, 'Cannot handle event, EventManager is not started.'
assert self._event_ws_factory is not None

event_specific_handlers = {
EventType.LOAD_STARTED: self._handle_load_started,
EventType.LOAD_FINISHED: self._handle_load_finished
}

if event_specific_handler := event_specific_handlers.get(event_type):
event_specific_handler()

event = self._handle_event_creation(event_type, event_args)

self._event_storage.save_event(event)
Expand Down Expand Up @@ -221,14 +231,6 @@ def _create_event(
group_id=group_id,
)

def _handle_load_started(self) -> None:
"""Event specific handler for EventType.LOAD_STARTED."""
self._event_storage.save_node_state(NodeState.LOAD)

def _handle_load_finished(self) -> None:
"""Event specific handler for EventType.LOAD_FINISHED."""
self._event_storage.save_node_state(NodeState.SYNC)

def _should_reload_events(self) -> bool:
"""Returns whether events should be reloaded or not."""
return self._previous_node_state in [None, NodeState.LOAD]
Expand All @@ -241,7 +243,12 @@ def save_event_queue_state(self, state: bool) -> None:
"""Saves whether the event queue feature is enabled from the storage."""
self._event_storage.save_event_queue_state(state)

def handle_load_phase_vertices(self, topological_iterator: Iterator[BaseTransaction]) -> None:
def handle_load_phase_vertices(
self,
*,
topological_iterator: Iterator[BaseTransaction],
total_vertices: int
) -> None:
"""
Either generates load phase events or not, depending on previous node state.
Does so asynchronously so events generated here are not processed before normal event handling.
Expand All @@ -251,10 +258,20 @@ def handle_load_phase_vertices(self, topological_iterator: Iterator[BaseTransact
if not self._should_reload_events():
return

for vertex in topological_iterator:
self._reactor.callLater(
delay=0,
callable=self._handle_event,
event_type=EventType.NEW_VERTEX_ACCEPTED,
event_args=EventArguments(tx=vertex)
)
def create_event_batch() -> Iterable[BaseEvent]:
assert self._event_ws_factory is not None
self.log.info('Starting creating events from existing database...')

for vertex in progress(topological_iterator, log=self.log, total=total_vertices):
msbrogli marked this conversation as resolved.
Show resolved Hide resolved
event = self._handle_event_creation(
event_type=EventType.NEW_VERTEX_ACCEPTED,
event_args=EventArguments(tx=vertex)
)

yield event
self._event_ws_factory.broadcast_event(event)
self._last_event = event

self.log.info('Finished creating events from existing database.')

self._event_storage.save_events(create_event_batch())
2 changes: 0 additions & 2 deletions hathor/event/model/event_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ def data_type(self) -> type[BaseEventData]:


_HATHOR_EVENT_TO_EVENT_TYPE = {
HathorEvents.MANAGER_ON_START: EventType.LOAD_STARTED,
HathorEvents.LOAD_FINISHED: EventType.LOAD_FINISHED,
HathorEvents.NETWORK_NEW_TX_ACCEPTED: EventType.NEW_VERTEX_ACCEPTED,
HathorEvents.REORG_STARTED: EventType.REORG_STARTED,
HathorEvents.REORG_FINISHED: EventType.REORG_FINISHED,
Expand Down
7 changes: 6 additions & 1 deletion hathor/event/storage/event_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Iterator, Optional
from typing import Iterable, Iterator, Optional

from hathor.event.model.base_event import BaseEvent
from hathor.event.model.node_state import NodeState
Expand All @@ -25,6 +25,11 @@ def save_event(self, event: BaseEvent) -> None:
""" Saves an event in the storage"""
raise NotImplementedError

@abstractmethod
def save_events(self, events: Iterable[BaseEvent]) -> None:
""" Saves an event batch in the storage"""
raise NotImplementedError

@abstractmethod
def get_event(self, key: int) -> Optional[BaseEvent]:
""" Get a stored event by key"""
Expand Down
6 changes: 5 additions & 1 deletion hathor/event/storage/memory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Iterator, Optional
from typing import Iterable, Iterator, Optional

from hathor.event.model.base_event import BaseEvent
from hathor.event.model.node_state import NodeState
Expand All @@ -35,6 +35,10 @@ def save_event(self, event: BaseEvent) -> None:
self._last_group_id = event.group_id
self._events.append(event)

def save_events(self, events: Iterable[BaseEvent]) -> None:
for event in events:
self.save_event(event)

def get_event(self, key: int) -> Optional[BaseEvent]:
if key < 0:
raise ValueError(f'event.id \'{key}\' must be non-negative')
Expand Down
22 changes: 19 additions & 3 deletions hathor/event/storage/rocksdb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Iterator, Optional
from typing import TYPE_CHECKING, Iterable, Iterator, Optional, Union

from hathor.event.model.base_event import BaseEvent
from hathor.event.model.node_state import NodeState
Expand All @@ -21,6 +21,10 @@
from hathor.transaction.util import bytes_to_int, int_to_bytes
from hathor.util import json_dumpb

if TYPE_CHECKING:
import rocksdb


_CF_NAME_EVENT = b'event'
_CF_NAME_META = b'event-metadata'
_KEY_LAST_GROUP_ID = b'last-group-id'
Expand Down Expand Up @@ -66,17 +70,29 @@ def _db_get_last_group_id(self) -> Optional[int]:
return bytes_to_int(last_group_id)

def save_event(self, event: BaseEvent) -> None:
self._save_event(event, database=self._db)

def _save_event(self, event: BaseEvent, *, database: Union['rocksdb.DB', 'rocksdb.WriteBatch']) -> None:
if (self._last_event is None and event.id != 0) or \
(self._last_event is not None and event.id != self._last_event.id + 1):
raise ValueError('invalid event.id, ids must be sequential and leave no gaps')
event_data = json_dumpb(event.dict())
key = int_to_bytes(event.id, 8)
self._db.put((self._cf_event, key), event_data)
database.put((self._cf_event, key), event_data)
self._last_event = event
if event.group_id is not None:
self._db.put((self._cf_meta, _KEY_LAST_GROUP_ID), int_to_bytes(event.group_id, 8))
database.put((self._cf_meta, _KEY_LAST_GROUP_ID), int_to_bytes(event.group_id, 8))
self._last_group_id = event.group_id

def save_events(self, events: Iterable[BaseEvent]) -> None:
import rocksdb
batch = rocksdb.WriteBatch()

for event in events:
self._save_event(event, database=batch)

self._db.write(batch)

def get_event(self, key: int) -> Optional[BaseEvent]:
if key < 0:
raise ValueError(f'event.id \'{key}\' must be non-negative')
Expand Down
8 changes: 6 additions & 2 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ def start(self) -> None:

self.state = self.NodeState.INITIALIZING
self.pubsub.publish(HathorEvents.MANAGER_ON_START)
self._event_manager.load_started()
self.connections.start()
self.pow_thread_pool.start()

Expand Down Expand Up @@ -557,10 +558,13 @@ def _initialize_components_new(self) -> None:

if self._enable_event_queue:
topological_iterator = self.tx_storage.topological_iterator()
self._event_manager.handle_load_phase_vertices(topological_iterator)
self._event_manager.handle_load_phase_vertices(
topological_iterator=topological_iterator,
total_vertices=self.tx_storage.indexes.info.get_vertices_count()
)

self._event_manager.load_finished()
self.state = self.NodeState.READY
self.pubsub.publish(HathorEvents.LOAD_FINISHED)

t1 = time.time()
total_load_time = LogDuration(t1 - t0)
Expand Down
5 changes: 0 additions & 5 deletions hathor/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ class HathorEvents(Enum):
WALLET_ELEMENT_VOIDED:
Triggered when a wallet element is marked as voided

LOAD_FINISHED
Triggered when manager finishes reading local data and it is ready to sync

REORG_STARTED
Trigerred when consensus algorithm finds that a reorg started to happen

Expand Down Expand Up @@ -126,8 +123,6 @@ class HathorEvents(Enum):

WALLET_ELEMENT_VOIDED = 'wallet:element_voided'

LOAD_FINISHED = 'manager:load_finished'

REORG_STARTED = 'reorg:started'

REORG_FINISHED = 'reorg:finished'
Expand Down
Loading