Skip to content

Commit

Permalink
Set flow counter from DB.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Oct 15, 2021
1 parent 33693c3 commit 740dc07
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 19 deletions.
19 changes: 11 additions & 8 deletions cylc/flow/flow_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ class FlowMgr:
def __init__(self, db_mgr: "WorkflowDatabaseManager") -> None:
"""Initialise the flow manager."""
self.db_mgr = db_mgr
self.counter = 0
self.flows: Dict[int, Dict[str, str]] = {}
self.counter: int = 0

def get_new_flow(self, description: str = "no description") -> int:
def get_new_flow(self, description: str) -> int:
"""Increment flow counter, record flow metadata."""
self.counter += 1
# record start time to nearest second
now = datetime.datetime.now()
now_sec: str = str(
now - datetime.timedelta(microseconds=now.microsecond))
self.flows[self.counter] = {
"description": description,
"description": description or "no description",
"start_time": now_sec
}
LOG.info(
Expand All @@ -52,13 +52,16 @@ def get_new_flow(self, description: str = "no description") -> int:
self.counter,
self.flows[self.counter]
)
self.db_mgr.put_workflow_params_1(
WorkflowDatabaseManager.KEY_FLOW_COUNTER,
self.counter)
return self.counter

def load_flows_db(self, flow_nums: Set[int]) -> None:
"""Load metadata for selected flows from DB - on restart."""
def load_from_db(self, flow_nums: Set[int]) -> None:
"""Load flow data for scheduler restart.
Sets the flow counter to the max flow number in the DB.
Loads metadata for selected flows (those in the task pool at startup).
"""
self.counter = self.db_mgr.pri_dao.select_workflow_flows_max_flow_num()
self.flows = self.db_mgr.pri_dao.select_workflow_flows(flow_nums)
self._log()

Expand Down
10 changes: 10 additions & 0 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,16 @@ def select_workflow_flows(self, flow_nums):
}
return flows

def select_workflow_flows_max_flow_num(self):
"""Return max flow number in the workflow_flows table."""
stmt = rf'''
SELECT
MAX(flow_num)
FROM
{self.TABLE_WORKFLOW_FLOWS}
''' # nosec (table name is code constant)
return self.connect().execute(stmt).fetchone()[0]

def select_workflow_params_restart_count(self):
"""Return number of restarts in workflow_params table."""
stmt = rf"""
Expand Down
8 changes: 2 additions & 6 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ def _load_pool_from_tasks(self):
self.pool.force_trigger_tasks(
self.options.starttask,
reflow=True,
flow_descr=f"original, from {self.options.starttask}"
flow_descr=f"original flow from {self.options.starttask}"
)

def _load_pool_from_point(self):
Expand All @@ -683,7 +683,7 @@ def _load_pool_from_point(self):
LOG.info(f"{start_type} start from {self.config.start_point}")

flow_num = self.flow_mgr.get_new_flow(
f"original from {self.config.start_point}"
f"original flow from {self.config.start_point}"
)
for name in self.config.get_task_name_list():
if self.config.start_point is None:
Expand Down Expand Up @@ -1150,7 +1150,6 @@ def _load_workflow_params(self, row_idx, row):
* Workflow UUID.
* A flag to indicate if the workflow should be paused or not.
* Original workflow run time zone.
* flow counter
"""
if row_idx == 0:
LOG.info('LOADING workflow parameters')
Expand Down Expand Up @@ -1214,9 +1213,6 @@ def _load_workflow_params(self, row_idx, row):
elif key == self.workflow_db_mgr.KEY_CYCLE_POINT_TIME_ZONE:
self.options.cycle_point_tz = value
LOG.info(f"+ cycle point time zone = {value}")
elif key == self.workflow_db_mgr.KEY_FLOW_COUNTER:
self.flow_mgr.counter = int(value)
LOG.info(f"+ flow counter = {value}")

def _load_template_vars(self, _, row):
"""Load workflow start up template variables."""
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ def update_flow_mgr(self):
flow_nums_seen = set()
for itask in self.get_all_tasks():
flow_nums_seen.update(itask.flow_nums)
self.flow_mgr.load_flows_db(flow_nums_seen)
self.flow_mgr.load_from_db(flow_nums_seen)

def load_abs_outputs_for_restart(self, row_idx, row):
cycle, name, output = row
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class WorkflowDatabaseManager:
KEY_CYCLE_POINT_FORMAT = 'cycle_point_format'
KEY_CYCLE_POINT_TIME_ZONE = 'cycle_point_tz'
KEY_RESTART_COUNT = 'n_restart'
KEY_FLOW_COUNTER = "flow_counter"

TABLE_BROADCAST_EVENTS = CylcWorkflowDAO.TABLE_BROADCAST_EVENTS
TABLE_BROADCAST_STATES = CylcWorkflowDAO.TABLE_BROADCAST_STATES
Expand Down
1 change: 0 additions & 1 deletion tests/flakyfunctional/database/00-simple.t
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ sed -i "s/$(cylc --version)/<SOME-VERSION>/g" "${NAME}"
cmp_ok "${NAME}" << __EOF__
UTC_mode|0
cylc_version|<SOME-VERSION>
flow_counter|1
__EOF__

NAME='select-task-events.out'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ INSERT INTO inheritance VALUES('root','["root"]');
INSERT INTO inheritance VALUES('foo','["foo", "root"]');
CREATE TABLE workflow_params(key TEXT, value TEXT, PRIMARY KEY(key));
INSERT INTO workflow_params VALUES('cylc_version', '8.0b2.dev');
INSERT INTO workflow_params VALUES('flow_counter', '1');
CREATE TABLE workflow_template_vars(key TEXT, value TEXT, PRIMARY KEY(key));
CREATE TABLE task_action_timers(cycle TEXT, name TEXT, ctx_key TEXT, ctx TEXT, delays TEXT, num INTEGER, delay TEXT, timeout TEXT, PRIMARY KEY(cycle, name, ctx_key));
INSERT INTO task_action_timers VALUES('1','foo','"poll_timer"','["tuple", [[99, "running"]]]','[]',0,NULL,NULL);
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/restart/50-two-flows.t
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ reftest_run
mv "${WORKFLOW_RUN_DIR}/reference.restart.log" "${WORKFLOW_RUN_DIR}/reference.log"
reftest_run

grep_workflow_log_ok flow-1 "flow: 1 (original from 1)"
grep_workflow_log_ok flow-1 "flow: 1 (original flow from 1)"
grep_workflow_log_ok flow-2 "flow: 2 (cheese wizard)"

purge

0 comments on commit 740dc07

Please sign in to comment.