Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(init): speed up initialization #392

Merged
merged 1 commit into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions hathor/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,8 @@ class InvalidNewTransaction(HathorError):
"""Raised when a new received tx/block is not valid.
"""
pass


class InitializationError(HathorError):
"""Raised when there's anything wrong during initialization that should cause it to be aborted.
"""
8 changes: 6 additions & 2 deletions hathor/indexes/address_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from abc import abstractmethod
from typing import TYPE_CHECKING, Iterable, List, Optional

from structlog import get_logger

from hathor.indexes.base_index import BaseIndex
from hathor.transaction import BaseTransaction

if TYPE_CHECKING: # pragma: no cover
Expand All @@ -25,11 +26,14 @@
logger = get_logger()


class AddressIndex(ABC):
class AddressIndex(BaseIndex):
""" Index of inputs/outputs by address
"""
pubsub: Optional['PubSubManager']

def init_loop_step(self, tx: BaseTransaction) -> None:
self.add_tx(tx)

def publish_tx(self, tx: BaseTransaction, *, addresses: Optional[Iterable[str]] = None) -> None:
""" Publish WALLET_ADDRESS_HISTORY for all addresses of a transaction.
"""
Expand Down
47 changes: 47 additions & 0 deletions hathor/indexes/base_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2021 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Optional

from hathor.transaction.base_transaction import BaseTransaction


class BaseIndex(ABC):
""" All indexes must inherit from this index.

This class exists so we can interact with indexes without knowing anything specific to its implemented. It was
created to generalize how we initialize indexes and keep track of which ones are up-to-date.
"""

@abstractmethod
def get_db_name(self) -> Optional[str]:
""" The returned string is used to generate the relevant attributes for storing an indexe's state in the db.

If None is returned, the database will not store the index initialization state and they will always be
initialized. This is the expected mode that memory-only indexes will use.
"""
raise NotImplementedError

@abstractmethod
def init_loop_step(self, tx: BaseTransaction) -> None:
""" When the index needs to be initialized, this function will be called for every tx in topological order.
"""
raise NotImplementedError

@abstractmethod
def force_clear(self) -> None:
""" Clear any existing data in the index.
"""
raise NotImplementedError
11 changes: 9 additions & 2 deletions hathor/indexes/deps_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from abc import abstractmethod
from typing import TYPE_CHECKING, Iterator, List

from hathor.indexes.base_index import BaseIndex
from hathor.transaction import BaseTransaction, Block

if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -48,7 +49,7 @@ def get_requested_from_height(tx: BaseTransaction) -> int:
return block.get_metadata().height


class DepsIndex(ABC):
class DepsIndex(BaseIndex):
""" Index of dependencies between transactions

This index exists to accelerate queries related to the partial validation mechanism needed by sync-v2. More
Expand Down Expand Up @@ -104,6 +105,12 @@ class DepsIndex(ABC):
them.
"""

def init_loop_step(self, tx: BaseTransaction) -> None:
tx_meta = tx.get_metadata()
if tx_meta.voided_by:
msbrogli marked this conversation as resolved.
Show resolved Hide resolved
return
self.add_tx(tx, partial=False)

@abstractmethod
def add_tx(self, tx: BaseTransaction, partial: bool = True) -> None:
"""Update 'deps' and 'needed' sub-indexes, removing them when necessary (i.e. validation is complete).
Expand Down
19 changes: 16 additions & 3 deletions hathor/indexes/height_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from abc import abstractmethod
from typing import List, NamedTuple, Optional, Tuple

from hathor.transaction import Block
from hathor.indexes.base_index import BaseIndex
from hathor.transaction import BaseTransaction, Block
from hathor.transaction.genesis import BLOCK_GENESIS
from hathor.util import not_none

Expand All @@ -35,10 +36,22 @@ class _AddToIndexItem(NamedTuple):
timestamp: int


class HeightIndex(ABC):
class HeightIndex(BaseIndex):
"""Store the block hash for each given height
"""

def init_loop_step(self, tx: BaseTransaction) -> None:
if not tx.is_block:
return
if tx.is_genesis:
return
assert isinstance(tx, Block)
assert tx.hash is not None
tx_meta = tx.get_metadata()
if tx_meta.voided_by:
return
self.add_new(tx_meta.height, tx.hash, tx.timestamp)

@abstractmethod
def add_new(self, height: int, block_hash: bytes, timestamp: int) -> None:
"""Add a new block to the height index that **MUST NOT** result in a re-org"""
Expand Down
137 changes: 119 additions & 18 deletions hathor/indexes/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@
# limitations under the License.

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional
from enum import Enum, auto
from typing import TYPE_CHECKING, Iterator, List, Optional, Tuple

from structlog import get_logger

from hathor.indexes.address_index import AddressIndex
from hathor.indexes.base_index import BaseIndex
from hathor.indexes.deps_index import DepsIndex
from hathor.indexes.height_index import HeightIndex
from hathor.indexes.mempool_tips_index import MempoolTipsIndex
from hathor.indexes.timestamp_index import TimestampIndex
from hathor.indexes.tips_index import TipsIndex
from hathor.indexes.tokens_index import TokensIndex
from hathor.transaction import BaseTransaction
from hathor.util import progress

if TYPE_CHECKING: # pragma: no cover
import rocksdb
Expand All @@ -35,13 +38,21 @@
logger = get_logger()


class _IndexFilter(Enum):
msbrogli marked this conversation as resolved.
Show resolved Hide resolved
ALL = auto() # block or tx, voided or not
ALL_BLOCKS = auto() # only blocks that are not voided
VALID_TXS = auto() # only transactions that are not voided
msbrogli marked this conversation as resolved.
Show resolved Hide resolved


class IndexesManager(ABC):
""" IndexesManager manages all the indexes that we will have in the system

