Skip to content

Commit

Permalink
NearCache checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
rlubke committed Dec 4, 2024
1 parent e71c988 commit 657829f
Show file tree
Hide file tree
Showing 10 changed files with 857 additions and 61 deletions.
8 changes: 8 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,11 @@ omit =
src/coherence/messages_pb2_grpc.py
src/coherence/services_pb2.py
src/coherence/services_pb2_grpc.py
src/coherence/cache_service_messages_v1_pb2.py
src/coherence/cache_service_messages_v1_pb2_grpc.py
src/coherence/common_messages_v1_pb2.py
src/coherence/common_messages_v1_pb2_grpc.py
src/coherence/proxy_service_messages_v1_pb2.py
src/coherence/proxy_service_messages_v1_pb2_grpc.py
src/coherence/proxy_service_v1_pb2.py
src/coherence/proxy_service_v1_pb2_grpc.py
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ $(info COMPOSE = $(COMPOSE))
# ----------------------------------------------------------------------------------------------------------------------
# List of unit tests
# ----------------------------------------------------------------------------------------------------------------------
UNIT_TESTS := tests/unit/test_environment.py \
UNIT_TESTS := tests/unit/test_cache_options.py \
tests/unit/test_environment.py \
tests/unit/test_serialization.py \
tests/unit/test_extractors.py

Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ classifiers = [
]

[tool.poetry.dependencies]
python = "^3.8"
python = "^3.9"
protobuf = "5.28.3"
grpcio = "1.68.0"
grpcio-tools = "1.68.0"
jsonpickle = ">=3.0,<4.1"
pymitter = ">=0.4,<0.6"
typing-extensions = ">=4.11,<4.13"
types-protobuf = "5.27.0.20240626"
pympler = "1.1"

[tool.poetry.dev-dependencies]
pytest = "~8.3"
Expand All @@ -41,6 +42,7 @@ sphinx-rtd-theme = "~2.0"
sphinxcontrib-napoleon = "~0.7"
m2r = "~0.3"
third-party-license-file-generator = "~2024.8"
pyinstrument="5.0.0"

[tool.pytest.ini_options]
pythonpath = ["src"]
Expand Down
162 changes: 136 additions & 26 deletions src/coherence/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,94 @@ async def inner_async(self, *args, **kwargs):
return inner


class NearCacheOptions:

def __init__(
self, ttl: int = 0, high_units: int = 0, high_units_memory: int = 0, prune_factor: float = 0.80
) -> None:
super().__init__()
if high_units < 0 or high_units_memory < 0:
raise ValueError("values for high_units and high_units_memory must be positive")
if ttl == 0 and high_units == 0 and high_units_memory == 0:
raise ValueError("at least one option must be specified and non-zero")
if high_units != 0 and high_units_memory != 0:
raise ValueError("high_units and high_units_memory cannot be used together; specify one or the other")
if 0.01 < prune_factor > 1:
raise ValueError("prune_factor must be between .01 and 1")

self._ttl = ttl if ttl >= 0 else -1
self._high_units = high_units
self._high_units_memory = high_units_memory
self._prune_factor = prune_factor

def __str__(self) -> str:
return (
f"NearCacheOptions(ttl={self.ttl}, high_units={self.high_units}, high_units_memory={self.high_unit_memory})"
)

def __eq__(self, other: Any) -> bool:
if self is other:
return True

if isinstance(other, NearCacheOptions):
return (
self.ttl == other.ttl
and self.high_units == other.high_units
and self.high_unit_memory == other.high_unit_memory
)

return False

@property
def ttl(self) -> int:
return self._ttl

@property
def high_units(self) -> int:
return self._high_units

@property
def high_unit_memory(self) -> int:
return self._high_units_memory

@property
def prune_factor(self) -> float:
return self._prune_factor


class CacheOptions:
def __init__(self, default_expiry: int = 0, near_cache_options: Optional[NearCacheOptions] = None):
super().__init__()
self._default_expiry: int = default_expiry if default_expiry >= 0 else -1
self._near_cache_options = near_cache_options

def __str__(self) -> str:
result: str = f"CacheOptions(default_expiry={self._default_expiry}"
result += ")" if self.near_cache_options is None else f", near_cache_options={self._near_cache_options})"
return result

def __eq__(self, other: Any) -> bool:
if self is other:
return True

if isinstance(other, CacheOptions):
return (
self.default_expiry == other.default_expiry and True
if self.near_cache_options is None
else self.near_cache_options == other.near_cache_options
)

return False

@property
def default_expiry(self) -> int:
return self._default_expiry

@property
def near_cache_options(self) -> Optional[NearCacheOptions]:
return self._near_cache_options


