Skip to content

Commit

Permalink
Persist satisfied xtriggers to the DB.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Sep 11, 2017
1 parent 1adb6b4 commit 8a1ec86
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 4 deletions.
10 changes: 10 additions & 0 deletions lib/cylc/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ class CylcSuiteDAO(object):
TABLE_TASK_POOL_CHECKPOINTS = "task_pool_checkpoints"
TABLE_TASK_STATES = "task_states"
TABLE_TASK_TIMEOUT_TIMERS = "task_timeout_timers"
TABLE_XTRIGGERS = "xtriggers"

TABLES_ATTRS = {
TABLE_BROADCAST_EVENTS: [
Expand Down Expand Up @@ -283,6 +284,10 @@ class CylcSuiteDAO(object):
["status"],
["hold_swap"],
],
TABLE_XTRIGGERS: [
["cycle"],
["signature", {"is_primary_key": True}],
],
TABLE_TASK_POOL_CHECKPOINTS: [
["id", {"datatype": "INTEGER", "is_primary_key": True}],
["cycle", {"is_primary_key": True}],
Expand Down Expand Up @@ -628,6 +633,11 @@ def select_submit_nums_for_insert(self, task_ids):
ret[(name, cycle)] = submit_num
return ret

def select_xtriggers_for_restart(self, callback):
stmt = r"SELECT cycle,signature FROM %s" % self.TABLE_XTRIGGERS
for row_idx, row in enumerate(self.connect().execute(stmt, [])):
callback(row_idx, list(row))

def select_task_pool(self, callback, id_key=None):
"""Select from task_pool or task_pool_checkpoints.
Expand Down
4 changes: 4 additions & 0 deletions lib/cylc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,9 @@ def load_tasks_for_restart(self):
self.pool.load_db_task_pool_for_restart, self.options.checkpoint)
self.suite_db_mgr.pri_dao.select_task_action_timers(
self.pool.load_db_task_action_timers)
self.suite_db_mgr.pri_dao.select_xtriggers_for_restart(
self.xtrigger_mgr.load_xtrigger_for_restart)

# Re-initialise run directory for user@host for each submitted and
# running tasks.
# Note: tasks should all be in the runahead pool at this point.
Expand Down Expand Up @@ -1151,6 +1154,7 @@ def process_task_pool(self):
min_point = self.pool.get_min_point()
self.broadcast_mgr.expire_broadcast(min_point)
self.xtrigger_mgr.housekeep(min_point)
self.suite_db_mgr.put_xtriggers(self.xtrigger_mgr.xtrig_satisfied)
if cylc.flags.debug:
LOG.debug("END TASK PROCESSING (took %s seconds)" %
(time() - time0))
Expand Down
14 changes: 12 additions & 2 deletions lib/cylc/suite_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class SuiteDatabaseManager(object):
TABLE_TASK_OUTPUTS = CylcSuiteDAO.TABLE_TASK_OUTPUTS
TABLE_TASK_STATES = CylcSuiteDAO.TABLE_TASK_STATES
TABLE_TASK_TIMEOUT_TIMERS = CylcSuiteDAO.TABLE_TASK_TIMEOUT_TIMERS
TABLE_XTRIGGERS = CylcSuiteDAO.TABLE_XTRIGGERS

def __init__(self, pri_d=None, pub_d=None):
self.pri_path = None
Expand All @@ -68,7 +69,8 @@ def __init__(self, pri_d=None, pub_d=None):
self.TABLE_TASK_POOL: [],
self.TABLE_TASK_ACTION_TIMERS: [],
self.TABLE_TASK_OUTPUTS: [],
self.TABLE_TASK_TIMEOUT_TIMERS: []}
self.TABLE_TASK_TIMEOUT_TIMERS: [],
self.TABLE_XTRIGGERS: []}
self.db_inserts_map = {
self.TABLE_BROADCAST_EVENTS: [],
self.TABLE_BROADCAST_STATES: [],
Expand All @@ -79,7 +81,8 @@ def __init__(self, pri_d=None, pub_d=None):
self.TABLE_TASK_POOL: [],
self.TABLE_TASK_ACTION_TIMERS: [],
self.TABLE_TASK_OUTPUTS: [],
self.TABLE_TASK_TIMEOUT_TIMERS: []}
self.TABLE_TASK_TIMEOUT_TIMERS: [],
self.TABLE_XTRIGGERS: []}
self.db_updates_map = {}

def checkpoint(self, name):
Expand Down Expand Up @@ -285,6 +288,13 @@ def put_task_event_timers(self, task_events_mgr):
"delay": timer.delay,
"timeout": timer.timeout})

def put_xtriggers(self, xtrig_satisfied):
self.db_deletes_map[self.TABLE_XTRIGGERS].append({})
for cycle, signature in xtrig_satisfied:
self.db_inserts_map[self.TABLE_XTRIGGERS].append({
"cycle": str(cycle),
"signature": signature})

def put_task_pool(self, pool):
"""Put statements to update the task_pool table in runtime database.
Expand Down
10 changes: 8 additions & 2 deletions lib/cylc/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import re
from time import time
from copy import deepcopy
from cylc.suite_logging import LOG

from cylc.suite_logging import LOG, OUT
from cylc.cycling.loader import get_point

# Templates for string replacement in funtion call arg values.
TMPL_POINT = 'cycle_point'
Expand Down Expand Up @@ -101,6 +101,12 @@ def add_trig(self, key, val):
raise ValueError(
"Illegal template in xtrigger %s: %s" % (key, match))

def load_xtrigger_for_restart(self, row_idx, row):
if row_idx == 0:
OUT.info("LOADING satisfied xtriggers")
cycle_str, signature = row
self.xtrig_satisfied.append((get_point(cycle_str), signature))

def housekeep(self, point):
"""Delete satisfied xtriggers and xclocks older than point."""
rem = []
Expand Down
1 change: 1 addition & 0 deletions tests/database/00-simple/schema.out
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ CREATE TABLE task_pool(cycle TEXT, name TEXT, spawned INTEGER, status TEXT, hold
CREATE TABLE task_pool_checkpoints(id INTEGER, cycle TEXT, name TEXT, spawned INTEGER, status TEXT, hold_swap TEXT, PRIMARY KEY(id, cycle, name));
CREATE TABLE task_states(name TEXT, cycle TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, PRIMARY KEY(name, cycle));
CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name));
CREATE TABLE xtriggers(cycle TEXT, signature TEXT, PRIMARY KEY(signature));

0 comments on commit 8a1ec86

Please sign in to comment.