Skip to content

Commit

Permalink
Cleanup log handling in remote workflows.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Nov 7, 2024
1 parent 8aa3ffe commit 530e26a
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 42 deletions.
16 changes: 11 additions & 5 deletions law/contrib/arc/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from law.task.proxy import ProxyCommand
from law.target.file import get_path
from law.parameter import CSVParameter
from law.util import law_src_path, merge_dicts, DotDict
from law.util import no_value, law_src_path, merge_dicts, DotDict
from law.logger import get_logger

from law.contrib.arc.job import ARCJobManager, ARCJobFileFactory
Expand Down Expand Up @@ -114,10 +114,10 @@ def create_job_file(self, job_num, branches):
if dashboard_file:
c.input_files["dashboard_file"] = dashboard_file

# log files
c.log = None
c.stdout = None
c.stderr = None
# initialize logs with empty values and defer to defaults later
c.log = no_value
c.stdout = no_value
c.stderr = no_value
if task.transfer_logs:
log_file = "stdall.txt"
c.stdout = log_file
Expand All @@ -134,6 +134,12 @@ def create_job_file(self, job_num, branches):
# build the job file and get the sanitized config
job_file, c = self.job_file_factory(postfix=postfix, **c.__dict__)

# logging defaults
c.log = c.log or None
c.stdout = c.stdout or None
c.stderr = c.stderr or None
c.custom_log_file = c.custom_log_file or None

# determine the custom log file uri if set
abs_log_file = None
if c.custom_log_file:
Expand Down
3 changes: 1 addition & 2 deletions law/contrib/cms/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ def create_job_file(self, submit_jobs):

# log file
if task.transfer_logs:
log_file = "stdall.txt"
c.custom_log_file = log_file
c.custom_log_file = "stdall.txt"

# task hook
c = task.crab_job_config(c, list(submit_jobs.keys()), list(submit_jobs.values()))
Expand Down
13 changes: 9 additions & 4 deletions law/contrib/glite/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from law.task.proxy import ProxyCommand
from law.target.file import get_path
from law.parameter import CSVParameter
from law.util import law_src_path, merge_dicts, DotDict
from law.util import no_value, law_src_path, merge_dicts, DotDict
from law.logger import get_logger

from law.contrib.glite.job import GLiteJobManager, GLiteJobFileFactory
Expand Down Expand Up @@ -132,9 +132,9 @@ def create_job_file(self, job_num, branches):
if dashboard_file:
c.input_files["dashboard_file"] = dashboard_file

# log file
c.stdout = None
c.stderr = None
# initialize logs with empty values and defer to defaults later
c.stdout = no_value
c.stderr = no_value
if task.transfer_logs:
log_file = "stdall.txt"
c.stdout = log_file
Expand All @@ -150,6 +150,11 @@ def create_job_file(self, job_num, branches):
# build the job file and get the sanitized config
job_file, c = self.job_file_factory(postfix=postfix, **c.__dict__)

# logging defaults
c.stdout = c.stdout or None
c.stderr = c.stderr or None
c.custom_log_file = c.custom_log_file or None

# determine the custom log file uri if set
abs_log_file = None
if c.custom_log_file:
Expand Down
57 changes: 26 additions & 31 deletions law/contrib/lsf/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
__all__ = ["LSFWorkflow"]


import os
import contextlib
from abc import abstractmethod
from collections import OrderedDict
Expand All @@ -22,7 +21,7 @@
from law.target.file import get_path, get_scheme, FileSystemDirectoryTarget
from law.target.local import LocalDirectoryTarget
from law.parameter import NO_STR
from law.util import law_src_path, merge_dicts, DotDict
from law.util import no_value, law_src_path, merge_dicts, DotDict
from law.logger import get_logger

from law.contrib.lsf.job import LSFJobManager, LSFJobFileFactory
Expand Down Expand Up @@ -109,25 +108,26 @@ def create_job_file(self, job_num, branches):
if dashboard_file:
c.input_files["dashboard_file"] = dashboard_file

# logging
# we do not use lsf's logging mechanism since it might require that the submission
# directory is present when it retrieves logs, and therefore we use a custom log file
c.stdout = None
c.stderr = None
# initialize logs with empty values and defer to defaults later
c.stdout = no_value
c.stderr = no_value
if task.transfer_logs:
c.custom_log_file = "stdall.txt"

# we can use lsf's file stageout only when the output directory is local
# otherwise, one should use the stageout_file and stageout manually
def cast_output_dir(output_dir):
if isinstance(output_dir, FileSystemDirectoryTarget):
return output_dir
path = get_path(output_dir)
if get_scheme(path) in (None, "file"):
return LocalDirectoryTarget(path)
# helper to cast directory paths to local directory targets if possible
def cast_dir(output_dir, touch=True):
if not isinstance(output_dir, FileSystemDirectoryTarget):
path = get_path(output_dir)
if get_scheme(path) not in (None, "file"):
return output_dir
output_dir = LocalDirectoryTarget(path)
if touch:
output_dir.touch()
return output_dir

output_dir = cast_output_dir(task.lsf_output_directory())
# when the output dir is local, we can run within this directory for easier output file
# handling and use absolute paths for input files
output_dir = cast_dir(task.lsf_output_directory())
output_dir_is_local = isinstance(output_dir, LocalDirectoryTarget)
if output_dir_is_local:
c.absolute_paths = True
Expand All @@ -146,13 +146,17 @@ def cast_output_dir(output_dir):
# build the job file and get the sanitized config
job_file, c = self.job_file_factory(postfix=postfix, **c.__dict__)

# logging defaults
# we do not use lsf's logging mechanism since it might require that the submission
# directory is present when it retrieves logs, and therefore we use a custom log file
c.stdout = c.stdout or None
c.stderr = c.stderr or None
c.custom_log_file = c.custom_log_file or None

# get the location of the custom local log file if any
abs_log_file = None
log_dir = task.lsf_log_directory()
log_dir = cast_output_dir(log_dir) if log_dir else output_dir
log_dir_is_local = isinstance(log_dir, LocalDirectoryTarget)
if log_dir_is_local and c.custom_log_file:
abs_log_file = os.path.join(log_dir.abspath, c.custom_log_file)
if output_dir_is_local and c.custom_log_file:
abs_log_file = output_dir.child(c.custom_log_file, type="f").abspath

# return job and log files
return {"job": job_file, "config": c, "log": abs_log_file}
Expand Down Expand Up @@ -206,20 +210,11 @@ def lsf_workflow_run_context(self):
def lsf_output_directory(self):
"""
Hook to define the location of submission output files, such as the json files containing
job data, and optional log files (in case :py:meth:`lsf_log_directory` is not defined).
job data, and optional log files.
This method should return a :py:class:`FileSystemDirectoryTarget`.
"""
return None

def lsf_log_directory(self):
"""
Hook to define the location of log files if any are written. When set, it has precedence
over :py:meth:`lsf_output_directory` for log files.
This method should return a :py:class:`FileSystemDirectoryTarget` or a value that evaluates
to *False* in case no custom log directory is desired.
"""
return None

def lsf_bootstrap_file(self):
return None

Expand Down

0 comments on commit 530e26a

Please sign in to comment.