From bed26afa7926cb4fb0e6c9db87ca549036e8a09a Mon Sep 17 00:00:00 2001 From: stxue1 Date: Tue, 25 Jun 2024 20:32:16 -0700 Subject: [PATCH 1/7] Don't immediately virtualize (may want to remove monkeypatching entirely) --- src/toil/wdl/wdltoil.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 53d10c6ebf..fe49f4abf4 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -887,7 +887,9 @@ def devirtualize_to(filename: str, dest_dir: str, file_source: Union[AbstractFil logger.debug("Virtualized file %s is already a local path", filename) if not os.path.exists(result): - raise RuntimeError(f"Virtualized file {filename} looks like a local file but isn't!") + # Devirtualizing an unvirtualized file means the file is coerced from a string and never used/virtualized + raise FileNotFoundError(f"File {filename} was not found!") + # raise RuntimeError(f"Virtualized file {filename} looks like a local file but isn't!") return result @@ -1450,7 +1452,12 @@ def get_file_paths_in_bindings(environment: WDLBindings) -> List[str]: """ paths = [] - map_over_files_in_bindings(environment, lambda x: paths.append(x)) + + def append_to_paths(path: str) -> str: + # Append element and return the element. This is to avoid a logger warning inside map_over_typed_files_in_value() + paths.append(path) + return path + map_over_files_in_bindings(environment, append_to_paths) return paths def map_over_typed_files_in_bindings(environment: WDLBindings, transform: Callable[[WDL.Type.Base, str], Optional[str]]) -> WDLBindings: @@ -1503,7 +1510,6 @@ def map_over_typed_files_in_value(value: WDL.Value.Base, transform: Callable[[WD actually be used, to allow for scans. So error checking needs to be part of the transform itself. """ - if isinstance(value, WDL.Value.File): # This is a file so we need to process it new_path = transform(value.type, value.value) @@ -1804,7 +1810,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: runtime_accelerators = [accelerator_requirement] # Schedule to get resources. Pass along the bindings from evaluating all the inputs and decls, and the runtime, with files virtualized. - run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, self._task_path, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) + run_job = WDLTaskJob(self._task, bindings, runtime_bindings, self._task_id, self._namespace, self._task_path, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) # Run that as a child self.addChild(run_job) @@ -3276,14 +3282,17 @@ def monkeypatch_coerce(standard_library: ToilWDLStdLibBase) -> Generator[None, N # conversion is to just store the local filepath. Toil needs to virtualize the file into the jobstore so until # there is an internal entrypoint, monkeypatch it. def base_coerce(self: WDL.Value.Base, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: - if isinstance(desired_type, WDL.Type.File): - self.value = standard_library._virtualize_filename(self.value) - return self + # if isinstance(desired_type, WDL.Type.File): + # self.value = standard_library._virtualize_filename(self.value) + # return self return old_base_coerce(self, desired_type) # old_coerce will recurse back into this monkey patched coerce def string_coerce(self: WDL.Value.String, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: # Sometimes string coerce is called instead, so monkeypatch this one as well - if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Type.File): - return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr) + # if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Type.File): + # try: + # return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr) + # except FileNotFoundError: + # return old_str_coerce(self, desired_type) return old_str_coerce(self, desired_type) old_base_coerce = WDL.Value.Base.coerce From b536cba115b289b8051eef58a6ee54d53df1569c Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 27 Jun 2024 17:47:07 -0700 Subject: [PATCH 2/7] Try sentinel implementation --- src/toil/wdl/wdltoil.py | 51 ++++++++++++++++++++++++++++------------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index fe49f4abf4..f3a3abefe5 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -46,6 +46,7 @@ Union, cast) from urllib.parse import quote, unquote, urljoin, urlsplit +from functools import partial import WDL.Error import WDL.runtime.config @@ -621,6 +622,8 @@ def evaluate_output_decls(output_decls: List[WDL.Tree.Decl], all_bindings: WDL.E output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() for output_decl in output_decls: output_value = evaluate_decl(output_decl, all_bindings, standard_library) + drop_if_missing_with_workdir = partial(drop_if_missing, work_dir=standard_library._execution_dir) + output_value = map_over_typed_files_in_value(output_value, drop_if_missing_with_workdir) all_bindings = all_bindings.bind(output_decl.name, output_value) output_bindings = output_bindings.bind(output_decl.name, output_value) return output_bindings @@ -717,7 +720,7 @@ class ToilWDLStdLibBase(WDL.StdLib.Base): """ Standard library implementation for WDL as run on Toil. """ - def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = None): + def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = None, enforce_nonexistence: bool = True): """ Set up the standard library. @@ -748,6 +751,8 @@ def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = self._execution_dir = execution_dir + self._enforce_nonexistence = enforce_nonexistence + def get_local_paths(self) -> List[str]: """ Get all the local paths of files devirtualized (or virtualized) through the stdlib. @@ -781,12 +786,13 @@ def _devirtualize_filename(self, filename: str) -> str: """ result = self.devirtualize_to(filename, self._file_store.localTempDir, self._file_store, self._execution_dir, - self._devirtualized_to_virtualized, self._virtualized_to_devirtualized) + self._devirtualized_to_virtualized, self._virtualized_to_devirtualized, self._enforce_nonexistence) return result @staticmethod def devirtualize_to(filename: str, dest_dir: str, file_source: Union[AbstractFileStore, Toil], execution_dir: Optional[str], - devirtualized_to_virtualized: Optional[Dict[str, str]] = None, virtualized_to_devirtualized: Optional[Dict[str, str]] = None) -> str: + devirtualized_to_virtualized: Optional[Dict[str, str]] = None, virtualized_to_devirtualized: Optional[Dict[str, str]] = None, + enforce_nonexistence: bool = True) -> str: """ Download or export a WDL virtualized filename/URL to the given directory. @@ -828,7 +834,11 @@ def devirtualize_to(filename: str, dest_dir: str, file_source: Union[AbstractFil # Put the UUID in the destination path in order for tasks to # see where to put files depending on their parents. dir_path = os.path.join(dest_dir, parent_id) - + elif filename.startswith(TOIL_NONEXISTENT_URI_SCHEME): + if enforce_nonexistence: + raise FileNotFoundError(f"File {filename[len(TOIL_NONEXISTENT_URI_SCHEME):]} was not available when virtualized!") + else: + return filename else: # Parse the URL and extract the basename file_basename = os.path.basename(urlsplit(filename).path) @@ -1453,10 +1463,12 @@ def get_file_paths_in_bindings(environment: WDLBindings) -> List[str]: paths = [] - def append_to_paths(path: str) -> str: + def append_to_paths(path: str) -> Optional[str]: # Append element and return the element. This is to avoid a logger warning inside map_over_typed_files_in_value() - paths.append(path) - return path + # But don't process nonexistent files + if not path.startswith(TOIL_NONEXISTENT_URI_SCHEME): + paths.append(path) + return path map_over_files_in_bindings(environment, append_to_paths) return paths @@ -2045,7 +2057,8 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Set up the WDL standard library # UUID to use for virtualizing files - standard_library = ToilWDLStdLibBase(file_store) + # Since we process nonexistent files in WDLTaskWrapperJob as those must be run locally, don't try to devirtualize them + standard_library = ToilWDLStdLibBase(file_store, enforce_nonexistence=False) # Get the bindings from after the input section bindings = unwrap(self._task_internal_bindings) @@ -2222,7 +2235,11 @@ def patched_run_invocation(*args: Any, **kwargs: Any) -> List[str]: # Replace everything with in-container paths for the command. # TODO: MiniWDL deals with directory paths specially here. - contained_bindings = map_over_files_in_bindings(bindings, lambda path: task_container.input_path_map[path]) + def get_path_in_container(path: str) -> Optional[str]: + if path.startswith(TOIL_NONEXISTENT_URI_SCHEME): + return None + return task_container.input_path_map[path] + contained_bindings = map_over_files_in_bindings(bindings, get_path_in_container) # Make a new standard library for evaluating the command specifically, which only deals with in-container paths and out-of-container paths. command_library = ToilWDLStdLibTaskCommand(file_store, task_container) @@ -2311,7 +2328,8 @@ def patched_run_invocation(*args: Any, **kwargs: Any) -> List[str]: outputs_library = ToilWDLStdLibTaskOutputs(file_store, host_stdout_txt, host_stderr_txt, task_container.input_path_map, current_directory_override=workdir_in_container) # Make sure files downloaded as inputs get re-used if we re-upload them. outputs_library.share_files(standard_library) - output_bindings = evaluate_output_decls(self._task.outputs, bindings, outputs_library) + with monkeypatch_coerce(outputs_library): + output_bindings = evaluate_output_decls(self._task.outputs, bindings, outputs_library) # Now we know if the standard output and error were sent somewhere by # the workflow. If not, we should report them to the leader. @@ -3219,7 +3237,8 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: # Output section is declared and is nonempty, so evaluate normally # Combine the bindings from the previous job - output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library) + with monkeypatch_coerce(standard_library): + output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library) finally: # We don't actually know when all our files are downloaded since # anything we evaluate might devirtualize inside any expression. @@ -3288,11 +3307,11 @@ def base_coerce(self: WDL.Value.Base, desired_type: Optional[WDL.Type.Base] = No return old_base_coerce(self, desired_type) # old_coerce will recurse back into this monkey patched coerce def string_coerce(self: WDL.Value.String, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: # Sometimes string coerce is called instead, so monkeypatch this one as well - # if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Type.File): - # try: - # return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr) - # except FileNotFoundError: - # return old_str_coerce(self, desired_type) + if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Type.File): + if os.path.isfile(os.path.join(standard_library._execution_dir or ".", self.value)): + return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr) + else: + return WDL.Value.File(TOIL_NONEXISTENT_URI_SCHEME + self.value, self.expr) return old_str_coerce(self, desired_type) old_base_coerce = WDL.Value.Base.coerce From c6195906a8f1ddfa6a1ba51007b1b4be5a90300d Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 27 Jun 2024 16:18:47 -0700 Subject: [PATCH 3/7] add abstracted function and remove duplicated execution_dir in standard library for task outputs --- src/toil/wdl/wdltoil.py | 122 +++++++++++++++++++++------------------- 1 file changed, 63 insertions(+), 59 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index f3a3abefe5..3e012d3fd7 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -574,6 +574,7 @@ def recursive_dependencies(root: WDL.Tree.WorkflowNode) -> Set[str]: # in the same destination directory, when dealing with basename conflicts. TOIL_URI_SCHEME = 'toilfile:' +TOIL_NONEXISTENT_URI_SCHEME = 'nonexistent:' def pack_toil_uri(file_id: FileID, dir_id: uuid.UUID, file_basename: str) -> str: """ @@ -682,7 +683,7 @@ def _call_eager(self, expr: "WDL.Expr.Apply", arguments: List[WDL.Value.Base]) - # Return the result as a WDL float value return WDL.Value.Float(total_size) -def is_url(filename: str, schemes: List[str] = ['http:', 'https:', 's3:', 'gs:', TOIL_URI_SCHEME]) -> bool: +def is_url(filename: str, schemes: List[str] = ['http:', 'https:', 's3:', 'gs:', TOIL_URI_SCHEME, TOIL_NONEXISTENT_URI_SCHEME]) -> bool: """ Decide if a filename is a known kind of URL """ @@ -898,8 +899,7 @@ def devirtualize_to(filename: str, dest_dir: str, file_source: Union[AbstractFil if not os.path.exists(result): # Devirtualizing an unvirtualized file means the file is coerced from a string and never used/virtualized - raise FileNotFoundError(f"File {filename} was not found!") - # raise RuntimeError(f"Virtualized file {filename} looks like a local file but isn't!") + raise RuntimeError(f"Virtualized file {filename} looks like a local file but isn't!") return result @@ -953,13 +953,13 @@ class ToilWDLStdLibTaskCommand(ToilWDLStdLibBase): are host-side paths. """ - def __init__(self, file_store: AbstractFileStore, container: TaskContainer): + def __init__(self, file_store: AbstractFileStore, container: TaskContainer, execution_dir: Optional[str] = None): """ Set up the standard library for the task command section. """ # TODO: Don't we want to make sure we don't actually use the file store? - super().__init__(file_store) + super().__init__(file_store, execution_dir) self.container = container @memoize @@ -1020,7 +1020,7 @@ def __init__(self, file_store: AbstractFileStore, stdout_path: str, stderr_path: # Just set up as ToilWDLStdLibBase, but it will call into # WDL.StdLib.TaskOutputs next. - super().__init__(file_store) + super().__init__(file_store, current_directory_override) # Remember task output files self._stdout_path = stdout_path @@ -1033,9 +1033,6 @@ def __init__(self, file_store: AbstractFileStore, stdout_path: str, stderr_path: # Reverse and store the file mount dict self._mountpoint_to_file = {v: k for k, v in file_to_mountpoint.items()} - # Remember current directory - self._current_directory_override = current_directory_override - # We need to attach implementations for WDL's stdout(), stderr(), and glob(). # TODO: Can we use the fancy decorators instead of this wizardry? setattr( @@ -1098,7 +1095,7 @@ def _glob(self, pattern: WDL.Value.String) -> WDL.Value.Array: # So we send a little Bash script that can delimit the files with something, and assume the Bash really is a Bash. # This needs to run in the work directory that the container used, if any. - work_dir = '.' if not self._current_directory_override else self._current_directory_override + work_dir = '.' if not self._execution_dir else self._execution_dir # TODO: get this to run in the right container if there is one # Bash (now?) has a compgen builtin for shell completion that can evaluate a glob where the glob is in a quoted string that might have spaces in it. See . @@ -1134,7 +1131,7 @@ def _devirtualize_filename(self, filename: str) -> str: if not is_url(filename) and not filename.startswith('/'): # We are getting a bare relative path from the WDL side. # Find a real path to it relative to the current directory override. - work_dir = '.' if not self._current_directory_override else self._current_directory_override + work_dir = '.' if not self._exeuction_dir else self._exeuction_dir filename = os.path.join(work_dir, filename) return super()._devirtualize_filename(filename) @@ -1152,7 +1149,7 @@ def _virtualize_filename(self, filename: str) -> str: if not is_url(filename) and not filename.startswith('/'): # We are getting a bare relative path on the supposedly devirtualized side. # Find a real path to it relative to the current directory override. - work_dir = '.' if not self._current_directory_override else self._current_directory_override + work_dir = '.' if not self._execution_dir else self._execution_dir filename = os.path.join(work_dir, filename) if filename in self._devirtualized_to_virtualized: @@ -1416,6 +1413,31 @@ def import_file_from_uri(uri: str) -> str: return map_over_files_in_bindings(environment, import_file_from_uri) + +def drop_if_missing(value_type: WDL.Type.Base, filename: str, work_dir: str) -> Optional[str]: + """ + Return None if a file doesn't exist, or its path if it does. + """ + logger.debug("Consider file %s", filename) + + if is_url(filename): + if (not filename.startswith(TOIL_NONEXISTENT_URI_SCHEME) + and (filename.startswith(TOIL_URI_SCHEME) or AbstractJobStore.url_exists(filename))): + # We assume anything in the filestore actually exists. + return filename + else: + logger.warning('File %s with type %s does not actually exist at its URI', filename, value_type) + return None + else: + # Get the absolute path, not resolving symlinks + effective_path = os.path.abspath(os.path.join(work_dir, filename)) + if os.path.islink(effective_path) or os.path.exists(effective_path): + # This is a broken symlink or a working symlink or a file. + return filename + else: + logger.warning('File %s with type %s does not actually exist at %s', filename, value_type, effective_path) + return None + def drop_missing_files(environment: WDLBindings, current_directory_override: Optional[str] = None) -> WDLBindings: """ Make sure all the File values embedded in the given bindings point to files @@ -1427,30 +1449,8 @@ def drop_missing_files(environment: WDLBindings, current_directory_override: Opt # Determine where to evaluate relative paths relative to work_dir = '.' if not current_directory_override else current_directory_override - def drop_if_missing(value_type: WDL.Type.Base, filename: str) -> Optional[str]: - """ - Return None if a file doesn't exist, or its path if it does. - """ - logger.debug("Consider file %s", filename) - - if is_url(filename): - if filename.startswith(TOIL_URI_SCHEME) or AbstractJobStore.url_exists(filename): - # We assume anything in the filestore actually exists. - return filename - else: - logger.warning('File %s with type %s does not actually exist at its URI', filename, value_type) - return None - else: - # Get the absolute path, not resolving symlinks - effective_path = os.path.abspath(os.path.join(work_dir, filename)) - if os.path.islink(effective_path) or os.path.exists(effective_path): - # This is a broken symlink or a working symlink or a file. - return filename - else: - logger.warning('File %s with type %s does not actually exist at %s', filename, value_type, effective_path) - return None - - return map_over_typed_files_in_bindings(environment, drop_if_missing) + drop_if_missing_with_workdir = partial(drop_if_missing, work_dir=work_dir) + return map_over_typed_files_in_bindings(environment, drop_if_missing_with_workdir) def get_file_paths_in_bindings(environment: WDLBindings) -> List[str]: """ @@ -1731,20 +1731,20 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Set up the WDL standard library # UUID to use for virtualizing files standard_library = ToilWDLStdLibBase(file_store) - - if self._task.inputs: - logger.debug("Evaluating task code") - for input_decl in self._task.inputs: - # Evaluate all the inputs that aren't pre-set - bindings = bindings.bind(input_decl.name, evaluate_defaultable_decl(input_decl, bindings, standard_library)) - for postinput_decl in self._task.postinputs: - # Evaluate all the postinput decls. - # We need these in order to evaluate the runtime. - # TODO: What if they wanted resources from the runtime? - bindings = bindings.bind(postinput_decl.name, evaluate_defaultable_decl(postinput_decl, bindings, standard_library)) - - # Evaluate the runtime section - runtime_bindings = evaluate_call_inputs(self._task, self._task.runtime, bindings, standard_library) + with monkeypatch_coerce(standard_library): + if self._task.inputs: + logger.debug("Evaluating task code") + for input_decl in self._task.inputs: + # Evaluate all the inputs that aren't pre-set + bindings = bindings.bind(input_decl.name, evaluate_defaultable_decl(input_decl, bindings, standard_library)) + for postinput_decl in self._task.postinputs: + # Evaluate all the postinput decls. + # We need these in order to evaluate the runtime. + # TODO: What if they wanted resources from the runtime? + bindings = bindings.bind(postinput_decl.name, evaluate_defaultable_decl(postinput_decl, bindings, standard_library)) + + # Evaluate the runtime section + runtime_bindings = evaluate_call_inputs(self._task, self._task.runtime, bindings, standard_library) # Fill these in with not-None if the workflow asks for each resource. runtime_memory: Optional[int] = None @@ -2242,7 +2242,7 @@ def get_path_in_container(path: str) -> Optional[str]: contained_bindings = map_over_files_in_bindings(bindings, get_path_in_container) # Make a new standard library for evaluating the command specifically, which only deals with in-container paths and out-of-container paths. - command_library = ToilWDLStdLibTaskCommand(file_store, task_container) + command_library = ToilWDLStdLibTaskCommand(file_store, task_container, workdir_in_container) # Work out the command string, and unwrap it command_string: str = evaluate_named_expression(self._task, "command", WDL.Type.String(), remove_common_leading_whitespace(self._task.command), contained_bindings, command_library).coerce(WDL.Type.String()).value @@ -2355,10 +2355,11 @@ def get_path_in_container(path: str) -> Optional[str]: output_bindings = drop_missing_files(output_bindings, current_directory_override=workdir_in_container) for decl in self._task.outputs: if not decl.type.optional and output_bindings[decl.name].value is None: - # We have an unacceptable null value. This can happen if a file - # is missing but not optional. Don't let it out to annoy the - # next task. - raise WDL.Error.EvalError(decl, f"non-optional value {decl.name} = {decl.expr} is missing") + # todo: make recursive + # We have an unacceptable null value. This can happen if a file + # is missing but not optional. Don't let it out to annoy the + # next task. + raise WDL.Error.EvalError(decl, f"non-optional value {decl.name} = {decl.expr} is missing") # Upload any files in the outputs if not uploaded already. Accounts for how relative paths may still need to be container-relative. output_bindings = virtualize_files(output_bindings, outputs_library) @@ -3250,6 +3251,9 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: # were mounted into a container at their actual paths. self.files_downloaded_hook([(p, p) for p in standard_library.get_local_paths()]) + # Null nonexistent optional values and error on the rest + output_bindings = drop_missing_files(output_bindings, self._wdl_options.get("execution_dir")) + return self.postprocess(output_bindings) class WDLRootJob(WDLSectionJob): @@ -3290,7 +3294,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: @contextmanager -def monkeypatch_coerce(standard_library: ToilWDLStdLibBase) -> Generator[None, None, None]: +def monkeypatch_coerce(standard_library: ToilWDLStdLibBase, null_nonexistent_files: bool = False) -> Generator[None, None, None]: """ Monkeypatch miniwdl's WDL.Value.Base.coerce() function to virtualize files when they are represented as Strings. Calls _virtualize_filename from a given standard library object. @@ -3301,9 +3305,9 @@ def monkeypatch_coerce(standard_library: ToilWDLStdLibBase) -> Generator[None, N # conversion is to just store the local filepath. Toil needs to virtualize the file into the jobstore so until # there is an internal entrypoint, monkeypatch it. def base_coerce(self: WDL.Value.Base, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: - # if isinstance(desired_type, WDL.Type.File): - # self.value = standard_library._virtualize_filename(self.value) - # return self + if isinstance(desired_type, WDL.Type.File): + self.value = standard_library._virtualize_filename(self.value) + return self return old_base_coerce(self, desired_type) # old_coerce will recurse back into this monkey patched coerce def string_coerce(self: WDL.Value.String, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: # Sometimes string coerce is called instead, so monkeypatch this one as well From fa62076bacb8b2297c6a614e3a6c5542e313f6db Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 27 Jun 2024 18:18:38 -0700 Subject: [PATCH 4/7] fix typo and satisfy mypy with getattr --- src/toil/wdl/wdltoil.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 3e012d3fd7..6ea125fdad 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -623,7 +623,7 @@ def evaluate_output_decls(output_decls: List[WDL.Tree.Decl], all_bindings: WDL.E output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() for output_decl in output_decls: output_value = evaluate_decl(output_decl, all_bindings, standard_library) - drop_if_missing_with_workdir = partial(drop_if_missing, work_dir=standard_library._execution_dir) + drop_if_missing_with_workdir = partial(drop_if_missing, work_dir=getattr(standard_library, "_execution_dir")) output_value = map_over_typed_files_in_value(output_value, drop_if_missing_with_workdir) all_bindings = all_bindings.bind(output_decl.name, output_value) output_bindings = output_bindings.bind(output_decl.name, output_value) @@ -1131,7 +1131,7 @@ def _devirtualize_filename(self, filename: str) -> str: if not is_url(filename) and not filename.startswith('/'): # We are getting a bare relative path from the WDL side. # Find a real path to it relative to the current directory override. - work_dir = '.' if not self._exeuction_dir else self._exeuction_dir + work_dir = '.' if not self._execution_dir else self._execution_dir filename = os.path.join(work_dir, filename) return super()._devirtualize_filename(filename) From 18bfd8bd63f3f0594318b0da4c434690ad295c88 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 27 Jun 2024 19:04:13 -0700 Subject: [PATCH 5/7] Fix instance type checks as we were checking WDL types to WDL values --- src/toil/wdl/wdltoil.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 6ea125fdad..da847de4d5 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -1257,7 +1257,7 @@ def evaluate_defaultable_decl(node: WDL.Tree.Decl, environment: WDLBindings, std try: if node.name in environment and not isinstance(environment[node.name], WDL.Value.Null): logger.debug('Name %s is already defined with a non-null value, not using default', node.name) - if not isinstance(environment[node.name], type(node.type)): + if not isinstance(environment[node.name].type, type(node.type)): return environment[node.name].coerce(node.type) else: return environment[node.name] @@ -3311,7 +3311,7 @@ def base_coerce(self: WDL.Value.Base, desired_type: Optional[WDL.Type.Base] = No return old_base_coerce(self, desired_type) # old_coerce will recurse back into this monkey patched coerce def string_coerce(self: WDL.Value.String, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: # Sometimes string coerce is called instead, so monkeypatch this one as well - if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Type.File): + if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Value.File): if os.path.isfile(os.path.join(standard_library._execution_dir or ".", self.value)): return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr) else: From 4b621bf48df155ec2142b407937923bf3cb7a5ea Mon Sep 17 00:00:00 2001 From: stxue1 Date: Wed, 3 Jul 2024 12:44:54 -0700 Subject: [PATCH 6/7] Add documentation, fix variable names, remove unused --- src/toil/wdl/wdltoil.py | 60 +++++++++++++++++++++++++++++------------ 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index da847de4d5..80ce307938 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -12,6 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import asyncio import errno import io @@ -51,8 +53,8 @@ import WDL.Error import WDL.runtime.config from configargparse import ArgParser -from WDL._util import byte_size_units, strip_leading_whitespace -from WDL.CLI import print_error, runner_exe +from WDL._util import byte_size_units +from WDL.CLI import print_error from WDL.runtime.backend.docker_swarm import SwarmContainer from WDL.runtime.backend.singularity import SingularityContainer from WDL.runtime.task_container import TaskContainer @@ -71,8 +73,8 @@ unwrap_all) from toil.jobStores.abstractJobStore import (AbstractJobStore, UnimplementedURLException, InvalidImportExportUrlException, LocatorException) -from toil.lib.accelerators import count_nvidia_gpus, get_individual_local_accelerators -from toil.lib.conversions import convert_units, human2bytes, strtobool +from toil.lib.accelerators import get_individual_local_accelerators +from toil.lib.conversions import convert_units, human2bytes from toil.lib.io import mkdtemp from toil.lib.memoize import memoize from toil.lib.misc import get_user_name @@ -574,6 +576,19 @@ def recursive_dependencies(root: WDL.Tree.WorkflowNode) -> Set[str]: # in the same destination directory, when dealing with basename conflicts. TOIL_URI_SCHEME = 'toilfile:' + +# We always virtualize any file into a URI. However, when coercing from string to file, +# it is not necessary that the file needs to exist. See https://github.com/openwdl/wdl/issues/667 +# So use a sentinel to indicate nonexistent files instead of immediately raising an error +# This is done instead of not virtualizing, using the string as a filepath, and coercing to None/null at use. +# This is because the File must represent some location on its corresponding machine. +# If a task runs on a node where a file does not exist, and passes that file as an input into another task, +# we need to remember that the file does not exist from the original node +# ex: +# Task T1 runs on node N1 with file F at path P, but P does not exist on node N1 +# Task T1 passes file F to task T2 to run on node N2 +# Task T2 runs on node N2, P exists on node N2, but file F cannot exist +# We also want to store the filename even if it does not exist, so use a sentinel URI scheme (can be useful in error messages) TOIL_NONEXISTENT_URI_SCHEME = 'nonexistent:' def pack_toil_uri(file_id: FileID, dir_id: uuid.UUID, file_basename: str) -> str: @@ -608,7 +623,7 @@ def unpack_toil_uri(toil_uri: str) -> Tuple[FileID, str, str]: return file_id, parent_id, file_basename -def evaluate_output_decls(output_decls: List[WDL.Tree.Decl], all_bindings: WDL.Env.Bindings[WDL.Value.Base], standard_library: WDL.StdLib.Base) -> WDL.Env.Bindings[WDL.Value.Base]: +def evaluate_output_decls(output_decls: List[WDL.Tree.Decl], all_bindings: WDL.Env.Bindings[WDL.Value.Base], standard_library: ToilWDLStdLibBase) -> WDL.Env.Bindings[WDL.Value.Base]: """ Evaluate output decls with a given bindings environment and standard library. Creates a new bindings object that only contains the bindings from the given decls. @@ -623,7 +638,7 @@ def evaluate_output_decls(output_decls: List[WDL.Tree.Decl], all_bindings: WDL.E output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() for output_decl in output_decls: output_value = evaluate_decl(output_decl, all_bindings, standard_library) - drop_if_missing_with_workdir = partial(drop_if_missing, work_dir=getattr(standard_library, "_execution_dir")) + drop_if_missing_with_workdir = partial(drop_if_missing, work_dir=standard_library.execution_dir) output_value = map_over_typed_files_in_value(output_value, drop_if_missing_with_workdir) all_bindings = all_bindings.bind(output_decl.name, output_value) output_bindings = output_bindings.bind(output_decl.name, output_value) @@ -721,11 +736,12 @@ class ToilWDLStdLibBase(WDL.StdLib.Base): """ Standard library implementation for WDL as run on Toil. """ - def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = None, enforce_nonexistence: bool = True): + def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = None, enforce_existence: bool = True): """ Set up the standard library. :param execution_dir: Directory to use as the working directory for workflow code. + :param enforce_existence: If true, then if a file is detected as nonexistent, raise an error. Else, let it pass through """ # TODO: Just always be the 1.2 standard library. wdl_version = "1.2" @@ -752,7 +768,11 @@ def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = self._execution_dir = execution_dir - self._enforce_nonexistence = enforce_nonexistence + self._enforce_existence = enforce_existence + + @property + def execution_dir(self): + return self._execution_dir def get_local_paths(self) -> List[str]: """ @@ -787,13 +807,13 @@ def _devirtualize_filename(self, filename: str) -> str: """ result = self.devirtualize_to(filename, self._file_store.localTempDir, self._file_store, self._execution_dir, - self._devirtualized_to_virtualized, self._virtualized_to_devirtualized, self._enforce_nonexistence) + self._devirtualized_to_virtualized, self._virtualized_to_devirtualized, self._enforce_existence) return result @staticmethod def devirtualize_to(filename: str, dest_dir: str, file_source: Union[AbstractFileStore, Toil], execution_dir: Optional[str], devirtualized_to_virtualized: Optional[Dict[str, str]] = None, virtualized_to_devirtualized: Optional[Dict[str, str]] = None, - enforce_nonexistence: bool = True) -> str: + enforce_existence: bool = True) -> str: """ Download or export a WDL virtualized filename/URL to the given directory. @@ -808,6 +828,8 @@ def devirtualize_to(filename: str, dest_dir: str, file_source: Union[AbstractFil The input filename could already be devirtualized. In this case, the filename should not be added to the cache + + :param enforce_existence: Raise an error if the file is nonexistent. Else, let it pass through. """ if not os.path.isdir(dest_dir): @@ -836,7 +858,7 @@ def devirtualize_to(filename: str, dest_dir: str, file_source: Union[AbstractFil # see where to put files depending on their parents. dir_path = os.path.join(dest_dir, parent_id) elif filename.startswith(TOIL_NONEXISTENT_URI_SCHEME): - if enforce_nonexistence: + if enforce_existence: raise FileNotFoundError(f"File {filename[len(TOIL_NONEXISTENT_URI_SCHEME):]} was not available when virtualized!") else: return filename @@ -898,7 +920,7 @@ def devirtualize_to(filename: str, dest_dir: str, file_source: Union[AbstractFil logger.debug("Virtualized file %s is already a local path", filename) if not os.path.exists(result): - # Devirtualizing an unvirtualized file means the file is coerced from a string and never used/virtualized + # Catch if something made it through without going through the proper virtualization/devirtualization steps raise RuntimeError(f"Virtualized file {filename} looks like a local file but isn't!") return result @@ -1417,6 +1439,9 @@ def import_file_from_uri(uri: str) -> str: def drop_if_missing(value_type: WDL.Type.Base, filename: str, work_dir: str) -> Optional[str]: """ Return None if a file doesn't exist, or its path if it does. + + filename represents a URI or file name belonging to a WDL value of type value_type. work_dir represents + the current working directory of the job and is where all relative paths will be interpreted from """ logger.debug("Consider file %s", filename) @@ -1822,7 +1847,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: runtime_accelerators = [accelerator_requirement] # Schedule to get resources. Pass along the bindings from evaluating all the inputs and decls, and the runtime, with files virtualized. - run_job = WDLTaskJob(self._task, bindings, runtime_bindings, self._task_id, self._namespace, self._task_path, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) + run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, self._task_path, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) # Run that as a child self.addChild(run_job) @@ -2057,8 +2082,8 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Set up the WDL standard library # UUID to use for virtualizing files - # Since we process nonexistent files in WDLTaskWrapperJob as those must be run locally, don't try to devirtualize them - standard_library = ToilWDLStdLibBase(file_store, enforce_nonexistence=False) + # We process nonexistent files in WDLTaskWrapperJob as those must be run locally, so don't try to devirtualize them + standard_library = ToilWDLStdLibBase(file_store, enforce_existence=False) # Get the bindings from after the input section bindings = unwrap(self._task_internal_bindings) @@ -3294,7 +3319,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: @contextmanager -def monkeypatch_coerce(standard_library: ToilWDLStdLibBase, null_nonexistent_files: bool = False) -> Generator[None, None, None]: +def monkeypatch_coerce(standard_library: ToilWDLStdLibBase) -> Generator[None, None, None]: """ Monkeypatch miniwdl's WDL.Value.Base.coerce() function to virtualize files when they are represented as Strings. Calls _virtualize_filename from a given standard library object. @@ -3309,10 +3334,11 @@ def base_coerce(self: WDL.Value.Base, desired_type: Optional[WDL.Type.Base] = No self.value = standard_library._virtualize_filename(self.value) return self return old_base_coerce(self, desired_type) # old_coerce will recurse back into this monkey patched coerce + def string_coerce(self: WDL.Value.String, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: # Sometimes string coerce is called instead, so monkeypatch this one as well if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Value.File): - if os.path.isfile(os.path.join(standard_library._execution_dir or ".", self.value)): + if os.path.isfile(os.path.join(standard_library.execution_dir or ".", self.value)): return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr) else: return WDL.Value.File(TOIL_NONEXISTENT_URI_SCHEME + self.value, self.expr) From 614f8515468726a2a96c6bda79f634b14108ae9c Mon Sep 17 00:00:00 2001 From: stxue1 Date: Wed, 3 Jul 2024 12:46:07 -0700 Subject: [PATCH 7/7] Missing type annotation --- src/toil/wdl/wdltoil.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 80ce307938..ffba7f76af 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -771,7 +771,7 @@ def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = self._enforce_existence = enforce_existence @property - def execution_dir(self): + def execution_dir(self) -> Optional[str]: return self._execution_dir def get_local_paths(self) -> List[str]: