Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix job submit time delta. #4553

Merged
merged 5 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ are now sparse, i.e. they will no longer be fleshed-out with defaults.

### Fixes

[#4553](https://github.com/cylc/cylc-flow/pull/4553) - Add job submit time
to the datastore.

[#4526](https://github.com/cylc/cylc-flow/pull/4526) - Prevent `runN` and
`run<number>` being allowed as installation target names.

[#4526](https://github.com/cylc/cylc-flow/pull/4526),
[#4549](https://github.com/cylc/cylc-flow/pull/4549) - Prevent installing
workflows with directory names that include reserved filenames such as
Expand Down
19 changes: 9 additions & 10 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1030,20 +1030,22 @@ def _process_message_submitted(self, itask, event_time, submit_num):
f"[{summary['submit_method_id']}]"
)

# Register the newly submitted job with the database and datastore.
self._insert_task_job(itask, event_time, self.JOB_SUBMIT_SUCCESS_FLAG)
job_d = get_task_job_id(
itask.point, itask.tdef.name, itask.submit_num)

itask.set_summary_time('submitted', event_time)
self.data_store_mgr.delta_job_time(job_d, 'submitted', event_time)
if itask.tdef.run_mode == 'simulation':
# Simulate job execution at this point.
itask.set_summary_time('submitted', event_time)
# Simulate job started as well.
itask.set_summary_time('started', event_time)
self.data_store_mgr.delta_job_time(job_d, 'started', event_time)
if itask.state_reset(TASK_STATUS_RUNNING):
self.data_store_mgr.delta_task_state(itask)
itask.state.outputs.set_completion(TASK_OUTPUT_STARTED, True)
self.data_store_mgr.delta_task_output(itask, TASK_OUTPUT_STARTED)

else:
itask.set_summary_time('submitted', event_time)
job_d = get_task_job_id(
itask.point, itask.tdef.name, itask.submit_num)
self.data_store_mgr.delta_job_time(job_d, 'submitted', event_time)
self.data_store_mgr.delta_job_state(job_d, TASK_STATUS_SUBMITTED)
# Unset started and finished times in case of resubmission.
itask.set_summary_time('started')
Expand All @@ -1065,9 +1067,6 @@ def _process_message_submitted(self, itask, event_time, submit_num):
self.data_store_mgr.delta_task_queued(itask)
self._reset_job_timers(itask)

# Register the newly submitted job with the database and datastore.
self._insert_task_job(itask, event_time, self.JOB_SUBMIT_SUCCESS_FLAG)

def _insert_task_job(
self,
itask: 'TaskProxy',
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ def copy_to_reload_successor(self, reload_successor):
reload_successor.state.is_runahead = self.state.is_runahead
reload_successor.state.is_updated = self.state.is_updated
reload_successor.state.prerequisites = self.state.prerequisites
reload_successor.jobs = self.jobs

@staticmethod
def get_offset_as_seconds(offset):
Expand Down
24 changes: 21 additions & 3 deletions tests/functional/graphql/03-is-held-arg.t
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ query {
workflows {
name
isHeldTotal
taskProxies(isHeld: false) {
taskProxies(isHeld: true) {
id
jobs {
submittedTime
startedTime
}
}
familyProxies(exids: [\"root\"], isHeld: true) {
id
Expand All @@ -69,14 +73,28 @@ run_ok "${TEST_NAME_BASE}-contact" cylc get-contact "${WORKFLOW_NAME}"
# stop workflow
cylc stop --max-polls=10 --interval=2 --kill "${WORKFLOW_NAME}"

RESPONSE="${TEST_NAME_BASE}-is-held-arg.stdout"
perl -pi -e 's/("submittedTime":).*$/${1} "blargh",/' "${RESPONSE}"
perl -pi -e 's/("startedTime":).*$/${1} "blargh"/' "${RESPONSE}"

# compare to expectation
cmp_json "${TEST_NAME}-out" "${TEST_NAME_BASE}-is-held-arg.stdout" << __HERE__
cmp_json "${TEST_NAME}-out" "$RESPONSE" << __HERE__
{
"workflows": [
{
"name": "${WORKFLOW_NAME}",
"isHeldTotal": 1,
"taskProxies": [],
"taskProxies": [
{
"id": "${USER}${ID_DELIM}${WORKFLOW_NAME}${ID_DELIM}1${ID_DELIM}foo",
"jobs": [
{
"submittedTime": "blargh",
"startedTime": "blargh"
}
]
}
],
"familyProxies": [
{
"id": "${USER}${ID_DELIM}${WORKFLOW_NAME}${ID_DELIM}1${ID_DELIM}BAZ"
Expand Down
10 changes: 4 additions & 6 deletions tests/functional/lib/python/diffr.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,18 +224,16 @@ def load_json(file1, file2=None):
"""
try:
this = json.loads(file1)
except json.decoder.JSONDecodeError:
sys.exit('Syntax error in file1')
raise
except json.decoder.JSONDecodeError as exc:
sys.exit(f'Syntax error in file1: {exc}')

try:
if file2:
that = json.loads(file2)
else:
that = json.load(sys.stdin)
except json.decoder.JSONDecodeError:
sys.exit('Syntax error in file2')
raise
except json.decoder.JSONDecodeError as exc:
sys.exit(f'Syntax error in file2: {exc}')

return this, that

Expand Down