From 12c7d647123ad33fabbb02caea0a412a6b1a5dbb Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Thu, 15 Jun 2023 13:07:07 -0300 Subject: [PATCH] refactor(events): improve events during the load phase --- hathor/event/event_manager.py | 73 ++++++++++++------- hathor/event/model/event_type.py | 2 - hathor/event/storage/event_storage.py | 7 +- hathor/event/storage/memory_storage.py | 6 +- hathor/event/storage/rocksdb_storage.py | 22 +++++- hathor/manager.py | 8 +- hathor/pubsub.py | 5 -- .../event/test_event_simulation_scenarios.py | 24 +++--- tests/event/test_event_storage.py | 10 +++ 9 files changed, 103 insertions(+), 54 deletions(-) diff --git a/hathor/event/event_manager.py b/hathor/event/event_manager.py index 08716728c..d93c0b8af 100644 --- a/hathor/event/event_manager.py +++ b/hathor/event/event_manager.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Iterator, Optional +from typing import Callable, Iterable, Iterator, Optional from structlog import get_logger @@ -23,7 +23,7 @@ from hathor.event.websocket import EventWebsocketFactory from hathor.pubsub import EventArguments, HathorEvents, PubSubManager from hathor.transaction import BaseTransaction -from hathor.util import Reactor +from hathor.util import Reactor, progress logger = get_logger() @@ -36,8 +36,6 @@ } _SUBSCRIBE_EVENTS = [ - HathorEvents.MANAGER_ON_START, - HathorEvents.LOAD_FINISHED, HathorEvents.NETWORK_NEW_TX_ACCEPTED, HathorEvents.REORG_STARTED, HathorEvents.REORG_FINISHED, @@ -125,6 +123,26 @@ def _subscribe_events(self) -> None: for event in _SUBSCRIBE_EVENTS: self._pubsub.subscribe(event, self._handle_hathor_event) + def load_started(self): + if not self._is_running: + return + + self._handle_event( + event_type=EventType.LOAD_STARTED, + event_args=EventArguments(), + ) + self._event_storage.save_node_state(NodeState.LOAD) + + def load_finished(self): + if not self._is_running: + return + + self._handle_event( + event_type=EventType.LOAD_FINISHED, + event_args=EventArguments(), + ) + self._event_storage.save_node_state(NodeState.SYNC) + def _handle_hathor_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None: """Handles a PubSub 'HathorEvents' event.""" event_type = EventType.from_hathor_event(hathor_event) @@ -136,14 +154,6 @@ def _handle_event(self, event_type: EventType, event_args: EventArguments) -> No assert self._is_running, 'Cannot handle event, EventManager is not started.' assert self._event_ws_factory is not None - event_specific_handlers = { - EventType.LOAD_STARTED: self._handle_load_started, - EventType.LOAD_FINISHED: self._handle_load_finished - } - - if event_specific_handler := event_specific_handlers.get(event_type): - event_specific_handler() - event = self._handle_event_creation(event_type, event_args) self._event_storage.save_event(event) @@ -221,14 +231,6 @@ def _create_event( group_id=group_id, ) - def _handle_load_started(self) -> None: - """Event specific handler for EventType.LOAD_STARTED.""" - self._event_storage.save_node_state(NodeState.LOAD) - - def _handle_load_finished(self) -> None: - """Event specific handler for EventType.LOAD_FINISHED.""" - self._event_storage.save_node_state(NodeState.SYNC) - def _should_reload_events(self) -> bool: """Returns whether events should be reloaded or not.""" return self._previous_node_state in [None, NodeState.LOAD] @@ -241,7 +243,12 @@ def save_event_queue_state(self, state: bool) -> None: """Saves whether the event queue feature is enabled from the storage.""" self._event_storage.save_event_queue_state(state) - def handle_load_phase_vertices(self, topological_iterator: Iterator[BaseTransaction]) -> None: + def handle_load_phase_vertices( + self, + *, + topological_iterator: Iterator[BaseTransaction], + total_vertices: int + ) -> None: """ Either generates load phase events or not, depending on previous node state. Does so asynchronously so events generated here are not processed before normal event handling. @@ -251,10 +258,20 @@ def handle_load_phase_vertices(self, topological_iterator: Iterator[BaseTransact if not self._should_reload_events(): return - for vertex in topological_iterator: - self._reactor.callLater( - delay=0, - callable=self._handle_event, - event_type=EventType.NEW_VERTEX_ACCEPTED, - event_args=EventArguments(tx=vertex) - ) + def create_event_batch() -> Iterable[BaseEvent]: + assert self._event_ws_factory is not None + self.log.info('Starting creating events from existing database...') + + for vertex in progress(topological_iterator, log=self.log, total=total_vertices): + event = self._handle_event_creation( + event_type=EventType.NEW_VERTEX_ACCEPTED, + event_args=EventArguments(tx=vertex) + ) + + yield event + self._event_ws_factory.broadcast_event(event) + self._last_event = event + + self.log.info('Finished creating events from existing database.') + + self._event_storage.save_events(create_event_batch()) diff --git a/hathor/event/model/event_type.py b/hathor/event/model/event_type.py index 59aa20f3e..7c697fbc8 100644 --- a/hathor/event/model/event_type.py +++ b/hathor/event/model/event_type.py @@ -40,8 +40,6 @@ def data_type(self) -> type[BaseEventData]: _HATHOR_EVENT_TO_EVENT_TYPE = { - HathorEvents.MANAGER_ON_START: EventType.LOAD_STARTED, - HathorEvents.LOAD_FINISHED: EventType.LOAD_FINISHED, HathorEvents.NETWORK_NEW_TX_ACCEPTED: EventType.NEW_VERTEX_ACCEPTED, HathorEvents.REORG_STARTED: EventType.REORG_STARTED, HathorEvents.REORG_FINISHED: EventType.REORG_FINISHED, diff --git a/hathor/event/storage/event_storage.py b/hathor/event/storage/event_storage.py index 2cff3ad33..00677c297 100644 --- a/hathor/event/storage/event_storage.py +++ b/hathor/event/storage/event_storage.py @@ -13,7 +13,7 @@ # limitations under the License. from abc import ABC, abstractmethod -from typing import Iterator, Optional +from typing import Iterable, Iterator, Optional from hathor.event.model.base_event import BaseEvent from hathor.event.model.node_state import NodeState @@ -25,6 +25,11 @@ def save_event(self, event: BaseEvent) -> None: """ Saves an event in the storage""" raise NotImplementedError + @abstractmethod + def save_events(self, events: Iterable[BaseEvent]) -> None: + """ Saves an event batch in the storage""" + raise NotImplementedError + @abstractmethod def get_event(self, key: int) -> Optional[BaseEvent]: """ Get a stored event by key""" diff --git a/hathor/event/storage/memory_storage.py b/hathor/event/storage/memory_storage.py index 0603b2826..d7ec9cb7c 100644 --- a/hathor/event/storage/memory_storage.py +++ b/hathor/event/storage/memory_storage.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Iterator, Optional +from typing import Iterable, Iterator, Optional from hathor.event.model.base_event import BaseEvent from hathor.event.model.node_state import NodeState @@ -35,6 +35,10 @@ def save_event(self, event: BaseEvent) -> None: self._last_group_id = event.group_id self._events.append(event) + def save_events(self, events: Iterable[BaseEvent]) -> None: + for event in events: + self.save_event(event) + def get_event(self, key: int) -> Optional[BaseEvent]: if key < 0: raise ValueError(f'event.id \'{key}\' must be non-negative') diff --git a/hathor/event/storage/rocksdb_storage.py b/hathor/event/storage/rocksdb_storage.py index edda8dcd0..1c675e2f9 100644 --- a/hathor/event/storage/rocksdb_storage.py +++ b/hathor/event/storage/rocksdb_storage.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Iterator, Optional +from typing import TYPE_CHECKING, Iterable, Iterator, Optional, Union from hathor.event.model.base_event import BaseEvent from hathor.event.model.node_state import NodeState @@ -21,6 +21,10 @@ from hathor.transaction.util import bytes_to_int, int_to_bytes from hathor.util import json_dumpb +if TYPE_CHECKING: + import rocksdb + + _CF_NAME_EVENT = b'event' _CF_NAME_META = b'event-metadata' _KEY_LAST_GROUP_ID = b'last-group-id' @@ -66,17 +70,29 @@ def _db_get_last_group_id(self) -> Optional[int]: return bytes_to_int(last_group_id) def save_event(self, event: BaseEvent) -> None: + self._save_event(event, database=self._db) + + def _save_event(self, event: BaseEvent, *, database: Union['rocksdb.DB', 'rocksdb.WriteBatch']) -> None: if (self._last_event is None and event.id != 0) or \ (self._last_event is not None and event.id != self._last_event.id + 1): raise ValueError('invalid event.id, ids must be sequential and leave no gaps') event_data = json_dumpb(event.dict()) key = int_to_bytes(event.id, 8) - self._db.put((self._cf_event, key), event_data) + database.put((self._cf_event, key), event_data) self._last_event = event if event.group_id is not None: - self._db.put((self._cf_meta, _KEY_LAST_GROUP_ID), int_to_bytes(event.group_id, 8)) + database.put((self._cf_meta, _KEY_LAST_GROUP_ID), int_to_bytes(event.group_id, 8)) self._last_group_id = event.group_id + def save_events(self, events: Iterable[BaseEvent]) -> None: + import rocksdb + batch = rocksdb.WriteBatch() + + for event in events: + self._save_event(event, database=batch) + + self._db.write(batch) + def get_event(self, key: int) -> Optional[BaseEvent]: if key < 0: raise ValueError(f'event.id \'{key}\' must be non-negative') diff --git a/hathor/manager.py b/hathor/manager.py index 252841f9e..b1effa871 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -258,6 +258,7 @@ def start(self) -> None: self.state = self.NodeState.INITIALIZING self.pubsub.publish(HathorEvents.MANAGER_ON_START) + self._event_manager.load_started() self.connections.start() self.pow_thread_pool.start() @@ -557,10 +558,13 @@ def _initialize_components_new(self) -> None: if self._enable_event_queue: topological_iterator = self.tx_storage.topological_iterator() - self._event_manager.handle_load_phase_vertices(topological_iterator) + self._event_manager.handle_load_phase_vertices( + topological_iterator=topological_iterator, + total_vertices=self.tx_storage.indexes.info.get_vertices_count() + ) + self._event_manager.load_finished() self.state = self.NodeState.READY - self.pubsub.publish(HathorEvents.LOAD_FINISHED) t1 = time.time() total_load_time = LogDuration(t1 - t0) diff --git a/hathor/pubsub.py b/hathor/pubsub.py index 2c03190d0..10aaa6d9e 100644 --- a/hathor/pubsub.py +++ b/hathor/pubsub.py @@ -82,9 +82,6 @@ class HathorEvents(Enum): WALLET_ELEMENT_VOIDED: Triggered when a wallet element is marked as voided - LOAD_FINISHED - Triggered when manager finishes reading local data and it is ready to sync - REORG_STARTED Trigerred when consensus algorithm finds that a reorg started to happen @@ -126,8 +123,6 @@ class HathorEvents(Enum): WALLET_ELEMENT_VOIDED = 'wallet:element_voided' - LOAD_FINISHED = 'manager:load_finished' - REORG_STARTED = 'reorg:started' REORG_FINISHED = 'reorg:finished' diff --git a/tests/event/test_event_simulation_scenarios.py b/tests/event/test_event_simulation_scenarios.py index 53784e791..c5f8398f7 100644 --- a/tests/event/test_event_simulation_scenarios.py +++ b/tests/event/test_event_simulation_scenarios.py @@ -47,9 +47,9 @@ def test_only_load(self) -> None: # LOAD_STATED EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=0, timestamp=1578878880.0, type=EventType.LOAD_STARTED, data=EmptyData(), group_id=None), latest_event_id=4), # noqa: E501 # One NEW_VERTEX_ACCEPTED for each genesis (1 block and 2 txs) - EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=1, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', nonce=2, timestamp=1572636345, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=4), # noqa: E501 - EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=2, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', nonce=0, timestamp=1572636343, version=0, weight=2.0, inputs=[], outputs=[TxOutput(value=100000000000, script='dqkU/QUFm2AGJJVDuC82h2oXxz/SJnuIrA==', token_data=0)], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=4), # noqa: E501 - EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=3, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', nonce=6, timestamp=1572636344, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=4), # noqa: E501 + EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=1, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', nonce=0, timestamp=1572636343, version=0, weight=2.0, inputs=[], outputs=[TxOutput(value=100000000000, script='dqkU/QUFm2AGJJVDuC82h2oXxz/SJnuIrA==', token_data=0)], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=4), # noqa: E501 + EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=2, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', nonce=6, timestamp=1572636344, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=4), # noqa: E501 + EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=3, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', nonce=2, timestamp=1572636345, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=4), # noqa: E501 # LOAD_FINISHED EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=4, timestamp=1578878880.0, type=EventType.LOAD_FINISHED, data=EmptyData(), group_id=None), latest_event_id=4) # noqa: E501 ] @@ -75,9 +75,9 @@ def test_single_chain_one_block(self): # LOAD_STATED EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=0, timestamp=1578878880.0, type=EventType.LOAD_STARTED, data=EmptyData(), group_id=None), latest_event_id=8), # noqa E501 # One NEW_VERTEX_ACCEPTED for each genesis (1 block and 2 txs) - EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=1, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', nonce=2, timestamp=1572636345, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=8), # noqa E501 - EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=2, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', nonce=0, timestamp=1572636343, version=0, weight=2.0, inputs=[], outputs=[TxOutput(value=100000000000, script='dqkU/QUFm2AGJJVDuC82h2oXxz/SJnuIrA==', token_data=0)], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=8), # noqa E501 - EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=3, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', nonce=6, timestamp=1572636344, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=8), # noqa E501 + EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=1, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', nonce=0, timestamp=1572636343, version=0, weight=2.0, inputs=[], outputs=[TxOutput(value=100000000000, script='dqkU/QUFm2AGJJVDuC82h2oXxz/SJnuIrA==', token_data=0)], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=8), # noqa: E501 + EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=2, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', nonce=6, timestamp=1572636344, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=8), # noqa: E501 + EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=3, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', nonce=2, timestamp=1572636345, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=8), # noqa: E501 # LOAD_FINISHED EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=4, timestamp=1578878880.0, type=EventType.LOAD_FINISHED, data=EmptyData(), group_id=None), latest_event_id=8), # noqa E501 ], @@ -126,9 +126,9 @@ def test_single_chain_blocks_and_transactions(self): # LOAD_STATED EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=0, timestamp=1578878880.0, type=EventType.LOAD_STARTED, data=EmptyData(), group_id=None), latest_event_id=36), # noqa E501 # One NEW_VERTEX_ACCEPTED for each genesis (1 block and 2 txs) - EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=1, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', nonce=2, timestamp=1572636345, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=36), # noqa E501 - EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=2, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', nonce=0, timestamp=1572636343, version=0, weight=2.0, inputs=[], outputs=[TxOutput(value=100000000000, script='dqkU/QUFm2AGJJVDuC82h2oXxz/SJnuIrA==', token_data=0)], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=36), # noqa E501 - EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=3, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', nonce=6, timestamp=1572636344, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=36), # noqa E501 + EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=1, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', nonce=0, timestamp=1572636343, version=0, weight=2.0, inputs=[], outputs=[TxOutput(value=100000000000, script='dqkU/QUFm2AGJJVDuC82h2oXxz/SJnuIrA==', token_data=0)], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=36), # noqa: E501 + EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=2, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', nonce=6, timestamp=1572636344, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=36), # noqa: E501 + EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=3, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', nonce=2, timestamp=1572636345, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=36), # noqa: E501 # LOAD_FINISHED EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=4, timestamp=1578878880.0, type=EventType.LOAD_FINISHED, data=EmptyData(), group_id=None), latest_event_id=36), # noqa E501 ], @@ -224,9 +224,9 @@ def test_reorg(self): # LOAD_STATED EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=0, timestamp=1578878880.0, type=EventType.LOAD_STARTED, data=EmptyData(), group_id=None), latest_event_id=20), # noqa E501 # One NEW_VERTEX_ACCEPTED for each genesis (1 block and 2 txs) - EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=1, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', nonce=2, timestamp=1572636345, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=20), # noqa E501 - EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=2, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', nonce=0, timestamp=1572636343, version=0, weight=2.0, inputs=[], outputs=[TxOutput(value=100000000000, script='dqkU/QUFm2AGJJVDuC82h2oXxz/SJnuIrA==', token_data=0)], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=20), # noqa E501 - EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=3, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', nonce=6, timestamp=1572636344, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=20), # noqa E501 + EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=1, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', nonce=0, timestamp=1572636343, version=0, weight=2.0, inputs=[], outputs=[TxOutput(value=100000000000, script='dqkU/QUFm2AGJJVDuC82h2oXxz/SJnuIrA==', token_data=0)], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='339f47da87435842b0b1b528ecd9eac2495ce983b3e9c923a37e1befbe12c792', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=20), # noqa: E501 + EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=2, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', nonce=6, timestamp=1572636344, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='16ba3dbe424c443e571b00840ca54b9ff4cff467e10b6a15536e718e2008f952', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=20), # noqa: E501 + EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=3, timestamp=1578878880.0, type=EventType.NEW_VERTEX_ACCEPTED, data=TxData(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', nonce=2, timestamp=1572636345, version=1, weight=2.0, inputs=[], outputs=[], parents=[], tokens=[], token_name=None, token_symbol=None, metadata=TxMetadata(hash='33e14cb555a96967841dcbe0f95e9eab5810481d01de8f4f73afb8cce365e869', spent_outputs=[], conflict_with=[], voided_by=[], received_by=[], children=[], twins=[], accumulated_weight=2.0, score=2.0, first_block=None, height=0, validation='full'), aux_pow=None), group_id=None), latest_event_id=20), # noqa: E501 # LOAD_FINISHED EventResponse(type='EVENT', event=BaseEvent(peer_id=self.peer_id, id=4, timestamp=1578878880.0, type=EventType.LOAD_FINISHED, data=EmptyData(), group_id=None), latest_event_id=20), # noqa E501 ], diff --git a/tests/event/test_event_storage.py b/tests/event/test_event_storage.py index 84427e1d7..1013fb25f 100644 --- a/tests/event/test_event_storage.py +++ b/tests/event/test_event_storage.py @@ -27,6 +27,16 @@ def test_save_event_and_retrieve(self): assert event_retrieved == event + def test_save_events_and_retrieve(self): + event1 = self.event_mocker.generate_mocked_event() + event2 = self.event_mocker.generate_mocked_event() + self.event_storage.save_events([event1, event2]) + event1_retrieved = self.event_storage.get_event(event1.id) + event2_retrieved = self.event_storage.get_event(event2.id) + + assert event1_retrieved == event1 + assert event2_retrieved == event2 + def test_get_negative_key(self): with self.assertRaises(ValueError) as cm: self.event_storage.get_event(-1)