class NamedMap(abc.ABC, Generic[K, V]):
# noinspection PyUnresolvedReferences
"""
Expand All @@ -196,7 +284,12 @@ class NamedMap(abc.ABC, Generic[K, V]):
@property
@abc.abstractmethod
def name(self) -> str:
"""documentation"""
"""Returns the logical name of this NamedMap"""

@property
@abc.abstractmethod
def session(self) -> Session:
"""Returns the Session associated with NamedMap"""

@abc.abstractmethod
def on(self, event: MapLifecycleEvent, callback: Callable[[str], None]) -> None:
Expand Down Expand Up @@ -579,29 +672,31 @@ class NamedCache(NamedMap[K, V]):
"""

@abc.abstractmethod
async def put(self, key: K, value: V, ttl: int = 0) -> Optional[V]:
async def put(self, key: K, value: V, ttl: Optional[int] = None) -> Optional[V]:
"""
Associates the specified value with the specified key in this map. If the map previously contained a mapping
for this key, the old value is replaced.
:param key: the key with which the specified value is to be associated
:param value: the value to be associated with the specified key
:param ttl: the expiry time in millis (optional)
:param ttl: the expiry time in millis (optional). If not specific, it will default to the default
ttl defined in the cache options provided when the cache was obtained
:return: resolving to the previous value associated with specified key, or `None` if there was no mapping for
key. A `None` return can also indicate that the map previously associated `None` with the specified key
if the implementation supports `None` values
"""

@abc.abstractmethod
async def put_if_absent(self, key: K, value: V, ttl: int = 0) -> Optional[V]:
async def put_if_absent(self, key: K, value: V, ttl: Optional[int] = None) -> Optional[V]:
"""
If the specified key is not already associated with a value (or is mapped to null) associates it with the
given value and returns `None`, else returns the current value.
:param key: the key with which the specified value is to be associated
:param value: the value to be associated with the specified key
:param ttl: the expiry time in millis (optional)
:param ttl: the expiry time in millis (optional). If not specific, it will default to the default
ttl defined in the cache options provided when the cache was obtained.
:return: resolving to the previous value associated with specified key, or `None` if there was no mapping for
key. A `None` return can also indicate that the map previously associated `None` with the specified key
if the implementation supports `None` values
Expand All @@ -610,7 +705,9 @@ async def put_if_absent(self, key: K, value: V, ttl: int = 0) -> Optional[V]:


class NamedCacheClient(NamedCache[K, V]):
def __init__(self, cache_name: str, session: Session, serializer: Serializer):
def __init__(
self, cache_name: str, session: Session, serializer: Serializer, cache_options: Optional[CacheOptions] = None
) -> None:
self._cache_name: str = cache_name
self._serializer: Serializer = serializer
self._client_stub: NamedCacheServiceStub = NamedCacheServiceStub(session.channel)
Expand All @@ -620,6 +717,7 @@ def __init__(self, cache_name: str, session: Session, serializer: Serializer):
self._destroyed: bool = False
self._released: bool = False
self._session: Session = session
self._default_expiry: int = cache_options.default_expiry if cache_options is not None else 0

self._setup_event_handlers()

Expand All @@ -631,6 +729,10 @@ def __init__(self, cache_name: str, session: Session, serializer: Serializer):
def name(self) -> str:
return self._cache_name

@property
def session(self) -> Session:
return self._session

@property
def destroyed(self) -> bool:
return self._destroyed
Expand Down Expand Up @@ -668,14 +770,14 @@ async def get_all(self, keys: set[K]) -> AsyncIterator[MapEntry[K, V]]:
return _Stream(self._request_factory.serializer, stream, _entry_producer)

@_pre_call_cache
async def put(self, key: K, value: V, ttl: int = 0) -> Optional[V]:
p = self._request_factory.put_request(key, value, ttl)
async def put(self, key: K, value: V, ttl: Optional[int] = None) -> Optional[V]:
p = self._request_factory.put_request(key, value, ttl if ttl is not None else self._default_expiry)
v = await self._client_stub.put(p)
return self._request_factory.serializer.deserialize(v.value)

@_pre_call_cache
async def put_if_absent(self, key: K, value: V, ttl: int = 0) -> Optional[V]:
p = self._request_factory.put_if_absent_request(key, value, ttl)
async def put_if_absent(self, key: K, value: V, ttl: Optional[int] = None) -> Optional[V]:
p = self._request_factory.put_if_absent_request(key, value, ttl if ttl is not None else self._default_expiry)
v = await self._client_stub.putIfAbsent(p)
return self._request_factory.serializer.deserialize(v.value)

Expand Down Expand Up @@ -909,7 +1011,9 @@ def __str__(self) -> str:

class NamedCacheClientV1(NamedCache[K, V]):

def __init__(self, cache_name: str, session: Session, serializer: Serializer):
def __init__(
self, cache_name: str, session: Session, serializer: Serializer, cache_options: Optional[CacheOptions] = None
):
self._cache_name: str = cache_name
self._cache_id: int = 0
self._serializer: Serializer = serializer
Expand All @@ -921,6 +1025,7 @@ def __init__(self, cache_name: str, session: Session, serializer: Serializer):
self._destroyed: bool = False
self._released: bool = False
self._session: Session = session
self._default_expiry: int = cache_options.default_expiry if cache_options is not None else 0

self._events_manager: _MapEventsManagerV1[K, V] = _MapEventsManagerV1(
self, session, serializer, self._internal_emitter, self._request_factory
Expand Down Expand Up @@ -973,6 +1078,10 @@ def cache_id(self, cache_id: int) -> None:
def name(self) -> str:
return self._cache_name

@property
def session(self) -> Session:
return self._session

def on(self, event: MapLifecycleEvent, callback: Callable[[str], None]) -> None:
self._emitter.on(str(event.value), callback)

Expand All @@ -998,14 +1107,18 @@ async def get(self, key: K) -> Optional[V]:
return dispatcher.result()

@_pre_call_cache
async def put(self, key: K, value: V, ttl: int = 0) -> Optional[V]:
dispatcher: UnaryDispatcher[Optional[V]] = self._request_factory.put_request(key, value, ttl)
async def put(self, key: K, value: V, ttl: Optional[int] = None) -> Optional[V]:
dispatcher: UnaryDispatcher[Optional[V]] = self._request_factory.put_request(
key, value, ttl if ttl is not None else self._default_expiry
)
await dispatcher.dispatch(self._stream_handler)
return dispatcher.result()

@_pre_call_cache
async def put_if_absent(self, key: K, value: V, ttl: int = 0) -> Optional[V]:
dispatcher: UnaryDispatcher[Optional[V]] = self._request_factory.put_if_absent_request(key, value, ttl)
async def put_if_absent(self, key: K, value: V, ttl: Optional[int] = None) -> Optional[V]:
dispatcher: UnaryDispatcher[Optional[V]] = self._request_factory.put_if_absent_request(
key, value, ttl if ttl is not None else self._default_expiry
)
await dispatcher.dispatch(self._stream_handler)
return dispatcher.result()

Expand Down Expand Up @@ -1594,9 +1707,6 @@ class Session:
"""

