Skip to content

Commit

Permalink
Merged with the master branch, implemented suggestions such as the re…
Browse files Browse the repository at this point in the history
…moval of an unneeded RLock, added more messaging and error handling, removed the now unncessary TimedOccurenceWatcher, removed unneeded logging configurations that caused extra logging, added handling to the evaluation service logger to ensure that logged messages don't propagate to the root logger, wrote documentation for the runner, and updated the runner to use redis streams instead of redis pubsub.
  • Loading branch information
christophertubbs authored and robertbartel committed Oct 7, 2024
1 parent 9dfb420 commit 29cda6c
Show file tree
Hide file tree
Showing 13 changed files with 372 additions and 500 deletions.
165 changes: 0 additions & 165 deletions python/lib/core/dmod/core/common/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,171 +238,6 @@ def __contains__(self, obj: object) -> bool:
return obj in self.__data


class _OccurrenceTracker(typing.Generic[_T]):
"""
Keeps track of occurrences of a type of value that have been encountered within a duration
"""
def __init__(self, key: _T, duration: timedelta, threshold: int, on_filled: typing.Callable[[_T], typing.Any]):
self.__key = key
self.__duration = duration
self.__threshold = threshold
self.__occurences: typing.List[datetime] = []
self.__on_filled = on_filled

def value_encountered(self):
"""
Inform the tracker that the value has been encountered again
"""
self.update_occurrences()
self.__occurences.append(datetime.now())
if len(self.__occurences) >= self.__threshold:
self.__on_filled(self.__key)

def update_occurrences(self) -> int:
"""
Update the list of occurrences to include only those within the current duration
Returns:
The number of occurrences still being tracked
"""
cutoff: datetime = datetime.now() - self.__duration
self.__occurences = [
occurrence
for occurrence in self.__occurences
if occurrence > cutoff
]
return len(self.__occurences)

@property
def key(self):
"""
The identifier that is being tracked
"""
return self.__key

def __len__(self):
return len(self.__occurences)

def __str__(self):
if len(self.__occurences) == 0:
occurrences_details = f"No Occurences within the last {self.__duration.total_seconds()} seconds."
else:
occurrences_details = (f"{len(self.__occurences)} occurrences since "
f"{self.__occurences[0].strftime('%Y-%m-%d %H:%M:%S')}")
return f"{self.key}: {occurrences_details}"


class TimedOccurrenceWatcher:
"""
Keeps track of the amount of occurrences of items within a range of time
"""
MINIMUM_TRACKING_SECONDS: typing.Final[float] = 0.1
"""
The lowest number of seconds to watch for multiple occurrences. Only acting when multiple occurrences are tracked
in under 100ms would create a scenario where the watcher will most likely never trigger an action, rendering
this the wrong tool for the job.
"""

@staticmethod
def default_key_function(obj: object) -> type:
"""
The function used to find a common identifier for an object if one is not provided
"""
return type(obj)

def __init__(
self,
duration: timedelta,
threshold: int,
on_filled: typing.Callable[[_T], typing.Any],
key_function: typing.Callable[[_VT], _KT] = None
):
if not isinstance(duration, timedelta):
raise ValueError(f"Cannot create a {self.__class__.__name__} - {duration} is not a timedelta object")

if duration.total_seconds() < self.MINIMUM_TRACKING_SECONDS:
raise ValueError(
f"Cannot create a {self.__class__.__name__} - the duration is too short ({duration.total_seconds()}s)"
)

self.__duration = duration

if not isinstance(key_function, typing.Callable):
key_function = self.default_key_function

self.__key_function = key_function
self.__entries: typing.Dict[uuid.UUID, _OccurrenceTracker] = {}
self.__threshold = threshold
self.__on_filled = on_filled

def value_encountered(self, value: _T):
"""
Add an occurrence of an object to track
Args:
value: The item to track
"""
self.__update_trackers()
self._get_tracker(value).value_encountered()

def _get_tracker(self, value: _T) -> _OccurrenceTracker[_T]:
"""
Get an occurrence tracker for the given value
Args:
value: The value to track
Returns:
A tracker for the value
"""
key = self.__key_function(value)

for tracker in self.__entries.values():
if tracker.key == key:
return tracker

