Skip to content

Commit

Permalink
feat(init): speed up initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
jansegre committed May 9, 2022
1 parent b064c37 commit f37ecdc
Show file tree
Hide file tree
Showing 28 changed files with 605 additions and 217 deletions.
11 changes: 9 additions & 2 deletions hathor/indexes/address_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from abc import abstractmethod
from typing import TYPE_CHECKING, Iterable, List, Optional

from structlog import get_logger

from hathor.indexes.base_index import BaseIndex
from hathor.transaction import BaseTransaction

if TYPE_CHECKING: # pragma: no cover
Expand All @@ -25,11 +26,17 @@
logger = get_logger()


class AddressIndex(ABC):
class AddressIndex(BaseIndex):
""" Index of inputs/outputs by address
"""
pubsub: Optional['PubSubManager']

def init_loop_step(self, tx: BaseTransaction) -> None:
tx_meta = tx.get_metadata()
if tx_meta.voided_by:
return
self.add_tx(tx)

def publish_tx(self, tx: BaseTransaction, *, addresses: Optional[Iterable[str]] = None) -> None:
""" Publish WALLET_ADDRESS_HISTORY for all addresses of a transaction.
"""
Expand Down
47 changes: 47 additions & 0 deletions hathor/indexes/base_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2021 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Optional

from hathor.transaction.base_transaction import BaseTransaction


class BaseIndex(ABC):
""" All indexes must inherit from this index.
This class exists so we can interact with indexes without knowing anything specific to its implemented. It was
created to generalize how we initialize indexes and keep track of which ones are up-to-date.
"""

@abstractmethod
def get_db_name(self) -> Optional[str]:
""" The returned string is used to generate the relevant attributes for storing an indexe's state in the db.
If None is returned, the database will not store the index initialization state and they will always be
initialized. This is the expected mode that memory-only indexes will use.
"""
raise NotImplementedError

@abstractmethod
def init_loop_step(self, tx: BaseTransaction) -> None:
""" When the index needs to be initialized, this function will be called for each tx in the database.
"""
raise NotImplementedError

@abstractmethod
def force_clear(self) -> None:
""" Clear any existing data in the index.
"""
raise NotImplementedError
11 changes: 9 additions & 2 deletions hathor/indexes/deps_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from abc import abstractmethod
from typing import TYPE_CHECKING, Iterator, List

from hathor.indexes.base_index import BaseIndex
from hathor.transaction import BaseTransaction, Block

if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -49,7 +50,7 @@ def get_requested_from_height(tx: BaseTransaction) -> int:
return block.get_metadata().height


class DepsIndex(ABC):
class DepsIndex(BaseIndex):
""" Index of dependencies between transactions
This index exists to accelerate queries related to the partial validation mechanism needed by sync-v2. More
Expand Down Expand Up @@ -105,6 +106,12 @@ class DepsIndex(ABC):
them.
"""

def init_loop_step(self, tx: BaseTransaction) -> None:
tx_meta = tx.get_metadata()
if tx_meta.voided_by:
return
self.add_tx(tx)

