Skip to content

Commit

Permalink
refactor(builder): Build IndexesManager on builders
Browse files Browse the repository at this point in the history
  • Loading branch information
msbrogli committed Jun 7, 2023
1 parent 0cf0e1d commit 229dac2
Show file tree
Hide file tree
Showing 14 changed files with 170 additions and 135 deletions.
76 changes: 55 additions & 21 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@
from hathor.event import EventManager
from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage, EventStorage
from hathor.event.websocket import EventWebsocketFactory
from hathor.indexes import IndexesManager
from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager
from hathor.manager import HathorManager
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer_id import PeerId
from hathor.pubsub import PubSubManager
from hathor.storage import RocksDBStorage
from hathor.stratum import StratumFactory
from hathor.transaction.storage import TransactionMemoryStorage, TransactionRocksDBStorage, TransactionStorage
from hathor.transaction.storage import (
TransactionCacheStorage,
TransactionMemoryStorage,
TransactionRocksDBStorage,
TransactionStorage,
)
from hathor.util import Random, Reactor, get_environment_info
from hathor.wallet import BaseWallet, Wallet

Expand Down Expand Up @@ -92,8 +97,11 @@ def __init__(self) -> None:
self._rocksdb_path: Optional[str] = None
self._rocksdb_storage: Optional[RocksDBStorage] = None
self._rocksdb_cache_capacity: Optional[int] = None
self._rocksdb_with_index: Optional[bool] = None

self._tx_storage_cache: bool = False
self._tx_storage_cache_capacity: Optional[int] = None

self._indexes_manager: Optional[IndexesManager] = None
self._tx_storage: Optional[TransactionStorage] = None
self._event_storage: Optional[EventStorage] = None

Expand Down Expand Up @@ -138,9 +146,8 @@ def build(self) -> BuildArtifacts:

wallet = self._get_or_create_wallet()
event_manager = self._get_or_create_event_manager()
tx_storage = self._get_or_create_tx_storage()
indexes = tx_storage.indexes
assert indexes is not None
indexes = self._get_or_create_indexes_manager()
tx_storage = self._get_or_create_tx_storage(indexes)

if self._enable_address_index:
indexes.enable_address_index(pubsub)
Expand Down Expand Up @@ -299,28 +306,51 @@ def _get_p2p_manager(self) -> ConnectionsManager:
)
return p2p_manager

def _get_or_create_tx_storage(self) -> TransactionStorage:
def _get_or_create_indexes_manager(self) -> IndexesManager:
if self._indexes_manager is not None:
return self._indexes_manager

if self._force_memory_index or self._storage_type == StorageType.MEMORY:
self._indexes_manager = MemoryIndexesManager()

elif self._storage_type == StorageType.ROCKSDB:
rocksdb_storage = self._get_or_create_rocksdb_storage()
self._indexes_manager = RocksDBIndexesManager(rocksdb_storage)

else:
raise NotImplementedError

return self._indexes_manager

def _get_or_create_tx_storage(self, indexes: IndexesManager) -> TransactionStorage:
if self._tx_storage is not None:
# If a tx storage is provided, set the indexes manager to it.
self._tx_storage.indexes = indexes
return self._tx_storage

store_indexes: Optional[IndexesManager] = indexes
if self._tx_storage_cache:
cache_indexes = indexes
store_indexes = None

if self._storage_type == StorageType.MEMORY:
return TransactionMemoryStorage()
self._tx_storage = TransactionMemoryStorage(indexes=store_indexes)

if self._storage_type == StorageType.ROCKSDB:
elif self._storage_type == StorageType.ROCKSDB:
rocksdb_storage = self._get_or_create_rocksdb_storage()
use_memory_index = self._force_memory_index
self._tx_storage = TransactionRocksDBStorage(rocksdb_storage, indexes=store_indexes)

kwargs = {}
if self._rocksdb_with_index is not None:
kwargs = dict(with_index=self._rocksdb_with_index)
else:
raise NotImplementedError

