Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(indexes): Create a generic TxGroupIndex and uses it on AddressIndex #524

Merged
merged 1 commit into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions hathor/indexes/address_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,41 @@

from structlog import get_logger

from hathor.indexes.base_index import BaseIndex
from hathor.indexes.tx_group_index import TxGroupIndex
from hathor.pubsub import HathorEvents
from hathor.transaction import BaseTransaction

if TYPE_CHECKING: # pragma: no cover
from hathor.pubsub import PubSubManager
from hathor.pubsub import EventArguments, PubSubManager

logger = get_logger()


class AddressIndex(BaseIndex):
class AddressIndex(TxGroupIndex[str]):
""" Index of inputs/outputs by address
"""
pubsub: Optional['PubSubManager']

def init_loop_step(self, tx: BaseTransaction) -> None:
self.add_tx(tx)

def publish_tx(self, tx: BaseTransaction, *, addresses: Optional[Iterable[str]] = None) -> None:
def _handle_tx_event(self, key: HathorEvents, args: 'EventArguments') -> None:
""" This method is called when pubsub publishes an event that we subscribed
"""
data = args.__dict__
tx = data['tx']
meta = tx.get_metadata()
if meta.has_voided_by_changed_since_last_call() or meta.has_spent_by_changed_since_last_call():
self._publish_tx(tx)

def _subscribe_pubsub_events(self) -> None:
""" Subscribe wallet index to receive voided/winner tx pubsub events
"""
assert self.pubsub is not None
# Subscribe to voided/winner events
self.pubsub.subscribe(HathorEvents.CONSENSUS_TX_UPDATE, self._handle_tx_event)

def _publish_tx(self, tx: BaseTransaction, *, addresses: Optional[Iterable[str]] = None) -> None:
""" Publish WALLET_ADDRESS_HISTORY for all addresses of a transaction.
"""
from hathor.pubsub import HathorEvents
Expand Down
65 changes: 13 additions & 52 deletions hathor/indexes/memory_address_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,84 +12,45 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import defaultdict
from typing import TYPE_CHECKING, DefaultDict, List, Optional, Set
from typing import TYPE_CHECKING, Iterable, List, Optional

from structlog import get_logger

from hathor.indexes.address_index import AddressIndex
from hathor.pubsub import HathorEvents
from hathor.indexes.memory_tx_group_index import MemoryTxGroupIndex
from hathor.transaction import BaseTransaction

if TYPE_CHECKING: # pragma: no cover
from hathor.pubsub import EventArguments, PubSubManager
from hathor.pubsub import PubSubManager

logger = get_logger()


class MemoryAddressIndex(AddressIndex):
class MemoryAddressIndex(MemoryTxGroupIndex[str], AddressIndex):
""" Index of inputs/outputs by address
"""

index: DefaultDict[str, Set[bytes]]

def __init__(self, pubsub: Optional['PubSubManager'] = None) -> None:
super().__init__()
self.pubsub = pubsub
self.force_clear()
pedroferreira1 marked this conversation as resolved.
Show resolved Hide resolved
if self.pubsub:
self.subscribe_pubsub_events()
self._subscribe_pubsub_events()

def get_db_name(self) -> Optional[str]:
return None

def force_clear(self) -> None:
self.index = defaultdict(set)

def subscribe_pubsub_events(self) -> None:
""" Subscribe wallet index to receive voided/winner tx pubsub events
"""
assert self.pubsub is not None
# Subscribe to voided/winner events
self.pubsub.subscribe(HathorEvents.CONSENSUS_TX_UPDATE, self.handle_tx_event)
def _extract_keys(self, tx: BaseTransaction) -> Iterable[str]:
return tx.get_related_addresses()

def add_tx(self, tx: BaseTransaction) -> None:
""" Add tx inputs and outputs to the wallet index (indexed by its addresses).
"""
assert tx.hash is not None

addresses = tx.get_related_addresses()
for address in addresses:
self.index[address].add(tx.hash)

self.publish_tx(tx, addresses=addresses)

def remove_tx(self, tx: BaseTransaction) -> None:
""" Remove tx inputs and outputs from the wallet index (indexed by its addresses).
"""
assert tx.hash is not None

addresses = tx.get_related_addresses()
for address in addresses:
self.index[address].discard(tx.hash)

