Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Use flow numbers to map jobs to tasks in 8.1.x workflows. #5309

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/cylc/cylc-review/static/css/cylc-review.css
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,8 @@ th,
.livestamp {
white-space: nowrap;
}

.warning {
background-color: #FFBF00;
border-radius: 4px;
}
10 changes: 10 additions & 0 deletions lib/cylc/cylc-review/template/taskjobs.html
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,16 @@

<div class="container-fluid">
<div class="row">

{% if eight_point_zero == True %}
<div class="warning col-md-11">
<b>WARNING:</b> Cylc Review cannot display all jobs for Cylc 8.0.x Workflows, some jobs
may be hidden.
<br>
Upgrading to 8.1.x will fix this problem, but may not be possible for workflows with multiple "flows".
<br>
</div>
{% endif %}
<div class="col-md-11">
{% include "suite-state.html" -%}
</div>
Expand Down
11 changes: 8 additions & 3 deletions lib/cylc/review.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,14 @@ def taskjobs(
self.suite_dao.get_suite_state_summary(user, suite))
data["states"]["last_activity_time"] = (
self.get_last_activity_time(user, suite))
entries, of_n_entries = self.suite_dao.get_suite_job_entries(
user, suite, cycles, tasks, task_status, job_status, order,
per_page, (page - 1) * per_page)
(
entries, of_n_entries, eight_point_zero
) = self.suite_dao.get_suite_job_entries(
user, suite, cycles, tasks,
task_status, job_status, order,
per_page, (page - 1) * per_page
)
data["eight_point_zero"] = eight_point_zero
data["entries"] = entries
data["of_n_entries"] = of_n_entries
if per_page:
Expand Down
37 changes: 31 additions & 6 deletions lib/cylc/review_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import tarfile
import re
from glob import glob
from sqlite3 import OperationalError

from cylc.rundb import CylcSuiteDAO
from cylc.task_state import TASK_STATUS_GROUPS
Expand Down Expand Up @@ -95,6 +96,10 @@ class CylcReviewDAO(object):

REC_CYCLE_QUERY_OP = re.compile(r"\A(before |after |[<>]=?)(.+)\Z")
REC_SEQ_LOG = re.compile(r"\A(.+\.)([^\.]+)(\.[^\.]+)\Z")
CANNOT_JOIN_FLOW_NUMS = (
'cannot join using column flow_nums - '
'column not present in both tables'
)

def __init__(self):
self.daos = {}
Expand Down Expand Up @@ -169,7 +174,7 @@ def get_suite_broadcast_events(self, user_name, suite_name):

def get_suite_job_entries(
self, user_name, suite_name, cycles, tasks, task_status,
job_status, order, limit, offset):
job_status, order, limit, offset, flow_nums='flow_nums'):
"""Query suite runtime database to return a listing of task jobs.
user -- A string containing a valid user ID
suite -- A string containing a valid suite ID
Expand All @@ -189,6 +194,8 @@ def get_suite_job_entries(
the keys in CylcReviewDAO.ORDERS.
limit -- Limit number of returned entries
offset -- Offset entry number
flow_nums -- whether to use flow_nums

Return (entries, of_n_entries) where:
entries -- A list of matching entries
of_n_entries -- Total number of entries matching query
Expand All @@ -201,6 +208,8 @@ def get_suite_job_entries(
"out": {...},
"err": {...},
...}}
eight_zero_warning - boolean flag indicating that the database is
a Cylc 8.0 database and we can only get the latest task job.
"""
where_expr, where_args = self._get_suite_job_entries_where(
cycles, tasks, task_status, job_status)
Expand Down Expand Up @@ -236,7 +245,7 @@ def get_suite_job_entries(
" time_run, time_run_exit, run_signal, run_status," +
" platform_name, job_runner_name, job_id" +
" FROM task_states LEFT JOIN task_jobs USING " +
"(cycle, name, submit_num)" +
"(cycle, name, " + flow_nums + ") " +
where_expr +
" ORDER BY " +
self.JOB_ORDERS.get(order, self.JOB_ORDERS["time_desc"])
Expand Down Expand Up @@ -266,9 +275,25 @@ def get_suite_job_entries(
stmt += " LIMIT ? OFFSET ?"
limit_args = [limit, offset]

db_data = self._db_exec(
user_name, suite_name, stmt, where_args + limit_args
)
# Try except loop deals with case (Cylc 8.0) where the database
# doesn't contain enough information to identify multiple jobs
# belonging to the same task:
# https://github.com/cylc/cylc-flow/issues/5247
eight_zero_warning = False
try:
db_data = self._db_exec(
user_name, suite_name, stmt, where_args + limit_args
)
except OperationalError as exc:
if exc.message == self.CANNOT_JOIN_FLOW_NUMS:
stmt = stmt.replace('flow_nums', 'submit_num')
db_data = self._db_exec(
user_name, suite_name, stmt, where_args + limit_args
)
eight_zero_warning = True
else:
raise exc

for row in db_data:
(
cycle, name, submit_num, submit_num_max, task_status,
Expand Down Expand Up @@ -296,7 +321,7 @@ def get_suite_job_entries(
self._db_close(user_name, suite_name)
if entries:
self._get_job_logs(user_name, suite_name, entries, entry_of)
return (entries, of_n_entries)
return (entries, of_n_entries, eight_zero_warning)

def _get_suite_job_entries_where(
self, cycles, tasks, task_status, job_status):
Expand Down