@abstractmethod
def add_tx(self, tx: BaseTransaction, partial: bool = True) -> None:
"""Update 'deps' and 'needed' sub-indexes, removing them when necessary (i.e. validation is complete).
Expand Down
19 changes: 16 additions & 3 deletions hathor/indexes/height_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from abc import abstractmethod
from typing import List, NamedTuple, Optional, Tuple

from hathor.transaction import Block
from hathor.indexes.base_index import BaseIndex
from hathor.transaction import BaseTransaction, Block
from hathor.transaction.genesis import BLOCK_GENESIS
from hathor.util import not_none

Expand All @@ -35,10 +36,22 @@ class _AddToIndexItem(NamedTuple):
timestamp: int


class HeightIndex(ABC):
class HeightIndex(BaseIndex):
"""Store the block hash for each given height
"""

def init_loop_step(self, tx: BaseTransaction) -> None:
if not tx.is_block:
return
if tx.is_genesis:
return
assert isinstance(tx, Block)
assert tx.hash is not None
tx_meta = tx.get_metadata()
if tx_meta.voided_by:
return
self.add_new(tx_meta.height, tx.hash, tx.timestamp)

@abstractmethod
def add_new(self, height: int, block_hash: bytes, timestamp: int) -> None:
"""Add a new block to the height index that **MUST NOT** result in a re-org"""
Expand Down
130 changes: 114 additions & 16 deletions hathor/indexes/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@
# limitations under the License.

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional
from enum import Enum, auto
from typing import TYPE_CHECKING, Iterator, List, Optional, Tuple

from structlog import get_logger

from hathor.indexes.address_index import AddressIndex
from hathor.indexes.base_index import BaseIndex
from hathor.indexes.deps_index import DepsIndex
from hathor.indexes.height_index import HeightIndex
from hathor.indexes.mempool_tips_index import MempoolTipsIndex
from hathor.indexes.timestamp_index import TimestampIndex
from hathor.indexes.tips_index import TipsIndex
from hathor.indexes.tokens_index import TokensIndex
from hathor.transaction import BaseTransaction
from hathor.util import progress

if TYPE_CHECKING: # pragma: no cover
import rocksdb
Expand All @@ -35,13 +38,21 @@
logger = get_logger()


class _IndexFilter(Enum):
ALL = auto() # block or tx, voided or not
VALID_BLOCKS = auto() # only blocks that are not voided
VALID_TXS = auto() # only transactions that are not voided


class IndexesManager(ABC):
""" IndexesManager manages all the indexes that we will have in the system
The idea is for the manager to handle all method calls to indexes,
so it will know which index is better to use in each moment
"""

log = get_logger()

all_tips: TipsIndex
block_tips: TipsIndex
tx_tips: TipsIndex
Expand All @@ -56,6 +67,39 @@ class IndexesManager(ABC):
addresses: Optional[AddressIndex]
tokens: Optional[TokensIndex]

def __init_checks__(self):
""" Implementations must call this at the **end** of their __init__ for running ValueError checks."""
# check if every index has a unique db_name
indexes_db_names = set()
for index in self.iter_all_indexes():
index_db_name = index.get_db_name()
if index_db_name is None:
continue
if index_db_name in indexes_db_names:
raise ValueError(f'duplicate index name "{index_db_name}", already in use by another index')
indexes_db_names.add(index_db_name)

def iter_all_indexes(self) -> Iterator[BaseIndex]:
""" Iterate over all of the indexes abstracted by this manager, hiding their specific implementation details"""
for _, index in self._iter_all_indexes_with_filter():
yield index

def _iter_all_indexes_with_filter(self) -> Iterator[Tuple[_IndexFilter, BaseIndex]]:
""" Same as `iter_all_indexes()`, but includes a filter for what transactions an index is interested in."""
yield _IndexFilter.ALL, self.all_tips
yield _IndexFilter.VALID_BLOCKS, self.block_tips
yield _IndexFilter.VALID_TXS, self.tx_tips
yield _IndexFilter.ALL, self.sorted_all
yield _IndexFilter.VALID_BLOCKS, self.sorted_blocks
yield _IndexFilter.VALID_TXS, self.sorted_txs
yield _IndexFilter.ALL, self.deps
yield _IndexFilter.ALL, self.height
yield _IndexFilter.ALL, self.mempool_tips
if self.addresses is not None:
yield _IndexFilter.ALL, self.addresses
if self.tokens is not None:
yield _IndexFilter.ALL, self.tokens

@abstractmethod
def enable_address_index(self, pubsub: 'PubSubManager') -> None:
"""Enable address index. It does nothing if it has already been enabled."""
Expand All @@ -66,23 +110,71 @@ def enable_tokens_index(self) -> None:
"""Enable tokens index. It does nothing if it has already been enabled."""
raise NotImplementedError

def _manually_initialize_tips_indexes(self, tx_storage: 'TransactionStorage') -> None:
""" Initialize the tips indexes, populating them from a tx_storage that is otherwise complete.
XXX: this method requires timestamp indexes to be complete and up-to-date with the rest of the database
XXX: this method is not yet being used
def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None:
""" Initialize the indexes, checking the indexes that need initialization, and the optimal iterator to use.
"""
for tx in tx_storage._topological_fast():
tx_meta = tx.get_metadata()
if not tx_meta.validation.is_final():
from hathor.transaction.genesis import BLOCK_GENESIS

db_last_started_at = tx_storage.get_last_started_at()
sorted_all_db_name = self.sorted_all.get_db_name()
if sorted_all_db_name is None:
can_use_fast_iterator = False
else:
sorted_all_index_last_started_at = tx_storage.get_index_last_started_at(sorted_all_db_name)
can_use_fast_iterator = db_last_started_at == sorted_all_index_last_started_at
if not can_use_fast_iterator:
self.log.info('initialization will start soon, please wait')

