Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that UUID is retrieved from database on workflow restart. #5623

Merged
merged 12 commits into from
Aug 3, 2023
46 changes: 28 additions & 18 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ def __init__(self, id_: str, options: Values) -> None:
workflow=self.workflow,
)
self.id = self.tokens.id
self.uuid_str = str(uuid4())
self.options = options
self.template_vars = get_template_vars(self.options)

Expand All @@ -307,6 +306,7 @@ def __init__(self, id_: str, options: Values) -> None:
pub_d=os.path.join(self.workflow_run_dir, 'log')
)
self.is_restart = Path(self.workflow_db_mgr.pri_path).is_file()

# Map used to track incomplete remote inits for restart
# {install_target: platform}
self.incomplete_ri_map: Dict[str, Dict] = {}
Expand Down Expand Up @@ -394,7 +394,6 @@ async def initialise(self):
self.bad_hosts,
self.reset_inactivity_timer
)
self.task_events_mgr.uuid_str = self.uuid_str

self.task_job_mgr = TaskJobManager(
self.workflow,
Expand All @@ -404,11 +403,10 @@ async def initialise(self):
self.data_store_mgr,
self.bad_hosts
)
self.task_job_mgr.task_remote_mgr.uuid_str = self.uuid_str

self.profiler = Profiler(self, self.options.profile_mode)

async def configure(self):
async def configure(self, params):
"""Configure the scheduler.

* Load the flow configuration.
Expand All @@ -426,7 +424,7 @@ async def configure(self):
self._check_startup_opts()

if self.is_restart:
self.load_workflow_params_and_tmpl_vars()
self._set_workflow_params(params)

self.profiler.log_memory("scheduler.py: before load_flow_file")
try:
Expand Down Expand Up @@ -549,15 +547,6 @@ async def configure(self):

self.profiler.log_memory("scheduler.py: end configure")

def load_workflow_params_and_tmpl_vars(self) -> None:
"""Load workflow params and template variables"""
with self.workflow_db_mgr.get_pri_dao() as pri_dao:
# This logic handles lack of initial cycle point in flow.cylc and
# things that can't change on workflow restart/reload.
self._load_workflow_params(pri_dao.select_workflow_params())
pri_dao.select_workflow_template_vars(self._load_template_vars)
pri_dao.execute_queued_items()

def log_start(self) -> None:
"""Log headers, that also get logged on each rollover.

Expand Down Expand Up @@ -696,12 +685,26 @@ async def run_scheduler(self) -> None:
finally:
self.profiler.stop()

def load_workflow_params_and_tmpl_vars(self) -> List[Tuple[str, str]]:
"""Load workflow params and template variables"""
with self.workflow_db_mgr.get_pri_dao() as pri_dao:
# This logic handles lack of initial cycle point in flow.cylc and
# things that can't change on workflow restart/reload.
pri_dao.select_workflow_template_vars(self._load_template_vars)
pri_dao.execute_queued_items()
return list(pri_dao.select_workflow_params())

async def start(self):
"""Run the startup sequence but don't set the main loop running.

Lightweight wrapper for testing convenience.

"""
if self.is_restart:
params = self.load_workflow_params_and_tmpl_vars()
else:
params = []

try:
await self.initialise()

Expand All @@ -716,8 +719,15 @@ async def start(self):
self.server.thread.start()
barrier.wait()

# Get UUID now:
if self.is_restart:
self.uuid_str = dict(params)['uuid_str']
else:
self.uuid_str = str(uuid4())

self._configure_contact()
await self.configure()
await self.configure(params)
self.task_events_mgr.uuid_str = self.uuid_str
except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc:
await self.handle_exception(exc)

Expand Down Expand Up @@ -1124,7 +1134,7 @@ async def command_reload_workflow(self) -> None:
self.reload_pending = 'applying the new config'
old_tasks = set(self.config.get_task_name_list())
# Things that can't change on workflow reload:
self._load_workflow_params(
self._set_workflow_params(
self.workflow_db_mgr.pri_dao.select_workflow_params()
)
self.apply_new_config(config, is_reload=True)
Expand Down Expand Up @@ -1305,10 +1315,10 @@ def apply_new_config(self, config, is_reload=False):
'CYLC_WORKFLOW_FINAL_CYCLE_POINT': str(self.config.final_point),
})

