Skip to content

Commit

Permalink
Fixes an issue with custom_directives being added letter by letter
Browse files Browse the repository at this point in the history
  • Loading branch information
dbeltrankyl committed Jan 7, 2025
1 parent 0231bce commit f52c43f
Showing 1 changed file with 45 additions and 48 deletions.
93 changes: 45 additions & 48 deletions autosubmit/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,9 @@ def update_current_parameters(self, as_conf: AutosubmitConfig, parameters: dict)
parameters[f"CURRENT_{key.upper()}"] = value
for key, value in as_conf.jobs_data[self.section].items():
parameters[f"CURRENT_{key.upper()}"] = value

return parameters

def update_platform_parameters(self,as_conf,parameters,job_platform):
if not job_platform:
submitter = job_utils._get_submitter(as_conf)
Expand All @@ -1458,7 +1460,7 @@ def update_platform_parameters(self,as_conf,parameters,job_platform):
parameters['CURRENT_LOGDIR'] = job_platform.get_files_path()
return parameters

def process_scheduler_parameters(self,as_conf,parameters,job_platform,chunk):
def process_scheduler_parameters(self, job_platform, chunk):
"""
Parsers yaml data stored in the dictionary
and calculates the components of the heterogeneous job if any
Expand Down Expand Up @@ -1687,40 +1689,35 @@ def process_scheduler_parameters(self,as_conf,parameters,job_platform,chunk):
self.wallclock = increase_wallclock_by_chunk(
self.wallclock, self.wchunkinc, chunk)

def update_platform_associated_parameters(self,as_conf, parameters, job_platform, chunk):
job_data = as_conf.jobs_data[self.section]
platform_data = as_conf.platforms_data.get(job_platform.name,{})
self.x11_options = str(as_conf.jobs_data[self.section].get("X11_OPTIONS", as_conf.platforms_data.get(job_platform.name,{}).get("X11_OPTIONS","")))

self.ec_queue = str(job_data.get("EC_QUEUE", platform_data.get("EC_QUEUE","")))
self.executable = job_data.get("EXECUTABLE", platform_data.get("EXECUTABLE",""))
self.total_jobs = job_data.get("TOTALJOBS",job_data.get("TOTAL_JOBS", job_platform.total_jobs))
self.max_waiting_jobs = job_data.get("MAXWAITINGJOBS",job_data.get("MAX_WAITING_JOBS", job_platform.max_waiting_jobs))
self.processors = job_data.get("PROCESSORS",platform_data.get("PROCESSORS","1"))
self.shape = job_data.get("SHAPE",platform_data.get("SHAPE",""))
self.processors_per_node = job_data.get("PROCESSORS_PER_NODE",as_conf.platforms_data.get(job_platform.name,{}).get("PROCESSORS_PER_NODE","1"))
self.nodes = job_data.get("NODES",platform_data.get("NODES",""))
self.exclusive = job_data.get("EXCLUSIVE",platform_data.get("EXCLUSIVE",False))
self.threads = job_data.get("THREADS",platform_data.get("THREADS","1"))
self.tasks = job_data.get("TASKS",platform_data.get("TASKS","0"))
self.reservation = job_data.get("RESERVATION",as_conf.platforms_data.get(job_platform.name, {}).get("RESERVATION", ""))
self.hyperthreading = job_data.get("HYPERTHREADING",platform_data.get("HYPERTHREADING","none"))
self.queue = job_data.get("QUEUE",platform_data.get("QUEUE",""))
self.partition = job_data.get("PARTITION",platform_data.get("PARTITION",""))
self.scratch_free_space = int(job_data.get("SCRATCH_FREE_SPACE",platform_data.get("SCRATCH_FREE_SPACE",0)))

self.memory = job_data.get("MEMORY",platform_data.get("MEMORY",""))
self.memory_per_task = job_data.get("MEMORY_PER_TASK",platform_data.get("MEMORY_PER_TASK",""))
self.wallclock = job_data.get("WALLCLOCK",
as_conf.platforms_data.get(self.platform_name, {}).get(
"MAX_WALLCLOCK", None))
self.custom_directives = job_data.get("CUSTOM_DIRECTIVES", "")

self.process_scheduler_parameters(as_conf,parameters,job_platform,chunk)
if self.het.get('HETSIZE',1) > 1:
def update_platform_associated_parameters(self, as_conf, parameters, job_platform, chunk):
self.x11_options = str(parameters.get("CURRENT_X11_OPTIONS", ""))
self.ec_queue = str(parameters.get("CURRENT_EC_QUEUE", ""))
self.executable = parameters.get("CURRENT_EXECUTABLE", "")
self.total_jobs = parameters.get("CURRENT_TOTALJOBS",
parameters.get("CURRENT_TOTAL_JOBS", job_platform.total_jobs))
self.max_waiting_jobs = parameters.get("CURRENT_MAXWAITINGJOBS", parameters.get("CURRENT_MAX_WAITING_JOBS",
job_platform.max_waiting_jobs))
self.processors = parameters.get("CURRENT_PROCESSORS", "1")
self.shape = parameters.get("CURRENT_SHAPE", "")
self.processors_per_node = parameters.get("CURRENT_PROCESSORS_PER_NODE", "1")
self.nodes = parameters.get("CURRENT_NODES", "")
self.exclusive = parameters.get("CURRENT_EXCLUSIVE", False)
self.threads = parameters.get("CURRENT_THREADS", "1")
self.tasks = parameters.get("CURRENT_TASKS", "0")
self.reservation = parameters.get("CURRENT_RESERVATION", "")
self.hyperthreading = parameters.get("CURRENT_HYPERTHREADING", "none")
self.queue = parameters.get("CURRENT_QUEUE", "")
self.partition = parameters.get("CURRENT_PARTITION", "")
self.scratch_free_space = int(parameters.get("CURRENT_SCRATCH_FREE_SPACE", 0))
self.memory = parameters.get("CURRENT_MEMORY", "")
self.memory_per_task = parameters.get("CURRENT_MEMORY_PER_TASK", parameters.get("CURRENT_MEMORY_PER_TASK", ""))
self.wallclock = parameters.get("CURRENT_WALLCLOCK", parameters.get("CURRENT_MAX_WALLCLOCK", None))
self.custom_directives = parameters.get("CURRENT_CUSTOM_DIRECTIVES", "")
self.process_scheduler_parameters(job_platform, chunk)
if self.het.get('HETSIZE', 1) > 1:
for name, components_value in self.het.items():
if name != "HETSIZE":
for indx,component in enumerate(components_value):
for indx, component in enumerate(components_value):
if indx == 0:
parameters[name.upper()] = component
parameters[f'{name.upper()}_{indx}'] = component
Expand Down Expand Up @@ -1752,7 +1749,8 @@ def update_platform_associated_parameters(self,as_conf, parameters, job_platform
parameters['EXTENDED_HEADER'] = self.read_header_tailer_script(self.ext_header_path, as_conf, True)
parameters['EXTENDED_TAILER'] = self.read_header_tailer_script(self.ext_tailer_path, as_conf, False)
elif self.ext_header_path or self.ext_tailer_path:
Log.warning("An extended header or tailer is defined in {0}, but it is ignored in dummy projects.", self._section)
Log.warning("An extended header or tailer is defined in {0}, but it is ignored in dummy projects.",
self._section)
else:
parameters['EXTENDED_HEADER'] = ""
parameters['EXTENDED_TAILER'] = ""
Expand All @@ -1761,7 +1759,6 @@ def update_platform_associated_parameters(self,as_conf, parameters, job_platform
parameters['CURRENT_EC_QUEUE'] = self.ec_queue
parameters['PARTITION'] = self.partition


return parameters

def update_wrapper_parameters(self,as_conf, parameters):
Expand Down Expand Up @@ -1963,16 +1960,16 @@ def calendar_chunk(self, parameters):
parameters['CHUNK_LAST'] = 'FALSE'
return parameters

def update_job_parameters(self,as_conf, parameters):
def update_job_parameters(self, as_conf, parameters):
if self.splits == "auto":
self.splits = as_conf.jobs_data[self.section].get("SPLITS", None)
self.delete_when_edgeless = as_conf.jobs_data[self.section].get("DELETE_WHEN_EDGELESS", True)
self.check = as_conf.jobs_data[self.section].get("CHECK", False)
self.check_warnings = as_conf.jobs_data[self.section].get("CHECK_WARNINGS", False)
self.shape = as_conf.jobs_data[self.section].get("SHAPE", "")
self.script = as_conf.jobs_data[self.section].get("SCRIPT", "")
self.x11 = False if str(as_conf.jobs_data[self.section].get("X11", False)).lower() == "false" else True
self.notify_on = as_conf.jobs_data[self.section].get("NOTIFY_ON", [])
self.splits = parameters.get("CURRENT_SPLITS", None)
self.delete_when_edgeless = parameters.get("CURRENT_DELETE_WHEN_EDGELESS", True)
self.check = parameters.get("CURRENT_CHECK", False)
self.check_warnings = parameters.get("CURRENT_CHECK_WARNINGS", False)
self.shape = parameters.get("CURRENT_SHAPE", "")
self.script = parameters.get("CURRENT_SCRIPT", "")
self.x11 = False if str(parameters.get("CURRENT_X11", False)).lower() == "false" else True
self.notify_on = parameters.get("CURRENT_NOTIFY_ON", [])
self.update_stat_file()
if self.checkpoint: # To activate placeholder sustitution per <empty> in the template
parameters["AS_CHECKPOINT"] = self.checkpoint
Expand All @@ -1994,7 +1991,7 @@ def update_job_parameters(self,as_conf, parameters):
parameters = self.calendar_chunk(parameters)
parameters = self.calendar_split(as_conf,parameters)
parameters['NUMMEMBERS'] = len(as_conf.get_member_list())
self.dependencies = as_conf.jobs_data[self.section].get("DEPENDENCIES", "")
self.dependencies = parameters.get("CURRENT_DEPENDENCIES", "")
self.dependencies = str(self.dependencies)
parameters['JOB_DEPENDENCIES'] = self.dependencies
parameters['EXPORT'] = self.export
Expand Down Expand Up @@ -2065,13 +2062,13 @@ def update_parameters(self, as_conf, parameters,
parameters['PROJDIR'] = as_conf.get_project_dir()
# Set parameters dictionary
# Set final value
parameters = self.update_platform_parameters(as_conf, parameters, self._platform)
parameters = self.update_current_parameters(as_conf, parameters)
parameters = as_conf.deep_read_loops(parameters)
parameters = as_conf.substitute_dynamic_variables(parameters,80)
parameters = self.update_job_parameters(as_conf, parameters)
parameters = self.update_platform_parameters(as_conf, parameters, self._platform)
parameters = self.update_platform_associated_parameters(as_conf, parameters, self._platform, parameters['CHUNK'])
parameters = self.update_wrapper_parameters(as_conf, parameters)
parameters = as_conf.deep_read_loops(parameters)
parameters = as_conf.substitute_dynamic_variables(parameters,80)
self.update_job_variables_final_values(parameters)
# For some reason, there is return but the assignee is also necessary
self.parameters = parameters
Expand Down

0 comments on commit f52c43f

Please sign in to comment.