From 657829fec517cb37e874b93f2d29a5002ba15d8b Mon Sep 17 00:00:00 2001 From: Ryan Lubke Date: Tue, 3 Dec 2024 19:18:54 -0800 Subject: [PATCH] NearCache checkpoint --- .coveragerc | 8 + Makefile | 3 +- pyproject.toml | 4 +- src/coherence/client.py | 162 ++++++++++++--- src/coherence/entry.py | 12 +- src/coherence/error.py | 31 --- src/coherence/local_cache.py | 335 +++++++++++++++++++++++++++++++ tests/e2e/test_client.py | 42 ++++ tests/unit/test_cache_options.py | 93 +++++++++ tests/unit/test_local_cache.py | 228 +++++++++++++++++++++ 10 files changed, 857 insertions(+), 61 deletions(-) delete mode 100644 src/coherence/error.py create mode 100644 src/coherence/local_cache.py create mode 100644 tests/unit/test_cache_options.py create mode 100644 tests/unit/test_local_cache.py diff --git a/.coveragerc b/.coveragerc index 33733e8..d1ec1b1 100644 --- a/.coveragerc +++ b/.coveragerc @@ -4,3 +4,11 @@ omit = src/coherence/messages_pb2_grpc.py src/coherence/services_pb2.py src/coherence/services_pb2_grpc.py + src/coherence/cache_service_messages_v1_pb2.py + src/coherence/cache_service_messages_v1_pb2_grpc.py + src/coherence/common_messages_v1_pb2.py + src/coherence/common_messages_v1_pb2_grpc.py + src/coherence/proxy_service_messages_v1_pb2.py + src/coherence/proxy_service_messages_v1_pb2_grpc.py + src/coherence/proxy_service_v1_pb2.py + src/coherence/proxy_service_v1_pb2_grpc.py diff --git a/Makefile b/Makefile index 332f8b9..0dd4a16 100644 --- a/Makefile +++ b/Makefile @@ -69,7 +69,8 @@ $(info COMPOSE = $(COMPOSE)) # ---------------------------------------------------------------------------------------------------------------------- # List of unit tests # ---------------------------------------------------------------------------------------------------------------------- -UNIT_TESTS := tests/unit/test_environment.py \ +UNIT_TESTS := tests/unit/test_cache_options.py \ + tests/unit/test_environment.py \ tests/unit/test_serialization.py \ tests/unit/test_extractors.py diff --git a/pyproject.toml b/pyproject.toml index cacdec2..aa4b604 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ classifiers = [ ] [tool.poetry.dependencies] -python = "^3.8" +python = "^3.9" protobuf = "5.28.3" grpcio = "1.68.0" grpcio-tools = "1.68.0" @@ -29,6 +29,7 @@ jsonpickle = ">=3.0,<4.1" pymitter = ">=0.4,<0.6" typing-extensions = ">=4.11,<4.13" types-protobuf = "5.27.0.20240626" +pympler = "1.1" [tool.poetry.dev-dependencies] pytest = "~8.3" @@ -41,6 +42,7 @@ sphinx-rtd-theme = "~2.0" sphinxcontrib-napoleon = "~0.7" m2r = "~0.3" third-party-license-file-generator = "~2024.8" +pyinstrument="5.0.0" [tool.pytest.ini_options] pythonpath = ["src"] diff --git a/src/coherence/client.py b/src/coherence/client.py index dd41e2e..35c0927 100644 --- a/src/coherence/client.py +++ b/src/coherence/client.py @@ -183,6 +183,94 @@ async def inner_async(self, *args, **kwargs): return inner +class NearCacheOptions: + + def __init__( + self, ttl: int = 0, high_units: int = 0, high_units_memory: int = 0, prune_factor: float = 0.80 + ) -> None: + super().__init__() + if high_units < 0 or high_units_memory < 0: + raise ValueError("values for high_units and high_units_memory must be positive") + if ttl == 0 and high_units == 0 and high_units_memory == 0: + raise ValueError("at least one option must be specified and non-zero") + if high_units != 0 and high_units_memory != 0: + raise ValueError("high_units and high_units_memory cannot be used together; specify one or the other") + if 0.01 < prune_factor > 1: + raise ValueError("prune_factor must be between .01 and 1") + + self._ttl = ttl if ttl >= 0 else -1 + self._high_units = high_units + self._high_units_memory = high_units_memory + self._prune_factor = prune_factor + + def __str__(self) -> str: + return ( + f"NearCacheOptions(ttl={self.ttl}, high_units={self.high_units}, high_units_memory={self.high_unit_memory})" + ) + + def __eq__(self, other: Any) -> bool: + if self is other: + return True + + if isinstance(other, NearCacheOptions): + return ( + self.ttl == other.ttl + and self.high_units == other.high_units + and self.high_unit_memory == other.high_unit_memory + ) + + return False + + @property + def ttl(self) -> int: + return self._ttl + + @property + def high_units(self) -> int: + return self._high_units + + @property + def high_unit_memory(self) -> int: + return self._high_units_memory + + @property + def prune_factor(self) -> float: + return self._prune_factor + + +class CacheOptions: + def __init__(self, default_expiry: int = 0, near_cache_options: Optional[NearCacheOptions] = None): + super().__init__() + self._default_expiry: int = default_expiry if default_expiry >= 0 else -1 + self._near_cache_options = near_cache_options + + def __str__(self) -> str: + result: str = f"CacheOptions(default_expiry={self._default_expiry}" + result += ")" if self.near_cache_options is None else f", near_cache_options={self._near_cache_options})" + return result + + def __eq__(self, other: Any) -> bool: + if self is other: + return True + + if isinstance(other, CacheOptions): + return ( + self.default_expiry == other.default_expiry and True + if self.near_cache_options is None + else self.near_cache_options == other.near_cache_options + ) + + return False + + @property + def default_expiry(self) -> int: + return self._default_expiry + + @property + def near_cache_options(self) -> Optional[NearCacheOptions]: + return self._near_cache_options + + class NamedMap(abc.ABC, Generic[K, V]): # noinspection PyUnresolvedReferences """ @@ -196,7 +284,12 @@ class NamedMap(abc.ABC, Generic[K, V]): @property @abc.abstractmethod def name(self) -> str: - """documentation""" + """Returns the logical name of this NamedMap""" + + @property + @abc.abstractmethod + def session(self) -> Session: + """Returns the Session associated with NamedMap""" @abc.abstractmethod def on(self, event: MapLifecycleEvent, callback: Callable[[str], None]) -> None: @@ -579,14 +672,15 @@ class NamedCache(NamedMap[K, V]): """ @abc.abstractmethod - async def put(self, key: K, value: V, ttl: int = 0) -> Optional[V]: + async def put(self, key: K, value: V, ttl: Optional[int] = None) -> Optional[V]: """ Associates the specified value with the specified key in this map. If the map previously contained a mapping for this key, the old value is replaced. :param key: the key with which the specified value is to be associated :param value: the value to be associated with the specified key - :param ttl: the expiry time in millis (optional) + :param ttl: the expiry time in millis (optional). If not specific, it will default to the default + ttl defined in the cache options provided when the cache was obtained :return: resolving to the previous value associated with specified key, or `None` if there was no mapping for key. A `None` return can also indicate that the map previously associated `None` with the specified key if the implementation supports `None` values @@ -594,14 +688,15 @@ async def put(self, key: K, value: V, ttl: int = 0) -> Optional[V]: """ @abc.abstractmethod - async def put_if_absent(self, key: K, value: V, ttl: int = 0) -> Optional[V]: + async def put_if_absent(self, key: K, value: V, ttl: Optional[int] = None) -> Optional[V]: """ If the specified key is not already associated with a value (or is mapped to null) associates it with the given value and returns `None`, else returns the current value. :param key: the key with which the specified value is to be associated :param value: the value to be associated with the specified key - :param ttl: the expiry time in millis (optional) + :param ttl: the expiry time in millis (optional). If not specific, it will default to the default + ttl defined in the cache options provided when the cache was obtained. :return: resolving to the previous value associated with specified key, or `None` if there was no mapping for key. A `None` return can also indicate that the map previously associated `None` with the specified key if the implementation supports `None` values @@ -610,7 +705,9 @@ async def put_if_absent(self, key: K, value: V, ttl: int = 0) -> Optional[V]: class NamedCacheClient(NamedCache[K, V]): - def __init__(self, cache_name: str, session: Session, serializer: Serializer): + def __init__( + self, cache_name: str, session: Session, serializer: Serializer, cache_options: Optional[CacheOptions] = None + ) -> None: self._cache_name: str = cache_name self._serializer: Serializer = serializer self._client_stub: NamedCacheServiceStub = NamedCacheServiceStub(session.channel) @@ -620,6 +717,7 @@ def __init__(self, cache_name: str, session: Session, serializer: Serializer): self._destroyed: bool = False self._released: bool = False self._session: Session = session + self._default_expiry: int = cache_options.default_expiry if cache_options is not None else 0 self._setup_event_handlers() @@ -631,6 +729,10 @@ def __init__(self, cache_name: str, session: Session, serializer: Serializer): def name(self) -> str: return self._cache_name + @property + def session(self) -> Session: + return self._session + @property def destroyed(self) -> bool: return self._destroyed @@ -668,14 +770,14 @@ async def get_all(self, keys: set[K]) -> AsyncIterator[MapEntry[K, V]]: return _Stream(self._request_factory.serializer, stream, _entry_producer) @_pre_call_cache - async def put(self, key: K, value: V, ttl: int = 0) -> Optional[V]: - p = self._request_factory.put_request(key, value, ttl) + async def put(self, key: K, value: V, ttl: Optional[int] = None) -> Optional[V]: + p = self._request_factory.put_request(key, value, ttl if ttl is not None else self._default_expiry) v = await self._client_stub.put(p) return self._request_factory.serializer.deserialize(v.value) @_pre_call_cache - async def put_if_absent(self, key: K, value: V, ttl: int = 0) -> Optional[V]: - p = self._request_factory.put_if_absent_request(key, value, ttl) + async def put_if_absent(self, key: K, value: V, ttl: Optional[int] = None) -> Optional[V]: + p = self._request_factory.put_if_absent_request(key, value, ttl if ttl is not None else self._default_expiry) v = await self._client_stub.putIfAbsent(p) return self._request_factory.serializer.deserialize(v.value) @@ -909,7 +1011,9 @@ def __str__(self) -> str: class NamedCacheClientV1(NamedCache[K, V]): - def __init__(self, cache_name: str, session: Session, serializer: Serializer): + def __init__( + self, cache_name: str, session: Session, serializer: Serializer, cache_options: Optional[CacheOptions] = None + ): self._cache_name: str = cache_name self._cache_id: int = 0 self._serializer: Serializer = serializer @@ -921,6 +1025,7 @@ def __init__(self, cache_name: str, session: Session, serializer: Serializer): self._destroyed: bool = False self._released: bool = False self._session: Session = session + self._default_expiry: int = cache_options.default_expiry if cache_options is not None else 0 self._events_manager: _MapEventsManagerV1[K, V] = _MapEventsManagerV1( self, session, serializer, self._internal_emitter, self._request_factory @@ -973,6 +1078,10 @@ def cache_id(self, cache_id: int) -> None: def name(self) -> str: return self._cache_name + @property + def session(self) -> Session: + return self._session + def on(self, event: MapLifecycleEvent, callback: Callable[[str], None]) -> None: self._emitter.on(str(event.value), callback) @@ -998,14 +1107,18 @@ async def get(self, key: K) -> Optional[V]: return dispatcher.result() @_pre_call_cache - async def put(self, key: K, value: V, ttl: int = 0) -> Optional[V]: - dispatcher: UnaryDispatcher[Optional[V]] = self._request_factory.put_request(key, value, ttl) + async def put(self, key: K, value: V, ttl: Optional[int] = None) -> Optional[V]: + dispatcher: UnaryDispatcher[Optional[V]] = self._request_factory.put_request( + key, value, ttl if ttl is not None else self._default_expiry + ) await dispatcher.dispatch(self._stream_handler) return dispatcher.result() @_pre_call_cache - async def put_if_absent(self, key: K, value: V, ttl: int = 0) -> Optional[V]: - dispatcher: UnaryDispatcher[Optional[V]] = self._request_factory.put_if_absent_request(key, value, ttl) + async def put_if_absent(self, key: K, value: V, ttl: Optional[int] = None) -> Optional[V]: + dispatcher: UnaryDispatcher[Optional[V]] = self._request_factory.put_if_absent_request( + key, value, ttl if ttl is not None else self._default_expiry + ) await dispatcher.dispatch(self._stream_handler) return dispatcher.result() @@ -1594,9 +1707,6 @@ class Session: """ - DEFAULT_FORMAT: Final[str] = "json" - """The default serialization format""" - def __init__(self, session_options: Optional[Options] = None): """ Construct a new `Session` based on the provided :func:`coherence.client.Options` @@ -1771,22 +1881,22 @@ def __str__(self) -> str: # noinspection PyProtectedMember @_pre_call_session - async def get_cache(self, name: str, ser_format: str = DEFAULT_FORMAT) -> NamedCache[K, V]: + async def get_cache(self, name: str, cache_options: Optional[CacheOptions] = None) -> NamedCache[K, V]: """ Returns a :func:`coherence.client.NamedCache` for the specified cache name. :param name: the cache name - :param ser_format: the serialization format for keys and values stored within the cache + :param cache_options: a :class:`coherence.client.CacheOptions` :return: Returns a :func:`coherence.client.NamedCache` for the specified cache name. """ - serializer = SerializerRegistry.serializer(ser_format) + serializer = SerializerRegistry.serializer(self._session_options.format) if self._protocol_version == 0: with self._lock: c = self._caches.get(name) if c is None: - c = NamedCacheClient(name, self, serializer) + c = NamedCacheClient(name, self, serializer, cache_options) # initialize the event stream now to ensure lifecycle listeners will work as expected await c._events_manager._ensure_stream() self._setup_event_handlers(c) @@ -1796,7 +1906,7 @@ async def get_cache(self, name: str, ser_format: str = DEFAULT_FORMAT) -> NamedC with self._lock: c = self._caches.get(name) if c is None: - c = NamedCacheClientV1(name, self, serializer) + c = NamedCacheClientV1(name, self, serializer, cache_options) await c._ensure_cache() self._setup_event_handlers(c) self._caches.update({name: c}) @@ -1804,16 +1914,16 @@ async def get_cache(self, name: str, ser_format: str = DEFAULT_FORMAT) -> NamedC # noinspection PyProtectedMember @_pre_call_session - async def get_map(self, name: str, ser_format: str = DEFAULT_FORMAT) -> NamedMap[K, V]: + async def get_map(self, name: str, cache_options: Optional[CacheOptions] = None) -> NamedMap[K, V]: """ Returns a :func:`coherence.client.NameMap` for the specified cache name. :param name: the map name - :param ser_format: the serialization format for keys and values stored within the cache + :param cache_options: a :class:`coherence.client.CacheOptions` :return: Returns a :func:`coherence.client.NamedMap` for the specified cache name. """ - return cast(NamedMap[K, V], await self.get_cache(name, ser_format)) + return cast(NamedMap[K, V], await self.get_cache(name, cache_options)) def is_ready(self) -> bool: """ diff --git a/src/coherence/entry.py b/src/coherence/entry.py index 2483066..7700902 100644 --- a/src/coherence/entry.py +++ b/src/coherence/entry.py @@ -12,5 +12,13 @@ class MapEntry(Generic[K, V]): """ def __init__(self, key: K, value: V): - self.key = key - self.value = value + self._key = key + self._value = value + + @property + def key(self) -> K: + return self._key + + @property + def value(self) -> V: + return self._value diff --git a/src/coherence/error.py b/src/coherence/error.py deleted file mode 100644 index 2bfc89f..0000000 --- a/src/coherence/error.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright (c) 2022, 2024, Oracle and/or its affiliates. -# Licensed under the Universal Permissive License v 1.0 as shown at -# https://oss.oracle.com/licenses/upl. - - -class RequestTimeoutError(Exception): - def __init__( # type:ignore - self, *args, message: str = "Request failed to complete within the configured request timeout", **kwargs - ) -> None: - if args: - self.message = args[0] - else: - self.message = message - # noinspection PyArgumentList - super().__init__(self.message, *args, **kwargs) - - -class RequestFailedError(Exception): - def __init__(self, *args, message: str = "Request failed for an unknown reason", **kwargs) -> None: # type:ignore - if args: - self.message = args[0] - else: - self.message = message - # noinspection PyArgumentList - super().__init__(self.message, *args, **kwargs) - - -class SessionCreationError(Exception): - def __init__(self, *args, **kwargs) -> None: # type:ignore - # noinspection PyArgumentList - super().__init__(*args, **kwargs) diff --git a/src/coherence/local_cache.py b/src/coherence/local_cache.py new file mode 100644 index 0000000..bca53f2 --- /dev/null +++ b/src/coherence/local_cache.py @@ -0,0 +1,335 @@ +# Copyright (c) 2024, Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at +# https://oss.oracle.com/licenses/upl. + +import asyncio +import time +from datetime import datetime, timezone +from typing import Generic, Optional, Tuple, TypeVar + +from pympler import asizeof + +from . import MapEntry +from .client import NearCacheOptions + +K = TypeVar("K") +V = TypeVar("V") + + +class LocalEntry(MapEntry[K, V]): + + def __init__(self, key: K, value: V, ttl: int): + super().__init__(key, value) + self._ttl_orig: int = ttl + self._ttl: int = ttl * 1_000_000 + self._insert_time = time.time_ns() + self._last_access = self._insert_time + self._size = asizeof.asizeof(self) + + @property + def bytes(self) -> int: + return self._size + + @property + def ttl(self) -> int: + return self._ttl_orig + + @property + def insert_time(self) -> int: + return self._insert_time + + @property + def last_access(self) -> int: + return self._last_access + + def touch(self) -> None: + self._last_access = time.time_ns() + + def expired(self, now: int) -> bool: + return 0 < self._ttl < now - self._insert_time + + def _nanos_format_date(self, nanos: int) -> str: + dt = datetime.fromtimestamp(nanos / 1e9, timezone.utc) + return "{}{:03.0f}".format(dt.strftime("%Y-%m-%dT%H:%M:%S.%f"), nanos % 1e3) + + def __str__(self) -> str: + return ( + f"LocalEntry(key={self._key}, value={self._value}," + f" ttl={self._ttl // 1_000_000}ms, insert-time={self._nanos_format_date(self._insert_time)}" + f" last-access={self._nanos_format_date(self._last_access)}," + f" expired={self.expired(time.time_ns())})" + ) + + +class CacheStats: + + def __init__(self, local_cache: "LocalCache[K, V]"): + self._local_cache: "LocalCache[K, V]" = local_cache + self._hits: int = 0 + self._misses: int = 0 + self._puts: int = 0 + self._gets: int = 0 + self._memory: int = 0 + self._prunes: int = 0 + self._expires: int = 0 + self._expires_nanos: int = 0 + self._prunes_nanos: int = 0 + self._misses_nanos: int = 0 + + @property + def hits(self) -> int: + return self._hits + + @property + def misses(self) -> int: + return self._misses + + @property + def misses_duration(self) -> int: + return self._misses_nanos + + @property + def hit_rate(self) -> float: + hits: int = self.hits + misses: int = self.misses + total = hits + misses + + return 0.0 if total == 0 else round((float(hits) / float(misses)), 2) + + @property + def puts(self) -> int: + return self._puts + + @property + def gets(self) -> int: + return self.hits + self.misses + + @property + def prunes(self) -> int: + return self._prunes + + @property + def expires(self) -> int: + return self._expires + + @property + def prunes_duration(self) -> int: + return self._prunes_nanos + + @property + def expires_duration(self) -> int: + return self._expires_nanos + + @property + def size(self) -> int: + return len(self._local_cache.storage) + + @property + def bytes(self) -> int: + return self._memory + + def reset(self) -> None: + self._prunes = 0 + self._prunes_nanos = 0 + self._misses = 0 + self._misses_nanos = 0 + self._hits = 0 + self._puts = 0 + self._expires = 0 + self._expires_nanos = 0 + + def _register_hit(self) -> None: + self._hits += 1 + + def _register_miss(self) -> None: + self._misses += 1 + + def _register_put(self) -> None: + self._puts += 1 + + def _register_get(self) -> None: + self._gets += 1 + + def _update_memory(self, size: int) -> None: + self._memory += size + + def _register_prune_nanos(self, nanos: int) -> None: + self._prunes += 1 + self._prunes_nanos += nanos + + def _register_misses_nanos(self, nanos: int) -> None: + self._misses_nanos += nanos + + def _register_expires(self, count: int, nanos: int) -> None: + self._expires += count + self._expires_nanos += nanos + + def __str__(self) -> str: + return ( + f"CacheStats(puts={self._puts}, gets={self._gets}, hits={self._hits}" + f", misses={self._misses}, misses-duration={self._misses_nanos}ns" + f", hit-rate={self.hit_rate}, prunes={self._prunes}" + f", prunes-duration={self._prunes_nanos}ns, size={self.size}" + f", expires={self._expires}, expires-duration={self._expires_nanos}ns" + f", memory-bytes={self._memory})" + ) + + +# noinspection PyProtectedMember +class LocalCache(Generic[K, V]): + + def __init__(self, name: str, options: NearCacheOptions): + self._name: str = name + self._options: NearCacheOptions = options + self._stats: CacheStats = CacheStats(self) + self._data: dict[K, Optional[LocalEntry[K, V]]] = dict() + self._lock: asyncio.Lock = asyncio.Lock() + + async def put(self, key: K, value: V, ttl: Optional[int] = None) -> Optional[V]: + async with self._lock: + stats: CacheStats = self.stats + storage: dict[K, Optional[LocalEntry[K, V]]] = self.storage + stats._register_put() + self._prune() + + old_entry: Optional[LocalEntry[K, V]] = storage.get(key, None) + if old_entry is not None: + stats._update_memory(-old_entry.bytes) + + entry: LocalEntry[K, V] = LocalEntry(key, value, ttl if ttl is not None else self.options.ttl) + stats._update_memory(entry.bytes) + + storage[key] = entry + + return None if old_entry is None else old_entry.value + + async def get(self, key: K) -> Optional[V]: + async with self._lock: + storage: dict[K, Optional[LocalEntry[K, V]]] = self.storage + stats: CacheStats = self.stats + self._expire() + + entry: Optional[LocalEntry[K, V]] = storage.get(key, None) + if entry is None: + stats._register_miss() + return None + + stats._register_hit() + entry._last_access = time.time_ns() + + return entry.value + + async def get_all(self, keys: set[K]) -> dict[K, V]: + async with self._lock: + self._expire() + + stats: CacheStats = self.stats + results: dict[K, V] = dict() + + for key in keys: + entry: Optional[LocalEntry[K, V]] = self.storage.get(key, None) + if entry is None: + stats._register_miss() + continue + + stats._register_hit() + entry._last_access = time.time_ns() + + results[key] = entry.value + + return results + + async def remove(self, key: K) -> Optional[V]: + async with self._lock: + self._expire() + + entry: Optional[LocalEntry[K, V]] = self.storage.get(key, None) + + if entry is None: + return None + + self.stats._update_memory(-entry.bytes) + return entry.value + + async def size(self) -> int: + async with self._lock: + self._expire() + + return len(self.storage) + + async def clear(self) -> None: + async with self._lock: + self._data = dict() + + self.stats._memory = 0 + + async def release(self) -> None: + await self.clear() + + @property + def stats(self) -> CacheStats: + return self._stats + + @property + def name(self) -> str: + return self._name + + @property + def storage(self) -> dict[K, Optional[LocalEntry[K, V]]]: + return self._data + + @property + def options(self) -> NearCacheOptions: + return self._options + + def _prune(self) -> None: + self._expire() + + storage: dict[K, Optional[LocalEntry[K, V]]] = self.storage + options: NearCacheOptions = self.options + prune_factor: float = options.prune_factor + high_units: int = options.high_units + high_units_used: bool = high_units > 0 + high_units_mem: int = options.high_unit_memory + mem_units_used: bool = high_units_mem > 0 + cur_size = len(storage) + start = time.time_ns() + + if (high_units_used and high_units < cur_size + 1) or (mem_units_used and high_units_mem < self.stats.bytes): + stats: CacheStats = self.stats + + to_sort: list[Tuple[int, K]] = [] + for key, value in storage.items(): + to_sort.append((value.last_access, key)) # type: ignore + + sorted(to_sort, key=lambda x: x[0]) + + target_size: int = int(round(float((cur_size if high_units_used else stats.bytes) * prune_factor))) + + for item in to_sort: + entry: Optional[LocalEntry[K, V]] = storage.pop(item[1]) + stats._update_memory(-entry.bytes) # type: ignore + + if (len(storage) if high_units_used else stats._memory) <= target_size: + break + + end = time.time_ns() + stats._register_prune_nanos(end - start) + + def _expire(self) -> None: + storage: dict[K, Optional[LocalEntry[K, V]]] = self.storage + start = time.time_ns() + keys: list[K] = [key for key, entry in storage.items() if entry.expired(start)] # type: ignore + stats: CacheStats = self.stats + + for key in keys: + entry: Optional[LocalEntry[K, V]] = storage.pop(key) + stats._update_memory(-entry.bytes) # type: ignore + + size: int = len(keys) + if size > 0: + end = time.time_ns() + stats._register_expires(size, end - start) + + def __str__(self) -> str: + return f"LocalCache(name={self.name}, options={self.options}" f", stats={self.stats})" diff --git a/tests/e2e/test_client.py b/tests/e2e/test_client.py index 273c904..e650b21 100644 --- a/tests/e2e/test_client.py +++ b/tests/e2e/test_client.py @@ -13,6 +13,7 @@ import tests from coherence import Aggregators, Filters, MapEntry, NamedCache, Session, request_timeout +from coherence.client import CacheOptions from coherence.event import MapLifecycleEvent from coherence.extractor import ChainedExtractor, Extractors, UniversalExtractor from coherence.processor import ExtractorProcessor @@ -603,3 +604,44 @@ async def test_unary_request_timeout(test_session: Session) -> None: end = time() assert e.code() == StatusCode.DEADLINE_EXCEEDED assert pytest.approx((end - start), 0.5) == 5.0 + + +# noinspection PyUnresolvedReferences +@pytest.mark.asyncio +async def test_ttl_configuration(test_session: Session) -> None: + cache: NamedCache[str, str] = await test_session.get_cache("none") + assert cache._default_expiry == 0 + await cache.destroy() + + options: CacheOptions = CacheOptions() + cache = await test_session.get_cache("default", options) + assert cache._default_expiry == options.default_expiry + await cache.destroy() + + options = CacheOptions(default_expiry=2000) + cache = await test_session.get_cache("defined", options) + assert cache._default_expiry == options.default_expiry + + await cache.put("a", "b") + assert await cache.size() == 1 + + sleep(2.5) + assert await cache.size() == 0 + await cache.destroy() + + options = CacheOptions(default_expiry=2000) + cache = await test_session.get_cache("override", options) + + await cache.put("a", "b", 5000) + + assert await cache.size() == 1 + + sleep(2.5) + assert await cache.size() == 1 + + sleep(1) + assert await cache.size() == 1 + + sleep(3) + assert await cache.size() == 0 + await cache.destroy() diff --git a/tests/unit/test_cache_options.py b/tests/unit/test_cache_options.py new file mode 100644 index 0000000..ba8a2a3 --- /dev/null +++ b/tests/unit/test_cache_options.py @@ -0,0 +1,93 @@ +import pytest + +from coherence.client import CacheOptions, NearCacheOptions + + +def test_cache_options_default_expiry() -> None: + options: CacheOptions = CacheOptions(-10) + assert options.default_expiry == -1 + + options = CacheOptions(10000) + assert options.default_expiry == 10000 + + +def test_near_cache_options_no_explicit_params() -> None: + with pytest.raises(ValueError) as err: + NearCacheOptions() + + assert str(err.value) == "at least one option must be specified and non-zero" + + +def test_near_cache_options_negative_units() -> None: + message: str = "values for high_units and high_units_memory must be positive" + + with pytest.raises(ValueError) as err: + NearCacheOptions(high_units=-1) + + assert str(err.value) == message + + with pytest.raises(ValueError) as err: + NearCacheOptions(high_units_memory=-1) + + assert str(err.value) == message + + +def test_near_cache_options_both_units() -> None: + message: str = "high_units and high_units_memory cannot be used together; specify one or the other" + + with pytest.raises(ValueError) as err: + NearCacheOptions(high_units=1000, high_units_memory=10000) + + assert str(err.value) == message + + with pytest.raises(ValueError) as err: + NearCacheOptions(ttl=10000, high_units=1000, high_units_memory=10000) + + assert str(err.value) == message + + +def test_near_cache_options_ttl() -> None: + options: NearCacheOptions = NearCacheOptions(ttl=-10) + assert options.ttl == -1 + + options = NearCacheOptions(ttl=10) + assert options.ttl == 10 + + +def test_near_cache_options_high_units() -> None: + options: NearCacheOptions = NearCacheOptions(high_units=10000) + assert options.high_units == 10000 + + +def test_near_cache_options_high_units_memory() -> None: + options: NearCacheOptions = NearCacheOptions(high_units_memory=10000) + assert options._high_units_memory == 10000 + + +def test_cache_options_str() -> None: + options: CacheOptions = CacheOptions(10000) + assert str(options) == "CacheOptions(default_expiry=10000)" + + options = CacheOptions(5000, NearCacheOptions(high_units=10000)) + assert ( + str(options) == "CacheOptions(default_expiry=5000, near_cache_options=NearCacheOptions(ttl=0," + " high_units=10000, high_units_memory=0))" + ) + + +def test_cache_options_eq() -> None: + options: CacheOptions = CacheOptions(10000) + options2: CacheOptions = CacheOptions(10000) + options3: CacheOptions = CacheOptions(1000) + + assert options == options + assert options == options2 + assert options != options3 + + options = CacheOptions(10000, NearCacheOptions(high_units=10000)) + options2 = CacheOptions(10000, NearCacheOptions(high_units=10000)) + options3 = CacheOptions(10000, NearCacheOptions(high_units=1000)) + + assert options == options + assert options == options2 + assert options != options3 diff --git a/tests/unit/test_local_cache.py b/tests/unit/test_local_cache.py new file mode 100644 index 0000000..bac9fee --- /dev/null +++ b/tests/unit/test_local_cache.py @@ -0,0 +1,228 @@ +# Copyright (c) 2024, Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at +# https://oss.oracle.com/licenses/upl. + +import time +from typing import Optional + +import pytest + +from coherence.client import NearCacheOptions +from coherence.local_cache import CacheStats, LocalCache, LocalEntry + + +def test_local_entry() -> None: + start: int = time.time_ns() + entry: LocalEntry[str, str] = LocalEntry("a", "b", 100) + + # check initial state after creation + assert entry.key == "a" + assert entry.value == "b" + assert entry.ttl == 100 + assert entry.insert_time > start + assert entry.last_access == entry.insert_time + assert entry.bytes > 0 + + # touch the entry and ensure the last_access has increased + # over the insert time + entry.touch() + assert entry.last_access > entry.insert_time + + # ensure the entry hasn't expired, then wait + # for a period of time beyond the expiry time + # and ensure it has expired + assert entry.expired(time.time_ns()) is False + time.sleep(0.2) + assert entry.expired(time.time_ns()) is True + + +@pytest.mark.asyncio +async def test_basic_put_get_remove() -> None: + options: NearCacheOptions = NearCacheOptions(high_units=10) + cache: LocalCache[str, str] = LocalCache("test", options) + result: Optional[str] = await cache.get("a") + + assert result is None + + stats: CacheStats = cache.stats + + # validate stats with a get against an empty cache + assert stats.misses == 1 + assert stats.gets == 1 + assert stats.bytes == 0 + + # check stats after a single put + result = await cache.put("a", "b") + assert result is None + assert stats.puts == 1 + assert stats.gets == 1 + + # check stats after a get for the value previously inserted + result = await cache.get("a") + assert result == "b" + assert stats.misses == 1 + assert stats.gets == 2 + assert stats.bytes > 0 + + # update the value + result = await cache.put("a", "c") + + # snapshot the current size for validation later + stats_bytes: int = stats.bytes + + # ensure previous value returned after update and stats + # are accurate + assert result == "b" + assert stats.puts == 2 + assert stats.misses == 1 + assert stats.gets == 2 + + # insert new value and validate stats + result = await cache.put("b", "d") + assert result is None + assert stats.puts == 3 + assert stats.misses == 1 + assert stats.gets == 2 + assert stats.bytes > stats_bytes + + # issue a series of gets for a non-existent key + for _ in range(10): + await cache.get("c") + + # validate the stats including the hit-rate + assert stats.gets == 12 + assert stats.hits == 1 + assert stats.misses == 11 + assert stats.hit_rate == 0.09 + assert stats.size == 2 + + # remove a value from the cache + # ensure the returned value is what was associated + # with the key. Ensure the bytes has decreased + # back to the snapshot taken earlier + result = await cache.remove("b") + assert result == "d" + assert stats.bytes == stats_bytes + + +@pytest.mark.asyncio +async def test_expiry() -> None: + options: NearCacheOptions = NearCacheOptions(high_units=1000) + cache: LocalCache[str, str] = LocalCache("test", options) + stats: CacheStats = cache.stats + + for i in range(5): + await cache.put(str(i), str(i), 1000) + + for i in range(5, 10): + await cache.put(str(i), str(i), 2000) + + for i in range(10): + await cache.get(str(i)) + + assert await cache.size() == 10 + + time.sleep(1.05) + + for i in range(5): + assert await cache.get(str(i)) is None + + for i in range(5, 10): + assert await cache.get(str(i)) == str(i) + + duration: int = stats.expires_duration + assert stats.expires == 5 + assert duration > 0 + + time.sleep(1.05) + + for i in range(10): + assert await cache.get(str(i)) is None + + # assert correct expires count and + # the duration has increased + assert stats.expires == 10 + assert stats.expires_duration > duration + + +@pytest.mark.asyncio +async def test_pruning_units() -> None: + options: NearCacheOptions = NearCacheOptions(high_units=100) + cache: LocalCache[str, str] = LocalCache("test", options) + stats: CacheStats = cache.stats + + for i in range(210): + key_value: str = str(i) + await cache.put(key_value, key_value, 0) + + # todo validate oldest entries pruned + + assert await cache.size() < 100 + assert stats.prunes == 6 + assert stats.prunes_duration > 0 + + +@pytest.mark.asyncio +async def test_pruning_memory() -> None: + upper_bound_mem: int = 110 * 1024 # 110KB + options: NearCacheOptions = NearCacheOptions(high_units_memory=upper_bound_mem) + cache: LocalCache[str, str] = LocalCache("test", options) + stats: CacheStats = cache.stats + + for i in range(210): + key_value: str = str(i) + await cache.put(key_value, key_value, 0) + + # todo validate oldest entries pruned + + assert stats.prunes > 0 + assert stats.prunes_duration > 0 + + assert stats.bytes < upper_bound_mem + + +@pytest.mark.asyncio +async def test_stats_reset() -> None: + upper_bound_mem: int = 110 * 1024 # 110KB + options: NearCacheOptions = NearCacheOptions(high_units_memory=upper_bound_mem) + cache: LocalCache[str, str] = LocalCache("test", options) + stats: CacheStats = cache.stats + + for i in range(210): + key_value: str = str(i) + await cache.put(key_value, key_value, 100) + await cache.get(key_value) + + await cache.get("none") + await cache.put("A", "B", 0) + + time.sleep(0.5) + + assert await cache.size() == 1 + + print(str(stats)) + + memory: int = stats.bytes + assert stats.puts == 211 + assert stats.gets == 211 + assert stats.prunes > 0 + assert stats.prunes_duration > 0 + assert stats.expires > 0 + assert stats.expires_duration > 0 + assert stats.hits > 0 + assert stats.hit_rate > 0 + assert stats.misses > 0 + assert memory > 0 + + stats.reset() + + assert stats.puts == 0 + assert stats.gets == 0 + assert stats.prunes == 0 + assert stats.prunes_duration == 0 + assert stats.expires == 0 + assert stats.expires_duration == 0 + assert stats.hits == 0 + assert stats.hit_rate == 0.0 + assert stats.misses == 0 + assert stats.bytes == memory