Skip to content

Commit

Permalink
Implement Database upgrader
Browse files Browse the repository at this point in the history
unit test for database upgrader
  • Loading branch information
wxtim committed Dec 5, 2022
1 parent a76632d commit 840d70a
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 1 deletion.
1 change: 1 addition & 0 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ class CylcWorkflowDAO:
TABLE_TASK_JOBS: [
["cycle", {"is_primary_key": True}],
["name", {"is_primary_key": True}],
["flow_nums", {"is_primary_key": True}],
["submit_num", {"datatype": "INTEGER", "is_primary_key": True}],
["is_manual_submit", {"datatype": "INTEGER"}],
["try_num", {"datatype": "INTEGER"}],
Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from cylc.flow.task_outputs import (
TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED, TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED, TASK_OUTPUT_SUBMIT_FAILED)
from cylc.flow.util import serialise
from cylc.flow.wallclock import (
get_current_time_string,
get_seconds_as_interval_string as intvl_as_str
Expand Down Expand Up @@ -1277,6 +1278,7 @@ def _insert_task_job(
'submit_status': submit_status,
'time_submit_exit': event_time,
'job_id': itask.summary.get('submit_method_id'),
'flow_nums': serialise(itask.flow_nums),
# NOTE: the platform name may have changed since task
# preparation started due to intelligent host (and or
# platform) selection
Expand Down
40 changes: 39 additions & 1 deletion cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.wallclock import get_current_time_string, get_utc_mode
from cylc.flow.exceptions import ServiceFileError
from cylc.flow.exceptions import CylcError, ServiceFileError
from cylc.flow.util import serialise

if TYPE_CHECKING:
Expand Down Expand Up @@ -728,6 +728,42 @@ def upgrade_pre_803(self, pri_dao: CylcWorkflowDAO) -> None:
)
conn.commit()

@staticmethod
def upgrade_pre_804(pri_dao: CylcWorkflowDAO) -> None:
"""Upgrade on restart from a pre-8.0.4 database.
Add "flow_nums" column to the "task_jobs".
See GitHub cylc/cylc-flow#5252.
This is only possible if we have single item in the list
represented by flow_nums, else we have to raise an error
"""
conn = pri_dao.connect()
c_name = "flow_nums"
LOG.info(
f"DB upgrade (pre-8.0.4): "
f"add {c_name} column to {CylcWorkflowDAO.TABLE_TASK_JOBS}"
)

# We can't upgrade if the flow_nums in task_states are not
# distinct.
if (
conn.execute(
'SELECT DISTINCT flow_nums FROM task_states;').fetchall()
!= [('[1]',)]
):
raise CylcError(
'Cannot upgrade-restart from 8.0.3 to 8.0.4 IF'
' multiple flows have been used.'
)

conn.execute(
rf"ALTER TABLE {CylcWorkflowDAO.TABLE_TASK_JOBS} "
rf"ADD COLUMN {c_name} "
r"DEFAULT '[1]'"
)
conn.commit()

def check_workflow_db_compatibility(self):
"""Raises ServiceFileError if the existing workflow database is
incompatible with the current version of Cylc."""
Expand Down Expand Up @@ -759,3 +795,5 @@ def check_workflow_db_compatibility(self):
)
if last_run_ver < parse_version("8.0.3.dev"):
self.upgrade_pre_803(pri_dao)
if last_run_ver < parse_version("8.0.4.dev"):
self.upgrade_pre_804(pri_dao)
92 changes: 92 additions & 0 deletions tests/unit/test_workflow_db_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""
Tests for worklfow_db_manager
"""

import pytest
import sqlite3

from cylc.flow.exceptions import CylcError
from cylc.flow.workflow_db_mgr import (
CylcWorkflowDAO,
WorkflowDatabaseManager,
)


@pytest.fixture
def _setup_db(tmp_path):
def _inner(values):
db_file = tmp_path / 'sql.db'
conn = sqlite3.connect(str(db_file))
conn.execute((
r'CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT,'
r' time_created TEXT, time_updated TEXT, submit_num INTEGER,'
r' status TEXT, flow_wait INTEGER, is_manual_submit INTEGER,'
r' PRIMARY KEY(name, cycle, flow_nums));')
)
conn.execute((
r'CREATE TABLE task_jobs(cycle TEXT, name TEXT,'
r' submit_num INTEGER, is_manual_submit INTEGER,'
r' try_num INTEGER, time_submit TEXT, time_submit_exit TEXT,'
r' submit_status INTEGER, time_run TEXT, time_run_exit TEXT,'
r' run_signal TEXT, run_status INTEGER, platform_name TEXT,'
r' job_runner_name TEXT, job_id TEXT,'
r' PRIMARY KEY(cycle, name, submit_num));'
))
conn.execute(values)
conn.execute((
r"INSERT INTO task_jobs VALUES"
r" ('10090101T0000Z', 'foo', 1, 0, 1, '2022-12-05T14:46:06Z',"
r" '2022-12-05T14:46:07Z', 0, '2022-12-05T14:46:10Z',"
r" '2022-12-05T14:46:39Z', '', 0, 'localhost', 'background',"
r" 4377)"
))
conn.commit()
return db_file
return _inner


def test_upgrade_pre_804_fails_on_multiple_flows(_setup_db):
values = (
r'INSERT INTO task_states VALUES'
r" ('foo', '10050101T0000Z', '[1, 3]',"
r" '2022-12-05T14:46:33Z',"
r" '2022-12-05T14:46:40Z', 1, 'succeeded', 0, 0)"
)
db_file_name = _setup_db(values)
pri_dao = CylcWorkflowDAO(db_file_name)
with pytest.raises(
CylcError,
match='^Cannot .* 8.0.3 to 8.0.4 .* used.$'
):
WorkflowDatabaseManager.upgrade_pre_804(pri_dao)


def test_upgrade_pre_804_pass_on_single_flow(_setup_db):
values = (
r'INSERT INTO task_states VALUES'
r" ('foo', '10050101T0000Z', '[1]',"
r" '2022-12-05T14:46:33Z',"
r" '2022-12-05T14:46:40Z', 1, 'succeeded', 0, 0)"
)
db_file_name = _setup_db(values)
pri_dao = CylcWorkflowDAO(db_file_name)
WorkflowDatabaseManager.upgrade_pre_804(pri_dao)
conn = sqlite3.connect(db_file_name)
result = conn.execute('SELECT flow_nums FROM task_jobs;').fetchall()[0][0]
assert result == '[1]'

0 comments on commit 840d70a

Please sign in to comment.