Skip to content

Commit

Permalink
refactor(events): improve events during the load phase
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Jun 16, 2023
1 parent a651eb1 commit 72bc871
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 54 deletions.
73 changes: 45 additions & 28 deletions hathor/event/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Callable, Iterator, Optional
from typing import Callable, Iterable, Iterator, Optional

from structlog import get_logger

Expand All @@ -23,7 +23,7 @@
from hathor.event.websocket import EventWebsocketFactory
from hathor.pubsub import EventArguments, HathorEvents, PubSubManager
from hathor.transaction import BaseTransaction
from hathor.util import Reactor
from hathor.util import Reactor, progress

logger = get_logger()

Expand All @@ -36,8 +36,6 @@
}

_SUBSCRIBE_EVENTS = [
HathorEvents.MANAGER_ON_START,
HathorEvents.LOAD_FINISHED,
HathorEvents.NETWORK_NEW_TX_ACCEPTED,
HathorEvents.REORG_STARTED,
HathorEvents.REORG_FINISHED,
Expand Down Expand Up @@ -125,6 +123,26 @@ def _subscribe_events(self) -> None:
for event in _SUBSCRIBE_EVENTS:
self._pubsub.subscribe(event, self._handle_hathor_event)

def load_started(self):
if not self._is_running:
return

self._handle_event(
event_type=EventType.LOAD_STARTED,
event_args=EventArguments(),
)
self._event_storage.save_node_state(NodeState.LOAD)

def load_finished(self):
if not self._is_running:
return

self._handle_event(
event_type=EventType.LOAD_FINISHED,
event_args=EventArguments(),
)
self._event_storage.save_node_state(NodeState.SYNC)

def _handle_hathor_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None:
"""Handles a PubSub 'HathorEvents' event."""
event_type = EventType.from_hathor_event(hathor_event)
Expand All @@ -136,14 +154,6 @@ def _handle_event(self, event_type: EventType, event_args: EventArguments) -> No
assert self._is_running, 'Cannot handle event, EventManager is not started.'
assert self._event_ws_factory is not None

event_specific_handlers = {
EventType.LOAD_STARTED: self._handle_load_started,
EventType.LOAD_FINISHED: self._handle_load_finished
}

if event_specific_handler := event_specific_handlers.get(event_type):
event_specific_handler()

event = self._handle_event_creation(event_type, event_args)

self._event_storage.save_event(event)
Expand Down Expand Up @@ -221,14 +231,6 @@ def _create_event(
group_id=group_id,
)

def _handle_load_started(self) -> None:
"""Event specific handler for EventType.LOAD_STARTED."""
self._event_storage.save_node_state(NodeState.LOAD)

def _handle_load_finished(self) -> None:
"""Event specific handler for EventType.LOAD_FINISHED."""
self._event_storage.save_node_state(NodeState.SYNC)

def _should_reload_events(self) -> bool:
"""Returns whether events should be reloaded or not."""
return self._previous_node_state in [None, NodeState.LOAD]
Expand All @@ -241,7 +243,12 @@ def save_event_queue_state(self, state: bool) -> None:
"""Saves whether the event queue feature is enabled from the storage."""
self._event_storage.save_event_queue_state(state)

def handle_load_phase_vertices(self, topological_iterator: Iterator[BaseTransaction]) -> None:
def handle_load_phase_vertices(
self,
*,
topological_iterator: Iterator[BaseTransaction],
total_vertices: int
) -> None:
"""
Either generates load phase events or not, depending on previous node state.
Does so asynchronously so events generated here are not processed before normal event handling.
Expand All @@ -251,10 +258,20 @@ def handle_load_phase_vertices(self, topological_iterator: Iterator[BaseTransact
if not self._should_reload_events():
return

for vertex in topological_iterator:
self._reactor.callLater(
delay=0,
callable=self._handle_event,
event_type=EventType.NEW_VERTEX_ACCEPTED,
event_args=EventArguments(tx=vertex)
)
def create_event_batch() -> Iterable[BaseEvent]:
assert self._event_ws_factory is not None
self.log.info('Starting creating events from existing database...')

for vertex in progress(topological_iterator, log=self.log, total=total_vertices):
event = self._handle_event_creation(
event_type=EventType.NEW_VERTEX_ACCEPTED,
event_args=EventArguments(tx=vertex)
)

yield event
self._event_ws_factory.broadcast_event(event)
self._last_event = event

self.log.info('Finished creating events from existing database.')

self._event_storage.save_events(create_event_batch())
2 changes: 0 additions & 2 deletions hathor/event/model/event_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ def data_type(self) -> type[BaseEventData]:


_HATHOR_EVENT_TO_EVENT_TYPE = {
HathorEvents.MANAGER_ON_START: EventType.LOAD_STARTED,
HathorEvents.LOAD_FINISHED: EventType.LOAD_FINISHED,
HathorEvents.NETWORK_NEW_TX_ACCEPTED: EventType.NEW_VERTEX_ACCEPTED,
HathorEvents.REORG_STARTED: EventType.REORG_STARTED,
HathorEvents.REORG_FINISHED: EventType.REORG_FINISHED,
Expand Down
7 changes: 6 additions & 1 deletion hathor/event/storage/event_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Iterator, Optional
from typing import Iterable, Iterator, Optional

from hathor.event.model.base_event import BaseEvent
from hathor.event.model.node_state import NodeState
Expand All @@ -25,6 +25,11 @@ def save_event(self, event: BaseEvent) -> None:
""" Saves an event in the storage"""
raise NotImplementedError

@abstractmethod
def save_events(self, events: Iterable[BaseEvent]) -> None:
""" Saves an event batch in the storage"""
raise NotImplementedError

@abstractmethod
def get_event(self, key: int) -> Optional[BaseEvent]:
""" Get a stored event by key"""
Expand Down
6 changes: 5 additions & 1 deletion hathor/event/storage/memory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Iterator, Optional
from typing import Iterable, Iterator, Optional

from hathor.event.model.base_event import BaseEvent
from hathor.event.model.node_state import NodeState
Expand All @@ -35,6 +35,10 @@ def save_event(self, event: BaseEvent) -> None:
self._last_group_id = event.group_id
self._events.append(event)

def save_events(self, events: Iterable[BaseEvent]) -> None:
for event in events:
self.save_event(event)

def get_event(self, key: int) -> Optional[BaseEvent]:
if key < 0:
raise ValueError(f'event.id \'{key}\' must be non-negative')
Expand Down
22 changes: 19 additions & 3 deletions hathor/event/storage/rocksdb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Iterator, Optional
from typing import TYPE_CHECKING, Iterable, Iterator, Optional, Union

from hathor.event.model.base_event import BaseEvent
from hathor.event.model.node_state import NodeState
Expand All @@ -21,6 +21,10 @@
from hathor.transaction.util import bytes_to_int, int_to_bytes
from hathor.util import json_dumpb

if TYPE_CHECKING:
import rocksdb


_CF_NAME_EVENT = b'event'
_CF_NAME_META = b'event-metadata'
_KEY_LAST_GROUP_ID = b'last-group-id'
Expand Down Expand Up @@ -66,17 +70,29 @@ def _db_get_last_group_id(self) -> Optional[int]:
return bytes_to_int(last_group_id)

def save_event(self, event: BaseEvent) -> None:
self._save_event(event, database=self._db)

def _save_event(self, event: BaseEvent, *, database: Union['rocksdb.DB', 'rocksdb.WriteBatch']) -> None:
if (self._last_event is None and event.id != 0) or \
(self._last_event is not None and event.id != self._last_event.id + 1):
raise ValueError('invalid event.id, ids must be sequential and leave no gaps')
event_data = json_dumpb(event.dict())
key = int_to_bytes(event.id, 8)
self._db.put((self._cf_event, key), event_data)
database.put((self._cf_event, key), event_data)
self._last_event = event
if event.group_id is not None:
self._db.put((self._cf_meta, _KEY_LAST_GROUP_ID), int_to_bytes(event.group_id, 8))
database.put((self._cf_meta, _KEY_LAST_GROUP_ID), int_to_bytes(event.group_id, 8))
self._last_group_id = event.group_id

def save_events(self, events: Iterable[BaseEvent]) -> None:
import rocksdb
batch = rocksdb.WriteBatch()

for event in events:
self._save_event(event, database=batch)

self._db.write(batch)

def get_event(self, key: int) -> Optional[BaseEvent]:
if key < 0:
raise ValueError(f'event.id \'{key}\' must be non-negative')
Expand Down
8 changes: 6 additions & 2 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ def start(self) -> None:

self.state = self.NodeState.INITIALIZING
self.pubsub.publish(HathorEvents.MANAGER_ON_START)
self._event_manager.load_started()
self.connections.start()
self.pow_thread_pool.start()

Expand Down Expand Up @@ -557,10 +558,13 @@ def _initialize_components_new(self) -> None:

if self._enable_event_queue:
topological_iterator = self.tx_storage.topological_iterator()
self._event_manager.handle_load_phase_vertices(topological_iterator)
self._event_manager.handle_load_phase_vertices(
topological_iterator=topological_iterator,
total_vertices=self.tx_storage.indexes.info.get_vertices_count()
)

self._event_manager.load_finished()
self.state = self.NodeState.READY
self.pubsub.publish(HathorEvents.LOAD_FINISHED)

t1 = time.time()
total_load_time = LogDuration(t1 - t0)
Expand Down
5 changes: 0 additions & 5 deletions hathor/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ class HathorEvents(Enum):
WALLET_ELEMENT_VOIDED:
Triggered when a wallet element is marked as voided
LOAD_FINISHED
Triggered when manager finishes reading local data and it is ready to sync
REORG_STARTED
Trigerred when consensus algorithm finds that a reorg started to happen
Expand Down Expand Up @@ -126,8 +123,6 @@ class HathorEvents(Enum):

WALLET_ELEMENT_VOIDED = 'wallet:element_voided'

LOAD_FINISHED = 'manager:load_finished'

REORG_STARTED = 'reorg:started'

REORG_FINISHED = 'reorg:finished'
Expand Down
Loading

0 comments on commit 72bc871

Please sign in to comment.