Skip to content

Commit

Permalink
Merge pull request #49 from oliver-sanders/tweak-suicide-extra-tests
Browse files Browse the repository at this point in the history
remove: add integration tests
  • Loading branch information
hjoliver authored Apr 22, 2024
2 parents 2d3df62 + b32c6e0 commit 7e8a096
Showing 1 changed file with 76 additions and 4 deletions.
80 changes: 76 additions & 4 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1929,7 +1929,12 @@ async def test_remove_by_suicide(
start,
log_filter
):
"""Test task removal by suicide trigger."""
"""Test task removal by suicide trigger.
* Suicide triggers should remove tasks from the pool.
* It should be possible to bring them back by manually triggering them.
* Removing a task manually (cylc remove) should work the same.
"""
id_ = flow({
'scheduler': {'allow implicit tasks': 'True'},
'scheduling': {
Expand All @@ -1938,16 +1943,83 @@ async def test_remove_by_suicide(
},
}
})
schd = scheduler(id_)
schd: 'Scheduler' = scheduler(id_)
async with start(schd) as log:
# it should start up with 1/a and 1/b
assert pool_get_task_ids(schd.pool) == ["1/a", "1/b"]

a = schd.pool.get_task(IntegerPoint("1"), "a")

# mark 1/a as failed and ensure 1/b is removed by suicide trigger
schd.pool.spawn_on_output(a, TASK_OUTPUT_FAILED)
assert log_filter(
log,
contains="removed from active task pool: suicide trigger"
regex="1/b.*removed from active task pool: suicide trigger"
)
assert pool_get_task_ids(schd.pool) == ["1/a"]

# ensure that we are able to bring 1/b back by triggering it
log.clear()
schd.pool.force_trigger_tasks(['1/b'], ['1'])
assert log_filter(
log,
regex='1/b.*added to active task pool',
)

# remove 1/b by request (cylc remove)
schd.command_remove_tasks(['1/b'])
assert log_filter(
log,
regex='1/b.*removed from active task pool: request',
)

# ensure that we are able to bring 1/b back by triggering it
log.clear()
schd.pool.force_trigger_tasks(['1/b'], ['1'])
assert log_filter(
log,
regex='1/b.*added to active task pool',
)


async def test_remove_no_respawn(flow, scheduler, start, log_filter):
"""Ensure that removed tasks stay removed.
If a task is removed by suicide trigger or "cylc remove", then it should
not be automatically spawned at a later time.
"""
id_ = flow({
'scheduling': {
'graph': {
'R1': 'a & b => z',
},
},
})
schd: 'Scheduler' = scheduler(id_)
async with start(schd) as log:
a1 = schd.pool.get_task(IntegerPoint("1"), "a")
b1 = schd.pool.get_task(IntegerPoint("1"), "b")
assert a1, '1/a should have been spawned on startup'
assert b1, '1/b should have been spawned on startup'

# mark one of the upstream tasks as succeeded, 1/z should spawn
schd.pool.spawn_on_output(a1, TASK_OUTPUT_SUCCEEDED)
schd.workflow_db_mgr.process_queued_ops()
z1 = schd.pool.get_task(IntegerPoint("1"), "z")
assert z1, '1/z should have been spawned after 1/a succeeded'

# manually remove 1/z, it should be removed from the pool
schd.command_remove_tasks(['1/z'])
schd.workflow_db_mgr.process_queued_ops()
z1 = schd.pool.get_task(IntegerPoint("1"), "z")
assert z1 is None, '1/z should have been removed (by request)'

# mark the other upstream task as succeeded, 1/z should not be
# respawned as a result
schd.pool.spawn_on_output(b1, TASK_OUTPUT_SUCCEEDED)
assert log_filter(
log, contains='Not spawning 1/z: already used in this flow'
)
z1 = schd.pool.get_task(IntegerPoint("1"), "z")
assert (
z1 is None
), '1/z should have stayed removed (but has been added back into the pool'

0 comments on commit 7e8a096

Please sign in to comment.