return TransactionRocksDBStorage(
rocksdb_storage,
use_memory_indexes=use_memory_index,
**kwargs
)
if self._tx_storage_cache:
reactor = self._get_reactor()
kwargs: Dict[str, Any] = {}
if self._tx_storage_cache_capacity is not None:
kwargs['capacity'] = self._tx_storage_cache_capacity
self._tx_storage = TransactionCacheStorage(self._tx_storage, reactor, indexes=cache_indexes, **kwargs)

raise NotImplementedError
return self._tx_storage

def _get_or_create_event_storage(self) -> EventStorage:
if self._event_storage is not None:
Expand Down Expand Up @@ -354,16 +384,20 @@ def use_memory(self) -> 'Builder':
def use_rocksdb(
self,
path: str,
with_index: Optional[bool] = None,
cache_capacity: Optional[int] = None
) -> 'Builder':
self.check_if_can_modify()
self._storage_type = StorageType.ROCKSDB
self._rocksdb_path = path
self._rocksdb_with_index = with_index
self._rocksdb_cache_capacity = cache_capacity
return self

def use_tx_storage_cache(self, capacity: Optional[int] = None) -> 'Builder':
self.check_if_can_modify()
self._tx_storage_cache = True
self._tx_storage_cache_capacity = capacity
return self

def force_memory_index(self) -> 'Builder':
self.check_if_can_modify()
self._force_memory_index = True
Expand Down
24 changes: 18 additions & 6 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from hathor.consensus import ConsensusAlgorithm
from hathor.event import EventManager
from hathor.exception import BuilderError
from hathor.indexes import IndexesManager
from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager
from hathor.manager import HathorManager
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer_id import PeerId
Expand Down Expand Up @@ -96,13 +96,15 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa

tx_storage: TransactionStorage
event_storage: EventStorage
indexes: IndexesManager
self.rocksdb_storage: Optional[RocksDBStorage] = None
self.event_ws_factory: Optional[EventWebsocketFactory] = None

if args.memory_storage:
self.check_or_raise(not args.data, '--data should not be used with --memory-storage')
# if using MemoryStorage, no need to have cache
tx_storage = TransactionMemoryStorage()
indexes = MemoryIndexesManager()
tx_storage = TransactionMemoryStorage(indexes)
event_storage = EventMemoryStorage()
self.check_or_raise(not args.x_rocksdb_indexes, 'RocksDB indexes require RocksDB data')
self.log.info('with storage', storage_class=type(tx_storage).__name__)
Expand All @@ -112,15 +114,25 @@ def create_manager(self, reactor: PosixReactorBase, args: Namespace) -> HathorMa
self.log.warn('--rocksdb-storage is now implied, no need to specify it')
cache_capacity = args.rocksdb_cache
self.rocksdb_storage = RocksDBStorage(path=args.data, cache_capacity=cache_capacity)
tx_storage = TransactionRocksDBStorage(self.rocksdb_storage,
with_index=(not args.cache),
use_memory_indexes=args.memory_indexes)

# Initialize indexes manager.
if args.memory_indexes:
indexes = MemoryIndexesManager()
else:
indexes = RocksDBIndexesManager(self.rocksdb_storage)

kwargs = {}
if not args.cache:
# We should only pass indexes if cache is disable. Otherwise,
# only TransactionCacheStorage should have indexes.
kwargs['indexes'] = indexes
tx_storage = TransactionRocksDBStorage(self.rocksdb_storage, **kwargs)
event_storage = EventRocksDBStorage(self.rocksdb_storage)

self.log.info('with storage', storage_class=type(tx_storage).__name__, path=args.data)
if args.cache:
self.check_or_raise(not args.memory_storage, '--cache should not be used with --memory-storage')
tx_storage = TransactionCacheStorage(tx_storage, reactor)
tx_storage = TransactionCacheStorage(tx_storage, reactor, indexes=indexes)
if args.cache_size:
tx_storage.capacity = args.cache_size
if args.cache_interval:
Expand Down
7 changes: 3 additions & 4 deletions hathor/indexes/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@
from hathor.util import progress

if TYPE_CHECKING: # pragma: no cover
import rocksdb