def handle_tx_event(self, key: HathorEvents, args: 'EventArguments') -> None:
""" This method is called when pubsub publishes an event that we subscribed
"""
data = args.__dict__
tx = data['tx']
meta = tx.get_metadata()
if meta.has_voided_by_changed_since_last_call() or meta.has_spent_by_changed_since_last_call():
self.publish_tx(tx)
super().add_tx(tx)
self._publish_tx(tx)

def get_from_address(self, address: str) -> List[bytes]:
""" Get list of transaction hashes of an address
"""
return list(self.index[address])
return list(self._get_from_key(address))

def get_sorted_from_address(self, address: str) -> List[bytes]:
""" Get a sorted list of transaction hashes of an address
"""
return sorted(self.index[address])
return list(self._get_sorted_from_key(address))

def is_address_empty(self, address: str) -> bool:
return not bool(self.index[address])
return self._is_key_empty(address)
69 changes: 69 additions & 0 deletions hathor/indexes/memory_tx_group_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright 2021 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from collections import defaultdict
from typing import DefaultDict, Iterable, Set, Sized, TypeVar

from structlog import get_logger

from hathor.indexes.tx_group_index import TxGroupIndex
from hathor.transaction import BaseTransaction
from hathor.util import not_none

logger = get_logger()

KT = TypeVar('KT', bound=Sized)


class MemoryTxGroupIndex(TxGroupIndex[KT]):
"""Memory implementation of the TxGroupIndex. This class is abstract and cannot be used directly.
"""

index: DefaultDict[KT, Set[bytes]]

def __init__(self) -> None:
self.force_clear()

def force_clear(self) -> None:
self.index = defaultdict(set)

def _add_tx(self, key: KT, tx: BaseTransaction) -> None:
self.index[key].add(not_none(tx.hash))

@abstractmethod
def _extract_keys(self, tx: BaseTransaction) -> Iterable[KT]:
"""Extract the keys related to a given tx. The transaction will be added to all extracted keys."""
raise NotImplementedError

def add_tx(self, tx: BaseTransaction) -> None:
assert tx.hash is not None

for key in self._extract_keys(tx):
self._add_tx(key, tx)

def remove_tx(self, tx: BaseTransaction) -> None:
assert tx.hash is not None

for key in self._extract_keys(tx):
self.index[key].discard(tx.hash)

def _get_from_key(self, key: KT) -> Iterable[bytes]:
yield from self.index[key]

def _get_sorted_from_key(self, key: KT) -> Iterable[bytes]:
return sorted(self.index[key])

def _is_key_empty(self, key: KT) -> bool:
return not bool(self.index[key])
132 changes: 22 additions & 110 deletions hathor/indexes/rocksdb_address_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,150 +12,62 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
from typing import TYPE_CHECKING, Iterable, List, Optional

from structlog import get_logger

from hathor.indexes.address_index import AddressIndex
from hathor.indexes.rocksdb_tx_group_index import RocksDBTxGroupIndex
from hathor.indexes.rocksdb_utils import RocksDBIndexUtils
from hathor.pubsub import HathorEvents
from hathor.transaction import BaseTransaction

if TYPE_CHECKING: # pragma: no cover
import rocksdb

from hathor.pubsub import EventArguments, PubSubManager
from hathor.pubsub import PubSubManager

logger = get_logger()

_CF_NAME_ADDRESS_INDEX = b'address-index'
_DB_NAME: str = 'address'


class RocksDBAddressIndex(AddressIndex, RocksDBIndexUtils):
class RocksDBAddressIndex(RocksDBTxGroupIndex[str], AddressIndex, RocksDBIndexUtils):
""" Index of inputs/outputs by address.
"""

This index uses rocksdb and the following key format:

key = [address][tx.timestamp][tx.hash]
|--34b--||--4 bytes---||--32b--|

It works nicely because rocksdb uses a tree sorted by key under the hood.
_KEY_SIZE = 34

