Skip to content

Commit

Permalink
Merge pull request #5193 from oliver-sanders/8.0.x.patch
Browse files Browse the repository at this point in the history
8.0.x.patch
  • Loading branch information
hjoliver authored Oct 13, 2022
2 parents 7b7e484 + ac196cd commit 7972114
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 73 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test_functional.yml
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ jobs:
timeout-minutes: 35
continue-on-error: true
run: |
echo "::set-output name=finished::false"
echo "finished=false" >> $GITHUB_OUTPUT
if [[ '${{ matrix.test-base }}' == 'tests/k' ]]; then
NPROC=4
else
Expand All @@ -204,7 +204,7 @@ jobs:
-j "${NPROC}" \
--state=save \
$(cat test-file) \
|| (echo "::set-output name=finished::true" && false)
|| (echo "finished=true" >> $GITHUB_OUTPUT && false)
- name: Time Out
if: steps.test.outcome == 'failure' && steps.test.outputs.finished != 'true'
Expand Down Expand Up @@ -253,7 +253,7 @@ jobs:
run: |
# artifact name cannot contain '/' characters
CID="$(sed 's|/|-|g' <<< "${{ matrix.name || matrix.chunk }}")"
echo "::set-output name=uploadname::$CID"
echo "uploadname=$CID" >> $GITHUB_OUTPUT
- name: Upload artifact
if: failure() && steps.test.outcome == 'failure'
Expand Down
11 changes: 8 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ Maintenance release.

### Fixes