def _load_workflow_params(
def _set_workflow_params(
self, params: Iterable[Tuple[str, Optional[str]]]
) -> None:
"""Load a row in the "workflow_params" table in a restart/reload.
"""Set workflow params on restart/reload.

This currently includes:
* Initial/Final cycle points.
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class EventData(Enum):

.. deprecated:: 8.0.0

Use 'uuid_str'.
Use 'uuid'.
"""

CyclePoint = 'point'
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1324,7 +1324,7 @@ def get_job_conf(
'workflow_name': workflow,
'task_id': itask.identity,
'try_num': itask.get_try_num(),
'uuid_str': self.task_remote_mgr.uuid_str,
'uuid_str': self.task_events_mgr.uuid_str,
'work_d': rtconfig['work sub-directory'],
# this field is populated retrospectively for regular job subs
'logfiles': [],
Expand Down Expand Up @@ -1357,7 +1357,7 @@ def get_simulation_job_conf(self, itask, workflow):
'workflow_name': workflow,
'task_id': itask.identity,
'try_num': itask.get_try_num(),
'uuid_str': self.task_remote_mgr.uuid_str,
'uuid_str': self.task_events_mgr.uuid_str,
'work_d': 'SIMULATION',
# this field is populated retrospectively for regular job subs
'logfiles': [],
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ def __init__(self, workflow, proc_pool, bad_hosts, db_mgr):
self.remote_command_map = {}
# self.remote_init_map = {(install target): status, ...}
self.remote_init_map = {}
self.uuid_str = None
# This flag is turned on when a host init/select command completes
self.ready = False
self.rsync_includes = None
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/workflow_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class EventData(Enum):

.. deprecated:: 8.0.0

Use "uuid_str".
Use "uuid".
"""

# BACK COMPAT: "suite_url" deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ INSERT INTO inheritance VALUES('root','["root"]');
INSERT INTO inheritance VALUES('foo','["foo", "root"]');
CREATE TABLE workflow_params(key TEXT, value TEXT, PRIMARY KEY(key));
INSERT INTO workflow_params VALUES('cylc_version', '8.0.0');
INSERT INTO workflow_params VALUES('uuid_str', 'Something');
CREATE TABLE workflow_template_vars(key TEXT, value TEXT, PRIMARY KEY(key));
CREATE TABLE task_action_timers(cycle TEXT, name TEXT, ctx_key TEXT, ctx TEXT, delays TEXT, num INTEGER, delay TEXT, timeout TEXT, PRIMARY KEY(cycle, name, ctx_key));
INSERT INTO task_action_timers VALUES('1','foo','"poll_timer"','["tuple", [[99, "running"]]]','[]',0,NULL,NULL);
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ async def myflow(mod_flow, mod_scheduler, mod_one_conf):


def test_module_scoped_fixture(myflow):
"""Ensure the uuid is set on __init__.
"""Ensure the host is set on __init__.

The myflow fixture will be shared between all test functions within this
Python module.

"""
assert myflow.uuid_str
assert myflow.host


async def test_db_select(one, start, db_select):
Expand Down
30 changes: 30 additions & 0 deletions tests/integration/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

import asyncio
import logging
from pathlib import Path
import pytest
import re
from typing import Any, Callable

from cylc.flow.exceptions import CylcError
Expand Down Expand Up @@ -296,6 +298,34 @@ def mock_auto_restart(*a, **k):
assert TRACEBACK_MSG in log.text


async def test_uuid_unchanged_on_restart(
one: Scheduler,
scheduler: Callable,
start: Callable,
):
"""Restart gets UUID from Database:

See https://github.com/cylc/cylc-flow/issues/5615

Process:
* Create a scheduler then shut it down.
* Create a new scheduler for the same workflow and check that it has
retrieved the UUID from the Daatabase.
"""
uuid_re = re.compile('CYLC_WORKFLOW_UUID=(.*)')
contact_file = Path(one.workflow_run_dir) / '.service/contact'

async with start(one):
pass

schd = scheduler(one.workflow_name, paused_start=True)
async with start(schd):
# UUID in contact file should be the same as that set in the database
# and the scheduler.
cf_uuid = uuid_re.findall(contact_file.read_text())
assert cf_uuid == [schd.uuid_str]


async def test_restart_timeout(
flow,
one_conf,
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/test_workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from os import unlink
from pathlib import Path
from textwrap import dedent
from uuid import uuid4

import pytest

Expand Down Expand Up @@ -69,6 +70,7 @@ async def workflow(flow, scheduler, one_conf, run_dir):
from collections import namedtuple
Server = namedtuple('Server', ['port', 'pub_port'])
schd.server = Server(1234, pub_port=2345)
schd.uuid_str = str(uuid4())

contact_data = schd.get_contact_data()
contact_file = Path(
Expand Down