Skip to content

Commit

Permalink
Intregation test added, fixed few bugs
Browse files Browse the repository at this point in the history
* Fixed DB test

* Now the db test also checks the stat and job_names in the filesystem

* feedback

* Changed test name,
Added generic functions in the test to expand it with other run options

* Added few todos

* disabled the check exit code as it is not working on the pipeline and not important right now

* Adding more chunks to the wrapper success

* More detailed test

* Add CI and dependabot GitHub actions (#2021)

* add CI and dependabot gh actions

* disable linting temporarily

(cherry picked from commit 8a1b2df)

* Improved test output in case of failure

* more info

* more info

* Test adding +x

* added chmod after sending

* chmod added

---------

Co-authored-by: dbeltran <daniel.beltran@bsc.es>
Co-authored-by: Luiggi Tenorio <luiggibit@gmail.com>
  • Loading branch information
3 people authored Dec 16, 2024
1 parent 0e943be commit dfaaa06
Show file tree
Hide file tree
Showing 11 changed files with 583 additions and 333 deletions.
25 changes: 25 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file

version: 2
updates:

# Maintain dependencies for GitHub Actions
- package-ecosystem: "github-actions"
# Workflow files stored in the default location of `.github/workflows`. (You don't need to specify `/.github/workflows` for `directory`. You can use `directory: "/"`.)
directory: "/"
schedule:
interval: "daily"
labels:
- 'dependencies'

# Maintain dependencies for PyPI
- package-ecosystem: "pip" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "daily"
labels:
- 'dependencies'

95 changes: 95 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
name: CI

on:
pull_request:
workflow_dispatch:
push:
branches:
- main

jobs:
# lint: # Turn on when linting issues are resolved
# runs-on: ubuntu-latest
# timeout-minutes: 2

# steps:
# - name: Checkout code
# uses: actions/checkout@v4

# - name: Set up Python
# uses: actions/setup-python@v5
# with:
# python-version: "3.9"

# - name: Install dependencies
# run: |
# python -m pip install --upgrade pip
# pip install -e .[all]

# - name: Lint code
# run: |
# ruff check .

test:
# needs: lint
runs-on: ubuntu-latest
timeout-minutes: 5

strategy:
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12"]

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install system dependencies
run: sudo apt-get install -y graphviz rsync curl

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e .[all]
- name: Run tests
run: |
pytest
- name: Coverage report
run: |
coverage xml
coverage report
- name: Upload coverage artifact
uses: actions/upload-artifact@v4
with:
name: coverage_${{ matrix.os }}_py-${{ matrix.python-version }}
path: coverage.xml
retention-days: 7

coverage:
needs: test
runs-on: ubuntu-latest
timeout-minutes: 2
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Download coverage artifacts
uses: actions/download-artifact@v4

- name: Codecov upload
uses: codecov/codecov-action@v5
with:
name: ${{ github.workflow }}
flags: fast-tests
fail_ci_if_error: true
verbose: true
# Token not required for public repos, but avoids upload failure due
# to rate-limiting (but not for PRs opened from forks)
token: ${{ secrets.CODECOV_TOKEN }}
40 changes: 11 additions & 29 deletions autosubmit/autosubmit.py
Original file line number Diff line number Diff line change
Expand Up @@ -1743,27 +1743,6 @@ def generate_scripts_andor_wrappers(as_conf, job_list, jobs_filtered, packages_p
for job in job_list.get_job_list():
job.status = Status.WAITING

@staticmethod
def terminate(all_threads):
# Closing threads on Ctrl+C
Log.info(
"Looking for active threads before closing Autosubmit. Ending the program before these threads finish may result in unexpected behavior. This procedure will last until all threads have finished or the program has waited for more than 30 seconds.")
timeout = 0
active_threads = True
while active_threads and timeout <= 60:
active_threads = False
for thread in all_threads:
if "JOB_" in thread.name:
if thread.is_alive():
active_threads = True
Log.info("{0} is still retrieving outputs, time remaining is {1} seconds.".format(
thread.name, 60 - timeout))
break
if active_threads:
sleep(10)
timeout += 10


@staticmethod
def manage_wrapper_job(as_conf, job_list, platform, wrapper_id, save=False):
check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time()
Expand Down Expand Up @@ -2144,7 +2123,12 @@ def get_iteration_info(as_conf,job_list):
Log.debug("Sleep: {0}", safetysleeptime)
Log.debug("Number of retrials: {0}", default_retrials)
return total_jobs, safetysleeptime, default_retrials, check_wrapper_jobs_sleeptime


@staticmethod
def check_logs_status(job_list, as_conf, new_run):
for job in job_list.get_completed_failed_without_logs():
job_list.update_log_status(job, as_conf, new_run)

@staticmethod
def run_experiment(expid, notransitive=False, start_time=None, start_after=None, run_only_members=None, profile=False):
"""
Expand Down Expand Up @@ -2203,6 +2187,7 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None,

max_recovery_retrials = as_conf.experiment_data.get("CONFIG",{}).get("RECOVERY_RETRIALS",3650) # (72h - 122h )
recovery_retrials = 0
Autosubmit.check_logs_status(job_list, as_conf, new_run=True)
while job_list.get_active():
for platform in platforms_to_test: # Send keep_alive signal
platform.work_event.set()
Expand All @@ -2211,7 +2196,7 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None,
did_run = True
try:
if Autosubmit.exit:
Autosubmit.terminate(threading.enumerate())
Autosubmit.check_logs_status(job_list, as_conf, new_run=False)
if job_list.get_failed():
return 1
return 0
Expand Down Expand Up @@ -2281,6 +2266,7 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None,
"Couldn't recover the Historical database, AS will continue without it, GUI may be affected")
job_changes_tracker = {}
if Autosubmit.exit:
Autosubmit.check_logs_status(job_list, as_conf, new_run=False)
job_list.save()
as_conf.save()
time.sleep(safetysleeptime)
Expand Down Expand Up @@ -2368,8 +2354,6 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None,
raise # If this happens, there is a bug in the code or an exception not-well caught
Log.result("No more jobs to run.")
# search hint - finished run
for job in job_list.get_completed_failed_without_logs():
job_list.update_log_status(job, as_conf)
job_list.save()
if not did_run and len(job_list.get_completed_failed_without_logs()) > 0: # Revise if there is any log unrecovered from previous run
Log.info(f"Connecting to the platforms, to recover missing logs")
Expand All @@ -2382,11 +2366,9 @@ def run_experiment(expid, notransitive=False, start_time=None, start_after=None,
Log.info("Waiting for all logs to be updated")
for p in platforms_to_test:
if p.log_recovery_process:
p.cleanup_event.set() # Send cleanup event
p.cleanup_event.set() # Send cleanup event
p.log_recovery_process.join()
Log.info(f"Log recovery process for {p.name} has finished")
for job in job_list.get_completed_failed_without_logs(): # update the info gathered from the childs
job_list.update_log_status(job, as_conf)
Autosubmit.check_logs_status(job_list, as_conf, new_run=False)
job_list.save()
if len(job_list.get_completed_failed_without_logs()) == 0:
Log.result(f"Autosubmit recovered all job logs.")
Expand Down
4 changes: 1 addition & 3 deletions autosubmit/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ def _init_runtime_parameters(self):
self._memory_per_task = ''
self.log_retrieved = False
self.start_time_timestamp = time.time()
self.end_time_placeholder = time.time()
self.processors_per_node = ""
self.stat_file = self.script_name[:-4] + "_STAT_0"

Expand Down Expand Up @@ -2783,8 +2782,7 @@ def _check_running_jobs(self):
self._platform.send_file(multiple_checker_inner_jobs, False)
command = f"cd {self._platform.get_files_path()}; {os.path.join(self._platform.get_files_path(), 'inner_jobs_checker.sh')}"
else:
command = os.path.join(
self._platform.get_files_path(), "inner_jobs_checker.sh")
command = f"cd {self._platform.get_files_path()}; ./inner_jobs_checker.sh; cd {os.getcwd()}"
#
wait = 2
retries = 5
Expand Down
7 changes: 2 additions & 5 deletions autosubmit/job/job_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -2666,7 +2666,7 @@ def _count_parents_status(job: Job, target_status: str) -> Tuple[List[Job], List
non_completed_parents_current.append(parent[0])
return non_completed_parents_current, completed_parents

def update_log_status(self, job, as_conf):
def update_log_status(self, job, as_conf, new_run=False):
"""
Updates the log err and log out.
"""
Expand All @@ -2681,7 +2681,7 @@ def update_log_status(self, job, as_conf):
if log_recovered:
job.local_logs = (log_recovered.name, log_recovered.name[:-4] + ".err") # we only want the last one
job.updated_log = True
elif not job.updated_log and str(as_conf.platforms_data.get(job.platform.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false":
elif new_run and not job.updated_log and str(as_conf.platforms_data.get(job.platform.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false":
job.platform.add_job_to_log_recover(job)
return log_recovered

Expand Down Expand Up @@ -2726,9 +2726,6 @@ def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter
if self.update_from_file(store_change):
save = store_change
Log.debug('Updating FAILED jobs')
write_log_status = False
for job in self.get_completed_failed_without_logs():
save = self.update_log_status(job, as_conf) if not save else save
if not first_time:
for job in self.get_failed():
job.packed = False
Expand Down
2 changes: 1 addition & 1 deletion autosubmit/platforms/locplatform.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def send_file(self, filename: str, check: bool = True) -> bool:
Returns:
bool: True if the file was sent successfully.
"""
command = f'{self.put_cmd} {os.path.join(self.tmp_path, Path(filename).name)} {os.path.join(self.tmp_path, "LOG_" + self.expid, Path(filename).name)}'
command = f'{self.put_cmd} {os.path.join(self.tmp_path, Path(filename).name)} {os.path.join(self.tmp_path, "LOG_" + self.expid, Path(filename).name)}; chmod 770 {os.path.join(self.tmp_path, "LOG_" + self.expid, Path(filename).name)}'
try:
subprocess.check_call(command, shell=True)
except subprocess.CalledProcessError:
Expand Down
75 changes: 43 additions & 32 deletions autosubmit/platforms/platform.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import atexit
import multiprocessing
import queue # only for the exception
from copy import copy
from os import _exit
import setproctitle
import locale
Expand Down Expand Up @@ -50,7 +51,7 @@ def put(self, job: Any, block: bool = True, timeout: float = None) -> None:
unique_name = job.name+str(job.fail_count) # We gather retrial per retrial
if unique_name not in self.all_items:
self.all_items.add(unique_name)
super().put(job, block, timeout)
super().put(copy(job), block, timeout) # Without copy, the process seems to modify the job for other retrials.. My guess is that the object is not serialized until it is get from the queue.


class Platform(object):
Expand Down Expand Up @@ -850,7 +851,11 @@ def submit_Script(self, hold=False):
raise NotImplementedError

def add_job_to_log_recover(self, job):
self.recovery_queue.put(job)
if job.id and int(job.id) != 0:
self.recovery_queue.put(job)
else:
Log.warning(f"Job {job.name} and retrial number:{job.fail_count} has no job id. Autosubmit will no record this retrial.")
job.updated_log = True

def connect(self, as_conf, reconnect=False):
raise NotImplementedError
Expand Down Expand Up @@ -937,40 +942,46 @@ def recover_job_log(self, identifier: str, jobs_pending_to_process: Set[Any]) ->
Set[Any]: Updated set of jobs pending to process.
"""
job = None
try:
while not self.recovery_queue.empty():

while not self.recovery_queue.empty():
try:
job = self.recovery_queue.get(
timeout=1) # Should be non-empty, but added a timeout for other possible errors.
job.children = set() # Children can't be serialized, so we set it to an empty set for this process.
job.platform = self # Change the original platform to this process platform.
job._log_recovery_retries = 0 # Reset the log recovery retries.
try:
job = self.recovery_queue.get(
timeout=1) # Should be non-empty, but added a timeout for other possible errors.
job.children = set() # Children can't be serialized, so we set it to an empty set for this process.
job.platform = self # Change the original platform to this process platform.
job._log_recovery_retries = 0 # Reset the log recovery retries.
job.retrieve_logfiles(self, raise_error=True)
if job.status == Status.FAILED:
Log.result(f"{identifier} Sucessfully recovered log files for job {job.name} and retrial:{job.fail_count}")
except queue.Empty:
pass

# This second while is to keep retring the failed jobs.
# With the unique queue, the main process won't send the job again, so we have to store it here.
while len(jobs_pending_to_process) > 0: # jobs that had any issue during the log retrieval
job = jobs_pending_to_process.pop()
job._log_recovery_retries += 1
Log.debug(
f"{identifier} (Retrial number: {job._log_recovery_retries}) Recovering log files for job {job.name}")
job.retrieve_logfiles(self, raise_error=True)
Log.result(f"{identifier} (Retrial) Successfully recovered log files for job {job.name}")
except Exception as e:
Log.info(f"{identifier} Error while recovering logs: {str(e)}")
try:
if job and job._log_recovery_retries < 5: # If log retrieval failed, add it to the pending jobs to process. Avoids to keep trying the same job forever.
Log.result(
f"{identifier} Successfully recovered log for job '{job.name}' and retry '{job.fail_count}'.")
except:
jobs_pending_to_process.add(job)
self.connected = False
Log.info(f"{identifier} Attempting to restore connection")
self.restore_connection(None) # Always restore the connection on a failure.
Log.result(f"{identifier} Sucessfully reconnected.")
except:
job._log_recovery_retries += 1
Log.warning(f"{identifier} (Retrial) Failed to recover log for job '{job.name}' and retry:'{job.fail_count}'.")
except queue.Empty:
pass

if len(jobs_pending_to_process) > 0: # Restore the connection if there was an issue with one or more jobs.
self.restore_connection(None)

# This second while is to keep retring the failed jobs.
# With the unique queue, the main process won't send the job again, so we have to store it here.
while len(jobs_pending_to_process) > 0: # jobs that had any issue during the log retrieval
job = jobs_pending_to_process.pop()
job._log_recovery_retries += 1
try:
job.retrieve_logfiles(self, raise_error=True)
job._log_recovery_retries += 1
except:
if job._log_recovery_retries < 5:
jobs_pending_to_process.add(job)
Log.warning(
f"{identifier} (Retrial) Failed to recover log for job '{job.name}' and retry '{job.fail_count}'.")
Log.result(
f"{identifier} (Retrial) Successfully recovered log for job '{job.name}' and retry '{job.fail_count}'.")
if len(jobs_pending_to_process) > 0:
self.restore_connection(None) # Restore the connection if there was an issue with one or more jobs.

return jobs_pending_to_process

def recover_platform_job_logs(self) -> None:
Expand Down
4 changes: 2 additions & 2 deletions autosubmit/platforms/wrappers/wrapper_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ def run(self):
out_path = os.path.join(os.getcwd(), out)
err_path = os.path.join(os.getcwd(), err)
template_path = os.path.join(os.getcwd(), self.template)
command = f"timeout {0} {{template_path}} > {{out_path}} 2> {{err_path}}"
command = f"chmod +x {{template_path}}; timeout {0} {{template_path}} > {{out_path}} 2> {{err_path}}"
print(command)
getstatusoutput(command)
Expand Down Expand Up @@ -1011,4 +1011,4 @@ def build_srun_launcher(self, jobs_list, footer=True):
def build_main(self):
nodelist = self.build_nodes_list()
srun_launcher = self.build_srun_launcher("scripts_list")
return nodelist, srun_launcher
return nodelist, srun_launcher
Loading

0 comments on commit dfaaa06

Please sign in to comment.