Skip to content

Commit

Permalink
fix(indexes): efficient rocksdb mempool-tips initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
jansegre committed Feb 5, 2024
1 parent b6748ee commit 3f52b0a
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 13 deletions.
16 changes: 14 additions & 2 deletions hathor/indexes/base_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,22 @@ def __init__(self) -> None:
self._settings = get_global_settings()
self.log = logger.new()

def init_start(self, indexes_manager: 'IndexesManager') -> None:
""" This method will always be called when starting the index manager, regardless of initialization state.
def pre_init(self, indexes_manager: 'IndexesManager', *, in_init_loop: bool) -> None:
""" This method will always be called when starting the index manager, before running the init loop.
It comes with a no-op implementation by default because usually indexes will not need this.
The boolean parameter `in_init_loop` indicates weather this index is set to have the method `init_loop_step`
called.
"""
pass

def post_init(self, indexes_manager: 'IndexesManager', *, in_init_loop: bool) -> None:
""" This method will always be called when starting the index manager, after running the init loop.
It comes with a no-op implementation by default because usually indexes will not need this.
The boolean parameter `in_init_loop` indicates weather this index had the method `init_loop_step` called.
"""
pass

Expand Down
11 changes: 7 additions & 4 deletions hathor/indexes/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None:

self.log.debug('indexes pre-init')
for index in self.iter_all_indexes():
index.init_start(self)
index.pre_init(self, in_init_loop=(index in indexes_to_init))

if indexes_to_init:
overall_scope = reduce(operator.__or__, map(lambda i: i.get_scope(), indexes_to_init))
Expand All @@ -176,6 +176,10 @@ def _manually_initialize(self, tx_storage: 'TransactionStorage') -> None:
if index.get_scope().matches(tx):
index.init_loop_step(tx)

self.log.debug('indexes post-init')
for index in self.iter_all_indexes():
index.post_init(self, in_init_loop=(index in indexes_to_init))

# Restore cache capacity.
if isinstance(tx_storage, TransactionCacheStorage):
assert cache_capacity is not None
Expand Down Expand Up @@ -348,7 +352,6 @@ def enable_utxo_index(self) -> None:
self.utxo = RocksDBUtxoIndex(self._db)

def enable_mempool_index(self) -> None:
from hathor.indexes.memory_mempool_tips_index import MemoryMempoolTipsIndex
from hathor.indexes.rocksdb_mempool_tips_index import RocksDBMempoolTipsIndex
if self.mempool_tips is None:
# XXX: use of RocksDBMempoolTipsIndex is very slow and was suspended
self.mempool_tips = MemoryMempoolTipsIndex()
self.mempool_tips = RocksDBMempoolTipsIndex(self._db)
2 changes: 1 addition & 1 deletion hathor/indexes/memory_info_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self):
self._first_timestamp = 0
self._latest_timestamp = 0

def init_start(self, indexes_manager: 'IndexesManager') -> None:
def pre_init(self, indexes_manager: 'IndexesManager', *, in_init_loop: bool) -> None:
self.force_clear()

def get_db_name(self) -> Optional[str]:
Expand Down
4 changes: 0 additions & 4 deletions hathor/indexes/mempool_tips_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ def get_scope(self) -> Scope:
def init_loop_step(self, tx: BaseTransaction) -> None:
self.update(tx)

# originally tx_storage.update_mempool_tips
@abstractmethod
def update(self, tx: BaseTransaction, *, remove: Optional[bool] = None) -> None:
"""
Expand All @@ -52,23 +51,20 @@ def update(self, tx: BaseTransaction, *, remove: Optional[bool] = None) -> None:
"""
raise NotImplementedError

# originally tx_storage.iter_mempool_tips
@abstractmethod
def iter(self, tx_storage: 'TransactionStorage', max_timestamp: Optional[float] = None) -> Iterator[Transaction]:
"""
Iterate over txs that are tips, a subset of the mempool (i.e. not tx-parent of another tx on the mempool).
"""
raise NotImplementedError

# originally tx_storage.iter_mempool
@abstractmethod
def iter_all(self, tx_storage: 'TransactionStorage') -> Iterator[Transaction]:
"""
Iterate over the transactions on the "mempool", even the ones that are not tips.
"""
raise NotImplementedError

# originally tx_storage.get_mempool_tips_index
@abstractmethod
def get(self) -> set[bytes]:
"""
Expand Down
2 changes: 1 addition & 1 deletion hathor/indexes/partial_rocksdb_tips_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def _from_key(self, key: bytes) -> Interval:
assert len(tx_id) == 32
return Interval(_from_db_value(begin), _from_db_value(end), tx_id)

