Skip to content

Commit

Permalink
Updated the error counter to clear errors if they haven't occurred re…
Browse files Browse the repository at this point in the history
…cently and added event handling to ensure that functions are when too many errors are hit. Fixed the parameters for the cleanup function and the return code for interrupt exits and added extra handling for keyboard interrupts that weren't being respected.
  • Loading branch information
christophertubbs authored and robertbartel committed Oct 7, 2024
1 parent 125b38f commit 9dfb420
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 69 deletions.
131 changes: 79 additions & 52 deletions python/services/evaluationservice/dmod/evaluationservice/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def get_concurrency_executor_type(**kwargs) -> typing.Callable[[], futures.Execu

def signal_handler(signum: int, frame):
"""
Handles cleanup operations for
Handles cleanup operations for the runner in case of an unexpected signal
Args:
signum: The type of signal currently being handled
frame: The frame explaining where the code was when the signal was triggered
Expand All @@ -67,14 +68,10 @@ def signal_handler(signum: int, frame):

if REDIS_PARAMETERS_FOR_PROCESS.is_valid:
service.error("Cleaning up redis resources...")
cleanup(
stream_name=REDIS_PARAMETERS_FOR_PROCESS.stream_name,
group_name=REDIS_PARAMETERS_FOR_PROCESS.group_name,
redis_parameters=REDIS_PARAMETERS_FOR_PROCESS.kvstore_parameters
)
cleanup(redis_parameters=REDIS_PARAMETERS_FOR_PROCESS)

service.error("Now exiting...")
sys.exit(1)
sys.exit(signum)


class Arguments:
Expand Down Expand Up @@ -151,8 +148,6 @@ def limit(self) -> typing.Optional[int]:
def __parse_command_line(self, *args):
parser = ArgumentParser("Starts a series of processes that will listen and launch evaluations")

# Add options

parser.add_argument(
'--redis-host',
help='Set the host value for making Redis connections',
Expand Down Expand Up @@ -284,6 +279,11 @@ def kwargs(self):


REDIS_PARAMETERS_FOR_PROCESS = streams.StreamParameters()
"""
Process-local parameters describing how to access a stream.
Held at the process level so that the signal handler may use it for cleanup operations
"""


@dataclasses.dataclass
Expand Down Expand Up @@ -434,7 +434,18 @@ def monitor_running_jobs(
"""
potentially_complete_job: typing.Optional[ApplyResult] = None

monitor_errors = ErrorCounter(limit=EXCEPTION_LIMIT)
encountered_errors = ErrorCounter(limit=EXCEPTION_LIMIT)
"""
A collection of errors that may bear repeats of of individual types of errors.
Collected errors are only finally raised if and when they have occurred over a given amount of times
This ensures that the polling loop is not interrupted on rare remote errors yet still throws errors when stuck
in an infinite loop of failing code
"""

# Tell this loop and the runner to halt due to the error
# Failure to do this will allow the runner to continue with an offline monitor
encountered_errors.on_exceedance(lambda _: stop_signal.set())

while not stop_signal.is_set():
try:
Expand Down Expand Up @@ -463,7 +474,7 @@ def monitor_running_jobs(
# There are plenty of times when this might be empty and that's fine. In this case we just want
pass
except Exception as exception:
monitor_errors.add_error(error=exception)
encountered_errors.add_error(error=exception)
service.error(exception)

potentially_complete_job = None
Expand Down Expand Up @@ -657,53 +668,66 @@ def listen(
redis_connection=connection
)

monitoring_thread = threading.Thread(target=monitor_running_jobs, args=(connection, active_jobs, stop_signal))
monitoring_thread.setDaemon(True)
monitoring_thread = threading.Thread(
target=monitor_running_jobs,
name=f"{service.application_values.APPLICATION_NAME}: Monitor for {consumer_name}",
kwargs={
"connection": connection,
"active_job_queue": active_jobs,
"stop_signal": stop_signal,
},
daemon=True
)
monitoring_thread.start()

error_counter = ErrorCounter(limit=EXCEPTION_LIMIT)

with multiprocessing.Pool(processes=job_limit) as worker_pool: # type: multiprocessing.pool.Pool
while not stop_signal.is_set():
if already_listening:
service.info("Starting to listen for evaluation jobs again")
else:
already_listening = True

try:
# Form the generator used to receive messages
message_stream = streams.StreamMessage.listen(
connection,
stream_parameters.group_name,
consumer_name,
stop_signal,
stream_parameters.stream_name
)

for message in message_stream:
possible_record = interpret_message(message, worker_pool, stop_signal)

if possible_record:
# This will block until another entry may be put into the queue - this will prevent one
# instance of the runner from trying to hoard all of the messages and allow other
# instances to try and carry the load
active_jobs.put(possible_record)
else:
# Since this message isn't considered one for the runner, acknowledge that it's been seen
# and move on so something else may view the message
connection.xack(
stream_parameters.stream_name,
stream_parameters.group_name,
message.message_id
)
try:
with multiprocessing.Pool(processes=job_limit) as worker_pool: # type: multiprocessing.pool.Pool
while not stop_signal.is_set():
if already_listening:
service.info(f"{consumer_name}: Starting to listen for evaluation jobs again")
else:
already_listening = True

if stop_signal.is_set():
break
except Exception as exception:
error_counter.add_error(error=exception)
service.error(message="An error occured while listening for evaluation jobs", exception=exception)
try:
# Form the generator used to receive messages
message_stream = streams.StreamMessage.listen(
connection,
stream_parameters.group_name,
consumer_name,
stop_signal,
stream_parameters.stream_name
)

monitoring_thread.join(timeout=5)
for message in message_stream:
possible_record = interpret_message(message, worker_pool, stop_signal)

if possible_record:
# This will block until another entry may be put into the queue - this will prevent one
# instance of the runner from trying to hoard all of the messages and allow other
# instances to try and carry the load
active_jobs.put(possible_record)
else:
# Since this message isn't considered one for the runner, acknowledge that it's been seen
# and move on so something else may view the message
connection.xack(
stream_parameters.stream_name,
stream_parameters.group_name,
message.message_id
)

if stop_signal.is_set():
break
except KeyboardInterrupt:
stop_signal.set()
break
except Exception as exception:
error_counter.add_error(error=exception)
service.error(message="An error occured while listening for evaluation jobs", exception=exception)
finally:
if monitoring_thread:
monitoring_thread.join(timeout=5)


# TODO: Switch from pubsub to a redis stream
Expand Down Expand Up @@ -866,6 +890,9 @@ def main(*args):
try:
listen(stream_parameters=redis_parameters, job_limit=arguments.limit)
exit_code = 0
except KeyboardInterrupt:
exit_code = os.EX_OK
cleanup(redis_parameters=REDIS_PARAMETERS_FOR_PROCESS)
except Exception as exception:
service.error(exception)
exit_code = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
import traceback

from datetime import datetime
from collections import Counter
from datetime import timedelta

import dateutil
from exceptiongroup import ExceptionGroup

ErrorIdentifier = typing.Tuple[typing.Union[typing.Type[BaseException], typing.Tuple[str, int, str]], ...]

def get_error_identifier(
error: BaseException
) -> typing.Tuple[typing.Union[typing.Type[BaseException], typing.Tuple[str, int, str]], ...]:

def get_error_identifier(error: BaseException) -> ErrorIdentifier:
"""
Create a pickleable common identifier for an error
Expand Down Expand Up @@ -42,7 +42,7 @@ def get_error_identifier(
except:
pass

error_identifier: typing.Tuple[typing.Union[type, typing.Tuple[str, int, str]], ...] = tuple(error_details)
error_identifier: ErrorIdentifier = tuple(error_details)
return error_identifier


Expand All @@ -52,20 +52,34 @@ class ErrorCounter:
If a certain type of error occurs too many times, that error will be thrown to stop the containing control structure
"""
def __init__(self, limit: int):
def __init__(self, limit: int, lifetime: timedelta = None):
"""
Args:
limit: The maximum amount of a specific failure that may occur before the error is raised
limit: The maximum amount of a specific failure that may occur before the error is raised.
A specific failure is identified via its code path
lifetime: The period over which an error must occur multiple times
"""
self.__error_limit = limit
self.__counter = Counter()
self.__exeedance_handlers: typing.List[typing.Callable[[BaseException], typing.Any]] = []
self.__occurrences: typing.Dict[ErrorIdentifier, typing.List[datetime]] = {}
self.__lifetime = lifetime or timedelta(minutes=1)

def on_exceedance(self, handler: typing.Callable[[BaseException], typing.Any]):
"""
Add an action to perform when an exception has occurred too many times. Duplicate handlers will not be called
Args:
handler: A function that will be called when an exception has occurred
"""
if handler not in self.__exeedance_handlers:
self.__exeedance_handlers.append(handler)

@property
def error_count(self):
def total(self):
"""
The total number of errors encountered
"""
return sum(count for count in self.__counter.values())
return sum(len(occurrences) for occurrences in self.__occurrences.values())

@property
def error_limit(self):
Expand All @@ -74,7 +88,7 @@ def error_limit(self):
"""
return self.__error_limit

def occurrences(self, error: BaseException) -> int:
def count(self, error: typing.Union[BaseException, ErrorIdentifier]) -> int:
"""
Get the number of times a certain type of error has been thrown
Expand All @@ -84,8 +98,60 @@ def occurrences(self, error: BaseException) -> int:
Returns:
The number of times that the error has been checked
"""
error_identifier = get_error_identifier(error)
return self.__counter[error_identifier]
if isinstance(error, BaseException):
error_identifier = get_error_identifier(error)
else:
error_identifier = error

return len(self.__occurrences.get(error_identifier, []))

def _handle_exceedance(self, error: BaseException) -> None:
"""
Call functions on a type of error that has occurred too many times
Args:
error: The error the was deemed to be too much
"""
handler_errors: typing.List[Exception] = []

for error_handler in self.__exeedance_handlers:
try:
error_handler(error)
except Exception as e:
traceback.print_exc()
handler_identifier = getattr(error_handler, getattr(error_handler, '__name__', str(error_handler)))
e.args = tuple(arg if index > 0 else f"{handler_identifier}: {arg}" for index, arg in enumerate(e.args))
handler_errors.append(e)

if handler_errors:
raise ExceptionGroup(
"Exceptions were encountered when calling handlers for when exceptions were called too many times",
handler_errors
) from error

def __purge_old_errors(self, encounter_time: datetime) -> None:
"""
Remove records of errors that occurred outside the time of interest
Args:
encounter_time: The time of the most recent error
"""
# Filter out all of the old occurrences
self.__occurrences = {
error_identifier: [
occurrence_time
for occurrence_time in occurrence_times
if occurrence_time > encounter_time - self.__lifetime
]
for error_identifier, occurrence_times in self.__occurrences.items()
}

# Remove all records of errors that no longer have occurrences
self.__occurrences = {
error_identifier: occurrence_times
for error_identifier, occurrence_times in self.__occurrences.items()
if len(occurrence_times) > 0
}

def add_error(self, error: BaseException):
"""
Expand All @@ -94,10 +160,16 @@ def add_error(self, error: BaseException):
Args:
error: The error to record
"""
encounter_time = now()

# Remove old errors - this ensures that errors aren't thrown if they occur too far apart
self.__purge_old_errors(encounter_time=encounter_time)

error_identifier = get_error_identifier(error=error)
self.__counter[error_identifier] += 1
self.__occurrences.setdefault(error_identifier, []).append(encounter_time)

if self.__counter[error_identifier] > self.__error_limit:
if self.count(error_identifier) > self.__error_limit:
self._handle_exceedance(error)
raise error


Expand Down Expand Up @@ -200,6 +272,10 @@ def now(local: bool = True) -> datetime:
if local is None:
local = True

timezone = dateutil.tz.tzlocal() if local else dateutil.tz.tzutc()
if local:
timezone = None
else:
from datetime import timezone as tz
timezone = tz.utc

return datetime.now(tz=timezone)

0 comments on commit 9dfb420

Please sign in to comment.