The timestamp must be serialized in big-endian, so ts1 > ts2 implies that bytes(ts1) > bytes(ts2),
hence the transactions are sorted by timestamp.
"""
def __init__(self, db: 'rocksdb.DB', *, cf_name: Optional[bytes] = None,
pubsub: Optional['PubSubManager'] = None) -> None:
self.log = logger.new()
RocksDBIndexUtils.__init__(self, db, cf_name or _CF_NAME_ADDRESS_INDEX)
RocksDBTxGroupIndex.__init__(self, db, cf_name or _CF_NAME_ADDRESS_INDEX)

self.pubsub = pubsub
if self.pubsub:
self.subscribe_pubsub_events()
self._subscribe_pubsub_events()

def _serialize_key(self, key: str) -> bytes:
return key.encode('ascii')

def _deserialize_key(self, key_bytes: bytes) -> str:
return key_bytes.decode('ascii')

def _extract_keys(self, tx: BaseTransaction) -> Iterable[str]:
return tx.get_related_addresses()

def get_db_name(self) -> Optional[str]:
# XXX: we don't need it to be parametrizable, so this is fine
return _DB_NAME

def force_clear(self) -> None:
self.clear()

def _to_key(self, address: str, tx: Optional[BaseTransaction] = None) -> bytes:
import struct
assert len(address) == 34
key = address.encode('ascii')
if tx:
assert tx.hash is not None
assert len(tx.hash) == 32
key += struct.pack('>I', tx.timestamp) + tx.hash
assert len(key) == 34 + 4 + 32
return key

def _from_key(self, key: bytes) -> Tuple[str, int, bytes]:
import struct
assert len(key) == 34 + 4 + 32
address = key[:34].decode('ascii')
timestamp: int
(timestamp,) = struct.unpack('>I', key[34:38])
tx_hash = key[38:]
assert len(address) == 34
assert len(tx_hash) == 32
return address, timestamp, tx_hash

def subscribe_pubsub_events(self) -> None:
""" Subscribe wallet index to receive voided/winner tx pubsub events
"""
assert self.pubsub is not None
# Subscribe to voided/winner events
self.pubsub.subscribe(HathorEvents.CONSENSUS_TX_UPDATE, self.handle_tx_event)

def add_tx(self, tx: BaseTransaction) -> None:
""" Add tx inputs and outputs to the wallet index (indexed by its addresses).
"""
assert tx.hash is not None

addresses = tx.get_related_addresses()
for address in addresses:
self.log.debug('put address', address=address)
self._db.put((self._cf, self._to_key(address, tx)), b'')

self.publish_tx(tx, addresses=addresses)

def remove_tx(self, tx: BaseTransaction) -> None:
""" Remove tx inputs and outputs from the wallet index (indexed by its addresses).
"""
assert tx.hash is not None

addresses = tx.get_related_addresses()
for address in addresses:
self.log.debug('delete address', address=address)
self._db.delete((self._cf, self._to_key(address, tx)))

def handle_tx_event(self, key: HathorEvents, args: 'EventArguments') -> None:
""" This method is called when pubsub publishes an event that we subscribed
"""
data = args.__dict__
tx = data['tx']
meta = tx.get_metadata()
if meta.has_voided_by_changed_since_last_call() or meta.has_spent_by_changed_since_last_call():
self.publish_tx(tx)

def _get_from_address_iter(self, address: str) -> Iterable[bytes]:
self.log.debug('seek to', address=address)
it = self._db.iterkeys(self._cf)
it.seek(self._to_key(address))
for _cf, key in it:
addr, _, tx_hash = self._from_key(key)
if addr != address:
break
self.log.debug('seek found', tx=tx_hash.hex())
yield tx_hash
self.log.debug('seek end')
super().add_tx(tx)
self._publish_tx(tx)

def get_from_address(self, address: str) -> List[bytes]:
""" Get list of transaction hashes of an address
"""
return list(self._get_from_address_iter(address))
return list(self._get_from_key(address))

def get_sorted_from_address(self, address: str) -> List[bytes]:
""" Get a sorted list of transaction hashes of an address
"""
return list(self._get_from_address_iter(address))
return list(self._get_sorted_from_key(address))

def is_address_empty(self, address: str) -> bool:
self.log.debug('seek to', address=address)
it = self._db.iterkeys(self._cf)
seek_key = self._to_key(address)
it.seek(seek_key)
cf_key = it.get()
if not cf_key:
return True
_cf, key = cf_key
# XXX: this means we reached the end it did not found any key
if key == seek_key:
return True
addr, _, _ = self._from_key(key)
is_empty = addr != address
self.log.debug('seek empty', is_empty=is_empty)
return is_empty
return self._is_key_empty(address)
Loading