diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 24405b1f75..d08e70fa47 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -12,6 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import asyncio import errno import io @@ -46,6 +48,7 @@ Union, cast) from urllib.parse import quote, unquote, urljoin, urlsplit +from functools import partial import WDL.Error import WDL.runtime.config @@ -71,8 +74,8 @@ unwrap_all) from toil.jobStores.abstractJobStore import (AbstractJobStore, UnimplementedURLException, InvalidImportExportUrlException, LocatorException) -from toil.lib.accelerators import count_nvidia_gpus, get_individual_local_accelerators -from toil.lib.conversions import convert_units, human2bytes, strtobool +from toil.lib.accelerators import get_individual_local_accelerators +from toil.lib.conversions import convert_units, human2bytes from toil.lib.io import mkdtemp from toil.lib.memoize import memoize from toil.lib.misc import get_user_name @@ -575,6 +578,20 @@ def recursive_dependencies(root: WDL.Tree.WorkflowNode) -> Set[str]: TOIL_URI_SCHEME = 'toilfile:' +# We always virtualize any file into a URI. However, when coercing from string to file, +# it is not necessary that the file needs to exist. See https://github.com/openwdl/wdl/issues/667 +# So use a sentinel to indicate nonexistent files instead of immediately raising an error +# This is done instead of not virtualizing, using the string as a filepath, and coercing to None/null at use. +# This is because the File must represent some location on its corresponding machine. +# If a task runs on a node where a file does not exist, and passes that file as an input into another task, +# we need to remember that the file does not exist from the original node +# ex: +# Task T1 runs on node N1 with file F at path P, but P does not exist on node N1 +# Task T1 passes file F to task T2 to run on node N2 +# Task T2 runs on node N2, P exists on node N2, but file F cannot exist +# We also want to store the filename even if it does not exist, so use a sentinel URI scheme (can be useful in error messages) +TOIL_NONEXISTENT_URI_SCHEME = 'nonexistent:' + def pack_toil_uri(file_id: FileID, dir_id: uuid.UUID, file_basename: str) -> str: """ Encode a Toil file ID and its source path in a URI that starts with the scheme in TOIL_URI_SCHEME. @@ -607,7 +624,7 @@ def unpack_toil_uri(toil_uri: str) -> Tuple[FileID, str, str]: return file_id, parent_id, file_basename -def evaluate_output_decls(output_decls: List[WDL.Tree.Decl], all_bindings: WDL.Env.Bindings[WDL.Value.Base], standard_library: WDL.StdLib.Base) -> WDL.Env.Bindings[WDL.Value.Base]: +def evaluate_output_decls(output_decls: List[WDL.Tree.Decl], all_bindings: WDL.Env.Bindings[WDL.Value.Base], standard_library: ToilWDLStdLibBase) -> WDL.Env.Bindings[WDL.Value.Base]: """ Evaluate output decls with a given bindings environment and standard library. Creates a new bindings object that only contains the bindings from the given decls. @@ -622,6 +639,8 @@ def evaluate_output_decls(output_decls: List[WDL.Tree.Decl], all_bindings: WDL.E output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() for output_decl in output_decls: output_value = evaluate_decl(output_decl, all_bindings, standard_library) + drop_if_missing_with_workdir = partial(drop_if_missing, work_dir=standard_library.execution_dir) + output_value = map_over_typed_files_in_value(output_value, drop_if_missing_with_workdir) all_bindings = all_bindings.bind(output_decl.name, output_value) output_bindings = output_bindings.bind(output_decl.name, output_value) return output_bindings @@ -680,7 +699,7 @@ def _call_eager(self, expr: "WDL.Expr.Apply", arguments: List[WDL.Value.Base]) - # Return the result as a WDL float value return WDL.Value.Float(total_size) -def is_url(filename: str, schemes: List[str] = ['http:', 'https:', 's3:', 'gs:', TOIL_URI_SCHEME]) -> bool: +def is_url(filename: str, schemes: List[str] = ['http:', 'https:', 's3:', 'gs:', TOIL_URI_SCHEME, TOIL_NONEXISTENT_URI_SCHEME]) -> bool: """ Decide if a filename is a known kind of URL """ @@ -718,11 +737,12 @@ class ToilWDLStdLibBase(WDL.StdLib.Base): """ Standard library implementation for WDL as run on Toil. """ - def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = None): + def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = None, enforce_existence: bool = True): """ Set up the standard library. :param execution_dir: Directory to use as the working directory for workflow code. + :param enforce_existence: If true, then if a file is detected as nonexistent, raise an error. Else, let it pass through """ # TODO: Just always be the 1.2 standard library. wdl_version = "1.2" @@ -749,6 +769,12 @@ def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = self._execution_dir = execution_dir + self._enforce_existence = enforce_existence + + @property + def execution_dir(self) -> Optional[str]: + return self._execution_dir + def get_local_paths(self) -> List[str]: """ Get all the local paths of files devirtualized (or virtualized) through the stdlib. @@ -782,12 +808,13 @@ def _devirtualize_filename(self, filename: str) -> str: """ result = self.devirtualize_to(filename, self._file_store.localTempDir, self._file_store, self._execution_dir, - self._devirtualized_to_virtualized, self._virtualized_to_devirtualized) + self._devirtualized_to_virtualized, self._virtualized_to_devirtualized, self._enforce_existence) return result @staticmethod def devirtualize_to(filename: str, dest_dir: str, file_source: Union[AbstractFileStore, Toil], execution_dir: Optional[str], - devirtualized_to_virtualized: Optional[Dict[str, str]] = None, virtualized_to_devirtualized: Optional[Dict[str, str]] = None) -> str: + devirtualized_to_virtualized: Optional[Dict[str, str]] = None, virtualized_to_devirtualized: Optional[Dict[str, str]] = None, + enforce_existence: bool = True) -> str: """ Download or export a WDL virtualized filename/URL to the given directory. @@ -802,6 +829,8 @@ def devirtualize_to(filename: str, dest_dir: str, file_source: Union[AbstractFil The input filename could already be devirtualized. In this case, the filename should not be added to the cache + + :param enforce_existence: Raise an error if the file is nonexistent. Else, let it pass through. """ if not os.path.isdir(dest_dir): @@ -829,7 +858,11 @@ def devirtualize_to(filename: str, dest_dir: str, file_source: Union[AbstractFil # Put the UUID in the destination path in order for tasks to # see where to put files depending on their parents. dir_path = os.path.join(dest_dir, parent_id) - + elif filename.startswith(TOIL_NONEXISTENT_URI_SCHEME): + if enforce_existence: + raise FileNotFoundError(f"File {filename[len(TOIL_NONEXISTENT_URI_SCHEME):]} was not available when virtualized!") + else: + return filename else: # Parse the URL and extract the basename file_basename = os.path.basename(urlsplit(filename).path) @@ -888,6 +921,7 @@ def devirtualize_to(filename: str, dest_dir: str, file_source: Union[AbstractFil logger.debug("Virtualized file %s is already a local path", filename) if not os.path.exists(result): + # Catch if something made it through without going through the proper virtualization/devirtualization steps raise RuntimeError(f"Virtualized file {filename} looks like a local file but isn't!") return result @@ -942,13 +976,13 @@ class ToilWDLStdLibTaskCommand(ToilWDLStdLibBase): are host-side paths. """ - def __init__(self, file_store: AbstractFileStore, container: TaskContainer): + def __init__(self, file_store: AbstractFileStore, container: TaskContainer, execution_dir: Optional[str] = None): """ Set up the standard library for the task command section. """ # TODO: Don't we want to make sure we don't actually use the file store? - super().__init__(file_store) + super().__init__(file_store, execution_dir) self.container = container @memoize @@ -1009,7 +1043,7 @@ def __init__(self, file_store: AbstractFileStore, stdout_path: str, stderr_path: # Just set up as ToilWDLStdLibBase, but it will call into # WDL.StdLib.TaskOutputs next. - super().__init__(file_store) + super().__init__(file_store, current_directory_override) # Remember task output files self._stdout_path = stdout_path @@ -1022,9 +1056,6 @@ def __init__(self, file_store: AbstractFileStore, stdout_path: str, stderr_path: # Reverse and store the file mount dict self._mountpoint_to_file = {v: k for k, v in file_to_mountpoint.items()} - # Remember current directory - self._current_directory_override = current_directory_override - # We need to attach implementations for WDL's stdout(), stderr(), and glob(). # TODO: Can we use the fancy decorators instead of this wizardry? setattr( @@ -1087,7 +1118,7 @@ def _glob(self, pattern: WDL.Value.String) -> WDL.Value.Array: # So we send a little Bash script that can delimit the files with something, and assume the Bash really is a Bash. # This needs to run in the work directory that the container used, if any. - work_dir = '.' if not self._current_directory_override else self._current_directory_override + work_dir = '.' if not self._execution_dir else self._execution_dir # TODO: get this to run in the right container if there is one # Bash (now?) has a compgen builtin for shell completion that can evaluate a glob where the glob is in a quoted string that might have spaces in it. See . @@ -1123,7 +1154,7 @@ def _devirtualize_filename(self, filename: str) -> str: if not is_url(filename) and not filename.startswith('/'): # We are getting a bare relative path from the WDL side. # Find a real path to it relative to the current directory override. - work_dir = '.' if not self._current_directory_override else self._current_directory_override + work_dir = '.' if not self._execution_dir else self._execution_dir filename = os.path.join(work_dir, filename) return super()._devirtualize_filename(filename) @@ -1141,7 +1172,7 @@ def _virtualize_filename(self, filename: str) -> str: if not is_url(filename) and not filename.startswith('/'): # We are getting a bare relative path on the supposedly devirtualized side. # Find a real path to it relative to the current directory override. - work_dir = '.' if not self._current_directory_override else self._current_directory_override + work_dir = '.' if not self._execution_dir else self._execution_dir filename = os.path.join(work_dir, filename) if filename in self._devirtualized_to_virtualized: @@ -1250,7 +1281,7 @@ def evaluate_defaultable_decl(node: WDL.Tree.Decl, environment: WDLBindings, std if ((node.name in environment and not isinstance(environment[node.name], WDL.Value.Null)) or (isinstance(environment.get(node.name), WDL.Value.Null) and node.type.optional)): logger.debug('Name %s is already defined, not using default', node.name) - if not isinstance(environment[node.name], type(node.type)): + if not isinstance(environment[node.name].type, type(node.type)): return environment[node.name].coerce(node.type) else: return environment[node.name] @@ -1406,6 +1437,34 @@ def import_file_from_uri(uri: str) -> str: return map_over_files_in_bindings(environment, import_file_from_uri) + +def drop_if_missing(value_type: WDL.Type.Base, filename: str, work_dir: str) -> Optional[str]: + """ + Return None if a file doesn't exist, or its path if it does. + + filename represents a URI or file name belonging to a WDL value of type value_type. work_dir represents + the current working directory of the job and is where all relative paths will be interpreted from + """ + logger.debug("Consider file %s", filename) + + if is_url(filename): + if (not filename.startswith(TOIL_NONEXISTENT_URI_SCHEME) + and (filename.startswith(TOIL_URI_SCHEME) or AbstractJobStore.url_exists(filename))): + # We assume anything in the filestore actually exists. + return filename + else: + logger.warning('File %s with type %s does not actually exist at its URI', filename, value_type) + return None + else: + # Get the absolute path, not resolving symlinks + effective_path = os.path.abspath(os.path.join(work_dir, filename)) + if os.path.islink(effective_path) or os.path.exists(effective_path): + # This is a broken symlink or a working symlink or a file. + return filename + else: + logger.warning('File %s with type %s does not actually exist at %s', filename, value_type, effective_path) + return None + def drop_missing_files(environment: WDLBindings, current_directory_override: Optional[str] = None) -> WDLBindings: """ Make sure all the File values embedded in the given bindings point to files @@ -1417,30 +1476,8 @@ def drop_missing_files(environment: WDLBindings, current_directory_override: Opt # Determine where to evaluate relative paths relative to work_dir = '.' if not current_directory_override else current_directory_override - def drop_if_missing(value_type: WDL.Type.Base, filename: str) -> Optional[str]: - """ - Return None if a file doesn't exist, or its path if it does. - """ - logger.debug("Consider file %s", filename) - - if is_url(filename): - if filename.startswith(TOIL_URI_SCHEME) or AbstractJobStore.url_exists(filename): - # We assume anything in the filestore actually exists. - return filename - else: - logger.warning('File %s with type %s does not actually exist at its URI', filename, value_type) - return None - else: - # Get the absolute path, not resolving symlinks - effective_path = os.path.abspath(os.path.join(work_dir, filename)) - if os.path.islink(effective_path) or os.path.exists(effective_path): - # This is a broken symlink or a working symlink or a file. - return filename - else: - logger.warning('File %s with type %s does not actually exist at %s', filename, value_type, effective_path) - return None - - return map_over_typed_files_in_bindings(environment, drop_if_missing) + drop_if_missing_with_workdir = partial(drop_if_missing, work_dir=work_dir) + return map_over_typed_files_in_bindings(environment, drop_if_missing_with_workdir) def get_file_paths_in_bindings(environment: WDLBindings) -> List[str]: """ @@ -1452,7 +1489,14 @@ def get_file_paths_in_bindings(environment: WDLBindings) -> List[str]: """ paths = [] - map_over_files_in_bindings(environment, lambda x: paths.append(x)) + + def append_to_paths(path: str) -> Optional[str]: + # Append element and return the element. This is to avoid a logger warning inside map_over_typed_files_in_value() + # But don't process nonexistent files + if not path.startswith(TOIL_NONEXISTENT_URI_SCHEME): + paths.append(path) + return path + map_over_files_in_bindings(environment, append_to_paths) return paths def map_over_typed_files_in_bindings(environment: WDLBindings, transform: Callable[[WDL.Type.Base, str], Optional[str]]) -> WDLBindings: @@ -1505,7 +1549,6 @@ def map_over_typed_files_in_value(value: WDL.Value.Base, transform: Callable[[WD actually be used, to allow for scans. So error checking needs to be part of the transform itself. """ - if isinstance(value, WDL.Value.File): # This is a file so we need to process it new_path = transform(value.type, value.value) @@ -1715,20 +1758,20 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Set up the WDL standard library # UUID to use for virtualizing files standard_library = ToilWDLStdLibBase(file_store) - - if self._task.inputs: - logger.debug("Evaluating task code") - for input_decl in self._task.inputs: - # Evaluate all the inputs that aren't pre-set - bindings = bindings.bind(input_decl.name, evaluate_defaultable_decl(input_decl, bindings, standard_library)) - for postinput_decl in self._task.postinputs: - # Evaluate all the postinput decls. - # We need these in order to evaluate the runtime. - # TODO: What if they wanted resources from the runtime? - bindings = bindings.bind(postinput_decl.name, evaluate_defaultable_decl(postinput_decl, bindings, standard_library)) - - # Evaluate the runtime section - runtime_bindings = evaluate_call_inputs(self._task, self._task.runtime, bindings, standard_library) + with monkeypatch_coerce(standard_library): + if self._task.inputs: + logger.debug("Evaluating task code") + for input_decl in self._task.inputs: + # Evaluate all the inputs that aren't pre-set + bindings = bindings.bind(input_decl.name, evaluate_defaultable_decl(input_decl, bindings, standard_library)) + for postinput_decl in self._task.postinputs: + # Evaluate all the postinput decls. + # We need these in order to evaluate the runtime. + # TODO: What if they wanted resources from the runtime? + bindings = bindings.bind(postinput_decl.name, evaluate_defaultable_decl(postinput_decl, bindings, standard_library)) + + # Evaluate the runtime section + runtime_bindings = evaluate_call_inputs(self._task, self._task.runtime, bindings, standard_library) # Fill these in with not-None if the workflow asks for each resource. runtime_memory: Optional[int] = None @@ -2041,7 +2084,8 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Set up the WDL standard library # UUID to use for virtualizing files - standard_library = ToilWDLStdLibBase(file_store) + # We process nonexistent files in WDLTaskWrapperJob as those must be run locally, so don't try to devirtualize them + standard_library = ToilWDLStdLibBase(file_store, enforce_existence=False) # Get the bindings from after the input section bindings = unwrap(self._task_internal_bindings) @@ -2218,10 +2262,14 @@ def patched_run_invocation(*args: Any, **kwargs: Any) -> List[str]: # Replace everything with in-container paths for the command. # TODO: MiniWDL deals with directory paths specially here. - contained_bindings = map_over_files_in_bindings(bindings, lambda path: task_container.input_path_map[path]) + def get_path_in_container(path: str) -> Optional[str]: + if path.startswith(TOIL_NONEXISTENT_URI_SCHEME): + return None + return task_container.input_path_map[path] + contained_bindings = map_over_files_in_bindings(bindings, get_path_in_container) # Make a new standard library for evaluating the command specifically, which only deals with in-container paths and out-of-container paths. - command_library = ToilWDLStdLibTaskCommand(file_store, task_container) + command_library = ToilWDLStdLibTaskCommand(file_store, task_container, workdir_in_container) # Work out the command string, and unwrap it command_string: str = evaluate_named_expression(self._task, "command", WDL.Type.String(), remove_common_leading_whitespace(self._task.command), contained_bindings, command_library).coerce(WDL.Type.String()).value @@ -2307,7 +2355,8 @@ def patched_run_invocation(*args: Any, **kwargs: Any) -> List[str]: outputs_library = ToilWDLStdLibTaskOutputs(file_store, host_stdout_txt, host_stderr_txt, task_container.input_path_map, current_directory_override=workdir_in_container) # Make sure files downloaded as inputs get re-used if we re-upload them. outputs_library.share_files(standard_library) - output_bindings = evaluate_output_decls(self._task.outputs, bindings, outputs_library) + with monkeypatch_coerce(outputs_library): + output_bindings = evaluate_output_decls(self._task.outputs, bindings, outputs_library) # Now we know if the standard output and error were sent somewhere by # the workflow. If not, we should report them to the leader. @@ -2333,10 +2382,11 @@ def patched_run_invocation(*args: Any, **kwargs: Any) -> List[str]: output_bindings = drop_missing_files(output_bindings, current_directory_override=workdir_in_container) for decl in self._task.outputs: if not decl.type.optional and output_bindings[decl.name].value is None: - # We have an unacceptable null value. This can happen if a file - # is missing but not optional. Don't let it out to annoy the - # next task. - raise WDL.Error.EvalError(decl, f"non-optional value {decl.name} = {decl.expr} is missing") + # todo: make recursive + # We have an unacceptable null value. This can happen if a file + # is missing but not optional. Don't let it out to annoy the + # next task. + raise WDL.Error.EvalError(decl, f"non-optional value {decl.name} = {decl.expr} is missing") # Upload any files in the outputs if not uploaded already. Accounts for how relative paths may still need to be container-relative. output_bindings = virtualize_files(output_bindings, outputs_library) @@ -3215,7 +3265,8 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: # Output section is declared and is nonempty, so evaluate normally # Combine the bindings from the previous job - output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library) + with monkeypatch_coerce(standard_library): + output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library) finally: # We don't actually know when all our files are downloaded since # anything we evaluate might devirtualize inside any expression. @@ -3227,6 +3278,9 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: # were mounted into a container at their actual paths. self.files_downloaded_hook([(p, p) for p in standard_library.get_local_paths()]) + # Null nonexistent optional values and error on the rest + output_bindings = drop_missing_files(output_bindings, self._wdl_options.get("execution_dir")) + return self.postprocess(output_bindings) class WDLRootJob(WDLSectionJob): @@ -3282,10 +3336,14 @@ def base_coerce(self: WDL.Value.Base, desired_type: Optional[WDL.Type.Base] = No self.value = standard_library._virtualize_filename(self.value) return self return old_base_coerce(self, desired_type) # old_coerce will recurse back into this monkey patched coerce + def string_coerce(self: WDL.Value.String, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: # Sometimes string coerce is called instead, so monkeypatch this one as well - if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Type.File): - return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr) + if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Value.File): + if os.path.isfile(os.path.join(standard_library.execution_dir or ".", self.value)): + return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr) + else: + return WDL.Value.File(TOIL_NONEXISTENT_URI_SCHEME + self.value, self.expr) return old_str_coerce(self, desired_type) old_base_coerce = WDL.Value.Base.coerce