Skip to content

Commit

Permalink
Merge pull request #5187 from hjoliver/db-upgrade-803
Browse files Browse the repository at this point in the history
Db upgrade 8.0.3
  • Loading branch information
hjoliver authored Oct 12, 2022
2 parents 7120803 + 0153153 commit f25ac49
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 56 deletions.
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Maintenance release.
[#5125](https://github.com/cylc/cylc-flow/pull/5125) - Allow rose-suite.conf
changes to be considered by ``cylc reinstall``.

[#5023](https://github.com/cylc/cylc-flow/pull/5023) - tasks force-triggered
[#5023](https://github.com/cylc/cylc-flow/pull/5023),
[#5187](https://github.com/cylc/cylc-flow/pull/5187) -
tasks force-triggered
after a shutdown was ordered should submit to run immediately on restart.

[#5137](https://github.com/cylc/cylc-flow/pull/5137) -
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class CylcWorkflowDAO:
CONN_TIMEOUT = 0.2
DB_FILE_BASE_NAME = "db"
MAX_TRIES = 100
RESTART_INCOMPAT_VERSION = "8.0b3" # Can't restart if <= this version
RESTART_INCOMPAT_VERSION = "8.0rc2" # Can't restart if <= this version
TABLE_BROADCAST_EVENTS = "broadcast_events"
TABLE_BROADCAST_STATES = "broadcast_states"
TABLE_INHERITANCE = "inheritance"
Expand Down
8 changes: 2 additions & 6 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,17 +519,13 @@ async def configure(self):

def load_workflow_params_and_tmpl_vars(self) -> None:
"""Load workflow params and template variables"""
pri_dao = self.workflow_db_mgr.get_pri_dao()
try:
with self.workflow_db_mgr.get_pri_dao() as pri_dao:
# This logic handles lack of initial cycle point in flow.cylc and
# things that can't change on workflow restart/reload.
pri_dao.select_workflow_params(self._load_workflow_params)
pri_dao.select_workflow_template_vars(self._load_template_vars)
pri_dao.execute_queued_items()

finally:
pri_dao.close()

def log_start(self) -> None:
"""Log headers, that also get logged on each rollover.
Expand Down Expand Up @@ -1012,7 +1008,7 @@ def command_reload_workflow(self) -> None:
LOG.info("Reloading the workflow definition.")
old_tasks = set(self.config.get_task_name_list())
# Things that can't change on workflow reload:
pri_dao = self.workflow_db_mgr.get_pri_dao()
pri_dao = self.workflow_db_mgr._get_pri_dao()
pri_dao.select_workflow_params(self._load_workflow_params)

try:
Expand Down
107 changes: 72 additions & 35 deletions cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
* Manage existing run database files on restart.
"""

from contextlib import contextmanager
import json
import os
from pkg_resources import parse_version
from shutil import copy, rmtree
from tempfile import mkstemp
from typing import Any, AnyStr, Dict, List, Set, TYPE_CHECKING, Tuple, Union
from typing import (
Any, AnyStr, Dict, Generator, List, Set, TYPE_CHECKING, Tuple, Union
)

from cylc.flow import LOG
from cylc.flow.broadcast_report import get_broadcast_change_iter
Expand Down Expand Up @@ -188,10 +191,24 @@ def delete_workflow_stop_task(self):
"""Delete workflow stop task from workflow_params table."""
self.delete_workflow_params(self.KEY_STOP_TASK)

def get_pri_dao(self):
"""Return the primary DAO."""
def _get_pri_dao(self) -> CylcWorkflowDAO:
"""Return the primary DAO.
Note: the DAO should be closed after use. It is better to use the
context manager method below, which handles this for you.
"""
return CylcWorkflowDAO(self.pri_path)

@contextmanager
def get_pri_dao(self) -> Generator[CylcWorkflowDAO, None, None]:
"""Return the primary DAO and close it after the context manager
exits."""
pri_dao = self._get_pri_dao()
try:
yield pri_dao
finally:
pri_dao.close()

@staticmethod
def _namedtuple2json(obj):
"""Convert nametuple obj to a JSON string.
Expand Down Expand Up @@ -223,7 +240,7 @@ def on_workflow_start(self, is_restart):
# ... however, in case there is a directory at the path for
# some bizarre reason:
rmtree(self.pri_path, ignore_errors=True)
self.pri_dao = self.get_pri_dao()
self.pri_dao = self._get_pri_dao()
os.chmod(self.pri_path, PERM_PRIVATE)
self.pub_dao = CylcWorkflowDAO(self.pub_path, is_public=True)
self.copy_pri_to_pub()
Expand Down Expand Up @@ -673,14 +690,43 @@ def restart_check(self) -> None:
self.check_workflow_db_compatibility()
except ServiceFileError as exc:
raise ServiceFileError(f"Cannot restart - {exc}")
pri_dao = self.get_pri_dao()
try:
with self.get_pri_dao() as pri_dao:
pri_dao.vacuum()
self.n_restart = pri_dao.select_workflow_params_restart_count() + 1
self.put_workflow_params_1(self.KEY_RESTART_COUNT, self.n_restart)
self.process_queued_ops()
finally:
pri_dao.close()

def _get_last_run_version(self, pri_dao: CylcWorkflowDAO) -> str:
return pri_dao.connect().execute(
rf'''
SELECT
value
FROM
{self.TABLE_WORKFLOW_PARAMS}
WHERE
key == ?
''', # nosec (table name is a code constant)
[self.KEY_CYLC_VERSION]
).fetchone()[0]

def upgrade_pre_803(self, pri_dao: CylcWorkflowDAO) -> None:
"""Upgrade on restart from a pre-8.0.3 database.
Add "is_manual_submit" column to the task states table.
See GitHub cylc/cylc-flow#5023 and #5187.
"""
conn = pri_dao.connect()
c_name = "is_manual_submit"
LOG.info(
f"DB upgrade (pre-8.0.3): "
f"add {c_name} column to {self.TABLE_TASK_STATES}"
)
conn.execute(
rf"ALTER TABLE {self.TABLE_TASK_STATES} "
rf"ADD COLUMN {c_name} INTEGER "
r"DEFAULT 0 NOT NULL"
)
conn.commit()

def check_workflow_db_compatibility(self):
"""Raises ServiceFileError if the existing workflow database is
Expand All @@ -694,31 +740,22 @@ def check_workflow_db_compatibility(self):
"If you are sure you want to operate on this workflow, please "
f"delete the database file at {self.pri_path}"
)
pri_dao = self.get_pri_dao()
try:
last_run_ver = pri_dao.connect().execute(
rf'''
SELECT
value
FROM
{self.TABLE_WORKFLOW_PARAMS}
WHERE
key == ?
''', # nosec (table name is a code constant)
[self.KEY_CYLC_VERSION]
).fetchone()[0]
except TypeError:
raise ServiceFileError(
f"{incompat_msg}, or is corrupted.\n{manual_rm_msg}"
)
finally:
pri_dao.close()
last_run_ver = parse_version(last_run_ver)
restart_incompat_ver = parse_version(
CylcWorkflowDAO.RESTART_INCOMPAT_VERSION
)
if last_run_ver <= restart_incompat_ver:
raise ServiceFileError(
f"{incompat_msg} (workflow last run with Cylc {last_run_ver})."
f"\n{manual_rm_msg}"
with self.get_pri_dao() as pri_dao:
try:
last_run_ver = self._get_last_run_version(pri_dao)
except TypeError:
raise ServiceFileError(
f"{incompat_msg}, or is corrupted.\n{manual_rm_msg}"
)
last_run_ver = parse_version(last_run_ver)
restart_incompat_ver = parse_version(
CylcWorkflowDAO.RESTART_INCOMPAT_VERSION
)
if last_run_ver <= restart_incompat_ver:
raise ServiceFileError(
f"{incompat_msg} (workflow last run with "
f"Cylc {last_run_ver})."
f"\n{manual_rm_msg}"
)
if last_run_ver < parse_version("8.0.3.dev"):
self.upgrade_pre_803(pri_dao)
7 changes: 2 additions & 5 deletions cylc/flow/workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -1175,12 +1175,9 @@ def get_platforms_from_db(run_dir):
workflow_db_mgr = WorkflowDatabaseManager(
os.path.join(run_dir, WorkflowFiles.Service.DIRNAME))
workflow_db_mgr.check_workflow_db_compatibility()
try:
pri_dao = workflow_db_mgr.get_pri_dao()
with workflow_db_mgr.get_pri_dao() as pri_dao:
platform_names = pri_dao.select_task_job_platforms()
return platform_names
finally:
pri_dao.close()
return platform_names


def check_deprecation(path, warn=True):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ CREATE TABLE inheritance(namespace TEXT, inheritance TEXT, PRIMARY KEY(namespace
INSERT INTO inheritance VALUES('root','["root"]');
INSERT INTO inheritance VALUES('foo','["foo", "root"]');
CREATE TABLE workflow_params(key TEXT, value TEXT, PRIMARY KEY(key));
INSERT INTO workflow_params VALUES('cylc_version', '8.0rc1.dev0');
INSERT INTO workflow_params VALUES('cylc_version', '8.0.0');
CREATE TABLE workflow_template_vars(key TEXT, value TEXT, PRIMARY KEY(key));
CREATE TABLE task_action_timers(cycle TEXT, name TEXT, ctx_key TEXT, ctx TEXT, delays TEXT, num INTEGER, delay TEXT, timeout TEXT, PRIMARY KEY(cycle, name, ctx_key));
INSERT INTO task_action_timers VALUES('1','foo','"poll_timer"','["tuple", [[99, "running"]]]','[]',0,NULL,NULL);
Expand All @@ -18,8 +18,8 @@ CREATE TABLE task_late_flags(cycle TEXT, name TEXT, value INTEGER, PRIMARY KEY(c
CREATE TABLE task_outputs(cycle TEXT, name TEXT, flow_nums TEXT, outputs TEXT, PRIMARY KEY(cycle, name, flow_nums));
CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums));
INSERT INTO task_pool VALUES('1','foo','["1", "2"]','waiting', 0);
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums));
INSERT INTO task_states VALUES('foo','1','["1", "2"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0', '0');
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums));
INSERT INTO task_states VALUES('foo','1','["1", "2"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0');
CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output));
CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name));
CREATE TABLE xtriggers(signature TEXT, results TEXT, PRIMARY KEY(signature));
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/restart/57-ghost-job/db.sqlite3
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ CREATE TABLE workflow_flows(flow_num INTEGER, start_time TEXT, description TEXT,
INSERT INTO workflow_flows VALUES(1,'2022-07-25 16:18:23','original flow from 1');
CREATE TABLE workflow_params(key TEXT, value TEXT, PRIMARY KEY(key));
INSERT INTO workflow_params VALUES('uuid_str','4972bc10-a016-46b0-b313-b10f3cb63bf5');
INSERT INTO workflow_params VALUES('cylc_version','8.0rc4.dev');
INSERT INTO workflow_params VALUES('cylc_version','8.0.3.dev');
INSERT INTO workflow_params VALUES('UTC_mode','0');
INSERT INTO workflow_params VALUES('n_restart','0');
INSERT INTO workflow_params VALUES('cycle_point_tz','Z');
Expand Down
5 changes: 1 addition & 4 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,8 @@ def _inner(
stmt += f' WHERE {where_stmt}'
stmt_args = list(where.values())

pri_dao = schd.workflow_db_mgr.get_pri_dao()
try:
with schd.workflow_db_mgr.get_pri_dao() as pri_dao:
return list(pri_dao.connect().execute(stmt, stmt_args))
finally:
pri_dao.close()

return _inner

Expand Down
69 changes: 69 additions & 0 deletions tests/integration/test_workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import pytest
import sqlite3

from cylc.flow.scheduler import Scheduler


Expand Down Expand Up @@ -47,3 +50,69 @@ async def test(expected_restart_num: int, do_reload: bool = False):
await test(expected_restart_num=2, do_reload=True)
# Final restart
await test(expected_restart_num=3)


def db_remove_column(schd: Scheduler, table: str, column: str) -> None:
"""Remove a column from a scheduler DB table.
ALTER TABLE DROP COLUMN is not supported by sqlite yet, so we have to copy
the table (without the column) and rename it back to the original.
"""
with schd.workflow_db_mgr.get_pri_dao() as pri_dao:
conn = pri_dao.connect()
# Get current column names, minus column
cursor = conn.execute(f'PRAGMA table_info({table})')
desc = cursor.fetchall()
c_names = ','.join(
[fields[1] for fields in desc if fields[1] != column]
)
# Copy table data to a temporary table, and rename it back.
conn.execute(rf'CREATE TABLE "tmp"({c_names})')
conn.execute(
rf'INSERT INTO "tmp"({c_names}) SELECT {c_names} FROM {table}')
conn.execute(rf'DROP TABLE "{table}"')
conn.execute(rf'ALTER TABLE "tmp" RENAME TO "{table}"')
conn.commit()


def db_set_workflow_param(schd: Scheduler, param: str, value: str) -> None:
"""Update a value in the scheduler's DB workflow_parameters table."""
with schd.workflow_db_mgr.get_pri_dao() as pri_dao:
conn = pri_dao.connect()
conn.execute(
rf'UPDATE "workflow_params" '
rf'SET "value" = "{value}" WHERE "key" = "{param}"'
)
conn.commit()


async def test_db_upgrade_pre_803(
flow, one_conf, start, scheduler, log_filter, db_select
):
"""Test scheduler restart with upgrade of pre-8.0.3 DB."""
reg = flow(one_conf)

# Run a scheduler to create a DB.
schd: Scheduler = scheduler(reg, paused_start=True)
async with start(schd):
assert ('n_restart', '0') in db_select(schd, False, 'workflow_params')

# Remove task_states:is_manual_submit to fake a pre-8.0.3 DB.
db_remove_column(schd, "task_states", "is_manual_submit")

# Restart should fail due to the missing column.
schd: Scheduler = scheduler(reg, paused_start=True)
with pytest.raises(sqlite3.OperationalError):
async with start(schd):
pass
assert (
('n_restart', '1') in db_select(schd, False, 'workflow_params')
)

# Set cylc_version to pre-8.0.3 to cause an upgrade on restart.
db_set_workflow_param(schd, "cylc_version", "8.0.2")

# Restart should now upgrade the DB automatically and succeed.
schd: Scheduler = scheduler(reg, paused_start=True)
async with start(schd):
assert ('n_restart', '2') in db_select(schd, False, 'workflow_params')

0 comments on commit f25ac49

Please sign in to comment.