indexes_to_init: List[Tuple[_IndexFilter, BaseIndex]] = []
for index_filter, index in self._iter_all_indexes_with_filter():
index_db_name = index.get_db_name()
if index_db_name is None:
indexes_to_init.append((index_filter, index))
continue
index_last_started_at = tx_storage.get_index_last_started_at(index_db_name)
if db_last_started_at != index_last_started_at:
indexes_to_init.append((index_filter, index))

self.all_tips.add_tx(tx)
iter_tx: Iterator[BaseTransaction]
if can_use_fast_iterator:
iter_tx = tx_storage._topological_fast()
else:
iter_tx = tx_storage._topological_sort()

# make sure that all the indexes that we're rebuilding are cleared
for _, index in indexes_to_init:
index.force_clear()

block_count = 0
tx_count = 0
latest_timestamp = BLOCK_GENESIS.timestamp
first_timestamp = BLOCK_GENESIS.timestamp

for tx in progress(self.log, iter_tx):
# XXX: these would probably make more sense to be their own simple "indexes" instead of how it is here
latest_timestamp = max(tx.timestamp, latest_timestamp)
first_timestamp = min(tx.timestamp, first_timestamp)
if tx.is_block:
self.block_tips.add_tx(tx)
elif not tx_meta.voided_by:
self.tx_tips.add_tx(tx)
block_count += 1
else:
tx_count += 1

tx_meta = tx.get_metadata()

# feed each transaction to the indexes that they are interested in
for index_filter, index in indexes_to_init:
if index_filter is _IndexFilter.ALL:
index.init_loop_step(tx)
elif index_filter is _IndexFilter.VALID_BLOCKS:
if tx.is_block:
index.init_loop_step(tx)
elif index_filter is _IndexFilter.VALID_TXS:
if tx.is_transaction and not tx_meta.voided_by:
index.init_loop_step(tx)
else:
assert False, 'impossible filter'

tx_storage._update_caches(block_count, tx_count, latest_timestamp, first_timestamp)

def add_tx(self, tx: BaseTransaction) -> bool:
""" Add a transaction to the indexes
Expand Down Expand Up @@ -162,6 +254,9 @@ def __init__(self) -> None:
self.mempool_tips = MemoryMempoolTipsIndex()
self.deps = MemoryDepsIndex()

# XXX: this has to be at the end of __init__, after everything has been initialized
self.__init_checks__()

def enable_address_index(self, pubsub: 'PubSubManager') -> None:
from hathor.indexes.memory_address_index import MemoryAddressIndex
if self.addresses is None:
Expand All @@ -186,16 +281,19 @@ def __init__(self, db: 'rocksdb.DB') -> None:
self.block_tips = TipsIndex()
self.tx_tips = TipsIndex()

self.sorted_all = RocksDBTimestampIndex(self._db, cf_name=b'timestamp-sorted-all')
self.sorted_blocks = RocksDBTimestampIndex(self._db, cf_name=b'timestamp-sorted-blocks')
self.sorted_txs = RocksDBTimestampIndex(self._db, cf_name=b'timestamp-sorted-txs')
self.sorted_all = RocksDBTimestampIndex(self._db, 'all')
self.sorted_blocks = RocksDBTimestampIndex(self._db, 'blocks')
self.sorted_txs = RocksDBTimestampIndex(self._db, 'txs')

self.addresses = None
self.tokens = None
self.height = RocksDBHeightIndex(self._db)
self.mempool_tips = RocksDBMempoolTipsIndex(self._db)
self.deps = RocksDBDepsIndex(self._db)

# XXX: this has to be at the end of __init__, after everything has been initialized
self.__init_checks__()

def enable_address_index(self, pubsub: 'PubSubManager') -> None:
from hathor.indexes.rocksdb_address_index import RocksDBAddressIndex
if self.addresses is None:
Expand Down
11 changes: 10 additions & 1 deletion hathor/indexes/memory_address_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,20 @@
class MemoryAddressIndex(AddressIndex):
""" Index of inputs/outputs by address
"""

index: DefaultDict[str, Set[bytes]]

def __init__(self, pubsub: Optional['PubSubManager'] = None) -> None:
self.index: DefaultDict[str, Set[bytes]] = defaultdict(set)
self.pubsub = pubsub
if self.pubsub:
self.subscribe_pubsub_events()
self.force_clear()

def get_db_name(self) -> Optional[str]:
return None

def force_clear(self) -> None:
self.index = defaultdict(set)

def subscribe_pubsub_events(self) -> None:
""" Subscribe wallet index to receive voided/winner tx pubsub events
Expand Down
Loading

0 comments on commit f37ecdc

Please sign in to comment.