The idea is for the manager to handle all method calls to indexes,
so it will know which index is better to use in each moment
"""

log = get_logger()

all_tips: TipsIndex
block_tips: TipsIndex
tx_tips: TipsIndex
Expand All @@ -56,6 +67,39 @@ class IndexesManager(ABC):
addresses: Optional[AddressIndex]
tokens: Optional[TokensIndex]

def __init_checks__(self):
""" Implementations must call this at the **end** of their __init__ for running ValueError checks."""
# check if every index has a unique db_name
indexes_db_names = set()
for index in self.iter_all_indexes():
index_db_name = index.get_db_name()
if index_db_name is None:
continue
if index_db_name in indexes_db_names:
raise ValueError(f'duplicate index name "{index_db_name}", already in use by another index')
indexes_db_names.add(index_db_name)

def iter_all_indexes(self) -> Iterator[BaseIndex]:
""" Iterate over all of the indexes abstracted by this manager, hiding their specific implementation details"""
for _, index in self._iter_all_indexes_with_filter():
yield index

def _iter_all_indexes_with_filter(self) -> Iterator[Tuple[_IndexFilter, BaseIndex]]:
""" Same as `iter_all_indexes()`, but includes a filter for what transactions an index is interested in."""
yield _IndexFilter.ALL, self.all_tips
yield _IndexFilter.ALL_BLOCKS, self.block_tips
yield _IndexFilter.VALID_TXS, self.tx_tips
yield _IndexFilter.ALL, self.sorted_all
yield _IndexFilter.ALL_BLOCKS, self.sorted_blocks
yield _IndexFilter.VALID_TXS, self.sorted_txs
yield _IndexFilter.ALL, self.deps
yield _IndexFilter.ALL, self.height
yield _IndexFilter.ALL, self.mempool_tips
if self.addresses is not None:
yield _IndexFilter.ALL, self.addresses
if self.tokens is not None:
yield _IndexFilter.ALL, self.tokens

@abstractmethod
def enable_address_index(self, pubsub: 'PubSubManager') -> None:
"""Enable address index. It does nothing if it has already been enabled."""
Expand All @@ -66,23 +110,74 @@ def enable_tokens_index(self) -> None:
"""Enable tokens index. It does nothing if it has already been enabled."""
raise NotImplementedError

def _manually_initialize_tips_indexes(self, tx_storage: 'TransactionStorage') -> None:
""" Initialize the tips indexes, populating them from a tx_storage that is otherwise complete.
def force_clear_all(self) -> None:
""" Force clear all indexes.
"""
for index in self.iter_all_indexes():
index.force_clear()

XXX: this method requires timestamp indexes to be complete and up-to-date with the rest of the database
XXX: this method is not yet being used
def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None:
""" Initialize the indexes, checking the indexes that need initialization, and the optimal iterator to use.
"""
for tx in tx_storage._topological_sort_timestamp_index():
tx_meta = tx.get_metadata()
if not tx_meta.validation.is_final():
continue
from hathor.transaction.genesis import BLOCK_GENESIS
from hathor.transaction.storage.transaction_storage import NULL_INDEX_LAST_STARTED_AT

self.all_tips.add_tx(tx)
db_last_started_at = tx_storage.get_last_started_at()

indexes_to_init: List[Tuple[_IndexFilter, BaseIndex]] = []
for index_filter, index in self._iter_all_indexes_with_filter():
index_db_name = index.get_db_name()
if index_db_name is None:
indexes_to_init.append((index_filter, index))
continue
index_last_started_at = tx_storage.get_index_last_started_at(index_db_name)
if db_last_started_at != index_last_started_at:
indexes_to_init.append((index_filter, index))

if indexes_to_init:
self.log.debug('there are indexes that need initialization',
indexes_to_init=[i for _, i in indexes_to_init])
else:
self.log.debug('there are no indexes that need initialization')

# make sure that all the indexes that we're rebuilding are cleared
for _, index in indexes_to_init:
index_db_name = index.get_db_name()
if index_db_name:
tx_storage.set_index_last_started_at(index_db_name, NULL_INDEX_LAST_STARTED_AT)
index.force_clear()

block_count = 0
tx_count = 0
latest_timestamp = BLOCK_GENESIS.timestamp
first_timestamp = BLOCK_GENESIS.timestamp

