Skip to content

Commit

Permalink
job: increment the submission number at preparation time
Browse files Browse the repository at this point in the history
* Addresses #4974
* Job submission number used to be incremented *after* submission
  (i.e. only once there is a "submission" of which to speak).
* However, we also incremented the submission number if submission
  (or preparation) failed (in which cases there isn't really a
  "submission" but we need one for internal purposes).
* Now the submission number is incremented when tasks enter the
  "preparing" state.
* This resolves an issue where jobs which were going through the
  submission pipeline during a reload got badly broken in the scheduler
  (until restarted).
  • Loading branch information
oliver-sanders committed Jul 15, 2022
1 parent d4938dd commit 5c1cbfe
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
prepared_tasks = []
bad_tasks = []
for itask in itasks:
if itask.state_reset(TASK_STATUS_PREPARING):
if not itask.state(TASK_STATUS_PREPARING):
# bump the submit_num *before* resetting the state so that the
# state transition message reflects the correct submit_num
itask.submit_num += 1
itask.state_reset(TASK_STATUS_PREPARING)
self.data_store_mgr.delta_task_state(itask)
self.workflow_db_mgr.put_update_task_state(itask)
prep_task = self._prep_submit_task_job(
Expand All @@ -255,6 +259,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
Return (list): list of tasks that attempted submission.
"""

if is_simulation:
return self._simulation_submit_task_jobs(itasks, workflow)
# Prepare tasks for job submission
Expand Down Expand Up @@ -996,6 +1001,7 @@ def _simulation_submit_task_jobs(self, itasks, workflow):
"""Simulation mode task jobs submission."""
for itask in itasks:
itask.waiting_on_job_prep = False
itask.submit_num += 1
self._set_retry_timers(itask)
itask.platform = {'name': 'SIMULATION'}
itask.summary['job_runner_name'] = 'SIMULATION'
Expand Down Expand Up @@ -1037,7 +1043,6 @@ def _submit_task_job_callback_255(
self, workflow, itask, cmd_ctx, line
):
"""Helper for _submit_task_jobs_callback, on one task job."""
itask.submit_num -= 1
self.task_events_mgr._retry_task(
itask, time(), submit_retry=True
)
Expand Down Expand Up @@ -1079,7 +1084,10 @@ def _submit_task_job_callback(self, workflow, itask, cmd_ctx, line):
def _prep_submit_task_job(self, workflow, itask, check_syntax=True):
"""Prepare a task job submission.
Return itask on a good preparation.
Returns:
* itask - preparation complete.
* None - preparation in progress.
* False - perparation failed.
"""
if itask.local_job_file_path:
Expand Down Expand Up @@ -1131,9 +1139,7 @@ def _prep_submit_task_job(self, workflow, itask, check_syntax=True):
rtconfig['platform'], PLATFORM_REC_COMMAND
)
except PlatformError as exc:
# Submit number not yet incremented
itask.waiting_on_job_prep = False
itask.submit_num += 1
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(workflow, itask)
Expand Down Expand Up @@ -1170,9 +1176,7 @@ def _prep_submit_task_job(self, workflow, itask, check_syntax=True):
platform = get_platform(rtconfig, bad_hosts=self.bad_hosts)

except PlatformLookupError as exc:
# Submit number not yet incremented
itask.waiting_on_job_prep = False
itask.submit_num += 1
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(workflow, itask)
Expand All @@ -1182,8 +1186,6 @@ def _prep_submit_task_job(self, workflow, itask, check_syntax=True):
return False
else:
itask.platform = platform
# Submit number not yet incremented
itask.submit_num += 1
# Retry delays, needed for the try_num
self._set_retry_timers(itask, rtconfig)

Expand Down

0 comments on commit 5c1cbfe

Please sign in to comment.