Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: add flow nums to task jobs table #5259

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ ones in. -->
-------------------------------------------------------------------------------
## __cylc-8.1.0 (<span actions:bind='release-date'>Upcoming</span>)__

### Breaking Changes

* Workflows started with Cylc 8.0 which contain multiple "flows" cannot be
restarted with Cylc 8.1 due to database changes.

### Enhancements

[#5184](https://github.com/cylc/cylc-flow/pull/5184) - scan for active
Expand All @@ -33,6 +38,9 @@ command to validate, install and play a workflow.
[#5081](https://github.com/cylc/cylc-flow/pull/5081) - Reduced amount that
gets logged at "INFO" level in scheduler logs.

[#5259](https://github.com/cylc/cylc-flow/pull/5259) - Add flow_nums
to task_jobs table in the workflow database.

-------------------------------------------------------------------------------
## __cylc-8.0.4 (<span actions:bind='release-date'>Pending YYYY-MM-DD</span>)__

Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def construct_rsync_over_ssh_cmd(

Developer Warning:
The Cylc Subprocess Pool method ``rsync_255_fail`` relies on
``rsync_cmd[0] == 'rsync'``. Please check that changes to this funtion
``rsync_cmd[0] == 'rsync'``. Please check that changes to this function
do not break ``rsync_255_fail``.
"""
dst_path = dst_path.replace('$HOME/', '')
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ class CylcWorkflowDAO:
["cycle", {"is_primary_key": True}],
["name", {"is_primary_key": True}],
["submit_num", {"datatype": "INTEGER", "is_primary_key": True}],
["flow_nums"],
["is_manual_submit", {"datatype": "INTEGER"}],
["try_num", {"datatype": "INTEGER"}],
["time_submit"],
Expand Down
3 changes: 3 additions & 0 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
get_utc_mode
)
from cylc.flow.cfgspec.globalcfg import SYSPATH
from cylc.flow.util import serialise

if TYPE_CHECKING:
from cylc.flow.task_proxy import TaskProxy
Expand Down Expand Up @@ -440,6 +441,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
# Log and persist
LOG.debug(f"[{itask}] host={host}")
self.workflow_db_mgr.put_insert_task_jobs(itask, {
'flow_nums': serialise(itask.flow_nums),
'is_manual_submit': itask.is_manual_submit,
'try_num': itask.get_try_num(),
'time_submit': get_current_time_string(),
Expand Down Expand Up @@ -1224,6 +1226,7 @@ def _prep_submit_task_job_error(self, workflow, itask, action, exc):
self.workflow_db_mgr.put_insert_task_jobs(
itask,
{
'flow_nums': serialise(itask.flow_nums),
'job_id': itask.summary.get('submit_method_id'),
'is_manual_submit': itask.is_manual_submit,
'try_num': itask.get_try_num(),
Expand Down
40 changes: 39 additions & 1 deletion cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.wallclock import get_current_time_string, get_utc_mode
from cylc.flow.exceptions import ServiceFileError
from cylc.flow.exceptions import CylcError, ServiceFileError
from cylc.flow.util import serialise

if TYPE_CHECKING:
Expand Down Expand Up @@ -722,9 +722,46 @@ def upgrade_pre_803(self, pri_dao: CylcWorkflowDAO) -> None:
)
conn.commit()

@staticmethod
def upgrade_pre_810(pri_dao: CylcWorkflowDAO) -> None:
"""Upgrade on restart from a pre-8.1.0 database.

Add "flow_nums" column to the "task_jobs".
See GitHub cylc/cylc-flow#5252.

This is only possible if we have single item in the list
represented by flow_nums, else we have to raise an error
"""
conn = pri_dao.connect()
c_name = "flow_nums"
LOG.info(
f"DB upgrade (pre-8.1.0): "
f"add {c_name} column to {CylcWorkflowDAO.TABLE_TASK_JOBS}"
)

# We can't upgrade if the flow_nums in task_states are not
# distinct.
from cylc.flow.util import deserialise
flow_nums = deserialise(conn.execute(
'SELECT DISTINCT flow_nums FROM task_states;').fetchall()[0][0])
if len(flow_nums) != 1:
raise CylcError(
'Cannot upgrade-restart from 8.0.x to 8.1.0 IF'
' multiple flows have been used.'
)

conn.execute(
rf"ALTER TABLE {CylcWorkflowDAO.TABLE_TASK_JOBS} "
rf"ADD COLUMN {c_name} "
r"DEFAULT '[1]'"
)
conn.commit()

def upgrade(self, last_run_ver, pri_dao):
if last_run_ver < parse_version("8.0.3.dev"):
self.upgrade_pre_803(pri_dao)
if last_run_ver < parse_version("8.1.0.dev"):
self.upgrade_pre_810(pri_dao)

def check_workflow_db_compatibility(self):
"""Raises ServiceFileError if the existing workflow database is
Expand Down Expand Up @@ -755,6 +792,7 @@ def check_workflow_db_compatibility(self):
f"Cylc {last_run_ver})."
f"\n{manual_rm_msg}"
)

self.upgrade(last_run_ver, pri_dao)

return last_run_ver
2 changes: 1 addition & 1 deletion tests/flakyfunctional/database/00-simple/schema.out
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ CREATE TABLE workflow_params(key TEXT, value TEXT, PRIMARY KEY(key));
CREATE TABLE workflow_template_vars(key TEXT, value TEXT, PRIMARY KEY(key));
CREATE TABLE task_action_timers(cycle TEXT, name TEXT, ctx_key TEXT, ctx TEXT, delays TEXT, num INTEGER, delay TEXT, timeout TEXT, PRIMARY KEY(cycle, name, ctx_key));
CREATE TABLE task_events(name TEXT, cycle TEXT, time TEXT, submit_num INTEGER, event TEXT, message TEXT);
CREATE TABLE task_jobs(cycle TEXT, name TEXT, submit_num INTEGER, is_manual_submit INTEGER, try_num INTEGER, time_submit TEXT, time_submit_exit TEXT, submit_status INTEGER, time_run TEXT, time_run_exit TEXT, run_signal TEXT, run_status INTEGER, platform_name TEXT, job_runner_name TEXT, job_id TEXT, PRIMARY KEY(cycle, name, submit_num));
CREATE TABLE task_jobs(cycle TEXT, name TEXT, submit_num INTEGER, flow_nums TEXT, is_manual_submit INTEGER, try_num INTEGER, time_submit TEXT, time_submit_exit TEXT, submit_status INTEGER, time_run TEXT, time_run_exit TEXT, run_signal TEXT, run_status INTEGER, platform_name TEXT, job_runner_name TEXT, job_id TEXT, PRIMARY KEY(cycle, name, submit_num));
CREATE TABLE task_late_flags(cycle TEXT, name TEXT, value INTEGER, PRIMARY KEY(cycle, name));
CREATE TABLE task_outputs(cycle TEXT, name TEXT, flow_nums TEXT, outputs TEXT, PRIMARY KEY(cycle, name, flow_nums));
CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ CREATE TABLE task_jobs(cycle TEXT, name TEXT, submit_num INTEGER, is_manual_subm
CREATE TABLE task_late_flags(cycle TEXT, name TEXT, value INTEGER, PRIMARY KEY(cycle, name));
CREATE TABLE task_outputs(cycle TEXT, name TEXT, flow_nums TEXT, outputs TEXT, PRIMARY KEY(cycle, name, flow_nums));
CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums));
INSERT INTO task_pool VALUES('1','foo','["1", "2"]','waiting', 0);
INSERT INTO task_pool VALUES('1','foo','["1"]','waiting', 0);
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums));
INSERT INTO task_states VALUES('foo','1','["1", "2"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0');
INSERT INTO task_states VALUES('foo','1','["1"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0');
CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output));
CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name));
CREATE TABLE xtriggers(signature TEXT, results TEXT, PRIMARY KEY(signature));
Expand Down
1 change: 1 addition & 0 deletions tests/functional/restart/57-ghost-job.t
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ poll_workflow_stopped
# Job should have been replaced in DB with same submit num:
TEST_NAME="${TEST_NAME_BASE}-db-query-2"
query_db_task_jobs "$TEST_NAME"

