diff --git a/hathor/indexes/address_index.py b/hathor/indexes/address_index.py index 18d6f798c..711fe9df6 100644 --- a/hathor/indexes/address_index.py +++ b/hathor/indexes/address_index.py @@ -12,11 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC, abstractmethod +from abc import abstractmethod from typing import TYPE_CHECKING, Iterable, List, Optional from structlog import get_logger +from hathor.indexes.base_index import BaseIndex from hathor.transaction import BaseTransaction if TYPE_CHECKING: # pragma: no cover @@ -25,11 +26,17 @@ logger = get_logger() -class AddressIndex(ABC): +class AddressIndex(BaseIndex): """ Index of inputs/outputs by address """ pubsub: Optional['PubSubManager'] + def init_loop_step(self, tx: BaseTransaction) -> None: + tx_meta = tx.get_metadata() + if tx_meta.voided_by: + return + self.add_tx(tx) + def publish_tx(self, tx: BaseTransaction, *, addresses: Optional[Iterable[str]] = None) -> None: """ Publish WALLET_ADDRESS_HISTORY for all addresses of a transaction. """ diff --git a/hathor/indexes/base_index.py b/hathor/indexes/base_index.py new file mode 100644 index 000000000..0c4852f8a --- /dev/null +++ b/hathor/indexes/base_index.py @@ -0,0 +1,47 @@ +# Copyright 2021 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import Optional + +from hathor.transaction.base_transaction import BaseTransaction + + +class BaseIndex(ABC): + """ All indexes must inherit from this index. + + This class exists so we can interact with indexes without knowing anything specific to its implemented. It was + created to generalize how we initialize indexes and keep track of which ones are up-to-date. + """ + + @abstractmethod + def get_db_name(self) -> Optional[str]: + """ The returned string is used to generate the relevant attributes for storing an indexe's state in the db. + + If None is returned, the database will not store the index initialization state and they will always be + initialized. This is the expected mode that memory-only indexes will use. + """ + raise NotImplementedError + + @abstractmethod + def init_loop_step(self, tx: BaseTransaction) -> None: + """ When the index needs to be initialized, this function will be called for each tx in the database. + """ + raise NotImplementedError + + @abstractmethod + def force_clear(self) -> None: + """ Clear any existing data in the index. + """ + raise NotImplementedError diff --git a/hathor/indexes/deps_index.py b/hathor/indexes/deps_index.py index 6a1dbb9bf..64edfef2c 100644 --- a/hathor/indexes/deps_index.py +++ b/hathor/indexes/deps_index.py @@ -12,9 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC, abstractmethod +from abc import abstractmethod from typing import TYPE_CHECKING, Iterator, List +from hathor.indexes.base_index import BaseIndex from hathor.transaction import BaseTransaction, Block if TYPE_CHECKING: # pragma: no cover @@ -49,7 +50,7 @@ def get_requested_from_height(tx: BaseTransaction) -> int: return block.get_metadata().height -class DepsIndex(ABC): +class DepsIndex(BaseIndex): """ Index of dependencies between transactions This index exists to accelerate queries related to the partial validation mechanism needed by sync-v2. More @@ -105,6 +106,12 @@ class DepsIndex(ABC): them. """ + def init_loop_step(self, tx: BaseTransaction) -> None: + tx_meta = tx.get_metadata() + if tx_meta.voided_by: + return + self.add_tx(tx) + @abstractmethod def add_tx(self, tx: BaseTransaction, partial: bool = True) -> None: """Update 'deps' and 'needed' sub-indexes, removing them when necessary (i.e. validation is complete). diff --git a/hathor/indexes/height_index.py b/hathor/indexes/height_index.py index e526210da..655fe7e70 100644 --- a/hathor/indexes/height_index.py +++ b/hathor/indexes/height_index.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC, abstractmethod +from abc import abstractmethod from typing import List, NamedTuple, Optional, Tuple -from hathor.transaction import Block +from hathor.indexes.base_index import BaseIndex +from hathor.transaction import BaseTransaction, Block from hathor.transaction.genesis import BLOCK_GENESIS from hathor.util import not_none @@ -35,10 +36,22 @@ class _AddToIndexItem(NamedTuple): timestamp: int -class HeightIndex(ABC): +class HeightIndex(BaseIndex): """Store the block hash for each given height """ + def init_loop_step(self, tx: BaseTransaction) -> None: + if not tx.is_block: + return + if tx.is_genesis: + return + assert isinstance(tx, Block) + assert tx.hash is not None + tx_meta = tx.get_metadata() + if tx_meta.voided_by: + return + self.add_new(tx_meta.height, tx.hash, tx.timestamp) + @abstractmethod def add_new(self, height: int, block_hash: bytes, timestamp: int) -> None: """Add a new block to the height index that **MUST NOT** result in a re-org""" diff --git a/hathor/indexes/manager.py b/hathor/indexes/manager.py index 44b582cf7..fbf3f7b73 100644 --- a/hathor/indexes/manager.py +++ b/hathor/indexes/manager.py @@ -13,11 +13,13 @@ # limitations under the License. from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Optional +from enum import Enum, auto +from typing import TYPE_CHECKING, Iterator, List, Optional, Tuple from structlog import get_logger from hathor.indexes.address_index import AddressIndex +from hathor.indexes.base_index import BaseIndex from hathor.indexes.deps_index import DepsIndex from hathor.indexes.height_index import HeightIndex from hathor.indexes.mempool_tips_index import MempoolTipsIndex @@ -25,6 +27,7 @@ from hathor.indexes.tips_index import TipsIndex from hathor.indexes.tokens_index import TokensIndex from hathor.transaction import BaseTransaction +from hathor.util import progress if TYPE_CHECKING: # pragma: no cover import rocksdb @@ -35,6 +38,12 @@ logger = get_logger() +class _IndexFilter(Enum): + ALL = auto() # block or tx, voided or not + VALID_BLOCKS = auto() # only blocks that are not voided + VALID_TXS = auto() # only transactions that are not voided + + class IndexesManager(ABC): """ IndexesManager manages all the indexes that we will have in the system @@ -42,6 +51,8 @@ class IndexesManager(ABC): so it will know which index is better to use in each moment """ + log = get_logger() + all_tips: TipsIndex block_tips: TipsIndex tx_tips: TipsIndex @@ -56,6 +67,39 @@ class IndexesManager(ABC): addresses: Optional[AddressIndex] tokens: Optional[TokensIndex] + def __init_checks__(self): + """ Implementations must call this at the **end** of their __init__ for running ValueError checks.""" + # check if every index has a unique db_name + indexes_db_names = set() + for index in self.iter_all_indexes(): + index_db_name = index.get_db_name() + if index_db_name is None: + continue + if index_db_name in indexes_db_names: + raise ValueError(f'duplicate index name "{index_db_name}", already in use by another index') + indexes_db_names.add(index_db_name) + + def iter_all_indexes(self) -> Iterator[BaseIndex]: + """ Iterate over all of the indexes abstracted by this manager, hiding their specific implementation details""" + for _, index in self._iter_all_indexes_with_filter(): + yield index + + def _iter_all_indexes_with_filter(self) -> Iterator[Tuple[_IndexFilter, BaseIndex]]: + """ Same as `iter_all_indexes()`, but includes a filter for what transactions an index is interested in.""" + yield _IndexFilter.ALL, self.all_tips + yield _IndexFilter.VALID_BLOCKS, self.block_tips + yield _IndexFilter.VALID_TXS, self.tx_tips + yield _IndexFilter.ALL, self.sorted_all + yield _IndexFilter.VALID_BLOCKS, self.sorted_blocks + yield _IndexFilter.VALID_TXS, self.sorted_txs + yield _IndexFilter.ALL, self.deps + yield _IndexFilter.ALL, self.height + yield _IndexFilter.ALL, self.mempool_tips + if self.addresses is not None: + yield _IndexFilter.ALL, self.addresses + if self.tokens is not None: + yield _IndexFilter.ALL, self.tokens + @abstractmethod def enable_address_index(self, pubsub: 'PubSubManager') -> None: """Enable address index. It does nothing if it has already been enabled.""" @@ -66,23 +110,71 @@ def enable_tokens_index(self) -> None: """Enable tokens index. It does nothing if it has already been enabled.""" raise NotImplementedError - def _manually_initialize_tips_indexes(self, tx_storage: 'TransactionStorage') -> None: - """ Initialize the tips indexes, populating them from a tx_storage that is otherwise complete. - - XXX: this method requires timestamp indexes to be complete and up-to-date with the rest of the database - XXX: this method is not yet being used + def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None: + """ Initialize the indexes, checking the indexes that need initialization, and the optimal iterator to use. """ - for tx in tx_storage._topological_fast(): - tx_meta = tx.get_metadata() - if not tx_meta.validation.is_final(): + from hathor.transaction.genesis import BLOCK_GENESIS + + db_last_started_at = tx_storage.get_last_started_at() + sorted_all_db_name = self.sorted_all.get_db_name() + if sorted_all_db_name is None: + can_use_fast_iterator = False + else: + sorted_all_index_last_started_at = tx_storage.get_index_last_started_at(sorted_all_db_name) + can_use_fast_iterator = db_last_started_at == sorted_all_index_last_started_at + if not can_use_fast_iterator: + self.log.info('initialization will start soon, please wait') + + indexes_to_init: List[Tuple[_IndexFilter, BaseIndex]] = [] + for index_filter, index in self._iter_all_indexes_with_filter(): + index_db_name = index.get_db_name() + if index_db_name is None: + indexes_to_init.append((index_filter, index)) continue + index_last_started_at = tx_storage.get_index_last_started_at(index_db_name) + if db_last_started_at != index_last_started_at: + indexes_to_init.append((index_filter, index)) - self.all_tips.add_tx(tx) + iter_tx: Iterator[BaseTransaction] + if can_use_fast_iterator: + iter_tx = tx_storage._topological_fast() + else: + iter_tx = tx_storage._topological_sort() + + # make sure that all the indexes that we're rebuilding are cleared + for _, index in indexes_to_init: + index.force_clear() + block_count = 0 + tx_count = 0 + latest_timestamp = BLOCK_GENESIS.timestamp + first_timestamp = BLOCK_GENESIS.timestamp + + for tx in progress(self.log, iter_tx): + # XXX: these would probably make more sense to be their own simple "indexes" instead of how it is here + latest_timestamp = max(tx.timestamp, latest_timestamp) + first_timestamp = min(tx.timestamp, first_timestamp) if tx.is_block: - self.block_tips.add_tx(tx) - elif not tx_meta.voided_by: - self.tx_tips.add_tx(tx) + block_count += 1 + else: + tx_count += 1 + + tx_meta = tx.get_metadata() + + # feed each transaction to the indexes that they are interested in + for index_filter, index in indexes_to_init: + if index_filter is _IndexFilter.ALL: + index.init_loop_step(tx) + elif index_filter is _IndexFilter.VALID_BLOCKS: + if tx.is_block: + index.init_loop_step(tx) + elif index_filter is _IndexFilter.VALID_TXS: + if tx.is_transaction and not tx_meta.voided_by: + index.init_loop_step(tx) + else: + assert False, 'impossible filter' + + tx_storage._update_caches(block_count, tx_count, latest_timestamp, first_timestamp) def add_tx(self, tx: BaseTransaction) -> bool: """ Add a transaction to the indexes @@ -162,6 +254,9 @@ def __init__(self) -> None: self.mempool_tips = MemoryMempoolTipsIndex() self.deps = MemoryDepsIndex() + # XXX: this has to be at the end of __init__, after everything has been initialized + self.__init_checks__() + def enable_address_index(self, pubsub: 'PubSubManager') -> None: from hathor.indexes.memory_address_index import MemoryAddressIndex if self.addresses is None: @@ -186,9 +281,9 @@ def __init__(self, db: 'rocksdb.DB') -> None: self.block_tips = TipsIndex() self.tx_tips = TipsIndex() - self.sorted_all = RocksDBTimestampIndex(self._db, cf_name=b'timestamp-sorted-all') - self.sorted_blocks = RocksDBTimestampIndex(self._db, cf_name=b'timestamp-sorted-blocks') - self.sorted_txs = RocksDBTimestampIndex(self._db, cf_name=b'timestamp-sorted-txs') + self.sorted_all = RocksDBTimestampIndex(self._db, 'all') + self.sorted_blocks = RocksDBTimestampIndex(self._db, 'blocks') + self.sorted_txs = RocksDBTimestampIndex(self._db, 'txs') self.addresses = None self.tokens = None @@ -196,6 +291,9 @@ def __init__(self, db: 'rocksdb.DB') -> None: self.mempool_tips = RocksDBMempoolTipsIndex(self._db) self.deps = RocksDBDepsIndex(self._db) + # XXX: this has to be at the end of __init__, after everything has been initialized + self.__init_checks__() + def enable_address_index(self, pubsub: 'PubSubManager') -> None: from hathor.indexes.rocksdb_address_index import RocksDBAddressIndex if self.addresses is None: diff --git a/hathor/indexes/memory_address_index.py b/hathor/indexes/memory_address_index.py index 5bed5d104..49f24a80b 100644 --- a/hathor/indexes/memory_address_index.py +++ b/hathor/indexes/memory_address_index.py @@ -30,11 +30,20 @@ class MemoryAddressIndex(AddressIndex): """ Index of inputs/outputs by address """ + + index: DefaultDict[str, Set[bytes]] + def __init__(self, pubsub: Optional['PubSubManager'] = None) -> None: - self.index: DefaultDict[str, Set[bytes]] = defaultdict(set) self.pubsub = pubsub if self.pubsub: self.subscribe_pubsub_events() + self.force_clear() + + def get_db_name(self) -> Optional[str]: + return None + + def force_clear(self) -> None: + self.index = defaultdict(set) def subscribe_pubsub_events(self) -> None: """ Subscribe wallet index to receive voided/winner tx pubsub events diff --git a/hathor/indexes/memory_deps_index.py b/hathor/indexes/memory_deps_index.py index bd0366759..058c34a6d 100644 --- a/hathor/indexes/memory_deps_index.py +++ b/hathor/indexes/memory_deps_index.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Dict, FrozenSet, Iterator, List, Set, Tuple +from typing import TYPE_CHECKING, Dict, FrozenSet, Iterator, List, Optional, Set, Tuple from structlog import get_logger @@ -27,17 +27,26 @@ class MemoryDepsIndex(DepsIndex): + # Reverse dependency mapping + _rev_dep_index: Dict[bytes, Set[bytes]] + + # Ready to be validated cache + _txs_with_deps_ready: Set[bytes] + + # Next to be downloaded + _needed_txs_index: Dict[bytes, Tuple[int, bytes]] + def __init__(self): self.log = logger.new() + self.force_clear() - # Reverse dependency mapping - self._rev_dep_index: Dict[bytes, Set[bytes]] = {} - - # Ready to be validated cache - self._txs_with_deps_ready: Set[bytes] = set() + def get_db_name(self) -> Optional[str]: + return None - # Next to be downloaded - self._needed_txs_index: Dict[bytes, Tuple[int, bytes]] = {} + def force_clear(self) -> None: + self._rev_dep_index = {} + self._txs_with_deps_ready = set() + self._needed_txs_index = {} def add_tx(self, tx: BaseTransaction, partial: bool = True) -> None: assert tx.hash is not None diff --git a/hathor/indexes/memory_height_index.py b/hathor/indexes/memory_height_index.py index 1d94c781d..5b776881c 100644 --- a/hathor/indexes/memory_height_index.py +++ b/hathor/indexes/memory_height_index.py @@ -24,6 +24,12 @@ class MemoryHeightIndex(HeightIndex): _index: List[IndexEntry] def __init__(self) -> None: + self.force_clear() + + def get_db_name(self) -> Optional[str]: + return None + + def force_clear(self) -> None: self._index = [BLOCK_GENESIS_ENTRY] def _add(self, height: int, block_hash: bytes, timestamp: int, *, can_reorg: bool) -> None: diff --git a/hathor/indexes/memory_mempool_tips_index.py b/hathor/indexes/memory_mempool_tips_index.py index 60f0017a1..9a8fd7c9f 100644 --- a/hathor/indexes/memory_mempool_tips_index.py +++ b/hathor/indexes/memory_mempool_tips_index.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Iterable, Set +from typing import Iterable, Optional, Set from structlog import get_logger @@ -26,6 +26,12 @@ class MemoryMempoolTipsIndex(ByteCollectionMempoolTipsIndex): def __init__(self): self.log = logger.new() + self.force_clear() + + def get_db_name(self) -> Optional[str]: + return None + + def force_clear(self) -> None: self._index = set() def _discard(self, tx: bytes) -> None: diff --git a/hathor/indexes/memory_timestamp_index.py b/hathor/indexes/memory_timestamp_index.py index 5cef631f6..d61e32677 100644 --- a/hathor/indexes/memory_timestamp_index.py +++ b/hathor/indexes/memory_timestamp_index.py @@ -37,6 +37,12 @@ class MemoryTimestampIndex(TimestampIndex): def __init__(self) -> None: self.log = logger.new() + self.force_clear() + + def get_db_name(self) -> Optional[str]: + return None + + def force_clear(self) -> None: self._index = SortedKeyList(key=lambda x: (x.timestamp, x.hash)) def add_tx(self, tx: BaseTransaction) -> bool: diff --git a/hathor/indexes/memory_tokens_index.py b/hathor/indexes/memory_tokens_index.py index 1d132ad8f..bdc8e2fa8 100644 --- a/hathor/indexes/memory_tokens_index.py +++ b/hathor/indexes/memory_tokens_index.py @@ -68,6 +68,12 @@ def iter_melt_utxos(self) -> Iterator[TokenUtxoInfo]: class MemoryTokensIndex(TokensIndex): def __init__(self) -> None: self.log = logger.new() + self.force_clear() + + def get_db_name(self) -> Optional[str]: + return None + + def force_clear(self) -> None: self._tokens: Dict[bytes, MemoryTokenIndexInfo] = defaultdict(MemoryTokenIndexInfo) def _add_to_index(self, tx: BaseTransaction, index: int) -> None: diff --git a/hathor/indexes/mempool_tips_index.py b/hathor/indexes/mempool_tips_index.py index 7e739a246..915e90cd5 100644 --- a/hathor/indexes/mempool_tips_index.py +++ b/hathor/indexes/mempool_tips_index.py @@ -12,12 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC, abstractmethod +from abc import abstractmethod from collections.abc import Collection from typing import TYPE_CHECKING, Iterable, Iterator, Optional, Set, cast import structlog +from hathor.indexes.base_index import BaseIndex from hathor.transaction import BaseTransaction, Transaction from hathor.util import not_none @@ -25,9 +26,12 @@ from hathor.transaction.storage import TransactionStorage -class MempoolTipsIndex(ABC): +class MempoolTipsIndex(BaseIndex): """Index to access the tips of the mempool transactions, which haven't been confirmed by a block.""" + def init_loop_step(self, tx: BaseTransaction) -> None: + self.update(tx) + # originally tx_storage.update_mempool_tips @abstractmethod def update(self, tx: BaseTransaction) -> None: diff --git a/hathor/indexes/rocksdb_address_index.py b/hathor/indexes/rocksdb_address_index.py index e6dde2b90..f8e044636 100644 --- a/hathor/indexes/rocksdb_address_index.py +++ b/hathor/indexes/rocksdb_address_index.py @@ -29,6 +29,7 @@ logger = get_logger() _CF_NAME_ADDRESS_INDEX = b'address-index' +_DB_NAME: str = 'address_index' class RocksDBAddressIndex(AddressIndex, RocksDBIndexUtils): @@ -46,17 +47,20 @@ class RocksDBAddressIndex(AddressIndex, RocksDBIndexUtils): """ def __init__(self, db: 'rocksdb.DB', *, cf_name: Optional[bytes] = None, pubsub: Optional['PubSubManager'] = None) -> None: - RocksDBIndexUtils.__init__(self, db) self.log = logger.new() - - # column family stuff - self._cf_name = cf_name or _CF_NAME_ADDRESS_INDEX - self._cf = self._fresh_cf(self._cf_name) + RocksDBIndexUtils.__init__(self, db, cf_name or _CF_NAME_ADDRESS_INDEX) self.pubsub = pubsub if self.pubsub: self.subscribe_pubsub_events() + def get_db_name(self) -> Optional[str]: + # XXX: we don't need it to be parametrizable, so this is fine + return _DB_NAME + + def force_clear(self) -> None: + self.clear() + def _to_key(self, address: str, tx: Optional[BaseTransaction] = None) -> bytes: import struct assert len(address) == 34 diff --git a/hathor/indexes/rocksdb_deps_index.py b/hathor/indexes/rocksdb_deps_index.py index 69ffc58cc..ec7afa005 100644 --- a/hathor/indexes/rocksdb_deps_index.py +++ b/hathor/indexes/rocksdb_deps_index.py @@ -31,6 +31,7 @@ logger = get_logger() _CF_NAME_DEPS_INDEX = b'deps-index' +_DB_NAME: str = 'deps_index' class _Tag(Enum): @@ -70,12 +71,15 @@ class RocksDBDepsIndex(DepsIndex, RocksDBIndexUtils): """ def __init__(self, db: 'rocksdb.DB', *, cf_name: Optional[bytes] = None) -> None: - RocksDBIndexUtils.__init__(self, db) self.log = logger.new() + RocksDBIndexUtils.__init__(self, db, cf_name or _CF_NAME_DEPS_INDEX) - # column family stuff - self._cf_name = cf_name or _CF_NAME_DEPS_INDEX - self._cf = self._fresh_cf(self._cf_name) + def get_db_name(self) -> Optional[str]: + # XXX: we don't need it to be parametrizable, so this is fine + return _DB_NAME + + def force_clear(self) -> None: + self.clear() def _to_key_ready(self, tx_hash: Optional[bytes] = None) -> bytes: """Make a key for accessing READY txs 'set'""" diff --git a/hathor/indexes/rocksdb_height_index.py b/hathor/indexes/rocksdb_height_index.py index 21237ae3f..e3dbf4997 100644 --- a/hathor/indexes/rocksdb_height_index.py +++ b/hathor/indexes/rocksdb_height_index.py @@ -27,6 +27,7 @@ logger = get_logger() _CF_NAME_HEIGHT_INDEX = b'height-index' +_DB_NAME: str = 'height_index' class RocksDBHeightIndex(HeightIndex, RocksDBIndexUtils): @@ -44,15 +45,15 @@ class RocksDBHeightIndex(HeightIndex, RocksDBIndexUtils): """ def __init__(self, db: 'rocksdb.DB', *, cf_name: Optional[bytes] = None) -> None: - RocksDBIndexUtils.__init__(self, db) self.log = logger.new() + RocksDBIndexUtils.__init__(self, db, cf_name or _CF_NAME_HEIGHT_INDEX) - # column family stuff - self._cf_name = cf_name or _CF_NAME_HEIGHT_INDEX - self._cf = self._fresh_cf(self._cf_name) + def get_db_name(self) -> Optional[str]: + # XXX: we don't need it to be parametrizable, so this is fine + return _DB_NAME - # XXX: when we stop using a fresh column-family we have to figure-out when to not run this - self._init_db() + def force_clear(self) -> None: + self.clear() def _init_db(self) -> None: """ Initialize the database with the genesis entry.""" diff --git a/hathor/indexes/rocksdb_mempool_tips_index.py b/hathor/indexes/rocksdb_mempool_tips_index.py index 39fdcfb41..bce131f25 100644 --- a/hathor/indexes/rocksdb_mempool_tips_index.py +++ b/hathor/indexes/rocksdb_mempool_tips_index.py @@ -25,6 +25,7 @@ logger = get_logger() _CF_NAME_MEMPOOL_TIPS_INDEX = b'mempool-tips-index' +_DB_NAME: str = 'mempool_tips_index' class RocksDBMempoolTipsIndex(ByteCollectionMempoolTipsIndex): @@ -35,6 +36,13 @@ def __init__(self, db: 'rocksdb.DB', *, cf_name: Optional[bytes] = None) -> None _cf_name = cf_name or _CF_NAME_MEMPOOL_TIPS_INDEX self._index = RocksDBSimpleSet(db, self.log, cf_name=_cf_name) + def get_db_name(self) -> Optional[str]: + # XXX: we don't need it to be parametrizable, so this is fine + return _DB_NAME + + def force_clear(self) -> None: + self._index.clear() + def _discard(self, tx: bytes) -> None: self._index.discard(tx) diff --git a/hathor/indexes/rocksdb_timestamp_index.py b/hathor/indexes/rocksdb_timestamp_index.py index c0f3d3b33..e46939e4c 100644 --- a/hathor/indexes/rocksdb_timestamp_index.py +++ b/hathor/indexes/rocksdb_timestamp_index.py @@ -38,13 +38,16 @@ class RocksDBTimestampIndex(TimestampIndex, RocksDBIndexUtils): It works nicely because rocksdb uses a tree sorted by key under the hood. """ - def __init__(self, db: 'rocksdb.DB', *, cf_name: bytes) -> None: - RocksDBIndexUtils.__init__(self, db) + def __init__(self, db: 'rocksdb.DB', name: str) -> None: self.log = logger.new() + RocksDBIndexUtils.__init__(self, db, f'timestamp-sorted-{name}'.encode()) + self._name = name - # column family stuff - self._cf_name = cf_name - self._cf = self._fresh_cf(self._cf_name) + def get_db_name(self) -> Optional[str]: + return f'timestamp_{self._name}_index' + + def force_clear(self) -> None: + self.clear() def _to_key(self, timestamp: int, tx_hash: Optional[bytes] = None) -> bytes: """Make a key for a timestamp and optionally tx_hash, the key represents the membership itself.""" diff --git a/hathor/indexes/rocksdb_tokens_index.py b/hathor/indexes/rocksdb_tokens_index.py index df96e25f0..26a61be1b 100644 --- a/hathor/indexes/rocksdb_tokens_index.py +++ b/hathor/indexes/rocksdb_tokens_index.py @@ -39,6 +39,7 @@ logger = get_logger() _CF_NAME_TOKENS_INDEX = b'tokens-index' +_DB_NAME: str = 'tokens_index' # the following type is used to help a little bit to distinguish when we're using a byte sequence that should only be # internally used @@ -91,12 +92,15 @@ class RocksDBTokensIndex(TokensIndex, RocksDBIndexUtils): """ def __init__(self, db: 'rocksdb.DB', *, cf_name: Optional[bytes] = None) -> None: - RocksDBIndexUtils.__init__(self, db) self.log = logger.new() + RocksDBIndexUtils.__init__(self, db, cf_name or _CF_NAME_TOKENS_INDEX) - # column family stuff - self._cf_name = cf_name or _CF_NAME_TOKENS_INDEX - self._cf = self._fresh_cf(self._cf_name) + def get_db_name(self) -> Optional[str]: + # XXX: we don't need it to be parametrizable, so this is fine + return _DB_NAME + + def force_clear(self) -> None: + self.clear() def _to_internal_token_uid(self, token_uid: bytes) -> _InternalUid: """Normalizes a token_uid so that the native token (\x00) will have the same length as custom tokens.""" diff --git a/hathor/indexes/rocksdb_utils.py b/hathor/indexes/rocksdb_utils.py index dda736a5d..73529ad42 100644 --- a/hathor/indexes/rocksdb_utils.py +++ b/hathor/indexes/rocksdb_utils.py @@ -55,41 +55,44 @@ class RocksDBIndexUtils: _db: 'rocksdb.DB' log: 'structlog.stdlib.BoundLogger' - def __init__(self, db: 'rocksdb.DB') -> None: + def __init__(self, db: 'rocksdb.DB', cf_name: bytes) -> None: + self._log = self.log.new(cf=cf_name.decode('ascii')) self._db = db + self._cf_name = cf_name + self._ensure_cf_exists(cf_name) + + def _init_db(self): + """ Inheritors of this class may implement this to initialize a column family when it is just created.""" + pass - def _fresh_cf(self, cf_name: bytes) -> 'rocksdb.ColumnFamilyHandle': - """Ensure we have a working and fresh column family""" + def _ensure_cf_exists(self, cf_name: bytes) -> None: + """Ensure we have a working and column family, loading the previous one if it exists""" import rocksdb - log_cf = self.log.new(cf=cf_name.decode('ascii')) - _cf = self._db.get_column_family(cf_name) - # XXX: dropping column because initialization currently expects a fresh index - if _cf is not None: - old_id = _cf.id - log_cf.debug('drop existing column family') - self._db.drop_column_family(_cf) - else: - old_id = None - log_cf.debug('no need to drop column family') - del _cf - log_cf.debug('create fresh column family') - _cf = self._db.create_column_family(cf_name, rocksdb.ColumnFamilyOptions()) - new_id = _cf.id - assert _cf is not None - assert _cf.is_valid + self._cf = self._db.get_column_family(cf_name) + if self._cf is None: + self._cf = self._db.create_column_family(cf_name, rocksdb.ColumnFamilyOptions()) + self._init_db() + self._log.debug('got column family', is_valid=self._cf.is_valid, id=self._cf.id) + + def clear(self) -> None: + old_id = self._cf.id + self._log.debug('drop existing column family') + self._db.drop_column_family(self._cf) + del self._cf + self._ensure_cf_exists(self._cf_name) + new_id = self._cf.id + assert self._cf is not None + assert self._cf.is_valid assert new_id != old_id - log_cf.debug('got column family', is_valid=_cf.is_valid, id=_cf.id, old_id=old_id) - return _cf + self._log.debug('got new column family', id=new_id, old_id=old_id) # XXX: should be `Collection[bytes]`, which only works on Python 3.9+ class RocksDBSimpleSet(Collection, RocksDBIndexUtils): def __init__(self, db: 'rocksdb.DB', log: 'structlog.stdlib.BoundLogger', *, cf_name: bytes) -> None: - super().__init__(db) self.log = log - self._cf_name = cf_name - self._cf = self._fresh_cf(self._cf_name) + super().__init__(db, cf_name) def __iter__(self) -> Iterator[bytes]: it = self._db.iterkeys(self._cf) diff --git a/hathor/indexes/timestamp_index.py b/hathor/indexes/timestamp_index.py index 8e2d2f243..e2dea623e 100644 --- a/hathor/indexes/timestamp_index.py +++ b/hathor/indexes/timestamp_index.py @@ -12,11 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC, abstractmethod +from abc import abstractmethod from typing import Iterator, List, NamedTuple, Optional, Tuple from structlog import get_logger +from hathor.indexes.base_index import BaseIndex from hathor.transaction import BaseTransaction logger = get_logger() @@ -27,10 +28,13 @@ class RangeIdx(NamedTuple): offset: int -class TimestampIndex(ABC): +class TimestampIndex(BaseIndex): """ Index of transactions sorted by their timestamps. """ + def init_loop_step(self, tx: BaseTransaction) -> None: + self.add_tx(tx) + @abstractmethod def add_tx(self, tx: BaseTransaction) -> bool: """ Add a transaction to the index diff --git a/hathor/indexes/tips_index.py b/hathor/indexes/tips_index.py index 5dc6af275..d865d23c9 100644 --- a/hathor/indexes/tips_index.py +++ b/hathor/indexes/tips_index.py @@ -13,17 +13,18 @@ # limitations under the License. from math import inf -from typing import Dict, Set +from typing import Dict, Optional, Set from intervaltree import Interval, IntervalTree from structlog import get_logger +from hathor.indexes.base_index import BaseIndex from hathor.transaction import BaseTransaction logger = get_logger() -class TipsIndex: +class TipsIndex(BaseIndex): """ Use an interval tree to quick get the tips at a given timestamp. The interval of a transaction is in the form [begin, end), where `begin` is @@ -48,8 +49,20 @@ class TipsIndex: def __init__(self) -> None: self.log = logger.new() + self.force_clear() + + def get_db_name(self) -> Optional[str]: + return None + + def force_clear(self) -> None: self.tree = IntervalTree() - self.tx_last_interval = {} # Dict[bytes(hash), Interval] + self.tx_last_interval = {} + + def init_loop_step(self, tx: BaseTransaction) -> None: + tx_meta = tx.get_metadata() + if not tx_meta.validation.is_final(): + return + self.add_tx(tx) def add_tx(self, tx: BaseTransaction) -> bool: """ Add a new transaction to the index diff --git a/hathor/indexes/tokens_index.py b/hathor/indexes/tokens_index.py index 850c285e6..27c38fa1d 100644 --- a/hathor/indexes/tokens_index.py +++ b/hathor/indexes/tokens_index.py @@ -15,6 +15,7 @@ from abc import ABC, abstractmethod from typing import Iterator, List, NamedTuple, Optional, Tuple +from hathor.indexes.base_index import BaseIndex from hathor.transaction import BaseTransaction @@ -57,10 +58,16 @@ def iter_melt_utxos(self) -> Iterator[TokenUtxoInfo]: raise NotImplementedError -class TokensIndex(ABC): +class TokensIndex(BaseIndex): """ Index of tokens by token uid """ + def init_loop_step(self, tx: BaseTransaction) -> None: + tx_meta = tx.get_metadata() + if tx_meta.voided_by: + return + self.add_tx(tx) + @abstractmethod def add_tx(self, tx: BaseTransaction) -> None: """ Checks if this tx has mint or melt inputs/outputs and adds to tokens index diff --git a/hathor/manager.py b/hathor/manager.py index e24d862d6..c8bad34a6 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -35,10 +35,10 @@ from hathor.profiler import get_cpu_profiler from hathor.pubsub import HathorEvents, PubSubManager from hathor.transaction import BaseTransaction, Block, MergeMinedBlock, Transaction, TxVersion, sum_weights -from hathor.transaction.exceptions import TxValidationError +# from hathor.transaction.exceptions import TxValidationError from hathor.transaction.storage import TransactionStorage from hathor.transaction.storage.exceptions import TransactionDoesNotExist -from hathor.util import LogDuration, Random, Reactor +from hathor.util import Random, Reactor from hathor.wallet import BaseWallet settings = HathorSettings() @@ -337,19 +337,18 @@ def _initialize_components(self) -> None: self.log.info('initialize') if self.wallet: self.wallet._manually_initialize() - t0 = time.time() - t1 = t0 - cnt = 0 - cnt2 = 0 - t2 = t0 - h = 0 - - block_count = 0 - tx_count = 0 self.tx_storage.pre_init() assert self.tx_storage.indexes is not None + started_at = int(time.time()) + last_started_at = self.tx_storage.get_last_started_at() + if last_started_at >= started_at: + # XXX: although last_started_at==started_at is not _techincally_ to the future, it's strange enough to + # deserve a warning, but not special enough to deserve a customized message IMO + self.log.warn('The last started time is to the future of the current time', + started_at=started_at, last_started_at=last_started_at) + # After introducing soft voided transactions we need to guarantee the full node is not using # a database that already has the soft voided transaction before marking them in the metadata # Any new sync from the beginning should work fine or starting with the latest snapshot @@ -385,88 +384,91 @@ def _initialize_components(self) -> None: for tx in self.tx_storage.get_all_transactions(): tx.reset_metadata() - self.log.debug('load blocks and transactions') - for tx in self.tx_storage._topological_sort(): - if self._full_verification: - tx.update_initial_metadata() - - assert tx.hash is not None - - tx_meta = tx.get_metadata() - - t2 = time.time() - dt = LogDuration(t2 - t1) - dcnt = cnt - cnt2 - tx_rate = '?' if dt == 0 else dcnt / dt - h = max(h, tx_meta.height) - if dt > 30: - ts_date = datetime.datetime.fromtimestamp(self.tx_storage.latest_timestamp) - if h == 0: - self.log.debug('start loading transactions...') - else: - self.log.info('load transactions...', tx_rate=tx_rate, tx_new=dcnt, dt=dt, - total=cnt, latest_ts=ts_date, height=h) - t1 = t2 - cnt2 = cnt - cnt += 1 - - # It's safe to skip block weight verification during initialization because - # we trust the difficulty stored in metadata - skip_block_weight_verification = True - if block_count % settings.VERIFY_WEIGHT_EVERY_N_BLOCKS == 0: - skip_block_weight_verification = False - - try: - if self._full_verification: - # TODO: deal with invalid tx - if tx.can_validate_full(): - self.tx_storage.add_to_indexes(tx) - assert tx.validate_full(skip_block_weight_verification=skip_block_weight_verification) - self.consensus_algorithm.update(tx) - self.tx_storage.indexes.mempool_tips.update(tx) - self.step_validations([tx]) - else: - assert tx.validate_basic(skip_block_weight_verification=skip_block_weight_verification) - self.tx_storage.save_transaction(tx, only_metadata=True) - else: - # TODO: deal with invalid tx - if not tx_meta.validation.is_final(): - if not tx_meta.validation.is_checkpoint(): - assert tx_meta.validation.is_at_least_basic(), f'invalid: {tx.hash_hex}' - elif tx.is_transaction and tx_meta.first_block is None and not tx_meta.voided_by: - assert self.tx_storage.indexes is not None - self.tx_storage.indexes.mempool_tips.update(tx) - self.tx_storage.add_to_indexes(tx) - if tx.is_transaction and tx_meta.voided_by: - self.tx_storage.del_from_indexes(tx) - except (InvalidNewTransaction, TxValidationError): - self.log.error('unexpected error when initializing', tx=tx, exc_info=True) - raise - - if tx.is_block: - block_count += 1 - - # this works because blocks on the best chain are iterated from lower to higher height - assert tx.hash is not None - assert tx_meta.validation.is_at_least_basic() - if not tx_meta.voided_by and tx_meta.validation.is_fully_connected(): - # XXX: this might not be needed when making a full init because the consensus should already have - self.tx_storage.indexes.height.add_reorg(tx_meta.height, tx.hash, tx.timestamp) - - # Check if it's a checkpoint block - if tx_meta.height in checkpoint_heights: - if tx.hash == checkpoint_heights[tx_meta.height]: - del checkpoint_heights[tx_meta.height] - else: - # If the hash is different from checkpoint hash, we stop the node - self.log.error('Error initializing the node. Checkpoint validation error.') - sys.exit() - else: - tx_count += 1 - - if time.time() - t2 > 1: - dt = LogDuration(time.time() - t2) - self.log.warn('tx took too long to load', tx=tx.hash_hex, dt=dt) + # FIXME: full-verification does not work alongside the new index initialization + self.tx_storage.indexes._manually_initialize(self.tx_storage) + + # self.log.debug('load blocks and transactions') + # for tx in self.tx_storage._topological_sort(): + # if self._full_verification: + # tx.update_initial_metadata() + + # assert tx.hash is not None + + # tx_meta = tx.get_metadata() + + # t2 = time.time() + # dt = LogDuration(t2 - t1) + # dcnt = cnt - cnt2 + # tx_rate = '?' if dt == 0 else dcnt / dt + # h = max(h, tx_meta.height) + # if dt > 30: + # ts_date = datetime.datetime.fromtimestamp(self.tx_storage.latest_timestamp) + # if h == 0: + # self.log.debug('start loading transactions...') + # else: + # self.log.info('load transactions...', tx_rate=tx_rate, tx_new=dcnt, dt=dt, + # total=cnt, latest_ts=ts_date, height=h) + # t1 = t2 + # cnt2 = cnt + # cnt += 1 + + # # It's safe to skip block weight verification during initialization because + # # we trust the difficulty stored in metadata + # skip_block_weight_verification = True + # if block_count % settings.VERIFY_WEIGHT_EVERY_N_BLOCKS == 0: + # skip_block_weight_verification = False + + # try: + # if self._full_verification: + # # TODO: deal with invalid tx + # if tx.can_validate_full(): + # self.tx_storage.add_to_indexes(tx) + # assert tx.validate_full(skip_block_weight_verification=skip_block_weight_verification) + # self.consensus_algorithm.update(tx) + # self.tx_storage.indexes.mempool_tips.update(tx) + # self.step_validations([tx]) + # else: + # assert tx.validate_basic(skip_block_weight_verification=skip_block_weight_verification) + # self.tx_storage.save_transaction(tx, only_metadata=True) + # else: + # # TODO: deal with invalid tx + # if not tx_meta.validation.is_final(): + # if not tx_meta.validation.is_checkpoint(): + # assert tx_meta.validation.is_at_least_basic(), f'invalid: {tx.hash_hex}' + # elif tx.is_transaction and tx_meta.first_block is None and not tx_meta.voided_by: + # assert self.tx_storage.indexes is not None + # self.tx_storage.indexes.mempool_tips.update(tx) + # self.tx_storage.add_to_indexes(tx) + # if tx.is_transaction and tx_meta.voided_by: + # self.tx_storage.del_from_indexes(tx) + # except (InvalidNewTransaction, TxValidationError): + # self.log.error('unexpected error when initializing', tx=tx, exc_info=True) + # raise + + # if tx.is_block: + # block_count += 1 + + # # this works because blocks on the best chain are iterated from lower to higher height + # assert tx.hash is not None + # assert tx_meta.validation.is_at_least_basic() + # if not tx_meta.voided_by and tx_meta.validation.is_fully_connected(): + # # XXX: this might not be needed when making a full init because the consensus should already have + # self.tx_storage.indexes.height.add_reorg(tx_meta.height, tx.hash, tx.timestamp) + + # # Check if it's a checkpoint block + # if tx_meta.height in checkpoint_heights: + # if tx.hash == checkpoint_heights[tx_meta.height]: + # del checkpoint_heights[tx_meta.height] + # else: + # # If the hash is different from checkpoint hash, we stop the node + # self.log.error('Error initializing the node. Checkpoint validation error.') + # sys.exit() + # else: + # tx_count += 1 + + # if time.time() - t2 > 1: + # dt = LogDuration(time.time() - t2) + # self.log.warn('tx took too long to load', tx=tx.hash_hex, dt=dt) # we have to have a best_block by now # assert best_block is not None @@ -475,7 +477,7 @@ def _initialize_components(self) -> None: # Check if all checkpoints in database are ok my_best_height = self.tx_storage.get_height_best_block() - if checkpoint_heights: + if checkpoint_heights and False: # If I have checkpoints that were not validated I must check if they are all in a height I still don't have first = min(list(checkpoint_heights.keys())) if first <= my_best_height: @@ -497,15 +499,14 @@ def _initialize_components(self) -> None: self.step_validations(depended_final_txs) self.log.debug('pending validations finished') - best_height = self.tx_storage.get_height_best_block() - if best_height != h: - self.log.warn('best height doesn\'t match', best_height=best_height, max_height=h) + # best_height = self.tx_storage.get_height_best_block() + # if best_height != h: + # self.log.warn('best height doesn\'t match', best_height=best_height, max_height=h) - # self.stop_profiler(save_to='profiles/initializing.prof') + # XXX: last step before actually starting is updating the last started at timestamps + self.tx_storage.update_last_started_at(started_at) self.state = self.NodeState.READY - tdt = LogDuration(t2 - t0) - tx_rate = '?' if tdt == 0 else cnt / tdt - self.log.info('ready', tx_count=cnt, tx_rate=tx_rate, total_dt=tdt, height=h, blocks=block_count, txs=tx_count) + self.log.info('ready') def add_listen_address(self, addr: str) -> None: self.listen_addresses.append(addr) diff --git a/hathor/transaction/base_transaction.py b/hathor/transaction/base_transaction.py index 25a55e223..184ca568e 100644 --- a/hathor/transaction/base_transaction.py +++ b/hathor/transaction/base_transaction.py @@ -832,7 +832,8 @@ def get_metadata(self, *, force_reload: bool = False, use_storage: bool = True) # which requires the use of a storage, this is a workaround that should be fixed, places where this # happens include generating new mining blocks and some tests height = self.calculate_height() if self.storage else 0 - metadata = TransactionMetadata(hash=self.hash, accumulated_weight=self.weight, height=height) + score = self.weight if self.is_genesis else 0 + metadata = TransactionMetadata(hash=self.hash, accumulated_weight=self.weight, height=height, score=score) self._metadata = metadata if not metadata.hash: metadata.hash = self.hash @@ -844,7 +845,8 @@ def reset_metadata(self) -> None: recalculating all metadata. """ assert self.storage is not None - self._metadata = TransactionMetadata(hash=self.hash, + score = self.weight if self.is_genesis else 0 + self._metadata = TransactionMetadata(hash=self.hash, score=score, accumulated_weight=self.weight, height=self.calculate_height()) self._metadata._tx_ref = weakref.ref(self) diff --git a/hathor/transaction/storage/transaction_storage.py b/hathor/transaction/storage/transaction_storage.py index 0ccbe741d..271ffb984 100644 --- a/hathor/transaction/storage/transaction_storage.py +++ b/hathor/transaction/storage/transaction_storage.py @@ -50,6 +50,18 @@ class TransactionStorage(ABC): log = get_logger() + # Key storage attribute to save if the network stored is the expected network + _network_attribute: str = 'network' + + # Key storage attribute to save if the full node is running a full verification + _running_full_verification_attribute: str = 'running_full_verification' + + # Key storage attribute to save if the manager is running + _manager_running_attribute: str = 'manager_running' + + # Ket storage attribute to save the last time the node started + _last_start_attribute: str = 'last_start' + def __init__(self): # Weakref is used to guarantee that there is only one instance of each transaction in memory. self._tx_weakref: WeakValueDictionary[bytes, BaseTransaction] = WeakValueDictionary() @@ -83,14 +95,10 @@ def __init__(self): # Initialize cache for genesis transactions. self._genesis_cache: Dict[bytes, BaseTransaction] = {} - # Key storage attribute to save if the network stored is the expected network - self._network_attribute: str = 'network' - - # Key storage attribute to save if the full node is running a full verification - self._running_full_verification_attribute: str = 'running_full_verification' - - # Key storage attribute to save if the manager is running - self._manager_running_attribute: str = 'manager_running' + @abstractmethod + def _update_caches(self, block_count: int, tx_count: int, latest_timestamp: int, first_timestamp: int) -> None: + """Update ephemeral caches, should only be used internally.""" + raise NotImplementedError def update_best_block_tips_cache(self, tips_cache: List[bytes]) -> None: # XXX: check that the cache update is working properly, only used in unittests @@ -633,6 +641,40 @@ def is_running_manager(self) -> bool: """ return self.get_value(self._manager_running_attribute) == '1' + def get_last_started_at(self) -> int: + """ Return the timestamp when the database was last started. + """ + # XXX: defaults to 1 just to force indexes initialization, by being higher than 0 + return int(self.get_value(self._last_start_attribute) or 1) + + def set_last_started_at(self, timestamp: int) -> None: + """ Update the timestamp when the database was last started. + """ + self.add_value(self._last_start_attribute, str(timestamp)) + + def get_index_last_started_at(self, index_db_name: str) -> int: + """ Return the timestamp when an index was last started. + """ + return int(self.get_value(index_db_name) or 0) + + def set_index_last_started_at(self, index_db_name: str, timestamp: int) -> None: + """ Update the timestamp when a specific index was last started. + """ + self.add_value(index_db_name, str(timestamp)) + + def update_last_started_at(self, timestamp: int) -> None: + """ Updates the respective timestamps of when the node was last started. + + Using this mehtod ensures that the same timestamp is being used and the correct indexes are being selected. + """ + assert self.indexes is not None + self.set_last_started_at(timestamp) + for index in self.indexes.iter_all_indexes(): + index_db_name = index.get_db_name() + if index_db_name is None: + continue + self.add_value(index_db_name, str(timestamp)) + class BaseTransactionStorage(TransactionStorage): def __init__(self, with_index: bool = True, pubsub: Optional[Any] = None) -> None: @@ -649,6 +691,12 @@ def __init__(self, with_index: bool = True, pubsub: Optional[Any] = None) -> Non # Either save or verify all genesis. self._save_or_verify_genesis() + def _update_caches(self, block_count: int, tx_count: int, latest_timestamp: int, first_timestamp: int) -> None: + self._cache_block_count = block_count + self._cache_tx_count = tx_count + self._latest_timestamp = latest_timestamp + self._first_timestamp = first_timestamp + @property def latest_timestamp(self) -> int: return self._latest_timestamp @@ -667,6 +715,8 @@ def _build_indexes_manager(self) -> IndexesManager: def _reset_cache(self) -> None: """Reset all caches. This function should not be called unless you know what you are doing.""" assert self.with_index, 'Cannot reset cache because it has not been enabled.' + + # XXX: these fields would be better of being migrated to the IndexesManager, and probably as proper indexes self._cache_block_count = 0 self._cache_tx_count = 0 @@ -785,11 +835,8 @@ def get_newer_txs_after(self, timestamp: int, hash_bytes: bytes, count: int) -> def _manually_initialize(self) -> None: self._reset_cache() - - # We need to construct a topological sort, then iterate from - # genesis to tips. - for tx in self._topological_sort(): - self.add_to_indexes(tx) + if self.indexes is not None: + self.indexes._manually_initialize(self) def _topological_fast(self) -> Iterator[BaseTransaction]: assert self.indexes is not None diff --git a/hathor/util.py b/hathor/util.py index 5400d6bf2..e09053d70 100644 --- a/hathor/util.py +++ b/hathor/util.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime import json import math +import time import warnings from collections import OrderedDict from enum import Enum @@ -48,7 +50,10 @@ from hathor.conf import HathorSettings if TYPE_CHECKING: + import structlog + from hathor.simulator.clock import HeapClock + from hathor.transaction.base_transaction import BaseTransaction # Reactor = IReactorTime # XXX: Ideally we would want to be able to express Reactor as IReactorTime+IReactorCore, which is what everyone using @@ -422,3 +427,73 @@ def skip_n(it: Iterator[_T], n: int) -> Iterator[_T]: def verified_cast(interface_class: Type[Z], obj: Any) -> Z: verifyObject(interface_class, obj) return obj + + +_DT_ITER_NEXT_WARN = 3 # time in seconds to warn when `next(iter_tx)` takes too long +_DT_LOG_PROGRESS = 30 # time in seconds after which a progress will be logged (it can take longer, but not shorter) +_DT_YIELD_WARN = 1 # time in seconds to warn when `yield tx` takes too long (which is when processing happens) + + +def progress(log: 'structlog.stdlib.BoundLogger', + iter_tx: Iterator['BaseTransaction']) -> Iterator['BaseTransaction']: + """ Log the progress of a transaction iterator while iterating. + """ + t_start = time.time() + h = 0 + ts_tx = 0 + + count = 0 + count_log_prev = 0 + block_count = 0 + tx_count = 0 + + log.debug('load will start') + t_log_prev = t_start + while True: + t_before_next = time.time() + try: + tx: 'BaseTransaction' = next(iter_tx) + except StopIteration: + break + t_after_next = time.time() + dt_next = LogDuration(t_after_next - t_before_next) + if dt_next > _DT_ITER_NEXT_WARN: + log.warn('iterator was slow to yield', took_sec=dt_next) + + assert tx.hash is not None + tx_meta = tx.get_metadata() + ts_tx = max(ts_tx, tx.timestamp) + + t_log = time.time() + dt_log = LogDuration(t_log - t_log_prev) + if dt_log > _DT_LOG_PROGRESS: + t_log_prev = t_log + dcount = count - count_log_prev + tx_rate = '?' if dt_log == 0 else dcount / dt_log + h = max(h, tx_meta.height) + ts = datetime.datetime.fromtimestamp(ts_tx) + if h == 0: + log.debug('start loading...') + else: + log.info('loading...', tx_rate=tx_rate, tx_new=dcount, dt=dt_log, total=count, latest_ts=ts, height=h) + count_log_prev = count + count += 1 + + t_before_yield = time.time() + yield tx + t_after_yield = time.time() + + if tx.is_block: + block_count += 1 + else: + tx_count += 1 + + dt_yield = t_after_yield - t_before_yield + if dt_yield > _DT_YIELD_WARN: + dt = LogDuration(dt_yield) + log.warn('tx took too long to be processed', tx=tx.hash_hex, dt=dt) + + t_final = time.time() + dt_total = LogDuration(t_final - t_start) + tx_rate = '?' if dt_total == 0 else count / dt_total + log.info('loaded', tx_count=count, tx_rate=tx_rate, total_dt=dt_total, height=h, blocks=block_count, txs=tx_count) diff --git a/tests/tx/test_indexes2.py b/tests/tx/test_indexes2.py index 64892abac..d53212b0e 100644 --- a/tests/tx/test_indexes2.py +++ b/tests/tx/test_indexes2.py @@ -45,7 +45,7 @@ def test_timestamp_index(self): from hathor.indexes.memory_timestamp_index import MemoryTimestampIndex from hathor.indexes.rocksdb_timestamp_index import RocksDBTimestampIndex from hathor.indexes.timestamp_index import RangeIdx - rocksdb_index = RocksDBTimestampIndex(self.create_tmp_rocksdb_db(), cf_name=b'foo') + rocksdb_index = RocksDBTimestampIndex(self.create_tmp_rocksdb_db(), 'foo') memory_index = MemoryTimestampIndex() for tx in self.transactions: rocksdb_index.add_tx(tx) diff --git a/tests/tx/test_indexes3.py b/tests/tx/test_indexes3.py index 661cda08b..6eaa09904 100644 --- a/tests/tx/test_indexes3.py +++ b/tests/tx/test_indexes3.py @@ -41,8 +41,6 @@ def _build_randomized_blockchain(self): @pytest.mark.flaky(max_runs=3, min_passes=1) def test_tips_index_initialization(self): - from intervaltree import IntervalTree - # XXX: this test makes use of the internals of TipsIndex manager = self._build_randomized_blockchain() tx_storage = manager.tx_storage @@ -56,9 +54,8 @@ def test_tips_index_initialization(self): base_block_tips_tree = tx_storage.indexes.block_tips.tree.copy() base_tx_tips_tree = tx_storage.indexes.tx_tips.tree.copy() - # reset the indexes and force a manual initialization - tx_storage._reset_cache() - manager._initialize_components() + # reset the indexes, which will force a re-initialization of all indexes + tx_storage._manually_initialize() reinit_all_tips_tree = tx_storage.indexes.all_tips.tree.copy() reinit_block_tips_tree = tx_storage.indexes.block_tips.tree.copy() @@ -66,14 +63,11 @@ def test_tips_index_initialization(self): self.assertEqual(reinit_all_tips_tree, base_all_tips_tree) self.assertEqual(reinit_block_tips_tree, base_block_tips_tree) + self.assertEqual(len(reinit_tx_tips_tree), len(base_tx_tips_tree)) self.assertEqual(reinit_tx_tips_tree, base_tx_tips_tree) - # reset again but now initilize from the new function - # XXX: manually reset each index, because we're using MemoryTimestampIndex and we need that for the new init - for tip_index in [tx_storage.indexes.all_tips, tx_storage.indexes.block_tips, tx_storage.indexes.tx_tips]: - tip_index.tx_last_interval = {} - tip_index.tree = IntervalTree() - tx_storage.indexes._manually_initialize_tips_indexes(tx_storage) + # reset again + tx_storage._manually_initialize() newinit_all_tips_tree = tx_storage.indexes.all_tips.tree.copy() newinit_block_tips_tree = tx_storage.indexes.block_tips.tree.copy()