Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove: add integration tests #49

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 76 additions & 4 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1929,7 +1929,12 @@ async def test_remove_by_suicide(
start,
log_filter
):
"""Test task removal by suicide trigger."""
"""Test task removal by suicide trigger.

* Suicide triggers should remove tasks from the pool.
* It should be possible to bring them back by manually triggering them.
* Removing a task manually (cylc remove) should work the same.
"""
id_ = flow({
'scheduler': {'allow implicit tasks': 'True'},
'scheduling': {
Expand All @@ -1938,16 +1943,83 @@ async def test_remove_by_suicide(
},
}
})
schd = scheduler(id_)
schd: 'Scheduler' = scheduler(id_)
async with start(schd) as log:
# it should start up with 1/a and 1/b
assert pool_get_task_ids(schd.pool) == ["1/a", "1/b"]

a = schd.pool.get_task(IntegerPoint("1"), "a")

# mark 1/a as failed and ensure 1/b is removed by suicide trigger
schd.pool.spawn_on_output(a, TASK_OUTPUT_FAILED)
assert log_filter(
log,
contains="removed from active task pool: suicide trigger"
regex="1/b.*removed from active task pool: suicide trigger"
)
assert pool_get_task_ids(schd.pool) == ["1/a"]

# ensure that we are able to bring 1/b back by triggering it
log.clear()
schd.pool.force_trigger_tasks(['1/b'], ['1'])
assert log_filter(
log,
regex='1/b.*added to active task pool',
)

# remove 1/b by request (cylc remove)
schd.command_remove_tasks(['1/b'])
assert log_filter(
log,
regex='1/b.*removed from active task pool: request',
)

# ensure that we are able to bring 1/b back by triggering it
log.clear()
schd.pool.force_trigger_tasks(['1/b'], ['1'])
assert log_filter(
log,
regex='1/b.*added to active task pool',
)


async def test_remove_no_respawn(flow, scheduler, start, log_filter):
"""Ensure that removed tasks stay removed.

If a task is removed by suicide trigger or "cylc remove", then it should
not be automatically spawned at a later time.
"""
id_ = flow({
'scheduling': {
'graph': {
'R1': 'a & b => z',
},
},
})
schd: 'Scheduler' = scheduler(id_)
async with start(schd) as log:
a1 = schd.pool.get_task(IntegerPoint("1"), "a")
b1 = schd.pool.get_task(IntegerPoint("1"), "b")
assert a1, '1/a should have been spawned on startup'
assert b1, '1/b should have been spawned on startup'

# mark one of the upstream tasks as succeeded, 1/z should spawn
schd.pool.spawn_on_output(a1, TASK_OUTPUT_SUCCEEDED)
schd.workflow_db_mgr.process_queued_ops()
z1 = schd.pool.get_task(IntegerPoint("1"), "z")
assert z1, '1/z should have been spawned after 1/a succeeded'

# manually remove 1/z, it should be removed from the pool
schd.command_remove_tasks(['1/z'])
schd.workflow_db_mgr.process_queued_ops()
z1 = schd.pool.get_task(IntegerPoint("1"), "z")
assert z1 is None, '1/z should have been removed (by request)'

# mark the other upstream task as succeeded, 1/z should not be
# respawned as a result
schd.pool.spawn_on_output(b1, TASK_OUTPUT_SUCCEEDED)
assert log_filter(
log, contains='Not spawning 1/z: already used in this flow'
)
z1 = schd.pool.get_task(IntegerPoint("1"), "z")
assert (
z1 is None
), '1/z should have stayed removed (but has been added back into the pool'