def init_start(self, indexes_manager: 'IndexesManager') -> None:
def pre_init(self, indexes_manager: 'IndexesManager', *, in_init_loop: bool) -> None:
log = self.log.new(index=f'tips-{self._name}')
total: Optional[int]
if self is indexes_manager.all_tips:
Expand Down
2 changes: 1 addition & 1 deletion hathor/indexes/rocksdb_info_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, db: 'rocksdb.DB', *, cf_name: Optional[bytes] = None) -> None
RocksDBIndexUtils.__init__(self, db, cf_name or _CF_NAME_ADDRESS_INDEX)
MemoryInfoIndex.__init__(self)

def init_start(self, indexes_manager: 'IndexesManager') -> None:
def pre_init(self, indexes_manager: 'IndexesManager', *, in_init_loop: bool) -> None:
self._load_all_values()
self.log.info('loaded info-index', block_count=self._block_count, tx_count=self._tx_count,
first_timestamp=self._first_timestamp, latest_timestamp=self._latest_timestamp)
Expand Down
23 changes: 23 additions & 0 deletions hathor/indexes/rocksdb_mempool_tips_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

from structlog import get_logger

from hathor.indexes.memory_mempool_tips_index import MemoryMempoolTipsIndex
from hathor.indexes.mempool_tips_index import ByteCollectionMempoolTipsIndex
from hathor.indexes.rocksdb_utils import RocksDBSimpleSet
from hathor.transaction.base_transaction import BaseTransaction

if TYPE_CHECKING: # pragma: no cover
import rocksdb

from hathor.indexes.manager import IndexesManager

logger = get_logger()

_CF_NAME_MEMPOOL_TIPS_INDEX = b'mempool-tips-index'
Expand All @@ -30,11 +34,30 @@

class RocksDBMempoolTipsIndex(ByteCollectionMempoolTipsIndex):
_index: RocksDBSimpleSet
_temp_mem_index_for_init: Optional[MemoryMempoolTipsIndex]

def __init__(self, db: 'rocksdb.DB', *, cf_name: Optional[bytes] = None) -> None:
self.log = logger.new()
_cf_name = cf_name or _CF_NAME_MEMPOOL_TIPS_INDEX
self._index = RocksDBSimpleSet(db, self.log, cf_name=_cf_name)
self._temp_mem_index_for_init = None

def init_loop_step(self, tx: BaseTransaction) -> None:
assert self._temp_mem_index_for_init is not None
self._temp_mem_index_for_init.init_loop_step(tx)

def pre_init(self, indexes_manager: 'IndexesManager', *, in_init_loop: bool) -> None:
if in_init_loop:
self._temp_mem_index_for_init = MemoryMempoolTipsIndex()
self._temp_mem_index_for_init.log = self.log.new(index='init-only')

def post_init(self, indexes_manager: 'IndexesManager', *, in_init_loop: bool) -> None:
if in_init_loop:
assert self._temp_mem_index_for_init is not None
self._index.update(self._temp_mem_index_for_init._index)
del self._temp_mem_index_for_init
self._temp_mem_index_for_init = None
assert self._temp_mem_index_for_init is None

def get_db_name(self) -> Optional[str]:
# XXX: we don't need it to be parametrizable, so this is fine
Expand Down

0 comments on commit 3f52b0a

Please sign in to comment.