Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xtriggers: test db / restart interaction #5915

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 72 additions & 4 deletions tests/integration/test_xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Tests for the behaviour of xtrigger manager.
"""
"""Tests for the behaviour of xtrigger manager."""

from pytest_mock import mocker
import asyncio
from pathlib import Path
from textwrap import dedent

from cylc.flow.pathutil import get_workflow_run_dir

async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
"""Test that if an itask has 2 wall_clock triggers with different
Expand Down Expand Up @@ -118,4 +121,69 @@ async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker):
# resulting in two calls to put_xtriggers. This test fails
# on master, but with call count 0 (not 2) because the main
# loop doesn't run in this test.



async def test_xtriggers_restart(flow, start, scheduler, db_select):
"""It should write xtrigger results to the DB and load them on restart."""
# define a workflow which uses a custom xtrigger
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True'
},
'scheduling': {
'xtriggers': {
'mytrig': 'mytrig()'
},
'graph': {
'R1': '@mytrig => foo'
},
}
})

# add a custom xtrigger to the workflow
run_dir = Path(get_workflow_run_dir(id_))
xtrig_dir = run_dir / 'lib/python'
xtrig_dir.mkdir(parents=True)
(xtrig_dir / 'mytrig.py').write_text(dedent('''
from random import random

def mytrig(*args, **kwargs):
# return a different random number each time
return True, {"x": str(random())}
'''))

# start the workflow & run the xtrigger
schd = scheduler(id_)
async with start(schd):
# run all xtriggers
for task in schd.pool.get_tasks():
schd.xtrigger_mgr.call_xtriggers_async(task)
# one xtrigger should have been scheduled to run
assert len(schd.proc_pool.queuings) + len(schd.proc_pool.runnings) == 1
# wait for it to return
for _ in range(50):
await asyncio.sleep(0.1)
schd.proc_pool.process()
if len(schd.proc_pool.runnings) == 0:
break
else:
raise Exception('Process pool did not clear')

# the xtrigger should be written to the DB
db_xtriggers = db_select(schd, True, 'xtriggers')
assert len(db_xtriggers) == 1
assert db_xtriggers[0][0] == 'mytrig()'
assert db_xtriggers[0][1].startswith('{"x":')

# restart the workflow, the xtrigger should *not* run again
schd = scheduler(id_)
async with start(schd):
# run all xtriggers
for task in schd.pool.get_tasks():
schd.xtrigger_mgr.call_xtriggers_async(task)
# the xtrigger should have been loaded from the DB
# (so no xtriggers should be scheduled to run)
assert len(schd.proc_pool.queuings) + len(schd.proc_pool.runnings) == 0

# check the DB to ensure no additional entries have been created
assert db_select(schd, True, 'xtriggers') == db_xtriggers
Loading