From 718bc002de0527af398c3708670bedfeaeefda1f Mon Sep 17 00:00:00 2001 From: Jan Segre Date: Tue, 26 Apr 2022 21:20:27 +0200 Subject: [PATCH] feat(init): speed up initialization --- hathor/exception.py | 5 + hathor/indexes/address_index.py | 8 +- hathor/indexes/base_index.py | 47 ++++++ hathor/indexes/deps_index.py | 11 +- hathor/indexes/height_index.py | 19 ++- hathor/indexes/manager.py | 137 +++++++++++++--- hathor/indexes/memory_address_index.py | 11 +- hathor/indexes/memory_deps_index.py | 25 ++- hathor/indexes/memory_height_index.py | 6 + hathor/indexes/memory_mempool_tips_index.py | 8 +- hathor/indexes/memory_timestamp_index.py | 6 + hathor/indexes/memory_tokens_index.py | 6 + hathor/indexes/mempool_tips_index.py | 8 +- hathor/indexes/rocksdb_address_index.py | 14 +- hathor/indexes/rocksdb_deps_index.py | 12 +- hathor/indexes/rocksdb_height_index.py | 13 +- hathor/indexes/rocksdb_mempool_tips_index.py | 8 + hathor/indexes/rocksdb_timestamp_index.py | 13 +- hathor/indexes/rocksdb_tokens_index.py | 12 +- hathor/indexes/rocksdb_utils.py | 51 +++--- hathor/indexes/timestamp_index.py | 8 +- hathor/indexes/tips_index.py | 19 ++- hathor/indexes/tokens_index.py | 9 +- hathor/manager.py | 129 ++++++++++++++- hathor/transaction/base_transaction.py | 6 +- .../storage/transaction_storage.py | 135 ++++++++++++++-- tests/tx/test_indexes2.py | 2 +- tests/tx/test_indexes3.py | 17 +- tests/tx/test_indexes4.py | 150 ++++++++++++++++++ tests/tx/test_tx_storage.py | 27 ++++ 30 files changed, 800 insertions(+), 122 deletions(-) create mode 100644 hathor/indexes/base_index.py create mode 100644 tests/tx/test_indexes4.py diff --git a/hathor/exception.py b/hathor/exception.py index 90e3e9958..25be223ef 100644 --- a/hathor/exception.py +++ b/hathor/exception.py @@ -22,3 +22,8 @@ class InvalidNewTransaction(HathorError): """Raised when a new received tx/block is not valid. """ pass + + +class InitializationError(HathorError): + """Raised when there's anything wrong during initialization that should cause it to be aborted. + """ diff --git a/hathor/indexes/address_index.py b/hathor/indexes/address_index.py index 18d6f798c..ebf848ada 100644 --- a/hathor/indexes/address_index.py +++ b/hathor/indexes/address_index.py @@ -12,11 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC, abstractmethod +from abc import abstractmethod from typing import TYPE_CHECKING, Iterable, List, Optional from structlog import get_logger +from hathor.indexes.base_index import BaseIndex from hathor.transaction import BaseTransaction if TYPE_CHECKING: # pragma: no cover @@ -25,11 +26,14 @@ logger = get_logger() -class AddressIndex(ABC): +class AddressIndex(BaseIndex): """ Index of inputs/outputs by address """ pubsub: Optional['PubSubManager'] + def init_loop_step(self, tx: BaseTransaction) -> None: + self.add_tx(tx) + def publish_tx(self, tx: BaseTransaction, *, addresses: Optional[Iterable[str]] = None) -> None: """ Publish WALLET_ADDRESS_HISTORY for all addresses of a transaction. """ diff --git a/hathor/indexes/base_index.py b/hathor/indexes/base_index.py new file mode 100644 index 000000000..000688601 --- /dev/null +++ b/hathor/indexes/base_index.py @@ -0,0 +1,47 @@ +# Copyright 2021 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import Optional + +from hathor.transaction.base_transaction import BaseTransaction + + +class BaseIndex(ABC): + """ All indexes must inherit from this index. + + This class exists so we can interact with indexes without knowing anything specific to its implemented. It was + created to generalize how we initialize indexes and keep track of which ones are up-to-date. + """ + + @abstractmethod + def get_db_name(self) -> Optional[str]: + """ The returned string is used to generate the relevant attributes for storing an indexe's state in the db. + + If None is returned, the database will not store the index initialization state and they will always be + initialized. This is the expected mode that memory-only indexes will use. + """ + raise NotImplementedError + + @abstractmethod + def init_loop_step(self, tx: BaseTransaction) -> None: + """ When the index needs to be initialized, this function will be called for every tx in topological order. + """ + raise NotImplementedError + + @abstractmethod + def force_clear(self) -> None: + """ Clear any existing data in the index. + """ + raise NotImplementedError diff --git a/hathor/indexes/deps_index.py b/hathor/indexes/deps_index.py index 6d98c6f56..5c2b56a57 100644 --- a/hathor/indexes/deps_index.py +++ b/hathor/indexes/deps_index.py @@ -12,9 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC, abstractmethod +from abc import abstractmethod from typing import TYPE_CHECKING, Iterator, List +from hathor.indexes.base_index import BaseIndex from hathor.transaction import BaseTransaction, Block if TYPE_CHECKING: # pragma: no cover @@ -48,7 +49,7 @@ def get_requested_from_height(tx: BaseTransaction) -> int: return block.get_metadata().height -class DepsIndex(ABC): +class DepsIndex(BaseIndex): """ Index of dependencies between transactions This index exists to accelerate queries related to the partial validation mechanism needed by sync-v2. More @@ -104,6 +105,12 @@ class DepsIndex(ABC): them. """ + def init_loop_step(self, tx: BaseTransaction) -> None: + tx_meta = tx.get_metadata() + if tx_meta.voided_by: + return + self.add_tx(tx, partial=False) + @abstractmethod def add_tx(self, tx: BaseTransaction, partial: bool = True) -> None: """Update 'deps' and 'needed' sub-indexes, removing them when necessary (i.e. validation is complete). diff --git a/hathor/indexes/height_index.py b/hathor/indexes/height_index.py index e526210da..655fe7e70 100644 --- a/hathor/indexes/height_index.py +++ b/hathor/indexes/height_index.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC, abstractmethod +from abc import abstractmethod from typing import List, NamedTuple, Optional, Tuple -from hathor.transaction import Block +from hathor.indexes.base_index import BaseIndex +from hathor.transaction import BaseTransaction, Block from hathor.transaction.genesis import BLOCK_GENESIS from hathor.util import not_none @@ -35,10 +36,22 @@ class _AddToIndexItem(NamedTuple): timestamp: int -class HeightIndex(ABC): +class HeightIndex(BaseIndex): """Store the block hash for each given height """ + def init_loop_step(self, tx: BaseTransaction) -> None: + if not tx.is_block: + return + if tx.is_genesis: + return + assert isinstance(tx, Block) + assert tx.hash is not None + tx_meta = tx.get_metadata() + if tx_meta.voided_by: + return + self.add_new(tx_meta.height, tx.hash, tx.timestamp) + @abstractmethod def add_new(self, height: int, block_hash: bytes, timestamp: int) -> None: """Add a new block to the height index that **MUST NOT** result in a re-org""" diff --git a/hathor/indexes/manager.py b/hathor/indexes/manager.py index 57f4f9bc7..b7bf7aa05 100644 --- a/hathor/indexes/manager.py +++ b/hathor/indexes/manager.py @@ -13,11 +13,13 @@ # limitations under the License. from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Optional +from enum import Enum, auto +from typing import TYPE_CHECKING, Iterator, List, Optional, Tuple from structlog import get_logger from hathor.indexes.address_index import AddressIndex +from hathor.indexes.base_index import BaseIndex from hathor.indexes.deps_index import DepsIndex from hathor.indexes.height_index import HeightIndex from hathor.indexes.mempool_tips_index import MempoolTipsIndex @@ -25,6 +27,7 @@ from hathor.indexes.tips_index import TipsIndex from hathor.indexes.tokens_index import TokensIndex from hathor.transaction import BaseTransaction +from hathor.util import progress if TYPE_CHECKING: # pragma: no cover import rocksdb @@ -35,6 +38,12 @@ logger = get_logger() +class _IndexFilter(Enum): + ALL = auto() # block or tx, voided or not + ALL_BLOCKS = auto() # only blocks that are not voided + VALID_TXS = auto() # only transactions that are not voided + + class IndexesManager(ABC): """ IndexesManager manages all the indexes that we will have in the system @@ -42,6 +51,8 @@ class IndexesManager(ABC): so it will know which index is better to use in each moment """ + log = get_logger() + all_tips: TipsIndex block_tips: TipsIndex tx_tips: TipsIndex @@ -56,6 +67,39 @@ class IndexesManager(ABC): addresses: Optional[AddressIndex] tokens: Optional[TokensIndex] + def __init_checks__(self): + """ Implementations must call this at the **end** of their __init__ for running ValueError checks.""" + # check if every index has a unique db_name + indexes_db_names = set() + for index in self.iter_all_indexes(): + index_db_name = index.get_db_name() + if index_db_name is None: + continue + if index_db_name in indexes_db_names: + raise ValueError(f'duplicate index name "{index_db_name}", already in use by another index') + indexes_db_names.add(index_db_name) + + def iter_all_indexes(self) -> Iterator[BaseIndex]: + """ Iterate over all of the indexes abstracted by this manager, hiding their specific implementation details""" + for _, index in self._iter_all_indexes_with_filter(): + yield index + + def _iter_all_indexes_with_filter(self) -> Iterator[Tuple[_IndexFilter, BaseIndex]]: + """ Same as `iter_all_indexes()`, but includes a filter for what transactions an index is interested in.""" + yield _IndexFilter.ALL, self.all_tips + yield _IndexFilter.ALL_BLOCKS, self.block_tips + yield _IndexFilter.VALID_TXS, self.tx_tips + yield _IndexFilter.ALL, self.sorted_all + yield _IndexFilter.ALL_BLOCKS, self.sorted_blocks + yield _IndexFilter.VALID_TXS, self.sorted_txs + yield _IndexFilter.ALL, self.deps + yield _IndexFilter.ALL, self.height + yield _IndexFilter.ALL, self.mempool_tips + if self.addresses is not None: + yield _IndexFilter.ALL, self.addresses + if self.tokens is not None: + yield _IndexFilter.ALL, self.tokens + @abstractmethod def enable_address_index(self, pubsub: 'PubSubManager') -> None: """Enable address index. It does nothing if it has already been enabled.""" @@ -66,23 +110,74 @@ def enable_tokens_index(self) -> None: """Enable tokens index. It does nothing if it has already been enabled.""" raise NotImplementedError - def _manually_initialize_tips_indexes(self, tx_storage: 'TransactionStorage') -> None: - """ Initialize the tips indexes, populating them from a tx_storage that is otherwise complete. + def force_clear_all(self) -> None: + """ Force clear all indexes. + """ + for index in self.iter_all_indexes(): + index.force_clear() - XXX: this method requires timestamp indexes to be complete and up-to-date with the rest of the database - XXX: this method is not yet being used + def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None: + """ Initialize the indexes, checking the indexes that need initialization, and the optimal iterator to use. """ - for tx in tx_storage._topological_sort_timestamp_index(): - tx_meta = tx.get_metadata() - if not tx_meta.validation.is_final(): - continue + from hathor.transaction.genesis import BLOCK_GENESIS + from hathor.transaction.storage.transaction_storage import NULL_INDEX_LAST_STARTED_AT - self.all_tips.add_tx(tx) + db_last_started_at = tx_storage.get_last_started_at() + indexes_to_init: List[Tuple[_IndexFilter, BaseIndex]] = [] + for index_filter, index in self._iter_all_indexes_with_filter(): + index_db_name = index.get_db_name() + if index_db_name is None: + indexes_to_init.append((index_filter, index)) + continue + index_last_started_at = tx_storage.get_index_last_started_at(index_db_name) + if db_last_started_at != index_last_started_at: + indexes_to_init.append((index_filter, index)) + + if indexes_to_init: + self.log.debug('there are indexes that need initialization', + indexes_to_init=[i for _, i in indexes_to_init]) + else: + self.log.debug('there are no indexes that need initialization') + + # make sure that all the indexes that we're rebuilding are cleared + for _, index in indexes_to_init: + index_db_name = index.get_db_name() + if index_db_name: + tx_storage.set_index_last_started_at(index_db_name, NULL_INDEX_LAST_STARTED_AT) + index.force_clear() + + block_count = 0 + tx_count = 0 + latest_timestamp = BLOCK_GENESIS.timestamp + first_timestamp = BLOCK_GENESIS.timestamp + + for tx in progress(tx_storage.topological_iterator(), log=self.log): + # XXX: these would probably make more sense to be their own simple "indexes" instead of how it is here + latest_timestamp = max(tx.timestamp, latest_timestamp) + first_timestamp = min(tx.timestamp, first_timestamp) if tx.is_block: - self.block_tips.add_tx(tx) - elif not tx_meta.voided_by: - self.tx_tips.add_tx(tx) + block_count += 1 + else: + tx_count += 1 + + tx_meta = tx.get_metadata() + + # feed each transaction to the indexes that they are interested in + for index_filter, index in indexes_to_init: + if index_filter is _IndexFilter.ALL: + index.init_loop_step(tx) + elif index_filter is _IndexFilter.ALL_BLOCKS: + if tx.is_block: + index.init_loop_step(tx) + elif index_filter is _IndexFilter.VALID_TXS: + # XXX: all indexes that use this filter treat soft-voided as voided, nothing special needed + if tx.is_transaction and not tx_meta.voided_by: + index.init_loop_step(tx) + else: + assert False, 'impossible filter' + + tx_storage._update_caches(block_count, tx_count, latest_timestamp, first_timestamp) def add_tx(self, tx: BaseTransaction) -> bool: """ Add a transaction to the indexes @@ -162,6 +257,9 @@ def __init__(self) -> None: self.mempool_tips = MemoryMempoolTipsIndex() self.deps = MemoryDepsIndex() + # XXX: this has to be at the end of __init__, after everything has been initialized + self.__init_checks__() + def enable_address_index(self, pubsub: 'PubSubManager') -> None: from hathor.indexes.memory_address_index import MemoryAddressIndex if self.addresses is None: @@ -176,8 +274,8 @@ def enable_tokens_index(self) -> None: class RocksDBIndexesManager(IndexesManager): def __init__(self, db: 'rocksdb.DB') -> None: from hathor.indexes.memory_deps_index import MemoryDepsIndex + from hathor.indexes.memory_mempool_tips_index import MemoryMempoolTipsIndex from hathor.indexes.rocksdb_height_index import RocksDBHeightIndex - from hathor.indexes.rocksdb_mempool_tips_index import RocksDBMempoolTipsIndex from hathor.indexes.rocksdb_timestamp_index import RocksDBTimestampIndex self._db = db @@ -186,16 +284,19 @@ def __init__(self, db: 'rocksdb.DB') -> None: self.block_tips = TipsIndex() self.tx_tips = TipsIndex() - self.sorted_all = RocksDBTimestampIndex(self._db, cf_name=b'timestamp-sorted-all') - self.sorted_blocks = RocksDBTimestampIndex(self._db, cf_name=b'timestamp-sorted-blocks') - self.sorted_txs = RocksDBTimestampIndex(self._db, cf_name=b'timestamp-sorted-txs') + self.sorted_all = RocksDBTimestampIndex(self._db, 'all') + self.sorted_blocks = RocksDBTimestampIndex(self._db, 'blocks') + self.sorted_txs = RocksDBTimestampIndex(self._db, 'txs') self.addresses = None self.tokens = None self.height = RocksDBHeightIndex(self._db) - self.mempool_tips = RocksDBMempoolTipsIndex(self._db) + self.mempool_tips = MemoryMempoolTipsIndex() # use of RocksDBMempoolTipsIndex is very slow and was suspended self.deps = MemoryDepsIndex() # use of RocksDBDepsIndex is currently suspended until it is fixed + # XXX: this has to be at the end of __init__, after everything has been initialized + self.__init_checks__() + def enable_address_index(self, pubsub: 'PubSubManager') -> None: from hathor.indexes.rocksdb_address_index import RocksDBAddressIndex if self.addresses is None: diff --git a/hathor/indexes/memory_address_index.py b/hathor/indexes/memory_address_index.py index 5bed5d104..a771d611c 100644 --- a/hathor/indexes/memory_address_index.py +++ b/hathor/indexes/memory_address_index.py @@ -30,12 +30,21 @@ class MemoryAddressIndex(AddressIndex): """ Index of inputs/outputs by address """ + + index: DefaultDict[str, Set[bytes]] + def __init__(self, pubsub: Optional['PubSubManager'] = None) -> None: - self.index: DefaultDict[str, Set[bytes]] = defaultdict(set) self.pubsub = pubsub + self.force_clear() if self.pubsub: self.subscribe_pubsub_events() + def get_db_name(self) -> Optional[str]: + return None + + def force_clear(self) -> None: + self.index = defaultdict(set) + def subscribe_pubsub_events(self) -> None: """ Subscribe wallet index to receive voided/winner tx pubsub events """ diff --git a/hathor/indexes/memory_deps_index.py b/hathor/indexes/memory_deps_index.py index 85ef79ca4..2c9d77eda 100644 --- a/hathor/indexes/memory_deps_index.py +++ b/hathor/indexes/memory_deps_index.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Dict, FrozenSet, Iterator, List, Set, Tuple +from typing import TYPE_CHECKING, Dict, FrozenSet, Iterator, List, Optional, Set, Tuple from structlog import get_logger @@ -27,17 +27,26 @@ class MemoryDepsIndex(DepsIndex): + # Reverse dependency mapping + _rev_dep_index: Dict[bytes, Set[bytes]] + + # Ready to be validated cache + _txs_with_deps_ready: Set[bytes] + + # Next to be downloaded + _needed_txs_index: Dict[bytes, Tuple[int, bytes]] + def __init__(self): self.log = logger.new() + self.force_clear() - # Reverse dependency mapping - self._rev_dep_index: Dict[bytes, Set[bytes]] = {} - - # Ready to be validated cache - self._txs_with_deps_ready: Set[bytes] = set() + def get_db_name(self) -> Optional[str]: + return None - # Next to be downloaded - self._needed_txs_index: Dict[bytes, Tuple[int, bytes]] = {} + def force_clear(self) -> None: + self._rev_dep_index = {} + self._txs_with_deps_ready = set() + self._needed_txs_index = {} def add_tx(self, tx: BaseTransaction, partial: bool = True) -> None: assert tx.hash is not None diff --git a/hathor/indexes/memory_height_index.py b/hathor/indexes/memory_height_index.py index 1d94c781d..5b776881c 100644 --- a/hathor/indexes/memory_height_index.py +++ b/hathor/indexes/memory_height_index.py @@ -24,6 +24,12 @@ class MemoryHeightIndex(HeightIndex): _index: List[IndexEntry] def __init__(self) -> None: + self.force_clear() + + def get_db_name(self) -> Optional[str]: + return None + + def force_clear(self) -> None: self._index = [BLOCK_GENESIS_ENTRY] def _add(self, height: int, block_hash: bytes, timestamp: int, *, can_reorg: bool) -> None: diff --git a/hathor/indexes/memory_mempool_tips_index.py b/hathor/indexes/memory_mempool_tips_index.py index 60f0017a1..9a8fd7c9f 100644 --- a/hathor/indexes/memory_mempool_tips_index.py +++ b/hathor/indexes/memory_mempool_tips_index.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Iterable, Set +from typing import Iterable, Optional, Set from structlog import get_logger @@ -26,6 +26,12 @@ class MemoryMempoolTipsIndex(ByteCollectionMempoolTipsIndex): def __init__(self): self.log = logger.new() + self.force_clear() + + def get_db_name(self) -> Optional[str]: + return None + + def force_clear(self) -> None: self._index = set() def _discard(self, tx: bytes) -> None: diff --git a/hathor/indexes/memory_timestamp_index.py b/hathor/indexes/memory_timestamp_index.py index 5cef631f6..d61e32677 100644 --- a/hathor/indexes/memory_timestamp_index.py +++ b/hathor/indexes/memory_timestamp_index.py @@ -37,6 +37,12 @@ class MemoryTimestampIndex(TimestampIndex): def __init__(self) -> None: self.log = logger.new() + self.force_clear() + + def get_db_name(self) -> Optional[str]: + return None + + def force_clear(self) -> None: self._index = SortedKeyList(key=lambda x: (x.timestamp, x.hash)) def add_tx(self, tx: BaseTransaction) -> bool: diff --git a/hathor/indexes/memory_tokens_index.py b/hathor/indexes/memory_tokens_index.py index 1d132ad8f..bdc8e2fa8 100644 --- a/hathor/indexes/memory_tokens_index.py +++ b/hathor/indexes/memory_tokens_index.py @@ -68,6 +68,12 @@ def iter_melt_utxos(self) -> Iterator[TokenUtxoInfo]: class MemoryTokensIndex(TokensIndex): def __init__(self) -> None: self.log = logger.new() + self.force_clear() + + def get_db_name(self) -> Optional[str]: + return None + + def force_clear(self) -> None: self._tokens: Dict[bytes, MemoryTokenIndexInfo] = defaultdict(MemoryTokenIndexInfo) def _add_to_index(self, tx: BaseTransaction, index: int) -> None: diff --git a/hathor/indexes/mempool_tips_index.py b/hathor/indexes/mempool_tips_index.py index 7e739a246..915e90cd5 100644 --- a/hathor/indexes/mempool_tips_index.py +++ b/hathor/indexes/mempool_tips_index.py @@ -12,12 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC, abstractmethod +from abc import abstractmethod from collections.abc import Collection from typing import TYPE_CHECKING, Iterable, Iterator, Optional, Set, cast import structlog +from hathor.indexes.base_index import BaseIndex from hathor.transaction import BaseTransaction, Transaction from hathor.util import not_none @@ -25,9 +26,12 @@ from hathor.transaction.storage import TransactionStorage -class MempoolTipsIndex(ABC): +class MempoolTipsIndex(BaseIndex): """Index to access the tips of the mempool transactions, which haven't been confirmed by a block.""" + def init_loop_step(self, tx: BaseTransaction) -> None: + self.update(tx) + # originally tx_storage.update_mempool_tips @abstractmethod def update(self, tx: BaseTransaction) -> None: diff --git a/hathor/indexes/rocksdb_address_index.py b/hathor/indexes/rocksdb_address_index.py index e6dde2b90..b68861d0b 100644 --- a/hathor/indexes/rocksdb_address_index.py +++ b/hathor/indexes/rocksdb_address_index.py @@ -29,6 +29,7 @@ logger = get_logger() _CF_NAME_ADDRESS_INDEX = b'address-index' +_DB_NAME: str = 'address' class RocksDBAddressIndex(AddressIndex, RocksDBIndexUtils): @@ -46,17 +47,20 @@ class RocksDBAddressIndex(AddressIndex, RocksDBIndexUtils): """ def __init__(self, db: 'rocksdb.DB', *, cf_name: Optional[bytes] = None, pubsub: Optional['PubSubManager'] = None) -> None: - RocksDBIndexUtils.__init__(self, db) self.log = logger.new() - - # column family stuff - self._cf_name = cf_name or _CF_NAME_ADDRESS_INDEX - self._cf = self._fresh_cf(self._cf_name) + RocksDBIndexUtils.__init__(self, db, cf_name or _CF_NAME_ADDRESS_INDEX) self.pubsub = pubsub if self.pubsub: self.subscribe_pubsub_events() + def get_db_name(self) -> Optional[str]: + # XXX: we don't need it to be parametrizable, so this is fine + return _DB_NAME + + def force_clear(self) -> None: + self.clear() + def _to_key(self, address: str, tx: Optional[BaseTransaction] = None) -> bytes: import struct assert len(address) == 34 diff --git a/hathor/indexes/rocksdb_deps_index.py b/hathor/indexes/rocksdb_deps_index.py index 539dd804e..55169ba50 100644 --- a/hathor/indexes/rocksdb_deps_index.py +++ b/hathor/indexes/rocksdb_deps_index.py @@ -31,6 +31,7 @@ logger = get_logger() _CF_NAME_DEPS_INDEX = b'deps-index' +_DB_NAME: str = 'deps' class _Tag(Enum): @@ -73,12 +74,15 @@ def __init__(self, db: 'rocksdb.DB', *, cf_name: Optional[bytes] = None, _force: if not _force: # See: https://github.com/HathorNetwork/hathor-core/issues/412 raise TypeError('This class should not be used') - RocksDBIndexUtils.__init__(self, db) self.log = logger.new() + RocksDBIndexUtils.__init__(self, db, cf_name or _CF_NAME_DEPS_INDEX) - # column family stuff - self._cf_name = cf_name or _CF_NAME_DEPS_INDEX - self._cf = self._fresh_cf(self._cf_name) + def get_db_name(self) -> Optional[str]: + # XXX: we don't need it to be parametrizable, so this is fine + return _DB_NAME + + def force_clear(self) -> None: + self.clear() def _to_key_ready(self, tx_hash: Optional[bytes] = None) -> bytes: """Make a key for accessing READY txs 'set'""" diff --git a/hathor/indexes/rocksdb_height_index.py b/hathor/indexes/rocksdb_height_index.py index 21237ae3f..c652ab0c9 100644 --- a/hathor/indexes/rocksdb_height_index.py +++ b/hathor/indexes/rocksdb_height_index.py @@ -27,6 +27,7 @@ logger = get_logger() _CF_NAME_HEIGHT_INDEX = b'height-index' +_DB_NAME: str = 'height' class RocksDBHeightIndex(HeightIndex, RocksDBIndexUtils): @@ -44,15 +45,15 @@ class RocksDBHeightIndex(HeightIndex, RocksDBIndexUtils): """ def __init__(self, db: 'rocksdb.DB', *, cf_name: Optional[bytes] = None) -> None: - RocksDBIndexUtils.__init__(self, db) self.log = logger.new() + RocksDBIndexUtils.__init__(self, db, cf_name or _CF_NAME_HEIGHT_INDEX) - # column family stuff - self._cf_name = cf_name or _CF_NAME_HEIGHT_INDEX - self._cf = self._fresh_cf(self._cf_name) + def get_db_name(self) -> Optional[str]: + # XXX: we don't need it to be parametrizable, so this is fine + return _DB_NAME - # XXX: when we stop using a fresh column-family we have to figure-out when to not run this - self._init_db() + def force_clear(self) -> None: + self.clear() def _init_db(self) -> None: """ Initialize the database with the genesis entry.""" diff --git a/hathor/indexes/rocksdb_mempool_tips_index.py b/hathor/indexes/rocksdb_mempool_tips_index.py index 39fdcfb41..a2c6c7ffe 100644 --- a/hathor/indexes/rocksdb_mempool_tips_index.py +++ b/hathor/indexes/rocksdb_mempool_tips_index.py @@ -25,6 +25,7 @@ logger = get_logger() _CF_NAME_MEMPOOL_TIPS_INDEX = b'mempool-tips-index' +_DB_NAME: str = 'mempool_tips' class RocksDBMempoolTipsIndex(ByteCollectionMempoolTipsIndex): @@ -35,6 +36,13 @@ def __init__(self, db: 'rocksdb.DB', *, cf_name: Optional[bytes] = None) -> None _cf_name = cf_name or _CF_NAME_MEMPOOL_TIPS_INDEX self._index = RocksDBSimpleSet(db, self.log, cf_name=_cf_name) + def get_db_name(self) -> Optional[str]: + # XXX: we don't need it to be parametrizable, so this is fine + return _DB_NAME + + def force_clear(self) -> None: + self._index.clear() + def _discard(self, tx: bytes) -> None: self._index.discard(tx) diff --git a/hathor/indexes/rocksdb_timestamp_index.py b/hathor/indexes/rocksdb_timestamp_index.py index c0f3d3b33..a530b2453 100644 --- a/hathor/indexes/rocksdb_timestamp_index.py +++ b/hathor/indexes/rocksdb_timestamp_index.py @@ -38,13 +38,16 @@ class RocksDBTimestampIndex(TimestampIndex, RocksDBIndexUtils): It works nicely because rocksdb uses a tree sorted by key under the hood. """ - def __init__(self, db: 'rocksdb.DB', *, cf_name: bytes) -> None: - RocksDBIndexUtils.__init__(self, db) + def __init__(self, db: 'rocksdb.DB', name: str) -> None: self.log = logger.new() + RocksDBIndexUtils.__init__(self, db, f'timestamp-sorted-{name}'.encode()) + self._name = name - # column family stuff - self._cf_name = cf_name - self._cf = self._fresh_cf(self._cf_name) + def get_db_name(self) -> Optional[str]: + return f'timestamp_{self._name}' + + def force_clear(self) -> None: + self.clear() def _to_key(self, timestamp: int, tx_hash: Optional[bytes] = None) -> bytes: """Make a key for a timestamp and optionally tx_hash, the key represents the membership itself.""" diff --git a/hathor/indexes/rocksdb_tokens_index.py b/hathor/indexes/rocksdb_tokens_index.py index df96e25f0..67e3648ea 100644 --- a/hathor/indexes/rocksdb_tokens_index.py +++ b/hathor/indexes/rocksdb_tokens_index.py @@ -39,6 +39,7 @@ logger = get_logger() _CF_NAME_TOKENS_INDEX = b'tokens-index' +_DB_NAME: str = 'tokens' # the following type is used to help a little bit to distinguish when we're using a byte sequence that should only be # internally used @@ -91,12 +92,15 @@ class RocksDBTokensIndex(TokensIndex, RocksDBIndexUtils): """ def __init__(self, db: 'rocksdb.DB', *, cf_name: Optional[bytes] = None) -> None: - RocksDBIndexUtils.__init__(self, db) self.log = logger.new() + RocksDBIndexUtils.__init__(self, db, cf_name or _CF_NAME_TOKENS_INDEX) - # column family stuff - self._cf_name = cf_name or _CF_NAME_TOKENS_INDEX - self._cf = self._fresh_cf(self._cf_name) + def get_db_name(self) -> Optional[str]: + # XXX: we don't need it to be parametrizable, so this is fine + return _DB_NAME + + def force_clear(self) -> None: + self.clear() def _to_internal_token_uid(self, token_uid: bytes) -> _InternalUid: """Normalizes a token_uid so that the native token (\x00) will have the same length as custom tokens.""" diff --git a/hathor/indexes/rocksdb_utils.py b/hathor/indexes/rocksdb_utils.py index 72381a4a0..482f745fb 100644 --- a/hathor/indexes/rocksdb_utils.py +++ b/hathor/indexes/rocksdb_utils.py @@ -56,32 +56,37 @@ class RocksDBIndexUtils: _cf: 'rocksdb.ColumnFamilyHandle' log: 'structlog.stdlib.BoundLogger' - def __init__(self, db: 'rocksdb.DB') -> None: + def __init__(self, db: 'rocksdb.DB', cf_name: bytes) -> None: + self._log = self.log.new(cf=cf_name.decode('ascii')) self._db = db + self._cf_name = cf_name + self._ensure_cf_exists(cf_name) + + def _init_db(self): + """ Inheritors of this class may implement this to initialize a column family when it is just created.""" + pass - def _fresh_cf(self, cf_name: bytes) -> 'rocksdb.ColumnFamilyHandle': - """Ensure we have a working and fresh column family""" + def _ensure_cf_exists(self, cf_name: bytes) -> None: + """Ensure we have a working and column family, loading the previous one if it exists""" import rocksdb - log_cf = self.log.new(cf=cf_name.decode('ascii')) - _cf = self._db.get_column_family(cf_name) - # XXX: dropping column because initialization currently expects a fresh index - if _cf is not None: - old_id = _cf.id - log_cf.debug('drop existing column family') - self._db.drop_column_family(_cf) - else: - old_id = None - log_cf.debug('no need to drop column family') - del _cf - log_cf.debug('create fresh column family') - _cf = self._db.create_column_family(cf_name, rocksdb.ColumnFamilyOptions()) - new_id = _cf.id - assert _cf is not None - assert _cf.is_valid + self._cf = self._db.get_column_family(cf_name) + if self._cf is None: + self._cf = self._db.create_column_family(cf_name, rocksdb.ColumnFamilyOptions()) + self._init_db() + self._log.debug('got column family', is_valid=self._cf.is_valid, id=self._cf.id) + + def clear(self) -> None: + old_id = self._cf.id + self._log.debug('drop existing column family') + self._db.drop_column_family(self._cf) + del self._cf + self._ensure_cf_exists(self._cf_name) + new_id = self._cf.id + assert self._cf is not None + assert self._cf.is_valid assert new_id != old_id - log_cf.debug('got column family', is_valid=_cf.is_valid, id=_cf.id, old_id=old_id) - return _cf + self._log.debug('got new column family', id=new_id, old_id=old_id) def _clone_into_dict(self) -> Dict[bytes, bytes]: """This method will make a copy of the database into a plain dict, be careful when running on large dbs.""" @@ -93,10 +98,8 @@ def _clone_into_dict(self) -> Dict[bytes, bytes]: # XXX: should be `Collection[bytes]`, which only works on Python 3.9+ class RocksDBSimpleSet(Collection, RocksDBIndexUtils): def __init__(self, db: 'rocksdb.DB', log: 'structlog.stdlib.BoundLogger', *, cf_name: bytes) -> None: - super().__init__(db) self.log = log - self._cf_name = cf_name - self._cf = self._fresh_cf(self._cf_name) + super().__init__(db, cf_name) def __iter__(self) -> Iterator[bytes]: it = self._db.iterkeys(self._cf) diff --git a/hathor/indexes/timestamp_index.py b/hathor/indexes/timestamp_index.py index 8e2d2f243..e2dea623e 100644 --- a/hathor/indexes/timestamp_index.py +++ b/hathor/indexes/timestamp_index.py @@ -12,11 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC, abstractmethod +from abc import abstractmethod from typing import Iterator, List, NamedTuple, Optional, Tuple from structlog import get_logger +from hathor.indexes.base_index import BaseIndex from hathor.transaction import BaseTransaction logger = get_logger() @@ -27,10 +28,13 @@ class RangeIdx(NamedTuple): offset: int -class TimestampIndex(ABC): +class TimestampIndex(BaseIndex): """ Index of transactions sorted by their timestamps. """ + def init_loop_step(self, tx: BaseTransaction) -> None: + self.add_tx(tx) + @abstractmethod def add_tx(self, tx: BaseTransaction) -> bool: """ Add a transaction to the index diff --git a/hathor/indexes/tips_index.py b/hathor/indexes/tips_index.py index 5dc6af275..d865d23c9 100644 --- a/hathor/indexes/tips_index.py +++ b/hathor/indexes/tips_index.py @@ -13,17 +13,18 @@ # limitations under the License. from math import inf -from typing import Dict, Set +from typing import Dict, Optional, Set from intervaltree import Interval, IntervalTree from structlog import get_logger +from hathor.indexes.base_index import BaseIndex from hathor.transaction import BaseTransaction logger = get_logger() -class TipsIndex: +class TipsIndex(BaseIndex): """ Use an interval tree to quick get the tips at a given timestamp. The interval of a transaction is in the form [begin, end), where `begin` is @@ -48,8 +49,20 @@ class TipsIndex: def __init__(self) -> None: self.log = logger.new() + self.force_clear() + + def get_db_name(self) -> Optional[str]: + return None + + def force_clear(self) -> None: self.tree = IntervalTree() - self.tx_last_interval = {} # Dict[bytes(hash), Interval] + self.tx_last_interval = {} + + def init_loop_step(self, tx: BaseTransaction) -> None: + tx_meta = tx.get_metadata() + if not tx_meta.validation.is_final(): + return + self.add_tx(tx) def add_tx(self, tx: BaseTransaction) -> bool: """ Add a new transaction to the index diff --git a/hathor/indexes/tokens_index.py b/hathor/indexes/tokens_index.py index 850c285e6..27c38fa1d 100644 --- a/hathor/indexes/tokens_index.py +++ b/hathor/indexes/tokens_index.py @@ -15,6 +15,7 @@ from abc import ABC, abstractmethod from typing import Iterator, List, NamedTuple, Optional, Tuple +from hathor.indexes.base_index import BaseIndex from hathor.transaction import BaseTransaction @@ -57,10 +58,16 @@ def iter_melt_utxos(self) -> Iterator[TokenUtxoInfo]: raise NotImplementedError -class TokensIndex(ABC): +class TokensIndex(BaseIndex): """ Index of tokens by token uid """ + def init_loop_step(self, tx: BaseTransaction) -> None: + tx_meta = tx.get_metadata() + if tx_meta.voided_by: + return + self.add_tx(tx) + @abstractmethod def add_tx(self, tx: BaseTransaction) -> None: """ Checks if this tx has mint or melt inputs/outputs and adds to tokens index diff --git a/hathor/manager.py b/hathor/manager.py index e1345c1c4..47b871320 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -27,7 +27,7 @@ from hathor.checkpoint import Checkpoint from hathor.conf import HathorSettings from hathor.consensus import ConsensusAlgorithm -from hathor.exception import HathorError, InvalidNewTransaction +from hathor.exception import HathorError, InitializationError, InvalidNewTransaction from hathor.mining import BlockTemplate, BlockTemplates from hathor.p2p.peer_discovery import PeerDiscovery from hathor.p2p.peer_id import PeerId @@ -248,11 +248,14 @@ def start(self) -> None: # Disable get transaction lock when initializing components self.tx_storage.disable_lock() # Initialize manager's components. - self._initialize_components() if self._full_verification: + self.tx_storage.reset_indexes() + self._initialize_components() # Before calling self._initialize_components() I start 'full verification' mode and after that I need to # finish it. It's just to know if the full node has stopped a full initialization in the middle self.tx_storage.finish_full_verification() + else: + self._initialize_components_new() self.tx_storage.enable_lock() # Metric starts to capture data @@ -507,6 +510,128 @@ def _initialize_components(self) -> None: tx_rate = '?' if tdt == 0 else cnt / tdt self.log.info('ready', tx_count=cnt, tx_rate=tx_rate, total_dt=tdt, height=h, blocks=block_count, txs=tx_count) + def _initialize_components_new(self) -> None: + """You are not supposed to run this method manually. You should run `doStart()` to initialize the + manager. + + This method runs through all transactions, verifying them and updating our wallet. + """ + self.log.info('initialize') + if self.wallet: + self.wallet._manually_initialize() + + self.tx_storage.pre_init() + assert self.tx_storage.indexes is not None + + started_at = int(time.time()) + last_started_at = self.tx_storage.get_last_started_at() + if last_started_at >= started_at: + # XXX: although last_started_at==started_at is not _techincally_ to the future, it's strange enough to + # deserve a warning, but not special enough to deserve a customized message IMO + self.log.warn('The last started time is to the future of the current time', + started_at=started_at, last_started_at=last_started_at) + + # TODO: this could be either refactored into a migration or at least into it's own method + # After introducing soft voided transactions we need to guarantee the full node is not using + # a database that already has the soft voided transaction before marking them in the metadata + # Any new sync from the beginning should work fine or starting with the latest snapshot + # that already has the soft voided transactions marked + for soft_voided_id in settings.SOFT_VOIDED_TX_IDS: + try: + soft_voided_tx = self.tx_storage.get_transaction(soft_voided_id) + except TransactionDoesNotExist: + # This database does not have this tx that should be soft voided + # so it's fine, we will mark it as soft voided when we get it through sync + pass + else: + soft_voided_meta = soft_voided_tx.get_metadata() + voided_set = soft_voided_meta.voided_by or set() + # If the tx is not marked as soft voided, then we can't continue the initialization + if settings.SOFT_VOIDED_ID not in voided_set: + self.log.error( + 'Error initializing node. Your database is not compatible with the current version of the' + ' full node. You must use the latest available snapshot or sync from the beginning.' + ) + sys.exit(-1) + + assert {soft_voided_id, settings.SOFT_VOIDED_ID}.issubset(voided_set) + + # TODO: move support for full-verification here, currently we rely on the original _initialize_components + # method for full-verification to work, if we implement it here we'll reduce a lot of duplicate and + # complex code + self.tx_storage.indexes._manually_initialize(self.tx_storage) + + # Verify if all checkpoints that exist in the database are correct + try: + self._verify_checkpoints() + except InitializationError: + self.log.exception('Initialization error when checking checkpoints, cannot continue.') + sys.exit() + + # restart all validations possible + self._resume_validations() + + # XXX: last step before actually starting is updating the last started at timestamps + self.tx_storage.update_last_started_at(started_at) + self.state = self.NodeState.READY + self.log.info('ready') + + def _verify_checkpoints(self) -> None: + """ Method to verify if all checkpoints that exist in the database have the correct hash and are winners. + + This method needs the essential indexes to be already initialized. + """ + assert self.tx_storage.indexes is not None + # based on the current best-height, filter-out checkpoints that aren't expected to exist in the database + best_height = self.tx_storage.get_height_best_block() + expected_checkpoints = [cp for cp in self.checkpoints if cp.height <= best_height] + for checkpoint in expected_checkpoints: + # XXX: query the database from checkpoint.hash and verify what comes out + try: + tx = self.tx_storage.get_transaction(checkpoint.hash) + except TransactionDoesNotExist as e: + raise InitializationError(f'Expected checkpoint does not exist in database: {checkpoint}') from e + assert tx.hash is not None + tx_meta = tx.get_metadata() + if tx_meta.height != checkpoint.height: + raise InitializationError( + f'Expected checkpoint of hash {tx.hash_hex} to have height {checkpoint.height}, but instead it has' + f'height {tx_meta.height}' + ) + if tx_meta.voided_by: + pretty_voided_by = list(i.hex() for i in tx_meta.voided_by) + raise InitializationError( + f'Expected checkpoint {checkpoint} to *NOT* be voided, but it is being voided by: ' + f'{pretty_voided_by}' + ) + # XXX: query the height index from checkpoint.height and check that the hash matches + tx_hash = self.tx_storage.indexes.height.get(checkpoint.height) + if tx_hash is None: + raise InitializationError( + f'Expected checkpoint {checkpoint} to be found in the height index, but it was not found' + ) + if tx_hash != tx.hash: + raise InitializationError( + f'Expected checkpoint {checkpoint} to be found in the height index, but it instead the block with ' + f'hash {tx_hash.hex()} was found' + ) + + def _resume_validations(self) -> None: + """ This method will resume running validations that did not run because the node exited. + """ + assert self.tx_storage.indexes is not None + if self.tx_storage.indexes.deps.has_needed_tx(): + self.log.debug('run pending validations') + depended_final_txs: List[BaseTransaction] = [] + for tx_hash in self.tx_storage.indexes.deps.iter(): + if not self.tx_storage.transaction_exists(tx_hash): + continue + tx = self.tx_storage.get_transaction(tx_hash) + if tx.get_metadata().validation.is_final(): + depended_final_txs.append(tx) + self.step_validations(depended_final_txs) + self.log.debug('pending validations finished') + def add_listen_address(self, addr: str) -> None: self.listen_addresses.append(addr) diff --git a/hathor/transaction/base_transaction.py b/hathor/transaction/base_transaction.py index 25a55e223..184ca568e 100644 --- a/hathor/transaction/base_transaction.py +++ b/hathor/transaction/base_transaction.py @@ -832,7 +832,8 @@ def get_metadata(self, *, force_reload: bool = False, use_storage: bool = True) # which requires the use of a storage, this is a workaround that should be fixed, places where this # happens include generating new mining blocks and some tests height = self.calculate_height() if self.storage else 0 - metadata = TransactionMetadata(hash=self.hash, accumulated_weight=self.weight, height=height) + score = self.weight if self.is_genesis else 0 + metadata = TransactionMetadata(hash=self.hash, accumulated_weight=self.weight, height=height, score=score) self._metadata = metadata if not metadata.hash: metadata.hash = self.hash @@ -844,7 +845,8 @@ def reset_metadata(self) -> None: recalculating all metadata. """ assert self.storage is not None - self._metadata = TransactionMetadata(hash=self.hash, + score = self.weight if self.is_genesis else 0 + self._metadata = TransactionMetadata(hash=self.hash, score=score, accumulated_weight=self.weight, height=self.calculate_height()) self._metadata._tx_ref = weakref.ref(self) diff --git a/hathor/transaction/storage/transaction_storage.py b/hathor/transaction/storage/transaction_storage.py index eedd6a35f..7816a507a 100644 --- a/hathor/transaction/storage/transaction_storage.py +++ b/hathor/transaction/storage/transaction_storage.py @@ -33,6 +33,13 @@ settings = HathorSettings() +# these are the timestamp values to be used when resetting them, 1 is used for the node instead of 0, so it can be +# greater, that way if both are reset (which also happens on a database that never run this implementation before) we +# guarantee that indexes will be initialized (because they would be "older" than the node timestamp). +NULL_INDEX_LAST_STARTED_AT = 0 +NULL_LAST_STARTED_AT = 1 +INDEX_ATTR_PREFIX = 'index_' + class AllTipsCache(NamedTuple): timestamp: int @@ -50,6 +57,18 @@ class TransactionStorage(ABC): log = get_logger() + # Key storage attribute to save if the network stored is the expected network + _network_attribute: str = 'network' + + # Key storage attribute to save if the full node is running a full verification + _running_full_verification_attribute: str = 'running_full_verification' + + # Key storage attribute to save if the manager is running + _manager_running_attribute: str = 'manager_running' + + # Ket storage attribute to save the last time the node started + _last_start_attribute: str = 'last_start' + def __init__(self): # Weakref is used to guarantee that there is only one instance of each transaction in memory. self._tx_weakref: WeakValueDictionary[bytes, BaseTransaction] = WeakValueDictionary() @@ -83,14 +102,18 @@ def __init__(self): # Initialize cache for genesis transactions. self._genesis_cache: Dict[bytes, BaseTransaction] = {} - # Key storage attribute to save if the network stored is the expected network - self._network_attribute: str = 'network' + # Internal toggle to choose when to select topological DFS iterator, used only on some tests + self._always_use_topological_dfs = False - # Key storage attribute to save if the full node is running a full verification - self._running_full_verification_attribute: str = 'running_full_verification' + @abstractmethod + def _update_caches(self, block_count: int, tx_count: int, latest_timestamp: int, first_timestamp: int) -> None: + """Update ephemeral caches, should only be used internally.""" + raise NotImplementedError - # Key storage attribute to save if the manager is running - self._manager_running_attribute: str = 'manager_running' + @abstractmethod + def reset_indexes(self) -> None: + """Reset all the indexes, making sure that no persisted value is reused.""" + raise NotImplementedError def update_best_block_tips_cache(self, tips_cache: List[bytes]) -> None: # XXX: check that the cache update is working properly, only used in unittests @@ -508,6 +531,40 @@ def _manually_initialize(self) -> None: """ pass + def topological_iterator(self) -> Iterator[BaseTransaction]: + """This method will return the fastest topological iterator available based on the database state. + + This will be: + + - self._topological_sort_timestamp_index() when the timestamp index is up-to-date + - self._topological_sort_metadata() otherwise, metadata is assumed to be up-to-date + - self._topological_sort_dfs() when the private property `_always_use_topological_dfs` is set to `True` + """ + # TODO: we currently assume that metadata is up-to-date, and thus this method can only run when that assumption + # is known to be true, but we could add a mechanism similar to what indexes use to know they're + # up-to-date and get rid of that assumption so this method can be used without having to make any + # assumptions + assert self.indexes is not None + + if self._always_use_topological_dfs: + return self._topological_sort_dfs() + + db_last_started_at = self.get_last_started_at() + sorted_all_db_name = self.indexes.sorted_all.get_db_name() + if sorted_all_db_name is None: + can_use_timestamp_index = False + else: + sorted_all_index_last_started_at = self.get_index_last_started_at(sorted_all_db_name) + can_use_timestamp_index = db_last_started_at == sorted_all_index_last_started_at + + iter_tx: Iterator[BaseTransaction] + if can_use_timestamp_index: + iter_tx = self._topological_sort_timestamp_index() + else: + iter_tx = self._topological_sort_metadata() + + return iter_tx + @abstractmethod def _topological_sort_dfs(self) -> Iterator[BaseTransaction]: """Return an iterable of the transactions in topological ordering, i.e., from genesis to the most recent @@ -643,6 +700,42 @@ def is_running_manager(self) -> bool: """ return self.get_value(self._manager_running_attribute) == '1' + def get_last_started_at(self) -> int: + """ Return the timestamp when the database was last started. + """ + # XXX: defaults to 1 just to force indexes initialization, by being higher than 0 + return int(self.get_value(self._last_start_attribute) or NULL_LAST_STARTED_AT) + + def set_last_started_at(self, timestamp: int) -> None: + """ Update the timestamp when the database was last started. + """ + self.add_value(self._last_start_attribute, str(timestamp)) + + def get_index_last_started_at(self, index_db_name: str) -> int: + """ Return the timestamp when an index was last started. + """ + attr_name = INDEX_ATTR_PREFIX + index_db_name + return int(self.get_value(attr_name) or NULL_INDEX_LAST_STARTED_AT) + + def set_index_last_started_at(self, index_db_name: str, timestamp: int) -> None: + """ Update the timestamp when a specific index was last started. + """ + attr_name = INDEX_ATTR_PREFIX + index_db_name + self.add_value(attr_name, str(timestamp)) + + def update_last_started_at(self, timestamp: int) -> None: + """ Updates the respective timestamps of when the node was last started. + + Using this mehtod ensures that the same timestamp is being used and the correct indexes are being selected. + """ + assert self.indexes is not None + self.set_last_started_at(timestamp) + for index in self.indexes.iter_all_indexes(): + index_db_name = index.get_db_name() + if index_db_name is None: + continue + self.set_index_last_started_at(index_db_name, timestamp) + class BaseTransactionStorage(TransactionStorage): def __init__(self, with_index: bool = True, pubsub: Optional[Any] = None) -> None: @@ -654,11 +747,18 @@ def __init__(self, with_index: bool = True, pubsub: Optional[Any] = None) -> Non # Initialize index if needed. self.with_index = with_index if with_index: - self._reset_cache() + self.indexes = self._build_indexes_manager() + self._reset_cache(clear_indexes=False) # Either save or verify all genesis. self._save_or_verify_genesis() + def _update_caches(self, block_count: int, tx_count: int, latest_timestamp: int, first_timestamp: int) -> None: + self._cache_block_count = block_count + self._cache_tx_count = tx_count + self._latest_timestamp = latest_timestamp + self._first_timestamp = first_timestamp + @property def latest_timestamp(self) -> int: return self._latest_timestamp @@ -674,13 +774,20 @@ def _save_transaction(self, tx: BaseTransaction, *, only_metadata: bool = False) def _build_indexes_manager(self) -> IndexesManager: return MemoryIndexesManager() - def _reset_cache(self) -> None: + def reset_indexes(self) -> None: + self._reset_cache(clear_indexes=True) + + def _reset_cache(self, *, clear_indexes: bool = True) -> None: """Reset all caches. This function should not be called unless you know what you are doing.""" assert self.with_index, 'Cannot reset cache because it has not been enabled.' + + # XXX: these fields would be better of being migrated to the IndexesManager, and probably as proper indexes self._cache_block_count = 0 self._cache_tx_count = 0 - self.indexes = self._build_indexes_manager() + if clear_indexes: + assert self.indexes is not None + self.indexes.force_clear_all() genesis = self.get_all_genesis() if genesis: @@ -794,12 +901,12 @@ def get_newer_txs_after(self, timestamp: int, hash_bytes: bytes, count: int) -> return txs, has_more def _manually_initialize(self) -> None: - self._reset_cache() + self._reset_cache(clear_indexes=False) + self._manually_initialize_indexes() - # We need to construct a topological sort, then iterate from - # genesis to tips. - for tx in self._topological_sort_dfs(): - self.add_to_indexes(tx) + def _manually_initialize_indexes(self) -> None: + if self.indexes is not None: + self.indexes._manually_initialize(self) def _topological_sort_timestamp_index(self) -> Iterator[BaseTransaction]: assert self.indexes is not None diff --git a/tests/tx/test_indexes2.py b/tests/tx/test_indexes2.py index 64892abac..d53212b0e 100644 --- a/tests/tx/test_indexes2.py +++ b/tests/tx/test_indexes2.py @@ -45,7 +45,7 @@ def test_timestamp_index(self): from hathor.indexes.memory_timestamp_index import MemoryTimestampIndex from hathor.indexes.rocksdb_timestamp_index import RocksDBTimestampIndex from hathor.indexes.timestamp_index import RangeIdx - rocksdb_index = RocksDBTimestampIndex(self.create_tmp_rocksdb_db(), cf_name=b'foo') + rocksdb_index = RocksDBTimestampIndex(self.create_tmp_rocksdb_db(), 'foo') memory_index = MemoryTimestampIndex() for tx in self.transactions: rocksdb_index.add_tx(tx) diff --git a/tests/tx/test_indexes3.py b/tests/tx/test_indexes3.py index 8e20e472b..bb7c3e935 100644 --- a/tests/tx/test_indexes3.py +++ b/tests/tx/test_indexes3.py @@ -48,8 +48,6 @@ def setUp(self): @pytest.mark.flaky(max_runs=3, min_passes=1) def test_tips_index_initialization(self): - from intervaltree import IntervalTree - # XXX: this test makes use of the internals of TipsIndex tx_storage = self.manager.tx_storage assert tx_storage.indexes is not None @@ -62,9 +60,8 @@ def test_tips_index_initialization(self): base_block_tips_tree = tx_storage.indexes.block_tips.tree.copy() base_tx_tips_tree = tx_storage.indexes.tx_tips.tree.copy() - # reset the indexes and force a manual initialization - tx_storage._reset_cache() - self.manager._initialize_components() + # reset the indexes, which will force a re-initialization of all indexes + tx_storage._manually_initialize() reinit_all_tips_tree = tx_storage.indexes.all_tips.tree.copy() reinit_block_tips_tree = tx_storage.indexes.block_tips.tree.copy() @@ -74,12 +71,8 @@ def test_tips_index_initialization(self): self.assertEqual(reinit_block_tips_tree, base_block_tips_tree) self.assertEqual(reinit_tx_tips_tree, base_tx_tips_tree) - # reset again but now initilize from the new function - # XXX: manually reset each index, because we're using MemoryTimestampIndex and we need that for the new init - for tip_index in [tx_storage.indexes.all_tips, tx_storage.indexes.block_tips, tx_storage.indexes.tx_tips]: - tip_index.tx_last_interval = {} - tip_index.tree = IntervalTree() - tx_storage.indexes._manually_initialize_tips_indexes(tx_storage) + # reset again + tx_storage._manually_initialize() newinit_all_tips_tree = tx_storage.indexes.all_tips.tree.copy() newinit_block_tips_tree = tx_storage.indexes.block_tips.tree.copy() @@ -110,7 +103,7 @@ def test_topological_iterators(self): ('metadata', tx_storage._topological_sort_metadata()), ] for name, it in iterators: - # collect all transactions + # collect all transactions, while checking that inputs/parents are consistent txs = list(it) # must be complete self.assertEqual(len(txs), total_count, f'iterator "{name}" does not cover all txs') diff --git a/tests/tx/test_indexes4.py b/tests/tx/test_indexes4.py new file mode 100644 index 000000000..b0ea58930 --- /dev/null +++ b/tests/tx/test_indexes4.py @@ -0,0 +1,150 @@ +from hathor.crypto.util import decode_address +from hathor.transaction import Transaction +from hathor.transaction.storage import TransactionMemoryStorage +from hathor.wallet.base_wallet import WalletOutputInfo +from tests import unittest +from tests.utils import add_blocks_unlock_reward, add_new_blocks, gen_new_tx + + +class BaseSimulatorIndexesTestCase(unittest.TestCase): + __test__ = False + + def setUp(self): + super().setUp() + + self.manager = self._build_randomized_blockchain() + + def _build_randomized_blockchain(self): + tx_storage = TransactionMemoryStorage() + manager = self.create_peer('testnet', tx_storage=tx_storage, unlock_wallet=True, wallet_index=True) + + add_new_blocks(manager, 50, advance_clock=15) + + add_blocks_unlock_reward(manager) + address1 = self.get_address(0) + address2 = self.get_address(1) + address3 = self.get_address(2) + output1 = WalletOutputInfo(address=decode_address(address1), value=123, timelock=None) + output2 = WalletOutputInfo(address=decode_address(address2), value=234, timelock=None) + output3 = WalletOutputInfo(address=decode_address(address3), value=345, timelock=None) + outputs = [output1, output2, output3] + + tx1 = manager.wallet.prepare_transaction_compute_inputs(Transaction, outputs, manager.tx_storage) + tx1.weight = 2.0 + tx1.parents = manager.get_new_tx_parents() + tx1.timestamp = int(self.clock.seconds()) + tx1.resolve() + assert manager.propagate_tx(tx1, False) + + tx2 = manager.wallet.prepare_transaction_compute_inputs(Transaction, outputs, manager.tx_storage) + tx2.weight = 2.0 + tx2.parents = [tx1.hash] + manager.get_new_tx_parents()[1:] + self.assertIn(tx1.hash, tx2.parents) + tx2.timestamp = int(self.clock.seconds()) + 1 + tx2.resolve() + assert manager.propagate_tx(tx2, False) + + tx3 = Transaction.create_from_struct(tx2.get_struct()) + tx3.weight = 3.0 + tx3.parents = tx1.parents + tx3.resolve() + assert manager.propagate_tx(tx3, False) + + for _ in range(100): + address = self.get_address(0) + value = 500 + tx = gen_new_tx(manager, address, value) + assert manager.propagate_tx(tx) + return manager + + def test_index_initialization(self): + from copy import deepcopy + + # XXX: this test makes use of the internals of TipsIndex + tx_storage = self.manager.tx_storage + assert tx_storage.indexes is not None + + # XXX: sanity check that we've at least produced something + self.assertGreater(tx_storage.get_count_tx_blocks(), 3) + + for tx in tx_storage.get_all_transactions(): + if tx.is_transaction and tx.get_metadata().voided_by: + break + else: + raise AssertionError('no voided tx found') + + # base tips indexes + base_all_tips_tree = tx_storage.indexes.all_tips.tree.copy() + base_block_tips_tree = tx_storage.indexes.block_tips.tree.copy() + base_tx_tips_tree = tx_storage.indexes.tx_tips.tree.copy() + base_address_index = deepcopy(tx_storage.indexes.addresses.index) + + # reset the indexes and force a re-initialization of all indexes + tx_storage._manually_initialize() + tx_storage.indexes.enable_address_index(self.manager.pubsub) + tx_storage._manually_initialize_indexes() + + reinit_all_tips_tree = tx_storage.indexes.all_tips.tree.copy() + reinit_block_tips_tree = tx_storage.indexes.block_tips.tree.copy() + reinit_tx_tips_tree = tx_storage.indexes.tx_tips.tree.copy() + reinit_address_index = deepcopy(tx_storage.indexes.addresses.index) + + self.assertEqual(reinit_all_tips_tree, base_all_tips_tree) + self.assertEqual(reinit_block_tips_tree, base_block_tips_tree) + self.assertEqual(reinit_tx_tips_tree, base_tx_tips_tree) + self.assertEqual(reinit_address_index, base_address_index) + + # reset again + tx_storage._manually_initialize() + tx_storage.indexes.enable_address_index(self.manager.pubsub) + tx_storage._manually_initialize_indexes() + + newinit_all_tips_tree = tx_storage.indexes.all_tips.tree.copy() + newinit_block_tips_tree = tx_storage.indexes.block_tips.tree.copy() + newinit_tx_tips_tree = tx_storage.indexes.tx_tips.tree.copy() + newinit_address_index = deepcopy(tx_storage.indexes.addresses.index) + + self.assertEqual(newinit_all_tips_tree, base_all_tips_tree) + self.assertEqual(newinit_block_tips_tree, base_block_tips_tree) + self.assertEqual(newinit_tx_tips_tree, base_tx_tips_tree) + self.assertEqual(newinit_address_index, base_address_index) + + def test_topological_iterators(self): + tx_storage = self.manager.tx_storage + + # XXX: sanity check that we've at least produced something + total_count = tx_storage.get_count_tx_blocks() + self.assertGreater(total_count, 3) + + # XXX: sanity check that the children metadata is properly set (this is needed for one of the iterators) + for tx in tx_storage.get_all_transactions(): + assert tx.hash is not None + for parent_tx in map(tx_storage.get_transaction, tx.parents): + self.assertIn(tx.hash, parent_tx.get_metadata().children) + + # test iterators, name is used to aid in assert messages + iterators = [ + ('dfs', tx_storage._topological_sort_dfs()), + ('timestamp_index', tx_storage._topological_sort_timestamp_index()), + ('metadata', tx_storage._topological_sort_metadata()), + ] + for name, it in iterators: + # collect all transactions, while checking that inputs/parents are consistent + txs = list(it) + # must be complete + self.assertEqual(len(txs), total_count, f'iterator "{name}" does not cover all txs') + # must be topological + self.assertIsTopological(iter(txs), f'iterator "{name}" is not topological') + + +class SyncV1SimulatorIndexesTestCase(unittest.SyncV1Params, BaseSimulatorIndexesTestCase): + __test__ = True + + +class SyncV2SimulatorIndexesTestCase(unittest.SyncV2Params, BaseSimulatorIndexesTestCase): + __test__ = True + + +# sync-bridge should behave like sync-v2 +class SyncBridgeSimulatorIndexesTestCase(unittest.SyncBridgeParams, SyncV2SimulatorIndexesTestCase): + __test__ = True diff --git a/tests/tx/test_tx_storage.py b/tests/tx/test_tx_storage.py index 141039648..36f42e31e 100644 --- a/tests/tx/test_tx_storage.py +++ b/tests/tx/test_tx_storage.py @@ -436,6 +436,10 @@ def tearDown(self): shutil.rmtree(self.directory) super().tearDown() + def test_storage_new_blocks(self): + self.tx_storage._always_use_topological_dfs = True + super().test_storage_new_blocks() + class TransactionCompactStorageTest(BaseTransactionStorageTest): __test__ = True @@ -452,6 +456,10 @@ def test_subfolders(self): subfolders = os.listdir(subfolders_path) self.assertEqual(settings.STORAGE_SUBFOLDERS, len(subfolders)) + def test_storage_new_blocks(self): + self.tx_storage._always_use_topological_dfs = True + super().test_storage_new_blocks() + def tearDown(self): shutil.rmtree(self.directory) super().tearDown() @@ -514,3 +522,22 @@ def setUp(self): def tearDown(self): shutil.rmtree(self.directory) super().tearDown() + + def test_storage_new_blocks(self): + self.tx_storage._always_use_topological_dfs = True + super().test_storage_new_blocks() + + +@pytest.mark.skipif(not HAS_ROCKSDB, reason='requires python-rocksdb') +class CacheRocksDBStorageTest(BaseCacheStorageTest): + __test__ = True + + def setUp(self): + self.directory = tempfile.mkdtemp() + store = TransactionRocksDBStorage(self.directory) + reactor = MemoryReactorHeapClock() + super().setUp(TransactionCacheStorage(store, reactor, capacity=5)) + + def tearDown(self): + shutil.rmtree(self.directory) + super().tearDown()