new_tracker = _OccurrenceTracker(
key=key,
duration=self.__duration,
threshold=self.__threshold,
on_filled=self.__on_filled
)
self.__entries[uuid.uuid1()] = new_tracker
return new_tracker

def __update_trackers(self):
"""
Update the amount of items in each tracker
If a tracker becomes empty it will be removed
"""
for tracker_id, tracker in self.__entries.items():
amount_left = tracker.update_occurrences()
if amount_left == 0:
del self.__entries[tracker_id]

@property
def size(self) -> int:
"""
The number of items encountered within the duration
"""
self.__update_trackers()
return sum(len(tracker) for tracker in self.__entries.values())

@property
def duration(self) -> timedelta:
"""
The amount of time to track items for
"""
return self.__duration

def __str__(self):
return f"{self.__class__.__name__}: {self.size} items within the last {self.duration.total_seconds()} Seconds"

def __repr__(self):
return self.__str__()


class EventfulMap(abc.ABC, typing.MutableMapping[_KT, _VT], typing.Generic[_KT, _VT]):
@abc.abstractmethod
def get_handlers(self) -> typing.Dict[CollectionEvent, typing.MutableSequence[typing.Callable]]:
Expand Down
1 change: 1 addition & 0 deletions python/lib/core/dmod/core/context/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def end_scope(self):
"""
Override to add extra logic for when this scope is supposed to reach its end
"""
self.logger.warning(f"Ending scope '{self.__scope_id}' for: {self}")
self.drop_references()
self.__scope_closed()

Expand Down
121 changes: 79 additions & 42 deletions python/lib/core/dmod/core/context/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from multiprocessing import managers
from multiprocessing import context
from multiprocessing import RLock

from .base import ObjectManagerScope
from .base import T
Expand All @@ -23,8 +22,6 @@
TypeOfRemoteObject = typing.Union[typing.Type[managers.BaseProxy], type]
"""A wrapper object that is used to communicate to objects created by Managers"""

_PREPARATION_LOCK: RLock = RLock()


class DMODObjectManager(managers.BaseManager):
"""
Expand Down Expand Up @@ -132,45 +129,46 @@ def prepare(
additional_proxy_types: A mapping between class types and the type of proxies used to operate
upon them remotely
"""
with _PREPARATION_LOCK:
if not cls.__initialized:
if not isinstance(additional_proxy_types, typing.Mapping):
additional_proxy_types = {}

already_registered_items: typing.List[str] = list(getattr(cls, "_registry").keys())

for real_class, proxy_class in additional_proxy_types.items():
name = real_class.__name__ if hasattr(real_class, "__name__") else None

if name is None:
raise TypeError(f"Cannot add a proxy for {real_class} - {real_class} is not a standard type")

if name in already_registered_items:
print(f"'{name}' is already registered to {cls.__name__}")
continue

cls.register_class(class_type=real_class, type_of_proxy=proxy_class)
already_registered_items.append(name)

# Now find all proxies attached to the SyncManager and attach those
# This will ensure that this manager has proxies for objects and structures like dictionaries
registry_initialization_arguments = (
{
"typeid": typeid,
"callable": attributes[0],
"exposed": attributes[1],
"method_to_typeid": attributes[2],
"proxytype": attributes[3]
}
for typeid, attributes in getattr(managers.SyncManager, "_registry").items()
if typeid not in already_registered_items
)
if not cls.__initialized:
if not isinstance(additional_proxy_types, typing.Mapping):
additional_proxy_types = {}

already_registered_items: typing.List[str] = list(getattr(cls, "_registry").keys())

for real_class, proxy_class in additional_proxy_types.items():
name = real_class.__name__ if hasattr(real_class, "__name__") else None

if name is None:
raise TypeError(f"Cannot add a proxy for {real_class} - {real_class} is not a standard type")

if name in already_registered_items:
print(f"'{name}' is already registered to {cls.__name__}")
continue

cls.register_class(class_type=real_class, type_of_proxy=proxy_class)
already_registered_items.append(name)

# Now find all proxies attached to the SyncManager and attach those
# This will ensure that this manager has proxies for objects and structures like dictionaries
registry_initialization_arguments = (
{
"typeid": typeid,
"callable": attributes[0],
"exposed": attributes[1],
"method_to_typeid": attributes[2],
"proxytype": attributes[3]
}
for typeid, attributes in getattr(managers.SyncManager, "_registry").items()
if typeid not in already_registered_items
)

