Skip to content

Commit

Permalink
add flow_nums to task_jobs table in database
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim committed Dec 6, 2022
1 parent 2454f8a commit 22ba527
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 2 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ command to validate, install and play a workflow.
[#5081](https://github.com/cylc/cylc-flow/pull/5081) - Reduced amount that
gets logged at "INFO" level in scheduler logs.

### Fixes

[#5259](https://github.com/cylc/cylc-flow/pull/5259) - Add flow_nums
to task_jobs table in the workflow database.

-------------------------------------------------------------------------------
## __cylc-8.0.4 (<span actions:bind='release-date'>Pending YYYY-MM-DD</span>)__

Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def construct_rsync_over_ssh_cmd(
Developer Warning:
The Cylc Subprocess Pool method ``rsync_255_fail`` relies on
``rsync_cmd[0] == 'rsync'``. Please check that changes to this funtion
``rsync_cmd[0] == 'rsync'``. Please check that changes to this function
do not break ``rsync_255_fail``.
"""
dst_path = dst_path.replace('$HOME/', '')
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ class CylcWorkflowDAO:
TABLE_TASK_JOBS: [
["cycle", {"is_primary_key": True}],
["name", {"is_primary_key": True}],
["flow_nums"],
["submit_num", {"datatype": "INTEGER", "is_primary_key": True}],
["is_manual_submit", {"datatype": "INTEGER"}],
["try_num", {"datatype": "INTEGER"}],
Expand Down
3 changes: 3 additions & 0 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
get_utc_mode
)
from cylc.flow.cfgspec.globalcfg import SYSPATH
from cylc.flow.util import serialise

if TYPE_CHECKING:
from cylc.flow.task_proxy import TaskProxy
Expand Down Expand Up @@ -440,6 +441,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
# Log and persist
LOG.debug(f"[{itask}] host={host}")
self.workflow_db_mgr.put_insert_task_jobs(itask, {
'flow_nums': serialise(itask.flow_nums),
'is_manual_submit': itask.is_manual_submit,
'try_num': itask.get_try_num(),
'time_submit': get_current_time_string(),
Expand Down Expand Up @@ -1224,6 +1226,7 @@ def _prep_submit_task_job_error(self, workflow, itask, action, exc):
self.workflow_db_mgr.put_insert_task_jobs(
itask,
{
'flow_nums': serialise(itask.flow_nums),
'job_id': itask.summary.get('submit_method_id'),
'is_manual_submit': itask.is_manual_submit,
'try_num': itask.get_try_num(),
Expand Down
41 changes: 40 additions & 1 deletion cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.wallclock import get_current_time_string, get_utc_mode
from cylc.flow.exceptions import ServiceFileError
from cylc.flow.exceptions import CylcError, ServiceFileError
from cylc.flow.util import serialise

if TYPE_CHECKING:
Expand Down Expand Up @@ -722,9 +722,47 @@ def upgrade_pre_803(self, pri_dao: CylcWorkflowDAO) -> None:
)
conn.commit()

@staticmethod
def upgrade_pre_810(pri_dao: CylcWorkflowDAO) -> None:
"""Upgrade on restart from a pre-8.1.0 database.
Add "flow_nums" column to the "task_jobs".
See GitHub cylc/cylc-flow#5252.
This is only possible if we have single item in the list
represented by flow_nums, else we have to raise an error
"""
conn = pri_dao.connect()
c_name = "flow_nums"
LOG.info(
f"DB upgrade (pre-8.1.0): "
f"add {c_name} column to {CylcWorkflowDAO.TABLE_TASK_JOBS}"
)

# We can't upgrade if the flow_nums in task_states are not
# distinct.
if (
conn.execute(
'SELECT DISTINCT flow_nums FROM task_states;').fetchall()
!= [('[1]',)]
):
raise CylcError(
'Cannot upgrade-restart from 8.0.x to 8.1.0 IF'
' multiple flows have been used.'
)

conn.execute(
rf"ALTER TABLE {CylcWorkflowDAO.TABLE_TASK_JOBS} "
rf"ADD COLUMN {c_name} "
r"DEFAULT '[1]'"
)
conn.commit()

def upgrade(self, last_run_ver, pri_dao):
if last_run_ver < parse_version("8.0.3.dev"):
self.upgrade_pre_803(pri_dao)
if last_run_ver < parse_version("8.1.0.dev"):
self.upgrade_pre_804(pri_dao)

def check_workflow_db_compatibility(self):
"""Raises ServiceFileError if the existing workflow database is
Expand Down Expand Up @@ -755,6 +793,7 @@ def check_workflow_db_compatibility(self):
f"Cylc {last_run_ver})."
f"\n{manual_rm_msg}"
)

self.upgrade(last_run_ver, pri_dao)

return last_run_ver
92 changes: 92 additions & 0 deletions tests/unit/test_workflow_db_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""
Tests for worklfow_db_manager
"""

import pytest
import sqlite3

from cylc.flow.exceptions import CylcError
from cylc.flow.workflow_db_mgr import (
CylcWorkflowDAO,
WorkflowDatabaseManager,
)


@pytest.fixture
def _setup_db(tmp_path):
def _inner(values):
db_file = tmp_path / 'sql.db'
conn = sqlite3.connect(str(db_file))
conn.execute((
r'CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT,'
r' time_created TEXT, time_updated TEXT, submit_num INTEGER,'
r' status TEXT, flow_wait INTEGER, is_manual_submit INTEGER,'
r' PRIMARY KEY(name, cycle, flow_nums));')
)
conn.execute((
r'CREATE TABLE task_jobs(cycle TEXT, name TEXT,'
r' submit_num INTEGER, is_manual_submit INTEGER,'
r' try_num INTEGER, time_submit TEXT, time_submit_exit TEXT,'
r' submit_status INTEGER, time_run TEXT, time_run_exit TEXT,'
r' run_signal TEXT, run_status INTEGER, platform_name TEXT,'
r' job_runner_name TEXT, job_id TEXT,'
r' PRIMARY KEY(cycle, name, submit_num));'
))
conn.execute(values)
conn.execute((
r"INSERT INTO task_jobs VALUES"
r" ('10090101T0000Z', 'foo', 1, 0, 1, '2022-12-05T14:46:06Z',"
r" '2022-12-05T14:46:07Z', 0, '2022-12-05T14:46:10Z',"
r" '2022-12-05T14:46:39Z', '', 0, 'localhost', 'background',"
r" 4377)"
))
conn.commit()
return db_file
return _inner


def test_upgrade_pre_804_fails_on_multiple_flows(_setup_db):
values = (
r'INSERT INTO task_states VALUES'
r" ('foo', '10050101T0000Z', '[1, 3]',"
r" '2022-12-05T14:46:33Z',"
r" '2022-12-05T14:46:40Z', 1, 'succeeded', 0, 0)"
)
db_file_name = _setup_db(values)
pri_dao = CylcWorkflowDAO(db_file_name)
with pytest.raises(
CylcError,
match='^Cannot .* 8.0.3 to 8.0.4 .* used.$'
):
WorkflowDatabaseManager.upgrade_pre_804(pri_dao)


def test_upgrade_pre_804_pass_on_single_flow(_setup_db):
values = (
r'INSERT INTO task_states VALUES'
r" ('foo', '10050101T0000Z', '[1]',"
r" '2022-12-05T14:46:33Z',"
r" '2022-12-05T14:46:40Z', 1, 'succeeded', 0, 0)"
)
db_file_name = _setup_db(values)
pri_dao = CylcWorkflowDAO(db_file_name)
WorkflowDatabaseManager.upgrade_pre_804(pri_dao)
conn = sqlite3.connect(db_file_name)
result = conn.execute('SELECT flow_nums FROM task_jobs;').fetchall()[0][0]
assert result == '[1]'

0 comments on commit 22ba527

Please sign in to comment.