from hathor.pubsub import PubSubManager
from hathor.storage import RocksDBStorage
from hathor.transaction.storage import TransactionStorage

logger = get_logger()
Expand Down Expand Up @@ -331,13 +330,13 @@ def enable_deps_index(self) -> None:


class RocksDBIndexesManager(IndexesManager):
def __init__(self, db: 'rocksdb.DB') -> None:
def __init__(self, rocksdb_storage: 'RocksDBStorage') -> None:
from hathor.indexes.partial_rocksdb_tips_index import PartialRocksDBTipsIndex
from hathor.indexes.rocksdb_height_index import RocksDBHeightIndex
from hathor.indexes.rocksdb_info_index import RocksDBInfoIndex
from hathor.indexes.rocksdb_timestamp_index import RocksDBTimestampIndex

self._db = db
self._db = rocksdb_storage.get_db()

self.info = RocksDBInfoIndex(self._db)
self.height = RocksDBHeightIndex(self._db)
Expand Down
3 changes: 1 addition & 2 deletions hathor/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from hathor.transaction.block import Block
from hathor.transaction.storage import TransactionRocksDBStorage, TransactionStorage
from hathor.transaction.storage.cache_storage import TransactionCacheStorage
from hathor.transaction.storage.memory_storage import TransactionMemoryStorage
from hathor.util import Reactor

if TYPE_CHECKING:
Expand Down Expand Up @@ -62,7 +61,7 @@ class Metrics:
pubsub: PubSubManager
avg_time_between_blocks: int
connections: ConnectionsManager
tx_storage: TransactionStorage = TransactionMemoryStorage()
tx_storage: TransactionStorage
# Twisted reactor that handles the time and callLater
reactor: Optional[Reactor] = None

Expand Down
7 changes: 2 additions & 5 deletions hathor/transaction/storage/cache_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class TransactionCacheStorage(BaseTransactionStorage):
dirty_txs: Set[bytes]

def __init__(self, store: 'BaseTransactionStorage', reactor: Reactor, interval: int = 5,
capacity: int = 10000, *, _clone_if_needed: bool = False):
capacity: int = 10000, *, indexes: Optional[IndexesManager], _clone_if_needed: bool = False):
"""
:param store: a subclass of BaseTransactionStorage
:type store: :py:class:`hathor.transaction.storage.BaseTransactionStorage`
Expand Down Expand Up @@ -67,7 +67,7 @@ def __init__(self, store: 'BaseTransactionStorage', reactor: Reactor, interval:

# we need to use only one weakref dict, so we must first initialize super, and then
# attribute the same weakref for both.
super().__init__()
super().__init__(indexes=indexes)
self._tx_weakref = store._tx_weakref

def set_capacity(self, capacity: int) -> None:
Expand Down Expand Up @@ -149,9 +149,6 @@ def save_transaction(self, tx: 'BaseTransaction', *, only_metadata: bool = False
def get_all_genesis(self) -> Set[BaseTransaction]:
return self.store.get_all_genesis()

def _build_indexes_manager(self) -> IndexesManager:
return self.store._build_indexes_manager()

def _save_transaction(self, tx: BaseTransaction, *, only_metadata: bool = False) -> None:
"""Saves the transaction without modifying TimestampIndex entries (in superclass)."""
assert tx.hash is not None
Expand Down
5 changes: 3 additions & 2 deletions hathor/transaction/storage/memory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from typing import Any, Dict, Iterator, Optional, TypeVar

from hathor.indexes import IndexesManager
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
from hathor.transaction.storage.migrations import MigrationState
from hathor.transaction.storage.transaction_storage import BaseTransactionStorage
Expand All @@ -24,7 +25,7 @@


class TransactionMemoryStorage(BaseTransactionStorage):
def __init__(self, with_index: bool = True, *, _clone_if_needed: bool = False) -> None:
def __init__(self, indexes: Optional[IndexesManager] = None, *, _clone_if_needed: bool = False) -> None:
"""
:param _clone_if_needed: *private parameter*, defaults to True, controls whether to clone
transaction/blocks/metadata when returning those objects.
Expand All @@ -35,7 +36,7 @@ def __init__(self, with_index: bool = True, *, _clone_if_needed: bool = False) -
# Store custom key/value attributes
self.attributes: Dict[str, Any] = {}
self._clone_if_needed = _clone_if_needed
super().__init__(with_index=with_index)
super().__init__(indexes=indexes)

