Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(indexes): efficient rocksdb mempool-tips initialization #938

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions hathor/indexes/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ def enable_utxo_index(self) -> None:
self.utxo = RocksDBUtxoIndex(self._db)

def enable_mempool_index(self) -> None:
from hathor.indexes.memory_mempool_tips_index import MemoryMempoolTipsIndex
from hathor.indexes.rocksdb_mempool_tips_index import RocksDBMempoolTipsIndex
if self.mempool_tips is None:
# XXX: use of RocksDBMempoolTipsIndex is very slow and was suspended
self.mempool_tips = MemoryMempoolTipsIndex()
self.mempool_tips = RocksDBMempoolTipsIndex(self._db)
42 changes: 34 additions & 8 deletions hathor/indexes/mempool_tips_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from abc import abstractmethod
from collections.abc import Collection
from itertools import chain
from typing import TYPE_CHECKING, Iterable, Iterator, Optional, cast

import structlog
Expand All @@ -27,22 +28,33 @@
from hathor.transaction.storage import TransactionStorage

SCOPE = Scope(
include_blocks=True,
include_blocks=False,
include_txs=True,
include_voided=True,
topological_order=False,
msbrogli marked this conversation as resolved.
Show resolved Hide resolved
)


def any_non_voided(tx_storage: 'TransactionStorage', hashes: Iterable[bytes]) -> bool:
"""
If there's any vertex with hash in hashes that is not voided, this function returns True, otherwise False.

Notice that this means that an empty `hashes` also returns False.
"""
for tx_hash in hashes:
tx = tx_storage.get_transaction(tx_hash)
tx_meta = tx.get_metadata()
if not tx_meta.voided_by:
return True
return False


class MempoolTipsIndex(BaseIndex):
"""Index to access the tips of the mempool transactions, which haven't been confirmed by a block."""

def get_scope(self) -> Scope:
return SCOPE

def init_loop_step(self, tx: BaseTransaction) -> None:
self.update(tx)

# originally tx_storage.update_mempool_tips
@abstractmethod
def update(self, tx: BaseTransaction, *, remove: Optional[bool] = None) -> None:
"""
Expand All @@ -52,23 +64,20 @@ def update(self, tx: BaseTransaction, *, remove: Optional[bool] = None) -> None:
"""
raise NotImplementedError

# originally tx_storage.iter_mempool_tips
@abstractmethod
def iter(self, tx_storage: 'TransactionStorage', max_timestamp: Optional[float] = None) -> Iterator[Transaction]:
"""
Iterate over txs that are tips, a subset of the mempool (i.e. not tx-parent of another tx on the mempool).
"""
raise NotImplementedError

# originally tx_storage.iter_mempool
@abstractmethod
def iter_all(self, tx_storage: 'TransactionStorage') -> Iterator[Transaction]:
"""
Iterate over the transactions on the "mempool", even the ones that are not tips.
"""
raise NotImplementedError

# originally tx_storage.get_mempool_tips_index
@abstractmethod
def get(self) -> set[bytes]:
"""
Expand All @@ -85,6 +94,23 @@ class ByteCollectionMempoolTipsIndex(MempoolTipsIndex):
log: 'structlog.stdlib.BoundLogger'
_index: 'Collection[bytes]'

def init_loop_step(self, tx: BaseTransaction) -> None:
assert tx.hash is not None
assert tx.storage is not None
tx_meta = tx.get_metadata()
# do not include transactions that have been confirmed
if tx_meta.first_block:
return
tx_storage = tx.storage
# do not include transactions that have a non-voided child
if any_non_voided(tx_storage, tx_meta.children):
return
# do not include transactions that have a non-voided spent output
if any_non_voided(tx_storage, chain(*tx_meta.spent_outputs.values())):
return
# include them otherwise
self._add(tx.hash)

@abstractmethod
def _discard(self, tx: bytes) -> None:
raise NotImplementedError()
Expand Down
Loading