Skip to content

Commit

Permalink
Merge pull request #5915 from oliver-sanders/xtrigger-restart-test
Browse files Browse the repository at this point in the history
xtriggers: test db / restart interaction
  • Loading branch information
hjoliver authored Jan 16, 2024
2 parents faf3c66 + 3b0f5ab commit b20a39c
Showing 1 changed file with 72 additions and 4 deletions.
76 changes: 72 additions & 4 deletions tests/integration/test_xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Tests for the behaviour of xtrigger manager.
"""
"""Tests for the behaviour of xtrigger manager."""

from pytest_mock import mocker
import asyncio
from pathlib import Path
from textwrap import dedent

from cylc.flow.pathutil import get_workflow_run_dir

async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
"""Test that if an itask has 2 wall_clock triggers with different
Expand Down Expand Up @@ -118,4 +121,69 @@ async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker):
# resulting in two calls to put_xtriggers. This test fails
# on master, but with call count 0 (not 2) because the main
# loop doesn't run in this test.



async def test_xtriggers_restart(flow, start, scheduler, db_select):
"""It should write xtrigger results to the DB and load them on restart."""
# define a workflow which uses a custom xtrigger
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True'
},
'scheduling': {
'xtriggers': {
'mytrig': 'mytrig()'
},
'graph': {
'R1': '@mytrig => foo'
},
}
})

# add a custom xtrigger to the workflow
run_dir = Path(get_workflow_run_dir(id_))
xtrig_dir = run_dir / 'lib/python'
xtrig_dir.mkdir(parents=True)
(xtrig_dir / 'mytrig.py').write_text(dedent('''
from random import random
def mytrig(*args, **kwargs):
# return a different random number each time
return True, {"x": str(random())}
'''))

# start the workflow & run the xtrigger
schd = scheduler(id_)
async with start(schd):
# run all xtriggers
for task in schd.pool.get_tasks():
schd.xtrigger_mgr.call_xtriggers_async(task)
# one xtrigger should have been scheduled to run
assert len(schd.proc_pool.queuings) + len(schd.proc_pool.runnings) == 1
# wait for it to return
for _ in range(50):
await asyncio.sleep(0.1)
schd.proc_pool.process()
if len(schd.proc_pool.runnings) == 0:
break
else:
raise Exception('Process pool did not clear')

# the xtrigger should be written to the DB
db_xtriggers = db_select(schd, True, 'xtriggers')
assert len(db_xtriggers) == 1
assert db_xtriggers[0][0] == 'mytrig()'
assert db_xtriggers[0][1].startswith('{"x":')

# restart the workflow, the xtrigger should *not* run again
schd = scheduler(id_)
async with start(schd):
# run all xtriggers
for task in schd.pool.get_tasks():
schd.xtrigger_mgr.call_xtriggers_async(task)
# the xtrigger should have been loaded from the DB
# (so no xtriggers should be scheduled to run)
assert len(schd.proc_pool.queuings) + len(schd.proc_pool.runnings) == 0

# check the DB to ensure no additional entries have been created
assert db_select(schd, True, 'xtriggers') == db_xtriggers

0 comments on commit b20a39c

Please sign in to comment.