for tx in progress(tx_storage.topological_iterator(), log=self.log):
# XXX: these would probably make more sense to be their own simple "indexes" instead of how it is here
latest_timestamp = max(tx.timestamp, latest_timestamp)
first_timestamp = min(tx.timestamp, first_timestamp)
if tx.is_block:
self.block_tips.add_tx(tx)
elif not tx_meta.voided_by:
self.tx_tips.add_tx(tx)
block_count += 1
else:
tx_count += 1

tx_meta = tx.get_metadata()

# feed each transaction to the indexes that they are interested in
for index_filter, index in indexes_to_init:
msbrogli marked this conversation as resolved.
Show resolved Hide resolved
if index_filter is _IndexFilter.ALL:
index.init_loop_step(tx)
elif index_filter is _IndexFilter.ALL_BLOCKS:
if tx.is_block:
jansegre marked this conversation as resolved.
Show resolved Hide resolved
index.init_loop_step(tx)
elif index_filter is _IndexFilter.VALID_TXS:
# XXX: all indexes that use this filter treat soft-voided as voided, nothing special needed
if tx.is_transaction and not tx_meta.voided_by:
index.init_loop_step(tx)
else:
assert False, 'impossible filter'

tx_storage._update_caches(block_count, tx_count, latest_timestamp, first_timestamp)

def add_tx(self, tx: BaseTransaction) -> bool:
""" Add a transaction to the indexes
Expand Down Expand Up @@ -162,6 +257,9 @@ def __init__(self) -> None:
self.mempool_tips = MemoryMempoolTipsIndex()
self.deps = MemoryDepsIndex()

# XXX: this has to be at the end of __init__, after everything has been initialized
self.__init_checks__()

def enable_address_index(self, pubsub: 'PubSubManager') -> None:
from hathor.indexes.memory_address_index import MemoryAddressIndex
if self.addresses is None:
Expand All @@ -176,8 +274,8 @@ def enable_tokens_index(self) -> None:
class RocksDBIndexesManager(IndexesManager):
def __init__(self, db: 'rocksdb.DB') -> None:
from hathor.indexes.memory_deps_index import MemoryDepsIndex
msbrogli marked this conversation as resolved.
Show resolved Hide resolved
from hathor.indexes.memory_mempool_tips_index import MemoryMempoolTipsIndex
msbrogli marked this conversation as resolved.
Show resolved Hide resolved
from hathor.indexes.rocksdb_height_index import RocksDBHeightIndex
from hathor.indexes.rocksdb_mempool_tips_index import RocksDBMempoolTipsIndex
from hathor.indexes.rocksdb_timestamp_index import RocksDBTimestampIndex

self._db = db
Expand All @@ -186,16 +284,19 @@ def __init__(self, db: 'rocksdb.DB') -> None:
self.block_tips = TipsIndex()
self.tx_tips = TipsIndex()

self.sorted_all = RocksDBTimestampIndex(self._db, cf_name=b'timestamp-sorted-all')
self.sorted_blocks = RocksDBTimestampIndex(self._db, cf_name=b'timestamp-sorted-blocks')
self.sorted_txs = RocksDBTimestampIndex(self._db, cf_name=b'timestamp-sorted-txs')
self.sorted_all = RocksDBTimestampIndex(self._db, 'all')
self.sorted_blocks = RocksDBTimestampIndex(self._db, 'blocks')
self.sorted_txs = RocksDBTimestampIndex(self._db, 'txs')

self.addresses = None
self.tokens = None
self.height = RocksDBHeightIndex(self._db)
self.mempool_tips = RocksDBMempoolTipsIndex(self._db)
self.mempool_tips = MemoryMempoolTipsIndex() # use of RocksDBMempoolTipsIndex is very slow and was suspended
msbrogli marked this conversation as resolved.
Show resolved Hide resolved
self.deps = MemoryDepsIndex() # use of RocksDBDepsIndex is currently suspended until it is fixed

# XXX: this has to be at the end of __init__, after everything has been initialized
self.__init_checks__()

def enable_address_index(self, pubsub: 'PubSubManager') -> None:
from hathor.indexes.rocksdb_address_index import RocksDBAddressIndex
if self.addresses is None:
Expand Down
11 changes: 10 additions & 1 deletion hathor/indexes/memory_address_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,21 @@
class MemoryAddressIndex(AddressIndex):
""" Index of inputs/outputs by address
"""

index: DefaultDict[str, Set[bytes]]

def __init__(self, pubsub: Optional['PubSubManager'] = None) -> None:
self.index: DefaultDict[str, Set[bytes]] = defaultdict(set)
self.pubsub = pubsub
self.force_clear()
if self.pubsub:
self.subscribe_pubsub_events()

def get_db_name(self) -> Optional[str]:
pedroferreira1 marked this conversation as resolved.
Show resolved Hide resolved
return None

def force_clear(self) -> None:
self.index = defaultdict(set)

def subscribe_pubsub_events(self) -> None:
""" Subscribe wallet index to receive voided/winner tx pubsub events
"""
Expand Down
Loading