Skip to content

Commit

Permalink
Fixed issues with the db and logs not being recovered in different ru…
Browse files Browse the repository at this point in the history
…n scenearios

* Update VERSION

* added process_jobs_to_submit fixes runtime issue

* Added few todos that requires to change critical stuff

* Fix runtime issue, recovery now shows a better output

* Improved package test

* Improved package test

* Closing file descriptors on reconnect.

* Fixes an issue with logs with this sequence of commands: `create + run + create + recovery + run`

* Intregation test added, fixed few bugs

* Fixed DB test

* Now the db test also checks the stat and job_names in the filesystem

* feedback

* Changed test name,
Added generic functions in the test to expand it with other run options

* Added few todos

* disabled the check exit code as it is not working on the pipeline and not important right now

* Adding more chunks to the wrapper success

* More detailed test

* Add CI and dependabot GitHub actions (#2021)

* add CI and dependabot gh actions

* disable linting temporarily

(cherry picked from commit 8a1b2df)

* Improved test output in case of failure

* more info

* more info

* Test adding +x

* added chmod after sending

* chmod added

---------

Co-authored-by: dbeltran <daniel.beltran@bsc.es>
Co-authored-by: Luiggi Tenorio <luiggibit@gmail.com>

---------

Co-authored-by: dbeltran <daniel.beltran@bsc.es>
Co-authored-by: Luiggi Tenorio <luiggibit@gmail.com>
  • Loading branch information
3 people committed Dec 16, 2024
1 parent 371767b commit 9733679
Show file tree
Hide file tree
Showing 19 changed files with 836 additions and 370 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4.1.11
4.1.12
58 changes: 17 additions & 41 deletions autosubmit/autosubmit.py
Original file line number Diff line number Diff line change
Expand Up @@ -1743,27 +1743,6 @@ def generate_scripts_andor_wrappers(as_conf, job_list, jobs_filtered, packages_p
for job in job_list.get_job_list():
job.status = Status.WAITING

@staticmethod
def terminate(all_threads):
# Closing threads on Ctrl+C
Log.info(
"Looking for active threads before closing Autosubmit. Ending the program before these threads finish may result in unexpected behavior. This procedure will last until all threads have finished or the program has waited for more than 30 seconds.")
timeout = 0
active_threads = True
while active_threads and timeout <= 60:
active_threads = False
for thread in all_threads:
if "JOB_" in thread.name:
if thread.is_alive():
active_threads = True
Log.info("{0} is still retrieving outputs, time remaining is {1} seconds.".format(
thread.name, 60 - timeout))
break
if active_threads:
sleep(10)
timeout += 10


@staticmethod
def manage_wrapper_job(as_conf, job_list, platform, wrapper_id, save=False):
check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time()
Expand Down Expand Up @@ -2144,7 +2123,12 @@ def get_iteration_info(as_conf,job_list):
Log.debug("Sleep: {0}", safetysleeptime)
Log.debug("Number of retrials: {0}", default_retrials)
return total_jobs, safetysleeptime, default_retrials, check_wrapper_jobs_sleeptime


@staticmethod
def check_logs_status(job_list, as_conf, new_run):
for job in job_list.get_completed_failed_without_logs():
job_list.update_log_status(job, as_conf, new_run)

@staticmethod
def run_experiment(expid, notransitive=False, start_time=None, start_after=None, run_only_members=None, profile=False):
"""
Expand Down Expand Up @@ -2203,6 +2187,7 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None,

max_recovery_retrials = as_conf.experiment_data.get("CONFIG",{}).get("RECOVERY_RETRIALS",3650) # (72h - 122h )
recovery_retrials = 0
Autosubmit.check_logs_status(job_list, as_conf, new_run=True)
while job_list.get_active():
for platform in platforms_to_test: # Send keep_alive signal
platform.work_event.set()
Expand All @@ -2211,7 +2196,7 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None,
did_run = True
try:
if Autosubmit.exit:
Autosubmit.terminate(threading.enumerate())
Autosubmit.check_logs_status(job_list, as_conf, new_run=False)
if job_list.get_failed():
return 1
return 0
Expand Down Expand Up @@ -2281,6 +2266,7 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None,
"Couldn't recover the Historical database, AS will continue without it, GUI may be affected")
job_changes_tracker = {}
if Autosubmit.exit:
Autosubmit.check_logs_status(job_list, as_conf, new_run=False)
job_list.save()
as_conf.save()
time.sleep(safetysleeptime)
Expand Down Expand Up @@ -2368,8 +2354,6 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None,
raise # If this happens, there is a bug in the code or an exception not-well caught
Log.result("No more jobs to run.")
# search hint - finished run
for job in job_list.get_completed_failed_without_logs():
job_list.update_log_status(job, as_conf)
job_list.save()
if not did_run and len(job_list.get_completed_failed_without_logs()) > 0: # Revise if there is any log unrecovered from previous run
Log.info(f"Connecting to the platforms, to recover missing logs")
Expand All @@ -2382,10 +2366,9 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None,
Log.info("Waiting for all logs to be updated")
for p in platforms_to_test:
if p.log_recovery_process:
p.cleanup_event.set() # Send cleanup event
p.cleanup_event.set() # Send cleanup event
p.log_recovery_process.join()
for job in job_list.get_completed_failed_without_logs(): # update the info gathered from the childs
job_list.update_log_status(job, as_conf)
Autosubmit.check_logs_status(job_list, as_conf, new_run=False)
job_list.save()
if len(job_list.get_completed_failed_without_logs()) == 0:
Log.result(f"Autosubmit recovered all job logs.")
Expand All @@ -2401,8 +2384,8 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None,
Autosubmit.database_fix(expid)
except Exception as e:
pass
for platform in platforms_to_test:
platform.closeConnection()
for p in platforms_to_test:
p.closeConnection()
if len(job_list.get_failed()) > 0:
Log.info("Some jobs have failed and reached maximum retrials")
else:
Expand Down Expand Up @@ -2573,7 +2556,6 @@ def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence
raise
except BaseException as e:
raise
raise AutosubmitCritical("This seems like a bug in the code, please contact AS developers", 7070, str(e))

@staticmethod
def monitor(expid, file_format, lst, filter_chunks, filter_status, filter_section, hide, txt_only=False,
Expand Down Expand Up @@ -2981,9 +2963,8 @@ def recovery(expid, noplot, save, all_jobs, hide, group_by=None, expand=list(),
"Experiment can't be recovered due being {0} active jobs in your experiment, If you want to recover the experiment, please use the flag -f and all active jobs will be cancelled".format(
len(current_active_jobs)), 7000)
Log.debug("Job list restored from {0} files", pkl_dir)
except BaseException as e:
raise AutosubmitCritical(
"Couldn't restore the job_list or packages, check if the filesystem is having issues", 7040, str(e))
except Exception:
raise
Log.info('Recovering experiment {0}'.format(expid))
try:
for job in job_list.get_job_list():
Expand Down Expand Up @@ -3017,13 +2998,8 @@ def recovery(expid, noplot, save, all_jobs, hide, group_by=None, expand=list(),
job.status = Status.COMPLETED
Log.info(
"CHANGED job '{0}' status to COMPLETED".format(job.name))
# Log.status("CHANGED job '{0}' status to COMPLETED".format(job.name))

if not no_recover_logs:
try:
job.platform.get_logs_files(expid, job.remote_logs)
except Exception as e:
pass
job.recover_last_ready_date()
job.recover_last_log_name()
elif job.status != Status.SUSPENDED:
job.status = Status.WAITING
job._fail_count = 0
Expand Down
50 changes: 47 additions & 3 deletions autosubmit/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ def __init__(self, name, job_id, status, priority):
self.parameters = None
self._tmp_path = os.path.join(
BasicConfig.LOCAL_ROOT_DIR, self.expid, BasicConfig.LOCAL_TMP_DIR)
self._log_path = Path(f"{self._tmp_path}/LOG_{self.expid}")
self.write_start = False
self._platform = None
self.check = 'true'
Expand Down Expand Up @@ -274,7 +275,6 @@ def _init_runtime_parameters(self):
self._memory_per_task = ''
self.log_retrieved = False
self.start_time_timestamp = time.time()
self.end_time_placeholder = time.time()
self.processors_per_node = ""
self.stat_file = self.script_name[:-4] + "_STAT_0"

Expand Down Expand Up @@ -2501,6 +2501,51 @@ def synchronize_logs(self, platform, remote_logs, local_logs, last = True):
self.local_logs = local_logs
self.remote_logs = copy.deepcopy(local_logs)

def _recover_last_log_name_from_filesystem(self) -> bool:
"""
Recovers the log name for the job from the filesystem.
:return: True if the log name was already recovered, False otherwise
:rtype: bool
"""
log_name = sorted(list(self._log_path.glob(f"{self.name}*")), key=lambda x: x.stat().st_mtime)
log_name = log_name[-1] if log_name else None
if log_name:
file_timestamp = int(datetime.datetime.fromtimestamp(log_name.stat().st_mtime).strftime("%Y%m%d%H%M%S"))
if self.ready_date and file_timestamp >= int(self.ready_date):
self.local_logs = (log_name.with_suffix(".out").name, log_name.with_suffix(".err").name)
self.remote_logs = copy.deepcopy(self.local_logs)
return True
self.local_logs = (f"{self.name}.out.{self._fail_count}", f"{self.name}.err.{self._fail_count}")
self.remote_logs = copy.deepcopy(self.local_logs)
return False

def recover_last_log_name(self):
"""
Recovers the last log name for the job
"""
if not self.updated_log:
self.updated_log = self._recover_last_log_name_from_filesystem()
# TODO: After PostgreSQL migration, implement _recover_last_log_from_db() to retrieve the last log from the database.

def recover_last_ready_date(self) -> None:
"""
Recovers the last ready date for this job
"""
if not self.ready_date:
stat_file = Path(f"{self._tmp_path}/{self.name}_TOTAL_STATS")
if stat_file.exists():
output_by_lines = stat_file.read_text().splitlines()
if output_by_lines:
line_info = output_by_lines[-1].split(" ")
if line_info and line_info[0].isdigit():
self.ready_date = line_info[0]
else:
self.ready_date = datetime.datetime.fromtimestamp(stat_file.stat().st_mtime).strftime('%Y%m%d%H%M%S')
Log.debug(f"Failed to recover ready date for the job {self.name}")
else: # Default to last mod time
self.ready_date = datetime.datetime.fromtimestamp(stat_file.stat().st_mtime).strftime('%Y%m%d%H%M%S')
Log.debug(f"Failed to recover ready date for the job {self.name}")


class WrapperJob(Job):
"""
Expand Down Expand Up @@ -2737,8 +2782,7 @@ def _check_running_jobs(self):
self._platform.send_file(multiple_checker_inner_jobs, False)
command = f"cd {self._platform.get_files_path()}; {os.path.join(self._platform.get_files_path(), 'inner_jobs_checker.sh')}"
else:
command = os.path.join(
self._platform.get_files_path(), "inner_jobs_checker.sh")
command = f"cd {self._platform.get_files_path()}; ./inner_jobs_checker.sh; cd {os.getcwd()}"
#
wait = 2
retries = 5
Expand Down
7 changes: 2 additions & 5 deletions autosubmit/job/job_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -2666,7 +2666,7 @@ def _count_parents_status(job: Job, target_status: str) -> Tuple[List[Job], List
non_completed_parents_current.append(parent[0])
return non_completed_parents_current, completed_parents

def update_log_status(self, job, as_conf):
def update_log_status(self, job, as_conf, new_run=False):
"""
Updates the log err and log out.
"""
Expand All @@ -2681,7 +2681,7 @@ def update_log_status(self, job, as_conf):
if log_recovered:
job.local_logs = (log_recovered.name, log_recovered.name[:-4] + ".err") # we only want the last one
job.updated_log = True
elif not job.updated_log and str(as_conf.platforms_data.get(job.platform.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false":
elif new_run and not job.updated_log and str(as_conf.platforms_data.get(job.platform.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false":
job.platform.add_job_to_log_recover(job)
return log_recovered

Expand Down Expand Up @@ -2726,9 +2726,6 @@ def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter
if self.update_from_file(store_change):
save = store_change
Log.debug('Updating FAILED jobs')
write_log_status = False
for job in self.get_completed_failed_without_logs():
save = self.update_log_status(job, as_conf) if not save else save
if not first_time:
for job in self.get_failed():
job.packed = False
Expand Down
9 changes: 8 additions & 1 deletion autosubmit/job/job_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,13 @@ def _send_files(self):
def _do_submission(self,job_scripts=None, hold=False):
""" Submit package to the platform. """


def process_jobs_to_submit(self, job_id: str, hold: bool = False) -> None:
for i, job in enumerate(self.jobs):
job.hold = hold
job.id = str(job_id)
job.status = Status.SUBMITTED
if hasattr(self, "name"): # TODO change this check for a property that checks if it is a wrapper or not, the same change has to be done in other parts of the code
job.wrapper_name = self.name

class JobPackageSimple(JobPackageBase):
"""
Expand All @@ -230,6 +236,7 @@ def __init__(self, jobs):
super(JobPackageSimple, self).__init__(jobs)
self._job_scripts = {}
self.export = jobs[0].export
# self.name = "simple_package" TODO this should be possible, but it crashes accross the code. Add a property that defines what is a package with wrappers

def _create_scripts(self, configuration):
for job in self.jobs:
Expand Down
2 changes: 1 addition & 1 deletion autosubmit/platforms/locplatform.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def send_file(self, filename: str, check: bool = True) -> bool:
Returns:
bool: True if the file was sent successfully.
"""
command = f'{self.put_cmd} {os.path.join(self.tmp_path, Path(filename).name)} {os.path.join(self.tmp_path, "LOG_" + self.expid, Path(filename).name)}'
command = f'{self.put_cmd} {os.path.join(self.tmp_path, Path(filename).name)} {os.path.join(self.tmp_path, "LOG_" + self.expid, Path(filename).name)}; chmod 770 {os.path.join(self.tmp_path, "LOG_" + self.expid, Path(filename).name)}'
try:
subprocess.check_call(command, shell=True)
except subprocess.CalledProcessError:
Expand Down
1 change: 1 addition & 0 deletions autosubmit/platforms/paramiko_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def wrapper(self):
return self._wrapper

def reset(self):
self.closeConnection()
self.connected = False
self._ssh = None
self._ssh_config = None
Expand Down
7 changes: 1 addition & 6 deletions autosubmit/platforms/pjmplatform.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,7 @@ def process_batch_ready_jobs(self,valid_packages_to_submit,failed_packages,error
sleep(10)

for package in valid_packages_to_submit:
for job in package.jobs:
job.hold = hold
job.id = str(jobs_id[i])
job.status = Status.SUBMITTED
job.wrapper_name = package.name

package.process_jobs_to_submit(jobs_id[i], hold)
i += 1
save = True
except AutosubmitError as e:
Expand Down
Loading

0 comments on commit 9733679

Please sign in to comment.