def _check_and_set_network(self) -> None:
# XXX: does not apply to memory storage, can safely be ignored
Expand Down
14 changes: 3 additions & 11 deletions hathor/transaction/storage/rocksdb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from structlog import get_logger

from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager
from hathor.indexes import IndexesManager
from hathor.storage import RocksDBStorage
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
from hathor.transaction.storage.migrations import MigrationState
Expand All @@ -43,16 +43,14 @@ class TransactionRocksDBStorage(BaseTransactionStorage):
It uses Protobuf serialization internally.
"""

def __init__(self, rocksdb_storage: RocksDBStorage, with_index: bool = True, use_memory_indexes: bool = False):
self._use_memory_indexes = use_memory_indexes

def __init__(self, rocksdb_storage: RocksDBStorage, indexes: Optional[IndexesManager] = None):
self._cf_tx = rocksdb_storage.get_or_create_column_family(_CF_NAME_TX)
self._cf_meta = rocksdb_storage.get_or_create_column_family(_CF_NAME_META)
self._cf_attr = rocksdb_storage.get_or_create_column_family(_CF_NAME_ATTR)
self._cf_migrations = rocksdb_storage.get_or_create_column_family(_CF_NAME_MIGRATIONS)

self._db = rocksdb_storage.get_db()
super().__init__(with_index=with_index)
super().__init__(indexes=indexes)

def _load_from_bytes(self, tx_data: bytes, meta_data: bytes) -> 'BaseTransaction':
from hathor.transaction.base_transaction import tx_or_block_from_bytes
Expand All @@ -63,12 +61,6 @@ def _load_from_bytes(self, tx_data: bytes, meta_data: bytes) -> 'BaseTransaction
tx.storage = self
return tx

def _build_indexes_manager(self) -> IndexesManager:
if self._use_memory_indexes:
return MemoryIndexesManager()
else:
return RocksDBIndexesManager(self._db)

def _tx_to_bytes(self, tx: 'BaseTransaction') -> bytes:
return bytes(tx)

Expand Down
14 changes: 4 additions & 10 deletions hathor/transaction/storage/transaction_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from structlog import get_logger

from hathor.conf import HathorSettings
from hathor.indexes import IndexesManager, MemoryIndexesManager
from hathor.indexes import IndexesManager
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import PubSubManager
from hathor.transaction.base_transaction import BaseTransaction
Expand Down Expand Up @@ -1037,17 +1037,14 @@ def compute_transactions_that_became_invalid(self) -> List[BaseTransaction]:
class BaseTransactionStorage(TransactionStorage):
indexes: Optional[IndexesManager]

def __init__(self, with_index: bool = True, pubsub: Optional[Any] = None) -> None:
def __init__(self, indexes: Optional[IndexesManager] = None, pubsub: Optional[Any] = None) -> None:
super().__init__()

# Pubsub is used to publish tx voided and winner but it's optional
self.pubsub = pubsub

# Initialize index if needed.
if with_index:
self.indexes = self._build_indexes_manager()
else:
self.indexes = None
# Indexes.
self.indexes = indexes

# Either save or verify all genesis.
self._save_or_verify_genesis()
Expand All @@ -1066,9 +1063,6 @@ def first_timestamp(self) -> int:
def _save_transaction(self, tx: BaseTransaction, *, only_metadata: bool = False) -> None:
raise NotImplementedError

def _build_indexes_manager(self) -> IndexesManager:
return MemoryIndexesManager()

def reset_indexes(self) -> None:
"""Reset all indexes. This function should not be called unless you know what you are doing."""
assert self.indexes is not None, 'Cannot reset indexes because they have not been enabled.'
Expand Down
Loading

0 comments on commit 229dac2

Please sign in to comment.