DEFAULT_FORMAT: Final[str] = "json"
"""The default serialization format"""

def __init__(self, session_options: Optional[Options] = None):
"""
Construct a new `Session` based on the provided :func:`coherence.client.Options`
Expand Down Expand Up @@ -1771,22 +1881,22 @@ def __str__(self) -> str:

# noinspection PyProtectedMember
@_pre_call_session
async def get_cache(self, name: str, ser_format: str = DEFAULT_FORMAT) -> NamedCache[K, V]:
async def get_cache(self, name: str, cache_options: Optional[CacheOptions] = None) -> NamedCache[K, V]:
"""
Returns a :func:`coherence.client.NamedCache` for the specified cache name.
:param name: the cache name
:param ser_format: the serialization format for keys and values stored within the cache
:param cache_options: a :class:`coherence.client.CacheOptions`
:return: Returns a :func:`coherence.client.NamedCache` for the specified cache name.
"""
serializer = SerializerRegistry.serializer(ser_format)
serializer = SerializerRegistry.serializer(self._session_options.format)

if self._protocol_version == 0:
with self._lock:
c = self._caches.get(name)
if c is None:
c = NamedCacheClient(name, self, serializer)
c = NamedCacheClient(name, self, serializer, cache_options)
# initialize the event stream now to ensure lifecycle listeners will work as expected
await c._events_manager._ensure_stream()
self._setup_event_handlers(c)
Expand All @@ -1796,24 +1906,24 @@ async def get_cache(self, name: str, ser_format: str = DEFAULT_FORMAT) -> NamedC
with self._lock:
c = self._caches.get(name)
if c is None:
c = NamedCacheClientV1(name, self, serializer)
c = NamedCacheClientV1(name, self, serializer, cache_options)
await c._ensure_cache()
self._setup_event_handlers(c)
self._caches.update({name: c})
return c

# noinspection PyProtectedMember
@_pre_call_session
async def get_map(self, name: str, ser_format: str = DEFAULT_FORMAT) -> NamedMap[K, V]:
async def get_map(self, name: str, cache_options: Optional[CacheOptions] = None) -> NamedMap[K, V]:
"""
Returns a :func:`coherence.client.NameMap` for the specified cache name.
:param name: the map name
:param ser_format: the serialization format for keys and values stored within the cache
:param cache_options: a :class:`coherence.client.CacheOptions`
:return: Returns a :func:`coherence.client.NamedMap` for the specified cache name.
"""
return cast(NamedMap[K, V], await self.get_cache(name, ser_format))
return cast(NamedMap[K, V], await self.get_cache(name, cache_options))

def is_ready(self) -> bool:
"""
Expand Down
12 changes: 10 additions & 2 deletions src/coherence/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,13 @@ class MapEntry(Generic[K, V]):
"""

def __init__(self, key: K, value: V):
self.key = key
self.value = value
self._key = key
self._value = value

@property
def key(self) -> K:
return self._key

@property
def value(self) -> V:
return self._value
Loading

0 comments on commit 657829f

Please sign in to comment.