Skip to content

Commit

Permalink
Merge pull request #5237 from hjoliver/workflow-state-back-compat
Browse files Browse the repository at this point in the history
workflow-state command back compat.
  • Loading branch information
hjoliver authored Aug 24, 2023
2 parents 96ddc35 + 2ac41ea commit 17d6012
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 8 deletions.
2 changes: 2 additions & 0 deletions changes.d/5237.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Back-compat: allow workflow-state xtriggers (and the `cylc workflow-state`
command) to read Cylc 7 databases.
31 changes: 26 additions & 5 deletions cylc/flow/dbstatecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ class CylcWorkflowDBChecker:
],
}

def __init__(self, rund, workflow):
db_path = expand_path(
rund, workflow, "log", CylcWorkflowDAO.DB_FILE_BASE_NAME
)
def __init__(self, rund, workflow, db_path=None):
# (Explicit dp_path arg is to make testing easier).
if db_path is None:
# Infer DB path from workflow name and run dir.
db_path = expand_path(
rund, workflow, "log", CylcWorkflowDAO.DB_FILE_BASE_NAME
)
if not os.path.exists(db_path):
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), db_path)
self.conn = sqlite3.connect(db_path, timeout=10.0)
Expand All @@ -73,7 +76,7 @@ def display_maps(res):
sys.stdout.write((", ").join(row) + "\n")