[#5125](https://github.com/cylc/cylc-flow/pull/5125) - Allow rose-suite.conf
changes to be considered by ``cylc reinstall``.
[#5192](https://github.com/cylc/cylc-flow/pull/5192) -
Recompute runahead limit after use of `cylc remove`.

[#5023](https://github.com/cylc/cylc-flow/pull/5023) - tasks force-triggered
[#5125](https://github.com/cylc/cylc-flow/pull/5125) -
Allow rose-suite.conf changes to be considered by ``cylc reinstall``.

[#5023](https://github.com/cylc/cylc-flow/pull/5023),
[#5187](https://github.com/cylc/cylc-flow/pull/5187) -
tasks force-triggered
after a shutdown was ordered should submit to run immediately on restart.

[#5137](https://github.com/cylc/cylc-flow/pull/5137) -
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class CylcWorkflowDAO:
CONN_TIMEOUT = 0.2
DB_FILE_BASE_NAME = "db"
MAX_TRIES = 100
RESTART_INCOMPAT_VERSION = "8.0b3" # Can't restart if <= this version
RESTART_INCOMPAT_VERSION = "8.0rc2" # Can't restart if <= this version
TABLE_BROADCAST_EVENTS = "broadcast_events"
TABLE_BROADCAST_STATES = "broadcast_states"
TABLE_INHERITANCE = "inheritance"
Expand Down
8 changes: 2 additions & 6 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,17 +523,13 @@ async def configure(self):

def load_workflow_params_and_tmpl_vars(self) -> None:
"""Load workflow params and template variables"""
pri_dao = self.workflow_db_mgr.get_pri_dao()
try:
with self.workflow_db_mgr.get_pri_dao() as pri_dao:
# This logic handles lack of initial cycle point in flow.cylc and
# things that can't change on workflow restart/reload.
pri_dao.select_workflow_params(self._load_workflow_params)
pri_dao.select_workflow_template_vars(self._load_template_vars)
pri_dao.execute_queued_items()

finally:
pri_dao.close()

def log_start(self) -> None:
"""Log headers, that also get logged on each rollover.
Expand Down Expand Up @@ -1006,7 +1002,7 @@ def command_reload_workflow(self) -> None:
LOG.info("Reloading the workflow definition.")
old_tasks = set(self.config.get_task_name_list())
# Things that can't change on workflow reload:
pri_dao = self.workflow_db_mgr.get_pri_dao()
pri_dao = self.workflow_db_mgr._get_pri_dao()
pri_dao.select_workflow_params(self._load_workflow_params)

try:
Expand Down
24 changes: 14 additions & 10 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ def __init__(

self.main_pool: Pool = {}
self.hidden_pool: Pool = {}
self.main_pool_list: List[TaskProxy] = []
self.hidden_pool_list: List[TaskProxy] = []
self._main_pool_list: List[TaskProxy] = []
self._hidden_pool_list: List[TaskProxy] = []
self.main_pool_changed = False
self.hidden_pool_changed = False

Expand Down Expand Up @@ -709,22 +709,24 @@ def get_all_tasks(self) -> List[TaskProxy]:

def get_tasks(self) -> List[TaskProxy]:
"""Return a list of task proxies in the main pool."""
# Cached list only for use internally in this method.
if self.main_pool_changed:
self.main_pool_changed = False
self.main_pool_list = []
self._main_pool_list = []
for _, itask_id_map in self.main_pool.items():
for __, itask in itask_id_map.items():
self.main_pool_list.append(itask)
return self.main_pool_list
self._main_pool_list.append(itask)
return self._main_pool_list

def get_hidden_tasks(self) -> List[TaskProxy]:
"""Return a list of task proxies in the hidden pool."""
# Cached list only for use internally in this method.
if self.hidden_pool_changed:
self.hidden_pool_changed = False
self.hidden_pool_list = []
self._hidden_pool_list = []
for itask_id_maps in self.hidden_pool.values():
self.hidden_pool_list.extend(list(itask_id_maps.values()))
return self.hidden_pool_list
self._hidden_pool_list.extend(list(itask_id_maps.values()))
return self._hidden_pool_list

def get_tasks_by_point(self):
"""Return a map of task proxies by cycle point."""
Expand Down Expand Up @@ -1540,6 +1542,8 @@ def remove_tasks(self, items):
itasks, _, bad_items = self.filter_task_proxies(items)
for itask in itasks:
self.remove(itask, 'request')
if self.compute_runahead():
self.release_runahead_tasks()
return len(bad_items)

def force_trigger_tasks(
Expand Down Expand Up @@ -1732,8 +1736,8 @@ def stop_flow(self, flow_num):
def log_task_pool(self, log_lvl=logging.DEBUG):
"""Log content of task and prerequisite pools in debug mode."""
for pool, name in [
(self.main_pool_list, "Main"),
(self.hidden_pool_list, "Hidden")
(self.get_tasks(), "Main"),
(self.get_hidden_tasks(), "Hidden")
]:
if pool:
LOG.log(
Expand Down
107 changes: 72 additions & 35 deletions cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
* Manage existing run database files on restart.
"""

from contextlib import contextmanager
import json
import os
from pkg_resources import parse_version
from shutil import copy, rmtree
from tempfile import mkstemp
from typing import Any, AnyStr, Dict, List, Set, TYPE_CHECKING, Tuple, Union
from typing import (
Any, AnyStr, Dict, Generator, List, Set, TYPE_CHECKING, Tuple, Union
)

from cylc.flow import LOG
from cylc.flow.broadcast_report import get_broadcast_change_iter
Expand Down Expand Up @@ -188,10 +191,24 @@ def delete_workflow_stop_task(self):
"""Delete workflow stop task from workflow_params table."""
self.delete_workflow_params(self.KEY_STOP_TASK)

def get_pri_dao(self):
"""Return the primary DAO."""
def _get_pri_dao(self) -> CylcWorkflowDAO:
"""Return the primary DAO.
Note: the DAO should be closed after use. It is better to use the
context manager method below, which handles this for you.
"""
return CylcWorkflowDAO(self.pri_path)

@contextmanager
def get_pri_dao(self) -> Generator[CylcWorkflowDAO, None, None]:
"""Return the primary DAO and close it after the context manager
exits."""
pri_dao = self._get_pri_dao()
try:
yield pri_dao
finally:
pri_dao.close()

@staticmethod
def _namedtuple2json(obj):
"""Convert nametuple obj to a JSON string.
Expand Down Expand Up @@ -223,7 +240,7 @@ def on_workflow_start(self, is_restart):
# ... however, in case there is a directory at the path for
# some bizarre reason:
rmtree(self.pri_path, ignore_errors=True)
self.pri_dao = self.get_pri_dao()
self.pri_dao = self._get_pri_dao()
os.chmod(self.pri_path, PERM_PRIVATE)
self.pub_dao = CylcWorkflowDAO(self.pub_path, is_public=True)
self.copy_pri_to_pub()
Expand Down Expand Up @@ -673,14 +690,43 @@ def restart_check(self) -> None:
self.check_workflow_db_compatibility()
except ServiceFileError as exc:
raise ServiceFileError(f"Cannot restart - {exc}")
pri_dao = self.get_pri_dao()
try:
with self.get_pri_dao() as pri_dao:
pri_dao.vacuum()
self.n_restart = pri_dao.select_workflow_params_restart_count() + 1
self.put_workflow_params_1(self.KEY_RESTART_COUNT, self.n_restart)
self.process_queued_ops()
finally:
pri_dao.close()

def _get_last_run_version(self, pri_dao: CylcWorkflowDAO) -> str:
return pri_dao.connect().execute(
rf'''
SELECT
value
FROM
{self.TABLE_WORKFLOW_PARAMS}
WHERE
key == ?
''', # nosec (table name is a code constant)
[self.KEY_CYLC_VERSION]
).fetchone()[0]

def upgrade_pre_803(self, pri_dao: CylcWorkflowDAO) -> None:
"""Upgrade on restart from a pre-8.0.3 database.
Add "is_manual_submit" column to the task states table.
See GitHub cylc/cylc-flow#5023 and #5187.
"""
conn = pri_dao.connect()
c_name = "is_manual_submit"
LOG.info(
f"DB upgrade (pre-8.0.3): "
f"add {c_name} column to {self.TABLE_TASK_STATES}"
)
conn.execute(
rf"ALTER TABLE {self.TABLE_TASK_STATES} "
rf"ADD COLUMN {c_name} INTEGER "
r"DEFAULT 0 NOT NULL"
)
conn.commit()

def check_workflow_db_compatibility(self):
"""Raises ServiceFileError if the existing workflow database is
Expand All @@ -694,31 +740,22 @@ def check_workflow_db_compatibility(self):
"If you are sure you want to operate on this workflow, please "
f"delete the database file at {self.pri_path}"
)
pri_dao = self.get_pri_dao()
try:
last_run_ver = pri_dao.connect().execute(
rf'''
SELECT
value
FROM
{self.TABLE_WORKFLOW_PARAMS}
WHERE
key == ?
''', # nosec (table name is a code constant)
[self.KEY_CYLC_VERSION]
).fetchone()[0]
except TypeError:
raise ServiceFileError(
f"{incompat_msg}, or is corrupted.\n{manual_rm_msg}"
)
finally:
pri_dao.close()
last_run_ver = parse_version(last_run_ver)
restart_incompat_ver = parse_version(
CylcWorkflowDAO.RESTART_INCOMPAT_VERSION
)
if last_run_ver <= restart_incompat_ver:
raise ServiceFileError(
f"{incompat_msg} (workflow last run with Cylc {last_run_ver})."
f"\n{manual_rm_msg}"
with self.get_pri_dao() as pri_dao:
try:
last_run_ver = self._get_last_run_version(pri_dao)
except TypeError:
raise ServiceFileError(
f"{incompat_msg}, or is corrupted.\n{manual_rm_msg}"
)
last_run_ver = parse_version(last_run_ver)
restart_incompat_ver = parse_version(
CylcWorkflowDAO.RESTART_INCOMPAT_VERSION
)
if last_run_ver <= restart_incompat_ver:
raise ServiceFileError(
f"{incompat_msg} (workflow last run with "
f"Cylc {last_run_ver})."
f"\n{manual_rm_msg}"
)
if last_run_ver < parse_version("8.0.3.dev"):
self.upgrade_pre_803(pri_dao)
7 changes: 2 additions & 5 deletions cylc/flow/workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -1185,12 +1185,9 @@ def get_platforms_from_db(run_dir):
workflow_db_mgr = WorkflowDatabaseManager(
os.path.join(run_dir, WorkflowFiles.Service.DIRNAME))
workflow_db_mgr.check_workflow_db_compatibility()
try:
pri_dao = workflow_db_mgr.get_pri_dao()
with workflow_db_mgr.get_pri_dao() as pri_dao:
platform_names = pri_dao.select_task_job_platforms()
return platform_names
finally:
pri_dao.close()
return platform_names


def check_deprecation(path, warn=True):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ CREATE TABLE inheritance(namespace TEXT, inheritance TEXT, PRIMARY KEY(namespace
INSERT INTO inheritance VALUES('root','["root"]');
INSERT INTO inheritance VALUES('foo','["foo", "root"]');
CREATE TABLE workflow_params(key TEXT, value TEXT, PRIMARY KEY(key));
INSERT INTO workflow_params VALUES('cylc_version', '8.0rc1.dev0');
INSERT INTO workflow_params VALUES('cylc_version', '8.0.0');
CREATE TABLE workflow_template_vars(key TEXT, value TEXT, PRIMARY KEY(key));
CREATE TABLE task_action_timers(cycle TEXT, name TEXT, ctx_key TEXT, ctx TEXT, delays TEXT, num INTEGER, delay TEXT, timeout TEXT, PRIMARY KEY(cycle, name, ctx_key));
INSERT INTO task_action_timers VALUES('1','foo','"poll_timer"','["tuple", [[99, "running"]]]','[]',0,NULL,NULL);
Expand All @@ -18,8 +18,8 @@ CREATE TABLE task_late_flags(cycle TEXT, name TEXT, value INTEGER, PRIMARY KEY(c
CREATE TABLE task_outputs(cycle TEXT, name TEXT, flow_nums TEXT, outputs TEXT, PRIMARY KEY(cycle, name, flow_nums));
CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums));
INSERT INTO task_pool VALUES('1','foo','["1", "2"]','waiting', 0);
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums));
INSERT INTO task_states VALUES('foo','1','["1", "2"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0', '0');
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums));
INSERT INTO task_states VALUES('foo','1','["1", "2"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0');
CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output));
CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name));
CREATE TABLE xtriggers(signature TEXT, results TEXT, PRIMARY KEY(signature));
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/restart/57-ghost-job/db.sqlite3
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ CREATE TABLE workflow_flows(flow_num INTEGER, start_time TEXT, description TEXT,
INSERT INTO workflow_flows VALUES(1,'2022-07-25 16:18:23','original flow from 1');
CREATE TABLE workflow_params(key TEXT, value TEXT, PRIMARY KEY(key));
INSERT INTO workflow_params VALUES('uuid_str','4972bc10-a016-46b0-b313-b10f3cb63bf5');
INSERT INTO workflow_params VALUES('cylc_version','8.0rc4.dev');
INSERT INTO workflow_params VALUES('cylc_version','8.0.3.dev');
INSERT INTO workflow_params VALUES('UTC_mode','0');
INSERT INTO workflow_params VALUES('n_restart','0');
INSERT INTO workflow_params VALUES('cycle_point_tz','Z');
Expand Down
5 changes: 3 additions & 2 deletions tests/functional/retries/01-submission-retry.t
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ workflow_run_ok "${TEST_NAME_BASE}-run" \
cylc play --reference-test --debug --no-detach "${WORKFLOW_NAME}"
#-------------------------------------------------------------------------------
if ! command -v 'sqlite3' >'/dev/null'; then
skip 1 'sqlite3 not installed?'
else
sqlite3 \
"$RUN_DIR/${WORKFLOW_NAME}/log/db" \
'SELECT try_num, submit_num FROM task_jobs' >'select.out'
Expand All @@ -40,8 +42,7 @@ if ! command -v 'sqlite3' >'/dev/null'; then
1|3
1|4
__OUT__
else
skip 1 'sqlite3 not installed?'

fi
#-------------------------------------------------------------------------------
purge
Expand Down
5 changes: 1 addition & 4 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,8 @@ def _inner(
stmt += f' WHERE {where_stmt}'
stmt_args = list(where.values())

pri_dao = schd.workflow_db_mgr.get_pri_dao()
try:
with schd.workflow_db_mgr.get_pri_dao() as pri_dao:
return list(pri_dao.connect().execute(stmt, stmt_args))
finally:
pri_dao.close()

return _inner

Expand Down
18 changes: 18 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,3 +559,21 @@ async def test_reload_stopcp(
assert str(schd.pool.stop_point) == '2020'
schd.command_reload_workflow()
assert str(schd.pool.stop_point) == '2020'


async def test_runahead_after_remove(
example_flow: Scheduler
) -> None:
"""The runahead limit should be recomputed after tasks are removed.
"""
task_pool = example_flow.pool
assert int(task_pool.runahead_limit_point) == 4

# No change after removing an intermediate cycle.
task_pool.remove_tasks(['3/*'])
assert int(task_pool.runahead_limit_point) == 4

# Should update after removing the first point.
task_pool.remove_tasks(['1/*'])
assert int(task_pool.runahead_limit_point) == 5
Loading

0 comments on commit 7972114

Please sign in to comment.