Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kickoff: Add watchdog for loop interactivity. #596

Merged
merged 11 commits into from
Aug 14, 2024
38 changes: 37 additions & 1 deletion asab/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ def __init__(self):
self.LaunchTime = time.time()
self.BaseTime = self.LaunchTime - self.Loop.time()

# Last time the on_tick/60! was registered
self.LastOnTick60 = self.LaunchTime

# Variable that is set when the exit time is happening
self.Exiting = False
PremyslCerny marked this conversation as resolved.
Show resolved Hide resolved

self.Modules: list[asab.Module] = []
"""
A list of modules that has been added to the application.
Expand Down Expand Up @@ -185,6 +191,12 @@ def handler(type):

self.TaskService = TaskService(self)

# Proactor is mandatory for subsequent interactivity check
import asab.proactor
PremyslCerny marked this conversation as resolved.
Show resolved Hide resolved

if asab.proactor.Module not in modules:
modules.append(asab.proactor.Module)

for module in modules:
self.add_module(module)

Expand All @@ -194,6 +206,27 @@ def handler(type):
# Every 10 minutes listen for housekeeping
self.PubSub.subscribe("Application.tick/600!", self._on_housekeeping_tick)

# Obtain proactor service to register interactivity check on the asyncio loop
self.ProactorService = self.get_service("asab.ProactorService")
self.ProactorService.schedule(self._run_watchdog_for_event_loop)


def _run_watchdog_for_event_loop(self):
PremyslCerny marked this conversation as resolved.
Show resolved Hide resolved
"""
Periodically checks the loop interactivity.
PremyslCerny marked this conversation as resolved.
Show resolved Hide resolved
"""

while not self.Exiting:
current_time = time.time()

if (current_time - self.LastOnTick60) < (10 * 60): # Ten minute threshold (10 * expected cycle)
time.sleep(60) # Sleep one minute (one cycle)
continue

# The loop lost its interactivity
L.critical("The event loop lost its interactivity. Stopping the application!")
os._exit(5) # Cannot use sys.exit inside the thread
PremyslCerny marked this conversation as resolved.
Show resolved Hide resolved


def create_argument_parser(
self,
Expand Down Expand Up @@ -596,7 +629,9 @@ async def _run_time_governor(self):
self.PubSub.publish("Application.tick/10!")
if (cycle_no % 60) == 0:
# Rebase a Loop time
self.BaseTime = time.time() - self.Loop.time()
current_time = time.time()
self.LastOnTick60 = current_time
self.BaseTime = current_time - self.Loop.time()
self.PubSub.publish("Application.tick/60!")
if (cycle_no % 300) == 0:
self.PubSub.publish("Application.tick/300!")
Expand All @@ -614,6 +649,7 @@ async def _run_time_governor(self):


async def _exit_time_governor(self):
self.Exiting = True
self.PubSub.publish("Application.exit!")

# Finalize services
Expand Down
Loading