Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

data-store n > 0 window DB loads #4581

Merged
merged 9 commits into from
Feb 10, 2022
Merged

Conversation

dwsutherland
Copy link
Member

@dwsutherland dwsutherland commented Jan 17, 2022

These changes partially addresses #4036
(prerequisites were always available, but satisfaction not set)

At present history is only populated via the evolution/forward-march of the active pool.
History is not recovered on restart nor on the window appearing/reappearing about a trigger/reflow.

This pull request aims to fill in the history of those n>0 window nodes:

  • Load DB task status and other information of the latest submit.
  • Load DB prerequisite satisfaction for the flow (flow_nums) of the respective tasks.
  • Load DB jobs of the respective tasks.
  • Set reflow future window task state to waiting (if not in active-pool/data-store).

This PR also fixes a bug where triggered tasks would hang around after completion, and never get pruned.

Note: Prerequisite DB history has been taken out (only restart prereqs available and loaded)
So the prereq satisfaction, shown with GraphiQL below, will no longer be correct for historical tasks.
(However, this is just a one line change.. So, I've left it in)

As an example:

[scheduling]
    initial cycle point = 20210101T00
    [[special tasks]]
        clock-trigger = foo(PT0H)
    [[graph]]
        P1M = """
foo => bar => qux => qaz
foo[-P1M] => foo
"""

With prerequisites, we can't visualize them at the moment.. But they can be confirmed with GraphiQL:

query {
  workflows (ids: ["linear/run1"]) {
    id
    taskProxies {
      id
      flowNums
      state
      prerequisites {
        satisfied
        conditions {
          taskId
          satisfied
        }
      }
    }
  }
}

Trigger reflow:
$ cylc trigger --reflow linear //20210201T00/foo
n-gt-zero-loads-reflow
Data at start:

{
  "data": {
    "workflows": [
      {
        "id": "~sutherlander/linear/run1",
        "taskProxies": [
          {
            "id": "~sutherlander/linear/run1//20220101T00/foo",
            "flowNums": "[1]",
            "state": "succeeded",
            "prerequisites": [
              {
                "satisfied": true,
                "conditions": [
                  {
                    "taskId": "20211201T00/foo",
                    "satisfied": true
                  }
                ]
              }
            ]
          },
          {
            "id": "~sutherlander/linear/run1//20210201T00/bar",
            "flowNums": "[2]",
            "state": "waiting",
            "prerequisites": [
              {
                "satisfied": false,
                "conditions": [
                  {
                    "taskId": "20210201T00/foo",
                    "satisfied": false
                  }
                ]
              }
            ]
          },
          {
            "id": "~sutherlander/linear/run1//20210301T00/foo",
            "flowNums": "[2]",
            "state": "waiting",
            "prerequisites": [
              {
                "satisfied": false,
                "conditions": [
                  {
                    "taskId": "20210201T00/foo",
                    "satisfied": false
                  }
                ]
              }
            ]
          },
          {
            "id": "~sutherlander/linear/run1//20220301T00/foo",
            "flowNums": "[1]",
            "state": "waiting",
            "prerequisites": [
              {
                "satisfied": false,
                "conditions": [
                  {
                    "taskId": "20220201T00/foo",
                    "satisfied": false
                  }
                ]
              }
            ]
          },
          {
            "id": "~sutherlander/linear/run1//20210201T00/foo",
            "flowNums": "[2]",
            "state": "preparing",
            "prerequisites": [
              {
                "satisfied": false,
                "conditions": [
                  {
                    "taskId": "20210101T00/foo",
                    "satisfied": false
                  }
                ]
              }
            ]
          },
          {
            "id": "~sutherlander/linear/run1//20210101T00/foo",
            "flowNums": "[1]",
            "state": "succeeded",
            "prerequisites": [
              {
                "satisfied": true,
                "conditions": [
                  {
                    "taskId": "20201201T00/foo",
                    "satisfied": true
                  }
                ]
              }
            ]
          },
          .
          .
          .
        ]
      }
    ]
  }
}

(not the past task is loaded in it's previous state, while the future tasks are set to waiting)

Trigger (noflow):
$ cylc trigger linear //20210201T00/bar
n-gt-zero-loads-noflow
Data at start:

{
  "data": {
    "workflows": [
      {
        "id": "~sutherlander/linear/run1",
        "taskProxies": [
          {
            "id": "~sutherlander/linear/run1//20220101T00/foo",
            "flowNums": "[1]",
            "state": "succeeded",
            "prerequisites": [
              {
                "satisfied": true,
                "conditions": [
                  {
                    "taskId": "20211201T00/foo",
                    "satisfied": true
                  }
                ]
              }
            ]
          },
          {
            "id": "~sutherlander/linear/run1//20210201T00/bar",
            "flowNums": "[]",
            "state": "submitted",
            "prerequisites": [
              {
                "satisfied": false,
                "conditions": [
                  {
                    "taskId": "20210201T00/foo",
                    "satisfied": false
                  }
                ]
              }
            ]
          },
          {
            "id": "~sutherlander/linear/run1//20220301T00/foo",
            "flowNums": "[1]",
            "state": "waiting",
            "prerequisites": [
              {
                "satisfied": false,
                "conditions": [
                  {
                    "taskId": "20220201T00/foo",
                    "satisfied": false
                  }
                ]
              }
            ]
          },
          {
            "id": "~sutherlander/linear/run1//20210201T00/qux",
            "flowNums": "[1]",
            "state": "succeeded",
            "prerequisites": [
              {
                "satisfied": true,
                "conditions": [
                  {
                    "taskId": "20210201T00/bar",
                    "satisfied": true
                  }
                ]
              }
            ]
          },
          {
            "id": "~sutherlander/linear/run1//20210201T00/foo",
            "flowNums": "[1]",
            "state": "succeeded",
            "prerequisites": [
              {
                "satisfied": true,
                "conditions": [
                  {
                    "taskId": "20210101T00/foo",
                    "satisfied": true
                  }
                ]
              }
            ]
          },
          .
          .
          .
        ]
      }
    ]
  }
}

Requirements check-list

  • I have read CONTRIBUTING.md and added my name as a Code Contributor.
  • Contains logically grouped changes (else tidy your branch by rebase).
  • Does not contain off-topic changes (use other PRs for other changes).
  • Applied any dependency changes to both setup.cfg and conda-environment.yml.
  • Already covered by existing tests.
  • Appropriate change log entry included.
  • No documentation update required (non-interactive changes).

Copy link
Member

@MetRonnie MetRonnie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good. However I don't think I'm knowledgeable enough about reflows and the data store to give a proper sign off, might need to wait for @hjoliver

cylc/flow/workflow_db_mgr.py Outdated Show resolved Hide resolved
Copy link
Member

@MetRonnie MetRonnie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, actually, 1 change that is needed:

diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py
index 71a932577..abf4ec0e7 100644
--- a/cylc/flow/rundb.py
+++ b/cylc/flow/rundb.py
@@ -168,7 +168,7 @@ class CylcWorkflowDAO:
     CONN_TIMEOUT = 0.2
     DB_FILE_BASE_NAME = "db"
     MAX_TRIES = 100
-    RESTART_INCOMPAT_VERSION = "8.0b1"  # Can't restart if <= this version
+    RESTART_INCOMPAT_VERSION = "8.0rc1"  # Can't restart if <= this version
     TABLE_BROADCAST_EVENTS = "broadcast_events"
     TABLE_BROADCAST_STATES = "broadcast_states"
     TABLE_INHERITANCE = "inheritance"

@MetRonnie MetRonnie added the db change Change to the workflow database structure label Jan 21, 2022
@dwsutherland
Copy link
Member Author

Ah, actually, 1 change that is needed:

Done, thanks.

@dwsutherland
Copy link
Member Author

dwsutherland commented Jan 21, 2022

Ah, actually, 1 change that is needed:

Done, thanks.

Actually, not sure it worked.. cause we're at 8.0rc1.dev0

ERROR    cylc:scheduler.py:1679 Workflow shutting down - ServiceFileError: Cannot restart - Workflow database is incompatible with Cylc 8.0rc1.dev (workflow last run with Cylc 8.0rc1.dev0)

And I don't think this helps:

(flow) sutherlander@graphic-vbox:cylc-flow$ grep -r "8.0rc1" *
CHANGES.md:## __cylc-8.0rc1 (<span actions:bind='release-date'>Upcoming</span>)__
Binary file cylc/flow/__pycache__/__init__.cpython-38.pyc matches
cylc/flow/__init__.py:__version__ = '8.0rc1.dev'
cylc_flow.egg-info/PKG-INFO:Version: 8.0rc1.dev0

(same thing after reinstalling)

So I've reverted it; we have some kind of "chicken and egg" situation it would seem..
We may have to change the DB version as part of the 8.0rc1 release, because even though it belongs in this PR, it renders Cylc non-functional to do so. Yet this change needs to be in for 8.0rc1..

Is there a way to change how the validation assesses this sort of thing? Or is there something else I can do?

@MetRonnie
Copy link
Member

D'oh, I should have said RESTART_INCOMPAT_VERSION = '8.0b3' instead of 8.0rc1

@dwsutherland
Copy link
Member Author

dwsutherland commented Jan 24, 2022

D'oh, I should have said RESTART_INCOMPAT_VERSION = '8.0b3' instead of 8.0rc1

Done. (I should've seen that too >.< )

@hjoliver
Copy link
Member

hjoliver commented Jan 25, 2022

One thing, from an earlier conversation with @oliver-sanders, probably can be a follow-up optimization though:

We need to avoid unbounded DB growth as far as possible (although #4608 could solve that for endless operational workflows) because that will eventually impact scheduler performance.

We should be able to avoid the unbounded prerequisites table by only storing partially-satisfied prerequisites (and perhaps those of any force-triggered tasks too).

If a task started running, its prerequisites can be inferred as satisfied.

Otherwise, if a task doesn't have partially satisfied prerequisites, they can be inferred as entirely unsatisfied.

For conditional prerequisites we can't infer the individual upstream task status like that, but we can infer that the conditional prerequisites expression was satisfied, which is good enough (as we can query the upstream tasks as well if needed).

(We would still have to store the base prerequisites of each task definition, but that's finite).

@hjoliver
Copy link
Member

Hmmm. I must be doing something wrong, as all the tests passed...

[scheduling]
    cycling mode = integer
    initial cycle point = 1
   [[graph]]
     P1 = "foo => bar"
[runtime]
   [[foo, bar]]

In my environment this runs on master, but on this branch I get:

...
2022-01-25T17:45:52+13:00 INFO - [1/foo waiting job:00 flows:1] => preparing
2022-01-25T17:45:52+13:00 INFO - [2/foo waiting job:00 flows:1] => preparing
2022-01-25T17:45:52+13:00 INFO - [3/foo waiting job:00 flows:1] => preparing
2022-01-25T17:45:52+13:00 INFO - [4/foo waiting job:00 flows:1] => preparing
2022-01-25T17:45:52+13:00 INFO - [5/foo waiting job:00 flows:1] => preparing
2022-01-25T17:45:52+13:00 INFO - [1/foo preparing job:01 flows:1] host=niwa-1007885.niwa.local
2022-01-25T17:45:52+13:00 INFO - [2/foo preparing job:01 flows:1] host=niwa-1007885.niwa.local
2022-01-25T17:45:52+13:00 INFO - [3/foo preparing job:01 flows:1] host=niwa-1007885.niwa.local
2022-01-25T17:45:52+13:00 INFO - [4/foo preparing job:01 flows:1] host=niwa-1007885.niwa.local
2022-01-25T17:45:52+13:00 INFO - [5/foo preparing job:01 flows:1] host=niwa-1007885.niwa.local
2022-01-25T17:45:52+13:00 ERROR - near ",": syntax error
    Traceback (most recent call last):
      File "/home/oliverh/cylc/cylc-flow/cylc/flow/scheduler.py", line 615, in start_scheduler
        await self.main_loop()
      File "/home/oliverh/cylc/cylc-flow/cylc/flow/scheduler.py", line 1538, in main_loop
        has_updated = await self.update_data_structure()
      File "/home/oliverh/cylc/cylc-flow/cylc/flow/scheduler.py", line 1599, in update_data_structure
        self.data_store_mgr.update_data_structure(
      File "/home/oliverh/cylc/cylc-flow/cylc/flow/data_store_mgr.py", line 1238, in update_data_structure
        self.apply_task_proxy_db_history()
      File "/home/oliverh/cylc/cylc-flow/cylc/flow/data_store_mgr.py", line 1048, in apply_task_proxy_db_history
        ) in flow_db.select_tasks_for_datastore(cycle_name_pairs):
      File "/home/oliverh/cylc/cylc-flow/cylc/flow/rundb.py", line 914, in select_tasks_for_datastore
        return list(self.connect().execute(stmt))
    sqlite3.OperationalError: near ",": syntax error
2022-01-25T17:45:52+13:00 CRITICAL - Workflow shutting down - near ",": syntax error
2022-01-25T17:45:53+13:00 INFO - DONE

Any ideas?

@dwsutherland
Copy link
Member Author

dwsutherland commented Jan 25, 2022

Any ideas?

Something to do with an empty request I think., Will put a fix in soon..

Will respond to the DB stuff soon.. I think the Job and task states table also grow unbounded (?), so not unique to prerequisites..

@hjoliver
Copy link
Member

Yes, but job and task states are essential history, whereas prerequisites aren’t (the latter can be derived from the former)

@dwsutherland
Copy link
Member Author

dwsutherland commented Jan 25, 2022

Yes, but job and task states are essential history, whereas prerequisites aren’t (the latter can be derived from the former)

Ok, well, if you're happy to hold a finite history of them...
I can jettison them based on some criterion.. If there was a timestamp we could cull based on that, because we need some history not in the pool or window (otherwise what's the point in loading prereq history at all?).

Do you want me to go back to no prereq history?

@hjoliver
Copy link
Member

I’m only talking about the database. We do have to store partially satisfied prereqs (actually already did that for SOD!), but otherwise we can infer prereq satisfaction from task state. So if you have to go to the database for task data then (a) check for partially satisfied prereqs, else (b) infer from the state recorded in the DB.

@dwsutherland
Copy link
Member Author

I’m only talking about the database. We do have to store partially satisfied prereqs (actually already did that for SOD!), but otherwise we can infer prereq satisfaction from task state. So if you have to go to the database for task data then (a) check for partially satisfied prereqs, else (b) infer from the state recorded in the DB.

Even before this PR the prerequisites were available and updated by the active tasks.

Conditionals could be hard to infer from the task state.

@oliver-sanders
Copy link
Member

oliver-sanders commented Feb 9, 2022

Testing going well, I've tried various nasty triggers and reflows, information has all been dragged in from the DB correctly 👍.

We could do with a nicer error message for incompatible databases as there are now a few folks out there using cylc-8.0b3. Nothing fancy needed, just something to make it clear this is a purposeful error not a random traceback.

@MetRonnie
Copy link
Member

I can tackle the DB message in a separate PR

@MetRonnie
Copy link
Member

Actually, not sure what could be improved here?

INFO - Extracting job.sh to /net/home/h04/rdutta/cylc-run/linear/.service/etc/job.sh
INFO - Workflow: linear
ERROR - Workflow shutting down - ServiceFileError: Cannot restart - Workflow database is
    incompatible with Cylc 8.0rc1.dev (workflow last run with Cylc 8.0b3)
INFO - DONE

@oliver-sanders
Copy link
Member

Actually, not sure what could be improved here?

That's not what I get:

$ git checkout upstream/master
$ cylc play <id>
$ cylc stop <id>   # let it stop with tasks in the pool
$ git checkout dwsutherland/n-gt-zero-db-loads
$ cylc play <id>
...
2022-02-10T10:16:54Z ERROR - no such column: flow_nums
    Traceback (most recent call last):
      File "~/cylc-flow/cylc/flow/scheduler.py", line 657, in run
        await self.configure()
      File "~/cylc-flow/cylc/flow/scheduler.py", line 481, in configure
        self._load_pool_from_db()
      File "~/cylc-flow/cylc/flow/scheduler.py", line 721, in _load_pool_from_db
        self.pool.load_db_task_pool_for_restart)
      File "~/cylc-flow/cylc/flow/rundb.py", line 824, in select_task_pool_for_restart
        callback(row_idx, list(row))
      File "~/cylc-flow/cylc/flow/task_pool.py", line 439, in load_db_task_pool_for_restart
        flow_nums,
      File "~/cylc-flow/cylc/flow/rundb.py", line 844, in select_task_prerequisites
        return list(self.connect().execute(stmt, stmt_args))
    sqlite3.OperationalError: no such column: flow_nums
2022-02-10T10:16:54Z CRITICAL - Workflow shutting down - no such column: flow_nums

@MetRonnie
Copy link
Member

MetRonnie commented Feb 10, 2022

That'll only happen for workflows run with 8.0rc1.dev prior to this PR being merged. If you check out 8.0b3 and start the workflow, then check out this branch then you should get the one-line message when trying to restart

@dwsutherland
Copy link
Member Author

dwsutherland commented Feb 10, 2022

Ya, the old DB is incompatible for restart.. So I guess that should be handled better?

Probably a follow-on PR though? Do you want it in for rc1?

@oliver-sanders
Copy link
Member

Ah, ok if it won't happen for the released code.

@MetRonnie
Copy link
Member

Ya, the old DB is incompatible for restart.. So I guess that should be handled better?

Probably a follow-on PR though? Do you want it in for rc1?

I don't think we need to do anything more. It was decided a while ago that DB incompatibility between pre-release versions is ok.

for message in json.loads(outputs_str).values():
itask.state.outputs.set_completion(message, True)
# Gather tasks with flow id.
prereq_ids.add(f'{tokens.relative_id}/{flow_nums_str}')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because flow_nums are defined as a set on the TaskProxy there is no guarantee of order when they are written to the DB which could mess with this comparison logic.

We should push serialisation/deserialisation through a sort to make it safe. Seems to work now so we can do this later - #4672

Comment on lines +1087 to +1090
for (
cycle, name, prereq_name,
prereq_cycle, prereq_output, satisfied
) in flow_db.select_prereqs_for_datastore(prereq_ids):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: we have ditched pylint which is the tool which used to tell us to double indent these statements.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, what's your preference? this:

        for (
            cycle, name,
            prereq_name,
            prereq_cycle,
            prereq_output,
            satisfied
        ) in flow_db.select_prereqs_for_datastore(prereq_ids):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. (Indent-wise; not necessarily each variable on a separate line).

Copy link
Member

@oliver-sanders oliver-sanders left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

From experimentation when I trigger a historical task I get four DB queries:

  1. Fetches the triggered task & its jobs.
  2. Fetches the n=1 tasks.
  3. Fetches the n=1 prerequisites (specifying the flow numbers).
  4. Fetches the n=1 jobs.

Note: Queries (1) & (2) don't specify the flow numbers because they are loading historical tasks

Here are the DB calls for triggering task 2/b in the graph:

a => b => c
b[-P1] => b
# Fetches the triggered task & its jobs.
                SELECT
                    task_states.cycle,
                    task_states.name,
                    task_states.status,
                    task_states.submit_num,
                    task_jobs.time_submit,
                    task_jobs.time_run,
                    task_jobs.time_run_exit,
                    task_jobs.job_runner_name,
                    task_jobs.job_id,
                    task_jobs.platform_name
                FROM
                    task_jobs
                JOIN
                    task_states
                ON  task_jobs.cycle == task_states.cycle AND
                    task_jobs.name == task_states.name AND
                    task_jobs.submit_num == task_states.submit_num
                WHERE
                    task_states.cycle || '/' || task_states.name IN (
                        '2/b'
                    )
                ORDER BY
                    task_states.submit_num DESC

# 2. Fetches the n=1 tasks.
                SELECT
                    task_states.cycle,
                    task_states.name,
                    task_states.flow_nums,
                    task_states.status,
                    MAX(task_states.submit_num),
                    task_outputs.outputs
                FROM
                    task_states
                LEFT OUTER JOIN
                    task_outputs
                ON  task_states.cycle == task_outputs.cycle AND
                    task_states.name == task_outputs.name
                WHERE
                    task_states.cycle || '/' || task_states.name IN (
                        '1/b', '2/a', '2/c', '3/b'
                    )
                GROUP BY
                    task_states.cycle, task_states.name

# 3. Fetches the n=1 prerequisites (specifying the flow numbers).
                SELECT
                    cycle,
                    name,
                    prereq_name,
                    prereq_cycle,
                    prereq_output,
                    satisfied
                FROM
                    task_prerequisites
                WHERE
                    cycle || '/' || name || '/' || flow_nums IN (
                        '2/c/[1]', '2/a/[1]', '3/b/[1]', '1/b/[1]'
                    )

# 4. Fetches the n=1 jobs.
                SELECT
                    task_states.cycle,
                    task_states.name,
                    task_states.status,
                    task_states.submit_num,
                    task_jobs.time_submit,
                    task_jobs.time_run,
                    task_jobs.time_run_exit,
                    task_jobs.job_runner_name,
                    task_jobs.job_id,
                    task_jobs.platform_name
                FROM
                    task_jobs
                JOIN
                    task_states
                ON  task_jobs.cycle == task_states.cycle AND
                    task_jobs.name == task_states.name AND
                    task_jobs.submit_num == task_states.submit_num
                WHERE
                    task_states.cycle || '/' || task_states.name IN (
                        '1/b', '2/a', '2/c', '3/b'
                    )
                ORDER BY
                    task_states.submit_num DESC

Checked some GraphiQL queries which show the flow_numbs are being loaded correctly. All good thanks @dwsutherland.

@oliver-sanders
Copy link
Member

(leaving the PR up for a short while incase any other reviews are still working on this)

@hjoliver
Copy link
Member

Two approvals, and no one has raise any unresolved problems ... merging. Thanks @dwsutherland

@hjoliver hjoliver merged commit 2dfb12a into cylc:master Feb 10, 2022
@MetRonnie MetRonnie linked an issue May 16, 2022 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
db change Change to the workflow database structure
Projects
None yet
Development

Successfully merging this pull request may close these issues.

compute prerequisites for tasks outside n=0
5 participants