cmp_ok "${TEST_NAME}.stdout" << EOF
1|foo|1
EOF
Expand Down
24 changes: 3 additions & 21 deletions tests/integration/test_workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,6 @@ def db_remove_column(schd: Scheduler, table: str, column: str) -> None:
conn.commit()


def upgrade_db_from_version(schd, version):
"""Runs the DB upgrader from the specified version.

Args:
schd:
The Scheduler who's DB you want to upgrade.
version:
The version you want to upgrade from.
(i.e. what version do you want to tell Cylc the workflow ran
with last time).

"""
with schd.workflow_db_mgr.get_pri_dao() as pri_dao:
schd.workflow_db_mgr.upgrade(parse_version(version), pri_dao)


async def test_db_upgrade_pre_803(
flow, one_conf, start, scheduler, log_filter, db_select
):
Expand All @@ -106,13 +90,10 @@ async def test_db_upgrade_pre_803(

# Remove task_states:is_manual_submit to fake a pre-8.0.3 DB.
db_remove_column(schd, "task_states", "is_manual_submit")
db_remove_column(schd, "task_jobs", "flow_nums")

schd: Scheduler = scheduler(reg, paused_start=True)

# Run the DB upgrader for version 8.0.3
# (8.0.3 does not require upgrade so should be skipped)
upgrade_db_from_version(schd, '8.0.3')

# Restart should fail due to the missing column.
with pytest.raises(sqlite3.OperationalError):
async with start(schd):
Expand All @@ -123,7 +104,8 @@ async def test_db_upgrade_pre_803(

# Run the DB upgrader for version 8.0.2
# (8.0.2 requires upgrade)
upgrade_db_from_version(schd, '8.0.2')
with schd.workflow_db_mgr.get_pri_dao() as pri_dao:
schd.workflow_db_mgr.upgrade_pre_803(pri_dao)
# Restart should now succeed.
async with start(schd):
assert ('n_restart', '2') in db_select(schd, False, 'workflow_params')
93 changes: 93 additions & 0 deletions tests/unit/test_workflow_db_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""
Tests for worklfow_db_manager
"""

import pytest
import sqlite3

from cylc.flow.exceptions import CylcError
from cylc.flow.workflow_db_mgr import (
CylcWorkflowDAO,
WorkflowDatabaseManager,
)


@pytest.fixture
def _setup_db(tmp_path):
def _inner(values):
db_file = tmp_path / 'sql.db'
conn = sqlite3.connect(str(db_file))
conn.execute((
r'CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT,'
r' time_created TEXT, time_updated TEXT, submit_num INTEGER,'
r' status TEXT, flow_wait INTEGER, is_manual_submit INTEGER,'
r' PRIMARY KEY(name, cycle, flow_nums));')
)
conn.execute((
r'CREATE TABLE task_jobs(cycle TEXT, name TEXT,'
r' submit_num INTEGER, is_manual_submit INTEGER,'
r' try_num INTEGER, time_submit TEXT, time_submit_exit TEXT,'
r' submit_status INTEGER, time_run TEXT, time_run_exit TEXT,'
r' run_signal TEXT, run_status INTEGER, platform_name TEXT,'
r' job_runner_name TEXT, job_id TEXT,'
r' PRIMARY KEY(cycle, name, submit_num));'
))
conn.execute(values)
conn.execute((
r"INSERT INTO task_jobs VALUES"
r" ('10090101T0000Z', 'foo', 1, 0, 1, '2022-12-05T14:46:06Z',"
r" '2022-12-05T14:46:07Z', 0, '2022-12-05T14:46:10Z',"
r" '2022-12-05T14:46:39Z', '', 0, 'localhost', 'background',"
r" 4377)"
))
conn.commit()
return db_file
return _inner


def test_upgrade_pre_810_fails_on_multiple_flows(_setup_db):
values = (
r'INSERT INTO task_states VALUES'
r" ('foo', '10050101T0000Z', '[1, 3]',"
r" '2022-12-05T14:46:33Z',"
r" '2022-12-05T14:46:40Z', 1, 'succeeded', 0, 0)"
)
db_file_name = _setup_db(values)
pri_dao = CylcWorkflowDAO(db_file_name)
with pytest.raises(
CylcError,
match='^Cannot .* 8.0.x to 8.1.0 .* used.$'
):
WorkflowDatabaseManager.upgrade_pre_810(pri_dao)


def test_upgrade_pre_810_pass_on_single_flow(_setup_db):
values = (
r'INSERT INTO task_states VALUES'
r" ('foo', '10050101T0000Z', '[1]',"
r" '2022-12-05T14:46:33Z',"
r" '2022-12-05T14:46:40Z', 1, 'succeeded', 0, 0)"
)
db_file_name = _setup_db(values)
pri_dao = CylcWorkflowDAO(db_file_name)
WorkflowDatabaseManager.upgrade_pre_810(pri_dao)
conn = sqlite3.connect(db_file_name)
result = conn.execute(
'SELECT DISTINCT flow_nums FROM task_jobs;').fetchall()[0][0]
assert result == '[1]'