for arguments in registry_initialization_arguments:
cls.register(**arguments)
for arguments in registry_initialization_arguments:
cls.register(**arguments)
else:
logging.warning(f"The '{cls.__name__}' cannot be prepared - it has already been done so.")

cls.__initialized = True
return cls
cls.__initialized = True
return cls

def create_and_track_object(self, __class_name: str, __scope_name: str, /, *args, **kwargs) -> T:
"""
Expand All @@ -197,6 +195,10 @@ def create_and_track_object(self, __class_name: str, __scope_name: str, /, *args
)

if __scope_name not in self.__scopes:
self.logger.warning(
f"Cannot track a '{__class_name}' object in the '{__scope_name}' "
f"scope if it has not been established. Establishing it now."
)
self.establish_scope(__scope_name)

new_instance = self.create_object(__class_name, *args, **kwargs)
Expand Down Expand Up @@ -228,12 +230,13 @@ def create_object(self, __class_name, /, *args, **kwargs) -> T:

return value

def free(self, scope_name: str):
def free(self, scope_name: str, fail_on_missing_scope: bool = True):
"""
Remove all items associated with a given tracking key from the object manager
Args:
scope_name: The key used to keep track of like items
fail_on_missing_scope: Throw an exception if the scope doesn't exist
Returns:
The number of items that were deleted
Expand All @@ -247,9 +250,13 @@ def free(self, scope_name: str):
f"Received '{scope_name}' ({type(scope_name)}"
)

if scope_name not in self.__scopes:
if scope_name not in self.__scopes and fail_on_missing_scope:
raise KeyError(f"Cannot free objects from {self} - no items are tracked by the key {scope_name}")

if scope_name not in self.__scopes:
self.logger.warning(f"Cannot remove scope by the name '{scope_name}' - it is not currently stored")
return

del self.__scopes[scope_name]

def __inject_scope(self, scope: ObjectManagerScope):
Expand All @@ -259,6 +266,9 @@ def __inject_scope(self, scope: ObjectManagerScope):
Args:
scope: The scope object to add
"""
if scope is None:
raise ValueError(f"A scope for {self.__class__.__name__} cannot be added - it is None")

if scope.name in self.__scopes:
raise KeyError(
f"Cannot add a scope object '{scope.name}' to {self} - there is already a scope by that name. "
Expand Down Expand Up @@ -297,7 +307,19 @@ def monitor_operation(self, scope: typing.Union[ObjectManagerScope, str, bytes],
scope: A scope object containing references to shared objects that need to be kept alive
operation: The operation using the shared objects
"""
if self.__monitor_scope and not self.__scope_monitor:
if self.__scope_monitor is None:
self.__scope_monitor = FutureMonitor(logger=self.__logger)
self.__scope_monitor.start()

if not self.__monitor_scope or not self.__scope_monitor:
if not self.__monitor_scope and not self.__scope_monitor:
logging.error("This logger was not set to monitor scopes and it does not have a monitoring thread")
elif not self.__monitor_scope:
logging.error("This logger was told not to monitor scope")
else:
logging.error("There is no thread available to monitor scopes")

if isinstance(scope, ObjectManagerScope):
scope_name = scope.name
elif isinstance(scope, bytes):
Expand Down Expand Up @@ -339,6 +361,9 @@ def monitor_operation(self, scope: typing.Union[ObjectManagerScope, str, bytes],

@property
def logger(self) -> LoggerProtocol:
"""
The logger made specifically for this manager
"""
return self.__logger

@logger.setter
Expand All @@ -360,3 +385,15 @@ def logger(self, logger: LoggerProtocol):
# the logger on the monitor is kept up to speed.
if self.__scope_monitor:
self.__scope_monitor.logger = logger

def __exit__(self, exc_type, exc_val, exc_tb):
if self.__scope_monitor:
self.__scope_monitor.stop()

if self.__monitor_scope:
self.__scope_monitor.kill()

for scope_name, scope in self.__scopes.items():
scope.end_scope()

super().__exit__(exc_type, exc_val, exc_tb)
Loading

0 comments on commit 29cda6c

Please sign in to comment.