def get_remote_point_format(self):
"""Query a remote workflow database for a 'cycle point format' entry"""
"""Query a workflow database for a 'cycle point format' entry"""
for row in self.conn.execute(
rf'''
SELECT
Expand All @@ -87,6 +90,24 @@ def get_remote_point_format(self):
):
return row[0]

def get_remote_point_format_compat(self):
"""Query a Cylc 7 suite database for a 'cycle point format' entry.
Back compat for Cylc 8 workflow state triggers targeting Cylc 7 DBs.
"""
for row in self.conn.execute(
rf'''
SELECT
value
FROM
{CylcWorkflowDAO.TABLE_SUITE_PARAMS}
WHERE
key==?
''', # nosec (table name is code constant)
['cycle_point_format']
):
return row[0]

def state_lookup(self, state):
"""allows for multiple states to be searched via a status alias"""
if state in self.STATE_ALIASES:
Expand Down
7 changes: 7 additions & 0 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ class CylcWorkflowDAO:
TABLE_BROADCAST_STATES = "broadcast_states"
TABLE_INHERITANCE = "inheritance"
TABLE_WORKFLOW_PARAMS = "workflow_params"
# BACK COMPAT: suite_params
# This Cylc 7 DB table is needed to allow workflow-state
# xtriggers (and the `cylc workflow-state` command) to
# work with Cylc 7 workflows.
# url: https://github.com/cylc/cylc-flow/issues/5236
# remove at: 8.x
TABLE_SUITE_PARAMS = "suite_params"
TABLE_WORKFLOW_FLOWS = "workflow_flows"
TABLE_WORKFLOW_TEMPLATE_VARS = "workflow_template_vars"
TABLE_TASK_JOBS = "task_jobs"
Expand Down
8 changes: 7 additions & 1 deletion cylc/flow/scripts/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,13 @@ def connect(self):
sys.stderr.write('\n')

if connected and self.args['cycle']:
fmt = self.checker.get_remote_point_format()
try:
fmt = self.checker.get_remote_point_format()
except sqlite3.OperationalError as exc:
try:
fmt = self.checker.get_remote_point_format_compat()
except sqlite3.OperationalError:
raise exc # original error

Check warning on line 124 in cylc/flow/scripts/workflow_state.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scripts/workflow_state.py#L120-L124

Added lines #L120 - L124 were not covered by tests
if fmt:
my_parser = TimePointParser()
my_point = my_parser.parse(self.args['cycle'], dump_format=fmt)
Expand Down
8 changes: 7 additions & 1 deletion cylc/flow/xtriggers/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,13 @@ def workflow_state(
except (OSError, sqlite3.Error):
# Failed to connect to DB; target workflow may not be started.
return (False, None)
fmt = checker.get_remote_point_format()
try:
fmt = checker.get_remote_point_format()
except sqlite3.OperationalError as exc:
try:
fmt = checker.get_remote_point_format_compat()
except sqlite3.OperationalError:
raise exc # original error

Check warning on line 99 in cylc/flow/xtriggers/workflow_state.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/workflow_state.py#L98-L99

Added lines #L98 - L99 were not covered by tests
if fmt:
my_parser = TimePointParser()
point = str(my_parser.parse(point, dump_format=fmt))
Expand Down
20 changes: 20 additions & 0 deletions tests/unit/test_workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
CylcWorkflowDAO,
WorkflowDatabaseManager,
)
from cylc.flow.dbstatecheck import CylcWorkflowDBChecker


@pytest.fixture
Expand Down Expand Up @@ -116,3 +117,22 @@ def test_check_workflow_db_compat(_setup_db, capsys):

with pytest.raises(ServiceFileError, match='99.99'):
WorkflowDatabaseManager.check_db_compatibility(pri_path)


def test_cylc_7_db_wflow_params_table(_setup_db):
"""Test back-compat needed by workflow state xtrigger for Cylc 7 DBs."""
ptformat = "CCYY"
create = r'CREATE TABLE suite_params(key TEXT, value TEXT)'
insert = (
r'INSERT INTO suite_params VALUES'
rf'("cycle_point_format", "{ptformat}")'
)
db_file_name = _setup_db([create, insert])
checker = CylcWorkflowDBChecker('foo', 'bar', db_path=db_file_name)

with pytest.raises(
sqlite3.OperationalError, match="no such table: workflow_params"
):
checker.get_remote_point_format()

assert checker.get_remote_point_format_compat() == ptformat
48 changes: 47 additions & 1 deletion tests/unit/xtriggers/test_workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from pathlib import Path
import sqlite3
from typing import Callable
from unittest.mock import Mock


from cylc.flow.workflow_files import WorkflowFiles
from cylc.flow.xtriggers.workflow_state import workflow_state
from ..conftest import MonkeyMock

Expand All @@ -38,3 +40,47 @@ def test_inferred_run(tmp_run_dir: Callable, monkeymock: MonkeyMock):
_, results = workflow_state(id_, task='precious', point='3000')
mock_db_checker.assert_called_once_with(cylc_run_dir, expected_workflow_id)
assert results['workflow'] == expected_workflow_id


def test_back_compat(tmp_run_dir):
"""Test workflow_state xtrigger backwards compatibility with Cylc 7
database."""
id_ = 'celebrimbor'
c7_run_dir: Path = tmp_run_dir(id_)
(c7_run_dir / WorkflowFiles.FLOW_FILE).rename(
c7_run_dir / WorkflowFiles.SUITE_RC
)
db_file = c7_run_dir / 'log' / 'db'
db_file.parent.mkdir(exist_ok=True)
# Note: cannot use CylcWorkflowDAO here as creating outdated DB
conn = sqlite3.connect(str(db_file))
try:
conn.execute(r"""
CREATE TABLE suite_params(key TEXT, value TEXT, PRIMARY KEY(key));
""")
conn.execute(r"""
CREATE TABLE task_states(
name TEXT, cycle TEXT, time_created TEXT, time_updated TEXT,
submit_num INTEGER, status TEXT, PRIMARY KEY(name, cycle)
);
""")
conn.executemany(
r'INSERT INTO "suite_params" VALUES(?,?);',
[('cylc_version', '7.8.12'),
('cycle_point_format', '%Y'),
('cycle_point_tz', 'Z')]
)
conn.execute(r"""
INSERT INTO "task_states" VALUES(
'mithril','2012','2023-01-30T18:19:15Z','2023-01-30T18:19:15Z',
0,'succeeded'
);
""")
conn.commit()
finally:
conn.close()

satisfied, _ = workflow_state(id_, task='mithril', point='2012')
assert satisfied
satisfied, _ = workflow_state(id_, task='arkenstone', point='2012')
assert not satisfied

0 comments on commit 17d6012

Please sign in to comment.