From 47e465bbd3936b67ca57fe174b168d908c859de4 Mon Sep 17 00:00:00 2001 From: Premysl Cerny Date: Thu, 8 Aug 2024 16:22:40 +0200 Subject: [PATCH 01/11] Add watchdog for loop interactivity. --- asab/application.py | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/asab/application.py b/asab/application.py index 47ff7439..76b6086c 100644 --- a/asab/application.py +++ b/asab/application.py @@ -130,6 +130,12 @@ def __init__(self): self.LaunchTime = time.time() self.BaseTime = self.LaunchTime - self.Loop.time() + # Last time the on_tick/10! was registered + self.LastOnTick60 = self.LaunchTime + + # Variable that is set when the exit time is happening + self.Exiting = False + self.Modules: list[asab.Module] = [] """ A list of modules that has been added to the application. @@ -185,6 +191,10 @@ def handler(type): self.TaskService = TaskService(self) + # Proactor is mandatory for subsequent interactivity check + if asab.proactor.Module not in modules: + modules.append(asab.proactor.Module) + for module in modules: self.add_module(module) @@ -194,6 +204,27 @@ def handler(type): # Every 10 minutes listen for housekeeping self.PubSub.subscribe("Application.tick/600!", self._on_housekeeping_tick) + # Obtain proactor service to register interactivity check on the asyncio loop + self.ProactorService = self.get_service("asab.ProactorService") + self.ProactorService.schedule(self._run_watchdog_for_event_loop) + + + def _run_watchdog_for_event_loop(self): + """ + Periodically checks the loop interactivity. + """ + + while not self.Exiting: + current_time = time.time() + + if (current_time - self.LastOnTick60) < (10 * 60): # Ten minute threshold (10 * expected cycle) + time.sleep(60) # Sleep one minute (one cycle) + continue + + # The loop lost its interactivity + L.critical("The event loop lost its interactivity. Stopping the application!") + os._exit(5) # Cannot use sys.exit inside the thread + def create_argument_parser( self, @@ -596,7 +627,9 @@ async def _run_time_governor(self): self.PubSub.publish("Application.tick/10!") if (cycle_no % 60) == 0: # Rebase a Loop time - self.BaseTime = time.time() - self.Loop.time() + current_time = time.time() + self.LastOnTick60 = current_time + self.BaseTime = current_time - self.Loop.time() self.PubSub.publish("Application.tick/60!") if (cycle_no % 300) == 0: self.PubSub.publish("Application.tick/300!") @@ -614,6 +647,7 @@ async def _run_time_governor(self): async def _exit_time_governor(self): + self.Exiting = True self.PubSub.publish("Application.exit!") # Finalize services From 215407cca9e8cbc5bf1b59f0507e94b7335c1e1e Mon Sep 17 00:00:00 2001 From: Premysl Cerny Date: Thu, 8 Aug 2024 16:23:37 +0200 Subject: [PATCH 02/11] Update application.py --- asab/application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asab/application.py b/asab/application.py index 76b6086c..34890685 100644 --- a/asab/application.py +++ b/asab/application.py @@ -130,7 +130,7 @@ def __init__(self): self.LaunchTime = time.time() self.BaseTime = self.LaunchTime - self.Loop.time() - # Last time the on_tick/10! was registered + # Last time the on_tick/60! was registered self.LastOnTick60 = self.LaunchTime # Variable that is set when the exit time is happening From 8670742fb80d25d77caf61c0490d2572f3249aee Mon Sep 17 00:00:00 2001 From: Premysl Cerny Date: Thu, 8 Aug 2024 16:45:10 +0200 Subject: [PATCH 03/11] Add a missing import --- asab/application.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/asab/application.py b/asab/application.py index 34890685..b3731a22 100644 --- a/asab/application.py +++ b/asab/application.py @@ -192,6 +192,8 @@ def handler(type): self.TaskService = TaskService(self) # Proactor is mandatory for subsequent interactivity check + import asab.proactor + if asab.proactor.Module not in modules: modules.append(asab.proactor.Module) From bc40093cb4b12b19e5d6d19b1db9f3ffaa83dac7 Mon Sep 17 00:00:00 2001 From: Premysl Cerny Date: Thu, 8 Aug 2024 17:00:18 +0200 Subject: [PATCH 04/11] Use native Python threading. --- asab/application.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/asab/application.py b/asab/application.py index b3731a22..b5725db8 100644 --- a/asab/application.py +++ b/asab/application.py @@ -11,6 +11,8 @@ import datetime import typing +import threading + import asab try: @@ -133,9 +135,6 @@ def __init__(self): # Last time the on_tick/60! was registered self.LastOnTick60 = self.LaunchTime - # Variable that is set when the exit time is happening - self.Exiting = False - self.Modules: list[asab.Module] = [] """ A list of modules that has been added to the application. @@ -206,9 +205,11 @@ def handler(type): # Every 10 minutes listen for housekeeping self.PubSub.subscribe("Application.tick/600!", self._on_housekeeping_tick) - # Obtain proactor service to register interactivity check on the asyncio loop - self.ProactorService = self.get_service("asab.ProactorService") - self.ProactorService.schedule(self._run_watchdog_for_event_loop) + # Run the watchdog to detect lost interactivity on the event loop + # Start the thread that checks if the loop is still interactive + thread = threading.Thread(target=self._run_watchdog_for_event_loop) + thread.daemon = True # Daemonize the thread to ensure it exits with the main program + thread.start() def _run_watchdog_for_event_loop(self): @@ -216,11 +217,11 @@ def _run_watchdog_for_event_loop(self): Periodically checks the loop interactivity. """ - while not self.Exiting: + while True: current_time = time.time() - if (current_time - self.LastOnTick60) < (10 * 60): # Ten minute threshold (10 * expected cycle) - time.sleep(60) # Sleep one minute (one cycle) + if (current_time - self.LastOnTick60) < (15 * 60): # Fifteen minute threshold (15 * expected cycle) + time.sleep(5) # Sleep one minute (one cycle) continue # The loop lost its interactivity @@ -649,7 +650,6 @@ async def _run_time_governor(self): async def _exit_time_governor(self): - self.Exiting = True self.PubSub.publish("Application.exit!") # Finalize services From 5ff7f483df7605f2636461dfb9c437fab36e11be Mon Sep 17 00:00:00 2001 From: Premysl Cerny Date: Thu, 8 Aug 2024 17:00:56 +0200 Subject: [PATCH 05/11] Update application.py --- asab/application.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/asab/application.py b/asab/application.py index b5725db8..00c908d5 100644 --- a/asab/application.py +++ b/asab/application.py @@ -190,12 +190,6 @@ def handler(type): self.TaskService = TaskService(self) - # Proactor is mandatory for subsequent interactivity check - import asab.proactor - - if asab.proactor.Module not in modules: - modules.append(asab.proactor.Module) - for module in modules: self.add_module(module) From c36437aad612042dd1029ba4abc2f9ffe7174b8f Mon Sep 17 00:00:00 2001 From: Premysl Cerny Date: Thu, 8 Aug 2024 17:01:23 +0200 Subject: [PATCH 06/11] Update application.py --- asab/application.py | 1 - 1 file changed, 1 deletion(-) diff --git a/asab/application.py b/asab/application.py index 00c908d5..0dd27e75 100644 --- a/asab/application.py +++ b/asab/application.py @@ -200,7 +200,6 @@ def handler(type): self.PubSub.subscribe("Application.tick/600!", self._on_housekeeping_tick) # Run the watchdog to detect lost interactivity on the event loop - # Start the thread that checks if the loop is still interactive thread = threading.Thread(target=self._run_watchdog_for_event_loop) thread.daemon = True # Daemonize the thread to ensure it exits with the main program thread.start() From e12558d1820f9edd061c61f59ff95fdb599e9bc0 Mon Sep 17 00:00:00 2001 From: Premysl Cerny Date: Thu, 8 Aug 2024 17:04:40 +0200 Subject: [PATCH 07/11] Update application.py --- asab/application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asab/application.py b/asab/application.py index 0dd27e75..e597fb29 100644 --- a/asab/application.py +++ b/asab/application.py @@ -214,7 +214,7 @@ def _run_watchdog_for_event_loop(self): current_time = time.time() if (current_time - self.LastOnTick60) < (15 * 60): # Fifteen minute threshold (15 * expected cycle) - time.sleep(5) # Sleep one minute (one cycle) + time.sleep(60) # Sleep one minute (one cycle) continue # The loop lost its interactivity From 6e819aeb006b5a93c742a351c88190da93165014 Mon Sep 17 00:00:00 2001 From: Premysl Cerny Date: Thu, 8 Aug 2024 17:31:46 +0200 Subject: [PATCH 08/11] Change naming. --- asab/application.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/asab/application.py b/asab/application.py index e597fb29..48c0ef1b 100644 --- a/asab/application.py +++ b/asab/application.py @@ -200,12 +200,12 @@ def handler(type): self.PubSub.subscribe("Application.tick/600!", self._on_housekeeping_tick) # Run the watchdog to detect lost interactivity on the event loop - thread = threading.Thread(target=self._run_watchdog_for_event_loop) - thread.daemon = True # Daemonize the thread to ensure it exits with the main program - thread.start() + watchdog_thread = threading.Thread(target=self._watchdog) + watchdog_thread.daemon = True # Daemonize the thread to ensure it exits with the main program + watchdog_thread.start() - def _run_watchdog_for_event_loop(self): + def _watchdog(self): """ Periodically checks the loop interactivity. """ From fbda873552f080f40fe3e55e91ac37c295676c8a Mon Sep 17 00:00:00 2001 From: Premysl Cerny Date: Thu, 8 Aug 2024 18:33:32 +0200 Subject: [PATCH 09/11] Add watchdog_threshold configuration. --- asab/application.py | 13 +++++++++---- asab/config.py | 5 +++++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/asab/application.py b/asab/application.py index 48c0ef1b..b366a81b 100644 --- a/asab/application.py +++ b/asab/application.py @@ -200,9 +200,12 @@ def handler(type): self.PubSub.subscribe("Application.tick/600!", self._on_housekeeping_tick) # Run the watchdog to detect lost interactivity on the event loop - watchdog_thread = threading.Thread(target=self._watchdog) - watchdog_thread.daemon = True # Daemonize the thread to ensure it exits with the main program - watchdog_thread.start() + self.WatchdogThreshold = Config["general"].getseconds("watchdog_threshold") + + if self.WatchdogThreshold > 0: + watchdog_thread = threading.Thread(target=self._watchdog) + watchdog_thread.daemon = True # Daemonize the thread to ensure it exits with the main program + watchdog_thread.start() def _watchdog(self): @@ -213,12 +216,14 @@ def _watchdog(self): while True: current_time = time.time() - if (current_time - self.LastOnTick60) < (15 * 60): # Fifteen minute threshold (15 * expected cycle) + # Check if the configured threshold passed + if (current_time - self.LastOnTick60) < self.WatchdogThreshold: time.sleep(60) # Sleep one minute (one cycle) continue # The loop lost its interactivity L.critical("The event loop lost its interactivity. Stopping the application!") + os.kill(os.getpid(), signal.SIGKILL) # Works only on Linux os._exit(5) # Cannot use sys.exit inside the thread diff --git a/asab/config.py b/asab/config.py index 6d91d679..2b18f212 100644 --- a/asab/config.py +++ b/asab/config.py @@ -43,6 +43,11 @@ class ConfigParser(configparser.ConfigParser): 'working_dir': '.', 'uid': '', 'gid': '', + + # Watchdog (event loop interactivity check) + # If the threshold (in seconds) since the last tick/60! passed, the application is killed + # If 0, the watchdog is not started + 'watchdog_threshold': 15 * 60, # 15 minutes }, "asab:metrics": { From d869dc663e3e71e9592a5cc2d067a82ef32c75a0 Mon Sep 17 00:00:00 2001 From: Premysl Cerny Date: Thu, 8 Aug 2024 18:34:57 +0200 Subject: [PATCH 10/11] Add comment. --- asab/application.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/asab/application.py b/asab/application.py index b366a81b..c7937943 100644 --- a/asab/application.py +++ b/asab/application.py @@ -210,7 +210,9 @@ def handler(type): def _watchdog(self): """ - Periodically checks the loop interactivity. + Periodically checks the loop interactivity + and if the configured threshold is passed, + the application is killed. """ while True: From f223c9727c078c4e7b147ac0e8d2c243de35bdbb Mon Sep 17 00:00:00 2001 From: Premysl Cerny Date: Thu, 8 Aug 2024 18:35:13 +0200 Subject: [PATCH 11/11] Update application.py --- asab/application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asab/application.py b/asab/application.py index c7937943..2fd47ef1 100644 --- a/asab/application.py +++ b/asab/application.py @@ -212,7 +212,7 @@ def _watchdog(self): """ Periodically checks the loop interactivity and if the configured threshold is passed, - the application is killed. + the application is killed by a signal. """ while True: