Skip to content

Commit

Permalink
Task cancelation on job timeout
Browse files Browse the repository at this point in the history
This commit adds a mechanism for terminating running tasks after the job
reaches its timeout.

Reference: avocado-framework#5295
Signed-off-by: Jan Richter <jarichte@redhat.com>
  • Loading branch information
richtja authored and maramsmurthy committed May 25, 2023
1 parent 145bcbb commit eb29abf
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 40 deletions.
81 changes: 72 additions & 9 deletions avocado/core/task/statemachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import collections
import logging
import multiprocessing
import sys
import time

from avocado.core.exceptions import TestFailFast
Expand All @@ -20,6 +21,7 @@ def __init__(self, tasks, status_repo):
self._triaging = []
self._ready = []
self._started = []
self._monitored = []
self._finished = []
self._lock = asyncio.Lock()
self._cache_lock = asyncio.Lock()
Expand All @@ -40,6 +42,10 @@ def ready(self):
def started(self):
return self._started

@property
def monitored(self):
return self._monitored

@property
def finished(self):
return self._finished
Expand Down Expand Up @@ -155,6 +161,46 @@ def __repr__(self):
self._spawner, self._max_triaging, self._max_running, self._task_timeout
)

async def _send_timeout_message(self, terminate_tasks):
"""Sends messages related to timeout to status repository.
When the task is terminated, it is necessary to send a finish message to status
repository to close logging. This method will send log message with timeout
information and finish message with right fail reason.
:param terminate_tasks: runtime_tasks which were terminated
:type terminate_tasks: list
"""
for terminated_task in terminate_tasks:
task_id = str(terminated_task.task.identifier)
job_id = terminated_task.task.job_id
log_message = {
"status": "running",
"type": "log",
"log": f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} | "
"Runner error occurred: Timeout reached".encode(
sys.getdefaultencoding()
),
"encoding": "utf-8",
"time": time.monotonic(),
"id": task_id,
"job_id": job_id,
}
finish_message = {
"status": "finished",
"result": "interrupted",
"fail_reason": "Test interrupted: Timeout reached",
"time": time.monotonic(),
"id": task_id,
"job_id": job_id,
}
try:
current_status, _ = self._state_machine._status_repo._status[task_id]
except KeyError:
return
if current_status != "finished":
self._state_machine._status_repo.process_message(log_message)
self._state_machine._status_repo.process_message(finish_message)

async def bootstrap(self):
"""Reads from requested, moves into triaging."""
try:
Expand Down Expand Up @@ -291,6 +337,8 @@ async def monitor(self):
return

if self._spawner.is_task_alive(runtime_task):
async with self._state_machine.lock:
self._state_machine._monitored.append(runtime_task)
try:
if runtime_task.execution_timeout is None:
remaining = None
Expand All @@ -300,15 +348,13 @@ async def monitor(self):
except asyncio.TimeoutError:
runtime_task.status = RuntimeTaskStatus.TIMEOUT
await self._spawner.terminate_task(runtime_task)
message = {
"status": "finished",
"result": "interrupted",
"fail_reason": "Test interrupted: Timeout reached",
"time": time.monotonic(),
"id": str(runtime_task.task.identifier),
"job_id": runtime_task.task.job_id,
}
self._state_machine._status_repo.process_message(message)
await self._send_timeout_message([runtime_task])
async with self._state_machine.lock:
try:
self._state_machine.monitored.remove(runtime_task)
except ValueError:
pass

# from here, this `task` ran, so, let's check
# the its latest data in the status repo
latest_task_data = (
Expand Down Expand Up @@ -341,6 +387,23 @@ async def monitor(self):

await self._state_machine.finish_task(runtime_task, RuntimeTaskStatus.FINISHED)

async def terminate_tasks_timeout(self):
"""Terminate all running tasks with timeout message."""
await self._state_machine.abort(RuntimeTaskStatus.TIMEOUT)
terminated = []
while True:
is_complete = await self._state_machine.complete
async with self._state_machine.lock:
try:
runtime_task = self._state_machine.monitored.pop(0)
except IndexError:
if is_complete:
break
runtime_task.status = RuntimeTaskStatus.TIMEOUT
await self._spawner.terminate_task(runtime_task)
terminated.append(runtime_task)
await self._send_timeout_message(terminated)

async def run(self):
"""Pushes Tasks forward and makes them do something with their lives."""
while True:
Expand Down
21 changes: 18 additions & 3 deletions avocado/plugins/runner_nrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,24 @@ def run_suite(self, job, test_suite):
asyncio.ensure_future(self._update_status(job))
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(
asyncio.wait_for(asyncio.gather(*workers), job.timeout or None)
)
try:
loop.run_until_complete(
asyncio.wait_for(
asyncio.shield(asyncio.gather(*workers)), job.timeout or None
)
)
except (KeyboardInterrupt, asyncio.TimeoutError):
terminate_worker = Worker(
state_machine=tsm,
spawner=spawner,
max_running=max_running,
task_timeout=timeout,
failfast=failfast,
)
loop.run_until_complete(
asyncio.wait_for(terminate_worker.terminate_tasks_timeout(), None)
)
raise
except (KeyboardInterrupt, asyncio.TimeoutError, TestFailFast) as ex:
LOG_JOB.info(str(ex))
job.interrupted_reason = str(ex)
Expand Down
54 changes: 26 additions & 28 deletions selftests/functional/test_job_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ def setUp(self):
)
self.py.save()

def run_and_check(self, cmd_line, e_rc, e_ntests, e_nerrors, e_nfailures, e_nskip):
def run_and_check(self, cmd_line, e_rc, e_ntests, terminated_tests):
os.chdir(BASEDIR)
result = process.run(cmd_line, ignore_status=True)
xml_output = result.stdout
output = result.stdout_text
xml_output = os.path.join(self.tmpdir.name, "latest", "results.xml")
self.assertEqual(
result.exit_status, e_rc, f"Avocado did not return rc {e_rc}:\n{result}"
)
try:
xunit_doc = xml.dom.minidom.parseString(xml_output)
xunit_doc = xml.dom.minidom.parse(xml_output)
except Exception as detail:
raise ParseXMLError(f"Failed to parse content: {detail}\n" f"{xml_output}")

Expand All @@ -70,26 +71,26 @@ def run_and_check(self, cmd_line, e_rc, e_ntests, e_nerrors, e_nfailures, e_nski
(f"Unexpected number of executed tests, XML:\n" f"{xml_output}"),
)

n_errors = int(testsuite_tag.attributes["errors"].value)
self.assertEqual(
n_errors,
e_nerrors,
(f"Unexpected number of test errors, XML:\n" f"{xml_output}"),
)

n_failures = int(testsuite_tag.attributes["failures"].value)
self.assertEqual(
n_failures,
e_nfailures,
0,
(f"Unexpected number of test failures, XML:\n" f"{xml_output}"),
)

e_skip = e_ntests - output.count("STARTED")
n_skip = int(testsuite_tag.attributes["skipped"].value)
self.assertEqual(
n_skip,
e_nskip,
e_skip,
(f"Unexpected number of test skips, XML:\n" f"{xml_output}"),
)
for terminated_test in terminated_tests:
self.assertTrue(
f"{terminated_test}: INTERRUPTED: Test interrupted: Timeout reached"
in output,
f"Test {terminated_test} was not in {output}.",
)

def _check_timeout_msg(self, idx):
res_dir = os.path.join(self.tmpdir.name, "latest", "test-results")
Expand All @@ -103,45 +104,42 @@ def _check_timeout_msg(self, idx):
f"in the {idx}st test's debug.log:\n{debug_log}"
),
)
self.assertIn(
"Traceback (most recent call last)",
debug_log,
(
f"Traceback not present in the {idx}st test's "
f"debug.log:\n{debug_log}"
),
)

@skipOnLevelsInferiorThan(1)
def test_sleep_longer_timeout(self):
""":avocado: tags=parallel:1"""
cmd_line = (
f"{AVOCADO} run --job-results-dir {self.tmpdir.name} "
f"--disable-sysinfo --xunit - "
f"--disable-sysinfo "
f"--job-timeout=5 {self.script.path} "
f"examples/tests/passtest.py"
)
self.run_and_check(cmd_line, 0, 2, 0, 0, 0)
self.run_and_check(cmd_line, 0, 2, [])

@unittest.skip("Job timeout is failing with nrunner, until we fix: #5295")
def test_sleep_short_timeout(self):
cmd_line = (
f"{AVOCADO} run --job-results-dir {self.tmpdir.name} "
f"--disable-sysinfo --xunit - "
f"--disable-sysinfo "
f"--job-timeout=1 {self.script.path} "
f"examples/tests/passtest.py"
)
self.run_and_check(cmd_line, exit_codes.AVOCADO_JOB_INTERRUPTED, 2, 1, 0, 1)
self.run_and_check(
cmd_line, exit_codes.AVOCADO_JOB_INTERRUPTED, 2, [self.script.path]
)
self._check_timeout_msg(1)

@unittest.skip("Job timeout is failing with nrunner, until we fix: #5295")
def test_sleep_short_timeout_with_test_methods(self):
cmd_line = (
f"{AVOCADO} run --job-results-dir {self.tmpdir.name} "
f"--disable-sysinfo --xunit - "
f"--disable-sysinfo "
f"--job-timeout=1 {self.py.path}"
)
self.run_and_check(cmd_line, exit_codes.AVOCADO_JOB_INTERRUPTED, 3, 1, 0, 2)
self.run_and_check(
cmd_line,
exit_codes.AVOCADO_JOB_INTERRUPTED,
3,
[f"{self.py.path}:Dummy.test00sleep"],
)
self._check_timeout_msg(1)

def test_invalid_values(self):
Expand Down

0 comments on commit eb29abf

Please sign in to comment.