From cfa06fe1b6a32f0ae9dd18e1b39ba856fc4823e1 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Thu, 13 Jul 2023 13:57:43 +0100 Subject: [PATCH 1/9] move setting of UUID on restart --- cylc/flow/scheduler.py | 10 ++++++---- tests/integration/test_scheduler.py | 30 +++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index a36e935c9d6..dbf5ca41cdb 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -281,7 +281,6 @@ def __init__(self, reg: str, options: Values) -> None: workflow=self.workflow, ) self.id = self.tokens.id - self.uuid_str = str(uuid4()) self.options = options self.template_vars = get_template_vars(self.options) @@ -306,6 +305,9 @@ def __init__(self, reg: str, options: Values) -> None: pub_d=os.path.join(self.workflow_run_dir, 'log') ) self.is_restart = Path(self.workflow_db_mgr.pri_path).is_file() + if not self.is_restart: + self.uuid_str = str(uuid4()) + # Map used to track incomplete remote inits for restart # {install_target: platform} self.incomplete_ri_map: Dict[str, Dict] = {} @@ -393,7 +395,6 @@ async def initialise(self): self.bad_hosts, self.reset_inactivity_timer ) - self.task_events_mgr.uuid_str = self.uuid_str self.task_job_mgr = TaskJobManager( self.workflow, @@ -403,7 +404,6 @@ async def initialise(self): self.data_store_mgr, self.bad_hosts ) - self.task_job_mgr.task_remote_mgr.uuid_str = self.uuid_str self.profiler = Profiler(self, self.options.profile_mode) @@ -701,8 +701,10 @@ async def start(self): self.server.thread.start() barrier.wait() - self._configure_contact() await self.configure() + self.task_events_mgr.uuid_str = self.uuid_str + self.task_job_mgr.task_remote_mgr.uuid_str = self.uuid_str + self._configure_contact() except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc: await self.handle_exception(exc) diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index 5330e516758..a742bc8a80d 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -16,7 +16,9 @@ import asyncio import logging +from pathlib import Path import pytest +import re from typing import Any, Callable from cylc.flow.exceptions import CylcError @@ -292,3 +294,31 @@ def mock_auto_restart(*a, **k): assert log_filter(log, level=logging.ERROR, contains=err_msg) assert TRACEBACK_MSG in log.text + + +async def test_uuid_unchanged_on_restart( + one: Scheduler, + scheduler: Callable, + start: Callable, +): + """Restart gets UUID from Database: + + See https://github.com/cylc/cylc-flow/issues/5615 + + Process: + * Create a scheduler then shut it down. + * Create a new scheduler for the same workflow and check that it has + retrieved the UUID from the Daatabase. + """ + uuid_re = re.compile('CYLC_WORKFLOW_UUID=(.*)') + contact_file = Path(one.workflow_run_dir) / '.service/contact' + + async with start(one): + pass + + schd = scheduler(one.workflow_name, paused_start=True) + async with start(schd): + # UUID in contact file should be the same as that set in the database + # and the scheduler. + cf_uuid = uuid_re.findall(contact_file.read_text())[0] + assert schd.uuid_str == cf_uuid From 522c74a02e0acf60fb50cf57361e4af02e927d23 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Thu, 13 Jul 2023 15:32:04 +0100 Subject: [PATCH 2/9] test fix --- tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 b/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 index d1138aafba8..d3dcf24f339 100644 --- a/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 +++ b/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 @@ -7,6 +7,7 @@ INSERT INTO inheritance VALUES('root','["root"]'); INSERT INTO inheritance VALUES('foo','["foo", "root"]'); CREATE TABLE workflow_params(key TEXT, value TEXT, PRIMARY KEY(key)); INSERT INTO workflow_params VALUES('cylc_version', '8.0.0'); +INSERT INTO workflow_params VALUES('uuid_str', 'Something'); CREATE TABLE workflow_template_vars(key TEXT, value TEXT, PRIMARY KEY(key)); CREATE TABLE task_action_timers(cycle TEXT, name TEXT, ctx_key TEXT, ctx TEXT, delays TEXT, num INTEGER, delay TEXT, timeout TEXT, PRIMARY KEY(cycle, name, ctx_key)); INSERT INTO task_action_timers VALUES('1','foo','"poll_timer"','["tuple", [[99, "running"]]]','[]',0,NULL,NULL); From c9e2a3030130e9ec873cd8707fe1c629fc2cc8fb Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Fri, 14 Jul 2023 14:41:27 +0100 Subject: [PATCH 3/9] UUID followup (#56) - Set Scheduler UUID at same place regardless of restart or not - Fix references to "uuid_str" -> "uuid" in docs - Tidy --- cylc/flow/scheduler.py | 5 ++--- cylc/flow/task_events_mgr.py | 2 +- cylc/flow/task_job_mgr.py | 4 ++-- cylc/flow/task_remote_mgr.py | 1 - cylc/flow/workflow_events.py | 2 +- tests/integration/test_examples.py | 4 ++-- tests/integration/test_workflow_files.py | 2 ++ 7 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index dbf5ca41cdb..139d0541e38 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -305,8 +305,6 @@ def __init__(self, reg: str, options: Values) -> None: pub_d=os.path.join(self.workflow_run_dir, 'log') ) self.is_restart = Path(self.workflow_db_mgr.pri_path).is_file() - if not self.is_restart: - self.uuid_str = str(uuid4()) # Map used to track incomplete remote inits for restart # {install_target: platform} @@ -426,6 +424,8 @@ async def configure(self): if self.is_restart: self.load_workflow_params_and_tmpl_vars() + else: + self.uuid_str = str(uuid4()) self.profiler.log_memory("scheduler.py: before load_flow_file") try: @@ -703,7 +703,6 @@ async def start(self): await self.configure() self.task_events_mgr.uuid_str = self.uuid_str - self.task_job_mgr.task_remote_mgr.uuid_str = self.uuid_str self._configure_contact() except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc: await self.handle_exception(exc) diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 010181ab3af..852b28f848e 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -186,7 +186,7 @@ class EventData(Enum): .. deprecated:: 8.0.0 - Use 'uuid_str'. + Use 'uuid'. """ CyclePoint = 'point' diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 524ac6d5b93..20ee7379d27 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -1324,7 +1324,7 @@ def get_job_conf( 'workflow_name': workflow, 'task_id': itask.identity, 'try_num': itask.get_try_num(), - 'uuid_str': self.task_remote_mgr.uuid_str, + 'uuid_str': self.task_events_mgr.uuid_str, 'work_d': rtconfig['work sub-directory'], # this field is populated retrospectively for regular job subs 'logfiles': [], @@ -1357,7 +1357,7 @@ def get_simulation_job_conf(self, itask, workflow): 'workflow_name': workflow, 'task_id': itask.identity, 'try_num': itask.get_try_num(), - 'uuid_str': self.task_remote_mgr.uuid_str, + 'uuid_str': self.task_events_mgr.uuid_str, 'work_d': 'SIMULATION', # this field is populated retrospectively for regular job subs 'logfiles': [], diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index 780ba70fa07..541bc1265a0 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -103,7 +103,6 @@ def __init__(self, workflow, proc_pool, bad_hosts, db_mgr): self.remote_command_map = {} # self.remote_init_map = {(install target): status, ...} self.remote_init_map = {} - self.uuid_str = None # This flag is turned on when a host init/select command completes self.ready = False self.rsync_includes = None diff --git a/cylc/flow/workflow_events.py b/cylc/flow/workflow_events.py index adf2abfed8c..c70df043d99 100644 --- a/cylc/flow/workflow_events.py +++ b/cylc/flow/workflow_events.py @@ -107,7 +107,7 @@ class EventData(Enum): .. deprecated:: 8.0.0 - Use "uuid_str". + Use "uuid". """ # BACK COMPAT: "suite_url" deprecated diff --git a/tests/integration/test_examples.py b/tests/integration/test_examples.py index c5e3439afe0..c9aa48a3430 100644 --- a/tests/integration/test_examples.py +++ b/tests/integration/test_examples.py @@ -199,13 +199,13 @@ async def myflow(mod_flow, mod_scheduler, mod_one_conf): def test_module_scoped_fixture(myflow): - """Ensure the uuid is set on __init__. + """Ensure the host is set on __init__. The myflow fixture will be shared between all test functions within this Python module. """ - assert myflow.uuid_str + assert myflow.host async def test_db_select(one, start, db_select): diff --git a/tests/integration/test_workflow_files.py b/tests/integration/test_workflow_files.py index 91933bd7074..10497fe0ed2 100644 --- a/tests/integration/test_workflow_files.py +++ b/tests/integration/test_workflow_files.py @@ -19,6 +19,7 @@ from os import unlink from pathlib import Path from textwrap import dedent +from uuid import uuid4 import pytest @@ -69,6 +70,7 @@ async def workflow(flow, scheduler, one_conf, run_dir): from collections import namedtuple Server = namedtuple('Server', ['port', 'pub_port']) schd.server = Server(1234, pub_port=2345) + schd.uuid_str = str(uuid4()) contact_data = schd.get_contact_data() contact_file = Path( From 958736db532204b253efbd646061ae3de38be2dd Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 25 Jul 2023 09:01:06 +0100 Subject: [PATCH 4/9] Update tests/integration/test_scheduler.py Co-authored-by: Oliver Sanders --- tests/integration/test_scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index 0581b2abe8a..6befe2b34d1 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -322,8 +322,8 @@ async def test_uuid_unchanged_on_restart( async with start(schd): # UUID in contact file should be the same as that set in the database # and the scheduler. - cf_uuid = uuid_re.findall(contact_file.read_text())[0] - assert schd.uuid_str == cf_uuid + cf_uuid = uuid_re.findall(contact_file.read_text()) + assert cf_uuid == [schd.uuid_str] async def test_restart_timeout( From d95772b80873f497b63fce07c1c530e14f20d555 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 25 Jul 2023 09:01:19 +0100 Subject: [PATCH 5/9] . --- cylc/flow/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index eb025996549..1643b17e60c 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -716,9 +716,9 @@ async def start(self): self.server.thread.start() barrier.wait() + self._configure_contact() await self.configure() self.task_events_mgr.uuid_str = self.uuid_str - self._configure_contact() except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc: await self.handle_exception(exc) From fb8aa8b2441eb3597764a9cf9276e9ec1797bdce Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Wed, 26 Jul 2023 12:38:17 +0100 Subject: [PATCH 6/9] Load workflow parameters into memory early in the startup process --- cylc/flow/scheduler.py | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 1643b17e60c..cb864c7f47a 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -406,7 +406,7 @@ async def initialise(self): self.profiler = Profiler(self, self.options.profile_mode) - async def configure(self): + async def configure(self, params): """Configure the scheduler. * Load the flow configuration. @@ -424,9 +424,7 @@ async def configure(self): self._check_startup_opts() if self.is_restart: - self.load_workflow_params_and_tmpl_vars() - else: - self.uuid_str = str(uuid4()) + self._parse_workflow_params(params) self.profiler.log_memory("scheduler.py: before load_flow_file") try: @@ -549,15 +547,6 @@ async def configure(self): self.profiler.log_memory("scheduler.py: end configure") - def load_workflow_params_and_tmpl_vars(self) -> None: - """Load workflow params and template variables""" - with self.workflow_db_mgr.get_pri_dao() as pri_dao: - # This logic handles lack of initial cycle point in flow.cylc and - # things that can't change on workflow restart/reload. - self._load_workflow_params(pri_dao.select_workflow_params()) - pri_dao.select_workflow_template_vars(self._load_template_vars) - pri_dao.execute_queued_items() - def log_start(self) -> None: """Log headers, that also get logged on each rollover. @@ -696,12 +685,23 @@ async def run_scheduler(self) -> None: finally: self.profiler.stop() + def load_workflow_params_and_tmpl_vars(self) -> None: + """Load workflow params and template variables""" + with self.workflow_db_mgr.get_pri_dao() as pri_dao: + # This logic handles lack of initial cycle point in flow.cylc and + # things that can't change on workflow restart/reload. + pri_dao.select_workflow_template_vars(self._load_template_vars) + pri_dao.execute_queued_items() + return list(pri_dao.select_workflow_params()) + async def start(self): """Run the startup sequence but don't set the main loop running. Lightweight wrapper for testing convenience. """ + params = self.load_workflow_params_and_tmpl_vars() + try: await self.initialise() @@ -716,8 +716,14 @@ async def start(self): self.server.thread.start() barrier.wait() + # Get UUID now: + if self.is_restart: + self.uuid_str = dict(params)['uuid_str'] + else: + self.uuid_str = str(uuid4()) + self._configure_contact() - await self.configure() + await self.configure(params) self.task_events_mgr.uuid_str = self.uuid_str except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc: await self.handle_exception(exc) @@ -1125,7 +1131,7 @@ async def command_reload_workflow(self) -> None: self.reload_pending = 'applying the new config' old_tasks = set(self.config.get_task_name_list()) # Things that can't change on workflow reload: - self._load_workflow_params( + self._parse_workflow_params( self.workflow_db_mgr.pri_dao.select_workflow_params() ) self.apply_new_config(config, is_reload=True) @@ -1306,7 +1312,7 @@ def apply_new_config(self, config, is_reload=False): 'CYLC_WORKFLOW_FINAL_CYCLE_POINT': str(self.config.final_point), }) - def _load_workflow_params( + def _parse_workflow_params( self, params: Iterable[Tuple[str, Optional[str]]] ) -> None: """Load a row in the "workflow_params" table in a restart/reload. From ebe5d8034e22348be997461e420bc8d86c2db199 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Wed, 26 Jul 2023 12:49:08 +0100 Subject: [PATCH 7/9] mypyfix --- cylc/flow/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index cb864c7f47a..d04f63db09c 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -685,7 +685,7 @@ async def run_scheduler(self) -> None: finally: self.profiler.stop() - def load_workflow_params_and_tmpl_vars(self) -> None: + def load_workflow_params_and_tmpl_vars(self) -> List[Tuple[str, str]]: """Load workflow params and template variables""" with self.workflow_db_mgr.get_pri_dao() as pri_dao: # This logic handles lack of initial cycle point in flow.cylc and From f11a3044c97ab10cd5ee67d99a0aa375f9af42ad Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Wed, 26 Jul 2023 13:33:49 +0100 Subject: [PATCH 8/9] don't create an un-needed db --- cylc/flow/scheduler.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index d04f63db09c..f15cab66195 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -700,7 +700,10 @@ async def start(self): Lightweight wrapper for testing convenience. """ - params = self.load_workflow_params_and_tmpl_vars() + if self.is_restart: + params = self.load_workflow_params_and_tmpl_vars() + else: + params = [] try: await self.initialise() From c7f554cbd46c744f1ba0983ea985e0b47b7d2944 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 1 Aug 2023 09:13:49 +0100 Subject: [PATCH 9/9] =?UTF-8?q?=5Fparse=5Fworkflow=5Fparams=20=E2=86=92=20?= =?UTF-8?q?=5Fset=5Fworkflow=5Fparams?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Oliver Sanders --- cylc/flow/scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index f15cab66195..657099aeabc 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -424,7 +424,7 @@ async def configure(self, params): self._check_startup_opts() if self.is_restart: - self._parse_workflow_params(params) + self._set_workflow_params(params) self.profiler.log_memory("scheduler.py: before load_flow_file") try: @@ -1134,7 +1134,7 @@ async def command_reload_workflow(self) -> None: self.reload_pending = 'applying the new config' old_tasks = set(self.config.get_task_name_list()) # Things that can't change on workflow reload: - self._parse_workflow_params( + self._set_workflow_params( self.workflow_db_mgr.pri_dao.select_workflow_params() ) self.apply_new_config(config, is_reload=True) @@ -1315,10 +1315,10 @@ def apply_new_config(self, config, is_reload=False): 'CYLC_WORKFLOW_FINAL_CYCLE_POINT': str(self.config.final_point), }) - def _parse_workflow_params( + def _set_workflow_params( self, params: Iterable[Tuple[str, Optional[str]]] ) -> None: - """Load a row in the "workflow_params" table in a restart/reload. + """Set workflow params on restart/reload. This currently includes: * Initial/Final cycle points.