diff --git a/src/toil/common.py b/src/toil/common.py index 62a052be9f..056196e088 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -1121,6 +1121,9 @@ def _setupAutoDeployment( logger.debug('Injecting user script %s into batch system.', userScriptResource) self._batchSystem.setUserScript(userScriptResource) + def url_exists(self, src_uri: str) -> bool: + return self._jobStore.url_exists(self.normalize_uri(src_uri)) + # Importing a file with a shared file name returns None, but without one it # returns a file ID. Explain this to MyPy. diff --git a/src/toil/jobStores/googleJobStore.py b/src/toil/jobStores/googleJobStore.py index 381b78fa55..9eaf9ff07e 100644 --- a/src/toil/jobStores/googleJobStore.py +++ b/src/toil/jobStores/googleJobStore.py @@ -383,7 +383,7 @@ def _get_blob_from_url(cls, url, exists=False): if exists: if not blob.exists(): - raise NoSuchFileException + raise NoSuchFileException(fileName) # sync with cloud so info like size is available blob.reload() return blob diff --git a/src/toil/test/wdl/wdltoil_test.py b/src/toil/test/wdl/wdltoil_test.py index 79a521714f..c7ba29626c 100644 --- a/src/toil/test/wdl/wdltoil_test.py +++ b/src/toil/test/wdl/wdltoil_test.py @@ -6,7 +6,7 @@ import subprocess import unittest from uuid import uuid4 -from typing import Optional +from typing import Optional, Union from unittest.mock import patch from typing import Any, Dict, List, Set @@ -49,11 +49,10 @@ def tearDown(self) -> None: WDL_CONFORMANCE_TEST_COMMIT = "2d617b703a33791f75f30a9db43c3740a499cd89" # These tests are known to require things not implemented by # Toil and will not be run in CI. -WDL_CONFORMANCE_TESTS_UNSUPPORTED_BY_TOIL= [ +WDL_CONFORMANCE_TESTS_UNSUPPORTED_BY_TOIL = [ 16, # Basic object test (deprecated and removed in 1.1); MiniWDL and toil-wdl-runner do not support Objects, so this will fail if ran by them 21, # Parser: expression placeholders in strings in conditional expressions in 1.0, Cromwell style; Fails with MiniWDL and toil-wdl-runner 64, # Legacy test for as_map_as_input; It looks like MiniWDL does not have the function as_map() - 72, # Symlink passthrough; see 77, # Test that array cannot coerce to a string. WDL 1.1 does not allow compound types to coerce into a string. This should return a TypeError. ] diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 612c154566..9d04da92a0 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -27,6 +27,7 @@ import stat import subprocess import sys +import tempfile import textwrap import uuid from contextlib import ExitStack, contextmanager @@ -46,7 +47,14 @@ Type, TypeVar, Union, - cast) + cast, + TypedDict, IO) + +if sys.version_info < (3, 11): + from typing_extensions import NotRequired +else: + # NotRequired is recommended for TypedDicts over Optional but was introduced in Python 3.11 + from typing import NotRequired from mypy_extensions import Arg, DefaultArg from urllib.error import HTTPError from urllib.parse import quote, unquote, urljoin, urlsplit @@ -55,12 +63,13 @@ import WDL.Error import WDL.runtime.config from configargparse import ArgParser -from WDL._util import byte_size_units +from WDL._util import byte_size_units, chmod_R_plus from WDL.Tree import ReadSourceResult from WDL.CLI import print_error from WDL.runtime.backend.docker_swarm import SwarmContainer from WDL.runtime.backend.singularity import SingularityContainer from WDL.runtime.task_container import TaskContainer +from WDL.runtime.error import DownloadFailed from toil.batchSystems.abstractBatchSystem import InsufficientSystemResources from toil.common import Toil, addOptions @@ -89,6 +98,15 @@ logger = logging.getLogger(__name__) +# WDL options to pass into the WDL jobs and standard libraries +# task_path: Dotted WDL name of the part of the workflow this library is working for. +# namespace: namespace of the WDL that the current job is in +# execution_dir: Directory to use as the working directory for workflow code. +# container: The type of container to use when executing a WDL task. Carries through the value of the commandline --container option +# all_call_outputs: whether a job should include all calls outputs +WDLContext = TypedDict('WDLContext', {"execution_dir": NotRequired[str], "container": NotRequired[str], + "task_path": str, "namespace": str, "all_call_outputs": bool}) + @contextmanager def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] = logger.critical) -> Generator[None, None, None]: @@ -104,6 +122,7 @@ def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] WDL.Error.ImportError, WDL.Error.ValidationError, WDL.Error.MultipleValidationErrors, + DownloadFailed, FileNotFoundError, InsufficientSystemResources, LocatorException, @@ -388,7 +407,6 @@ def potential_absolute_uris(uri: str, path: List[str], importer: Optional[WDL.Tr for candidate_base in full_path_list: # Try fetching based off each base URI candidate_uri = urljoin(candidate_base, uri) - if candidate_uri in failures: # Already tried this one, maybe we have an absolute uri input. continue @@ -443,7 +461,20 @@ async def toil_read_source(uri: str, path: List[str], importer: Optional[WDL.Tre raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), uri) +def virtualized_equal(value1: WDL.Value.Base, value2: WDL.Value.Base) -> bool: + """ + Check if two WDL values are equal when taking into account file virtualization. + Treats virtualized and non-virtualized Files referring to the same underlying file as equal. + + :param value1: WDL value + :param value2: WDL value + :return: Whether the two values are equal with file virtualization accounted for + """ + def f(file: WDL.Value.File) -> WDL.Value.File: + file.value = getattr(file, "virtualized_value", file.value) + return file + return map_over_typed_files_in_value(value1, f) == map_over_typed_files_in_value(value2, f) # Bindings have a long type name WDLBindings = WDL.Env.Bindings[WDL.Value.Base] @@ -479,7 +510,7 @@ def combine_bindings(all_bindings: Sequence[WDLBindings]) -> WDLBindings: if binding.name in merged: # This is a duplicate existing_value = merged[binding.name] - if existing_value != binding.value: + if not virtualized_equal(existing_value, binding.value): raise RuntimeError('Conflicting bindings for %s with values %s and %s', binding.name, existing_value, binding.value) else: logger.debug('Drop duplicate binding for %s', binding.name) @@ -488,6 +519,7 @@ def combine_bindings(all_bindings: Sequence[WDLBindings]) -> WDLBindings: return merged + # TODO: Develop a Protocol that can match the logging function type more closely def log_bindings(log_function: Callable[..., None], message: str, all_bindings: Sequence[Promised[WDLBindings]]) -> None: """ @@ -571,19 +603,6 @@ def recursive_dependencies(root: WDL.Tree.WorkflowNode) -> Set[str]: TOIL_URI_SCHEME = 'toilfile:' -# We always virtualize any file into a URI. However, when coercing from string to file, -# it is not necessary that the file needs to exist. See https://github.com/openwdl/wdl/issues/667 -# So use a sentinel to indicate nonexistent files instead of immediately raising an error -# This is done instead of not virtualizing, using the string as a filepath, and coercing to None/null at use. -# This is because the File must represent some location on its corresponding machine. -# If a task runs on a node where a file does not exist, and passes that file as an input into another task, -# we need to remember that the file does not exist from the original node -# ex: -# Task T1 runs on node N1 with file F at path P, but P does not exist on node N1 -# Task T1 passes file F to task T2 to run on node N2 -# Task T2 runs on node N2, P exists on node N2, but file F cannot exist -# We also want to store the filename even if it does not exist, so use a sentinel URI scheme (can be useful in error messages) -TOIL_NONEXISTENT_URI_SCHEME = 'nonexistent:' def pack_toil_uri(file_id: FileID, task_path: str, dir_id: uuid.UUID, file_basename: str) -> str: """ @@ -678,26 +697,37 @@ def choose_human_readable_directory(root_dir: str, source_task_path: str, parent return result -def evaluate_output_decls(output_decls: List[WDL.Tree.Decl], all_bindings: WDL.Env.Bindings[WDL.Value.Base], standard_library: ToilWDLStdLibBase) -> WDL.Env.Bindings[WDL.Value.Base]: +def evaluate_decls_to_bindings(decls: List[WDL.Tree.Decl], all_bindings: WDL.Env.Bindings[WDL.Value.Base], standard_library: ToilWDLStdLibBase, + include_previous: bool = False, drop_missing_files: bool = False) -> WDL.Env.Bindings[WDL.Value.Base]: """ - Evaluate output decls with a given bindings environment and standard library. + Evaluate decls with a given bindings environment and standard library. Creates a new bindings object that only contains the bindings from the given decls. - Guarantees that each decl in `output_decls` can access the variables defined by the previous ones. + Guarantees that each decl in `decls` can access the variables defined by the previous ones. :param all_bindings: Environment to use when evaluating decls - :param output_decls: Decls to evaluate + :param decls: Decls to evaluate :param standard_library: Standard library - :return: New bindings object with only the output_decls - """ - # all_bindings contains output + previous bindings so that the output can reference its own declarations - # output_bindings only contains the output bindings themselves so that bindings from sections such as the input aren't included - output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() - for output_decl in output_decls: - output_value = evaluate_decl(output_decl, all_bindings, standard_library) - drop_if_missing_with_workdir = partial(drop_if_missing, work_dir=standard_library.execution_dir) - output_value = map_over_typed_files_in_value(output_value, drop_if_missing_with_workdir) - all_bindings = all_bindings.bind(output_decl.name, output_value) - output_bindings = output_bindings.bind(output_decl.name, output_value) - return output_bindings + :param include_previous: Whether to include the existing environment in the new returned environment. This will be false for outputs where only defined decls should be included + :param drop_missing_files: Whether to coerce nonexistent files to null. The coerced elements will be checked that the transformation is valid. + Currently should only be enabled in output sections, see https://github.com/openwdl/wdl/issues/673#issuecomment-2248828116 + :return: New bindings object + """ + # all_bindings contains current bindings + previous all_bindings + # bindings only contains the decl bindings themselves so that bindings from other sections prior aren't included + bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() + drop_if_missing_with_workdir = partial(drop_if_missing, standard_library=standard_library) + for each_decl in decls: + output_value = evaluate_defaultable_decl(each_decl, all_bindings, standard_library) + if drop_missing_files: + dropped_output_value = map_over_typed_files_in_value(output_value, drop_if_missing_with_workdir) + # Typecheck that the new binding value with dropped files is valid for the declaration's type + # If a dropped file exists where the type is not optional File?, raise FileNotFoundError + # Ideally, map_over_typed_files_in_value should do this check, but that will require retooling the map functions + # to carry through WDL types as well; currently miniwdl's WDL value has a type which we use, but that does not carry the optional flag through + ensure_null_files_are_nullable(dropped_output_value, output_value, each_decl.type) + output_value = dropped_output_value + all_bindings = all_bindings.bind(each_decl.name, output_value) + bindings = bindings.bind(each_decl.name, output_value) + return all_bindings if include_previous else bindings class NonDownloadingSize(WDL.StdLib._Size): """ @@ -716,11 +746,12 @@ def _call_eager(self, expr: "WDL.Expr.Apply", arguments: List[WDL.Value.Base]) - """ # Get all the URIs of files that actually are set. - file_uris: List[str] = [f.value for f in arguments[0].coerce(WDL.Type.Array(WDL.Type.File(optional=True))).value if not isinstance(f, WDL.Value.Null)] + file_objects: List[WDL.Value.File] = [f for f in arguments[0].coerce(WDL.Type.Array(WDL.Type.File(optional=True))).value if not isinstance(f, WDL.Value.Null)] total_size = 0.0 - for uri in file_uris: + for file in file_objects: # Sum up the sizes of all the files, if any. + uri = getattr(file, "virtualized_value", None) or file.value if is_url(uri): if uri.startswith(TOIL_URI_SCHEME): # This is a Toil File ID we encoded; we have the size @@ -753,7 +784,15 @@ def _call_eager(self, expr: "WDL.Expr.Apply", arguments: List[WDL.Value.Base]) - # Return the result as a WDL float value return WDL.Value.Float(total_size) -def is_url(filename: str, schemes: List[str] = ['http:', 'https:', 's3:', 'gs:', TOIL_URI_SCHEME, TOIL_NONEXISTENT_URI_SCHEME]) -> bool: +def is_toil_url(filename: str) -> bool: + return is_url(filename, schemes=[TOIL_URI_SCHEME]) + + +def is_standard_url(filename: str) -> bool: + return is_url(filename, ['http:', 'https:', 's3:', 'gs:']) + + +def is_url(filename: str, schemes: List[str]=['http:', 'https:', 's3:', 'gs:', TOIL_URI_SCHEME]) -> bool: """ Decide if a filename is a known kind of URL """ @@ -762,6 +801,127 @@ def is_url(filename: str, schemes: List[str] = ['http:', 'https:', 's3:', 'gs:', return True return False +def convert_remote_files(environment: WDLBindings, file_source: Toil, task_path: str, search_paths: Optional[List[str]] = None, import_remote_files: bool = True) -> None: + """ + Resolve relative-URI files in the given environment and import all files. + + Will set the value of the File to the relative-URI. + + :param environment: Bindings to evaluate on + :param file_source: Context to search for files with + :param task_path: Dotted WDL name of the user-level code doing the + importing (probably the workflow name). + :param search_paths: If set, try resolving input location relative to the URLs or + directories in this list. + :param import_remote_files: If set, import files from remote locations. Else leave them as URI references. + """ + path_to_id: Dict[str, uuid.UUID] = {} + @memoize + def import_filename(filename: str) -> Tuple[Optional[str], Optional[str]]: + """ + Detect if any potential URI exists. Will convert a file's value to a URI and import it. + + Separated out from convert_file_to_url in order to properly memoize and avoid importing the same file twice + :param filename: Filename to import + :return: Tuple of the uri the file was found at and the virtualized import + """ + # Search through any input search paths passed in and download it if found + tried = [] + for candidate_uri in potential_absolute_uris(filename, search_paths if search_paths is not None else []): + tried.append(candidate_uri) + try: + if not import_remote_files and is_url(candidate_uri): + # Use remote URIs in place. But we need to find the one that exists. + if not file_source.url_exists(candidate_uri): + # Wasn't found there + continue + + # Now we know this exists, so pass it through + return candidate_uri, None + else: + # Actually import + # Try to import the file. Don't raise if we can't find it, just + # return None! + imported = file_source.import_file(candidate_uri, check_existence=False) + if imported is None: + # Wasn't found there + continue + except UnimplementedURLException as e: + # We can't find anything that can even support this URL scheme. + # Report to the user, they are probably missing an extra. + logger.critical('Error: ' + str(e)) + raise + except HTTPError as e: + # Something went wrong looking for it there. + logger.warning("Checked URL %s but got HTTP status %s", candidate_uri, e.code) + # Try the next location. + continue + except Exception: + # Something went wrong besides the file not being found. Maybe + # we have no auth. + logger.error("Something went wrong when testing for existence of %s", candidate_uri) + raise + + if imported is None: + # Wasn't found there + # Mostly to satisfy mypy + continue + + # Work out what the basename for the file was + file_basename = os.path.basename(urlsplit(candidate_uri).path) + + if file_basename == "": + # We can't have files with no basename because we need to + # download them at that basename later. + raise RuntimeError(f"File {candidate_uri} has no basename and so cannot be a WDL File") + + # Was actually found + if is_url(candidate_uri): + # Might be a file URI or other URI. + # We need to make sure file URIs and local paths that point to + # the same place are treated the same. + parsed = urlsplit(candidate_uri) + if parsed.scheme == "file:": + # This is a local file URI. Convert to a path for source directory tracking. + parent_dir = os.path.dirname(unquote(parsed.path)) + else: + # This is some other URL. Get the URL to the parent directory and use that. + parent_dir = urljoin(candidate_uri, ".") + else: + # Must be a local path + parent_dir = os.path.dirname(candidate_uri) + + # Pack a UUID of the parent directory + dir_id = path_to_id.setdefault(parent_dir, uuid.uuid4()) + + toil_uri = pack_toil_uri(imported, task_path, dir_id, file_basename) + + logger.info('Converting input file path %s to %s', filename, candidate_uri) + + return candidate_uri, toil_uri + # Not found, return None + return None, None + + def convert_file_to_uri(file: WDL.Value.File) -> WDL.Value.File: + """ + Calls import_filename to detect if a potential URI exists and imports it. Will modify the File object value to the new URI and tack on the virtualized file. + """ + candidate_uri, toil_uri = import_filename(file.value) + if candidate_uri is None and toil_uri is None: + # If we get here we tried all the candidates + raise RuntimeError(f"Could not find {file.value} at any of: {list(potential_absolute_uris(file.value, search_paths if search_paths is not None else []))}") + elif candidate_uri is not None and toil_uri is None: + # A candidate exists but importing is disabled because import_remote_files is false + file.value = candidate_uri + else: + # Was actually found and imported + file.value = candidate_uri + setattr(file, "virtualized_value", toil_uri) + return file + + map_over_files_in_bindings(environment, convert_file_to_uri) + + # Both the WDL code itself **and** the commands that it runs will deal in # "virtualized" filenames. @@ -791,23 +951,10 @@ class ToilWDLStdLibBase(WDL.StdLib.Base): """ Standard library implementation for WDL as run on Toil. """ - def __init__( - self, - file_store: AbstractFileStore, - task_path: str, - execution_dir: Optional[str] = None, - enforce_existence: bool = True, - share_files_with: Optional["ToilWDLStdLibBase"] = None - ) -> None: + def __init__(self, file_store: AbstractFileStore, wdl_options: WDLContext, share_files_with: Optional["ToilWDLStdLibBase"] = None): """ Set up the standard library. - - :param task_path: Dotted WDL name of the part of the workflow this library is working for. - :param execution_dir: Directory to use as the working directory for workflow code. - :param enforce_existence: If true, then if a file is detected as - nonexistent, raise an error. Else, let it pass through - :param share_files_with: If set to an existing standard library - instance, use the same file upload and download paths as it. + :param wdl_options: Options to pass into the standard library to use. """ # TODO: Just always be the 1.2 standard library. wdl_version = "1.2" @@ -820,15 +967,10 @@ def __init__( # to always download the file. self.size = NonDownloadingSize(self) - # Save the task path to tag uploads - self._task_path = task_path - # Keep the file store around so we can access files. self._file_store = file_store - self._execution_dir = execution_dir or gettempdir() - - self._enforce_existence = enforce_existence + self._wdl_options: WDLContext = wdl_options if share_files_with is None: # We get fresh file download/upload state @@ -850,8 +992,14 @@ def __init__( self._parent_dir_to_ids = share_files_with._parent_dir_to_ids @property - def execution_dir(self) -> str: - return self._execution_dir + def execution_dir(self) -> Optional[str]: + execution_dir: Optional[str] = self._wdl_options.get("execution_dir") + return execution_dir + + @property + def task_path(self) -> str: + task_path: str = self._wdl_options["task_path"] + return task_path def get_local_paths(self) -> List[str]: """ @@ -860,36 +1008,144 @@ def get_local_paths(self) -> List[str]: return list(self._virtualized_to_devirtualized.values()) + + def _read(self, parse: Callable[[str], WDL.Value.Base]) -> Callable[[WDL.Value.File], WDL.Value.Base]: + # To only virtualize on task/function boundaries, rely on the _read function + # as this is called before every WDL function that takes a file input + # We want to virtualize before any function call so we can control the caching + # and to support all Toil supported formats (ex Google buckets) + # Since we also want to preserve the URL/path *and* store the virtualized URI, use setattr + # I can't think of another way to do this. I still need to remember the original URL/path, + # but I need to virtualize as well, so I can't remove one or the other. + def _f(file: WDL.Value.File) -> WDL.Value.Base: + if getattr(file, "virtualized_value", None) is None: + setattr(file, "virtualized_value", self._virtualize_filename(file.value)) + with open(self._devirtualize_filename(getattr(file, "virtualized_value")), "r") as infile: + return parse(infile.read()) + + return _f + + def _write( + self, serialize: Callable[[WDL.Value.Base, IO[bytes]], None] + ) -> Callable[[WDL.Value.Base], WDL.Value.File]: + "generate write_* function implementation based on serialize" + + def _f( + v: WDL.Value.Base, + ) -> WDL.Value.File: + os.makedirs(self._write_dir, exist_ok=True) + with tempfile.NamedTemporaryFile(dir=self._write_dir, delete=False) as outfile: + serialize(v, outfile) + filename = outfile.name + chmod_R_plus(filename, file_bits=0o660) + return WDL.Value.File(filename) + + return _f + + def _devirtualize_file(self, file: WDL.Value.File) -> WDL.Value.File: + # We track whether files do not exist with the nonexistent flag in order to coerce to Null/error on use + if getattr(file, "nonexistent", False): + return file + virtualized_filename = getattr(file, "virtualized_value", None) + if virtualized_filename is not None: + file.value = self._devirtualize_filename(virtualized_filename) + return file + + def _virtualize_file(self, file: WDL.Value.File, enforce_existence: bool = True) -> WDL.Value.File: + # If enforce_existence is true, then if a file is detected as nonexistent, raise an error. Else, let it pass through + if getattr(file, "virtualized_value", None) is not None: + return file + + if enforce_existence is False: + # We only want to error on a nonexistent file in the output section + # Since we need to virtualize on task boundaries, don't enforce existence if on a boundary + if is_standard_url(file.value): + file_uri = Toil.normalize_uri(file.value) + else: + abs_filepath = os.path.join(self.execution_dir, file.value) if self.execution_dir is not None else os.path.abspath(file.value) + file_uri = Toil.normalize_uri(abs_filepath) + + if not AbstractJobStore.url_exists(file_uri): + setattr(file, "nonexistent", True) + return file + virtualized = self._virtualize_filename(file.value) + setattr(file, "virtualized_value", virtualized) + return file + @memoize def _devirtualize_filename(self, filename: str) -> str: """ 'devirtualize' filename passed to a read_* function: return a filename that can be open()ed on the local host. """ - result = self.devirtualize_to( filename, self._file_store.localTempDir, self._file_store, - self._execution_dir, self._devirtualization_state, + self._wdl_options, self._devirtualized_to_virtualized, self._virtualized_to_devirtualized, - self._enforce_existence ) return result @staticmethod - def devirtualize_to( - filename: str, - dest_dir: str, - file_source: Union[AbstractFileStore, Toil], - execution_dir: Optional[str], - state: DirectoryNamingStateDict, - devirtualized_to_virtualized: Optional[Dict[str, str]] = None, - virtualized_to_devirtualized: Optional[Dict[str, str]] = None, - enforce_existence: bool = True - ) -> str: + def _devirtualize_uri(filename: str, dest_dir: str, file_source: Union[AbstractFileStore, Toil], state: DirectoryNamingStateDict) -> str: + """ + Given a filename, either return the devirtualized path or the filename itself if not a virtualized URI. + """ + if filename.startswith(TOIL_URI_SCHEME): + # This is a reference to the Toil filestore. + # Deserialize the FileID + file_id, task_path, parent_id, file_basename = unpack_toil_uri(filename) + + # Decide where it should be put. + dir_path = choose_human_readable_directory(dest_dir, task_path, parent_id, state) + else: + # Parse the URL and extract the basename + file_basename = os.path.basename(urlsplit(filename).path) + # Get the URL to the directory this thing came from. Remember + # URLs are interpreted relative to the directory the thing is + # in, not relative to the thing. + parent_url = urljoin(filename, ".") + # Turn it into a string we can make a directory for + dir_path = os.path.join(dest_dir, quote(parent_url, safe='')) + + if not os.path.exists(dir_path): + # Make sure the chosen directory exists + os.mkdir(dir_path) + # And decide the file goes in it. + dest_path = os.path.join(dir_path, file_basename) + + if filename.startswith(TOIL_URI_SCHEME): + # Get a local path to the file + if isinstance(file_source, AbstractFileStore): + # Read from the file store. + # File is not allowed to be modified by the task. See + # . + # We try to get away with symlinks and hope the task + # container can mount the destination file. + result = file_source.readGlobalFile(file_id, dest_path, mutable=False, symlink=True) + elif isinstance(file_source, Toil): + # Read from the Toil context + file_source.export_file(file_id, dest_path) + result = dest_path + else: + # Download to a local file with the right name and execute bit. + # Open it exclusively + with open(dest_path, 'xb') as dest_file: + # And save to it + size, executable = AbstractJobStore.read_from_url(filename, dest_file) + if executable: + # Set the execute bit in the file's permissions + os.chmod(dest_path, os.stat(dest_path).st_mode | stat.S_IXUSR) + + result = dest_path + return result + + @staticmethod + def devirtualize_to(filename: str, dest_dir: str, file_source: Union[AbstractFileStore, Toil], state: DirectoryNamingStateDict, wdl_options: WDLContext, + devirtualized_to_virtualized: Optional[Dict[str, str]] = None, virtualized_to_devirtualized: Optional[Dict[str, str]] = None) -> str: """ Download or export a WDL virtualized filename/URL to the given directory. @@ -906,9 +1162,8 @@ def devirtualize_to( should not be added to the cache :param state: State dict which must be shared among successive calls into a dest_dir. - :param enforce_existence: Raise an error if the file is nonexistent. Else, let it pass through. + :param wdl_options: WDL options to carry through. """ - if not os.path.isdir(dest_dir): # os.mkdir fails saying the directory *being made* caused a # FileNotFoundError. So check the dest_dir before trying to make @@ -923,58 +1178,7 @@ def devirtualize_to( result = virtualized_to_devirtualized[filename] logger.debug("Found virtualized %s in cache with devirtualized path %s", filename, result) return result - if filename.startswith(TOIL_URI_SCHEME): - # This is a reference to the Toil filestore. - # Deserialize the FileID - file_id, task_path, parent_id, file_basename = unpack_toil_uri(filename) - - # Decide where it should be put. - dir_path = choose_human_readable_directory(dest_dir, task_path, parent_id, state) - elif filename.startswith(TOIL_NONEXISTENT_URI_SCHEME): - if enforce_existence: - raise FileNotFoundError(f"File {filename[len(TOIL_NONEXISTENT_URI_SCHEME):]} was not available when virtualized!") - else: - return filename - else: - # Parse the URL and extract the basename - file_basename = os.path.basename(urlsplit(filename).path) - # Get the URL to the directory this thing came from. Remember - # URLs are interpreted relative to the directory the thing is - # in, not relative to the thing. - parent_url = urljoin(filename, ".") - # Turn it into a string we can make a directory for - dir_path = os.path.join(dest_dir, quote(parent_url, safe='')) - - if not os.path.exists(dir_path): - # Make sure the chosen directory exists - os.mkdir(dir_path) - # And decide the file goes in it. - dest_path = os.path.join(dir_path, file_basename) - - if filename.startswith(TOIL_URI_SCHEME): - # Get a local path to the file - if isinstance(file_source, AbstractFileStore): - # Read from the file store. - # File is not allowed to be modified by the task. See - # . - # We try to get away with symlinks and hope the task - # container can mount the destination file. - result = file_source.readGlobalFile(file_id, dest_path, mutable=False, symlink=True) - elif isinstance(file_source, Toil): - # Read from the Toil context - file_source.export_file(file_id, dest_path) - result = dest_path - else: - # Download to a local file with the right name and execute bit. - # Open it exclusively - with open(dest_path, 'xb') as dest_file: - # And save to it - size, executable = AbstractJobStore.read_from_url(filename, dest_file) - if executable: - # Set the execute bit in the file's permissions - os.chmod(dest_path, os.stat(dest_path).st_mode | stat.S_IXUSR) - - result = dest_path + result = ToilWDLStdLibBase._devirtualize_uri(filename, dest_dir, file_source, state) if devirtualized_to_virtualized is not None: # Store the back mapping devirtualized_to_virtualized[result] = filename @@ -986,6 +1190,7 @@ def devirtualize_to( # This is a local file # To support relative paths, join the execution dir and filename # if filename is already an abs path, join() will do nothing + execution_dir = wdl_options.get("execution_dir") if execution_dir is not None: result = os.path.join(execution_dir, filename) else: @@ -1001,43 +1206,74 @@ def devirtualize_to( @memoize def _virtualize_filename(self, filename: str) -> str: """ - from a local path in write_dir, 'virtualize' into the filename as it should present in a - File value + from a local path in write_dir, 'virtualize' into the filename as it should present in a File value + + :param filename: Can be a local file path, URL (http, https, s3, gs), or toilfile """ - if is_url(filename): + if is_toil_url(filename): # Already virtual logger.debug('Already virtual: %s', filename) return filename - - # Otherwise this is a local file and we want to fake it as a Toil file store file - - # Make it an absolute path - if self._execution_dir is not None: - # To support relative paths from execution directory, join the execution dir and filename - # If filename is already an abs path, join() will not do anything - abs_filename = os.path.join(self._execution_dir, filename) + elif is_standard_url(filename): + # This is a URL (http, s3, etc) that we want to virtualize + # First check the cache + if filename in self._devirtualized_to_virtualized: + # Note: this is a little duplicative with the local file path branch, but the keys are different + result = self._devirtualized_to_virtualized[filename] + logger.debug("Re-using virtualized WDL file %s for %s", result, filename) + return result + try: + imported = self._file_store.import_file(filename) + except FileNotFoundError: + logger.error("File at URL %s does not exist or is inaccessible." % filename) + raise + except HTTPError as e: + # Something went wrong with the connection + logger.error("File %s could not be downloaded due to HTTP error %d", filename, e.code) + raise + if imported is None: + # Satisfy mypy, this should never happen though as we don't pass a shared file name (which is the only way import_file returns None) + raise RuntimeError("Failed to import URL %s into jobstore." % filename) + file_basename = os.path.basename(urlsplit(filename).path) + # Get the URL to the parent directory and use that. + parent_dir = urljoin(filename, ".") + # Pack a UUID of the parent directory + dir_id = self._parent_dir_to_ids.setdefault(parent_dir, uuid.uuid4()) + result = pack_toil_uri(imported, self.task_path, dir_id, file_basename) + logger.debug('Virtualized %s as WDL file %s', filename, result) + # We can't put the Toil URI in the virtualized_to_devirtualized cache because it would point to the URL instead of a + # local file on the machine, so only store the forward mapping + self._devirtualized_to_virtualized[filename] = result + return result else: - abs_filename = os.path.abspath(filename) + # Otherwise this is a local file and we want to fake it as a Toil file store file + # Make it an absolute path + if self.execution_dir is not None: + # To support relative paths from execution directory, join the execution dir and filename + # If filename is already an abs path, join() will not do anything + abs_filename = os.path.join(self.execution_dir, filename) + else: + abs_filename = os.path.abspath(filename) - if abs_filename in self._devirtualized_to_virtualized: - # This is a previously devirtualized thing so we can just use the - # virtual version we remembered instead of reuploading it. - result = self._devirtualized_to_virtualized[abs_filename] - logger.debug("Re-using virtualized WDL file %s for %s", result, filename) - return result + if abs_filename in self._devirtualized_to_virtualized: + # This is a previously devirtualized thing so we can just use the + # virtual version we remembered instead of reuploading it. + result = self._devirtualized_to_virtualized[abs_filename] + logger.debug("Re-using virtualized WDL file %s for %s", result, filename) + return result - file_id = self._file_store.writeGlobalFile(abs_filename) + file_id = self._file_store.writeGlobalFile(abs_filename) - file_dir = os.path.dirname(abs_filename) - parent_id = self._parent_dir_to_ids.setdefault(file_dir, uuid.uuid4()) - result = pack_toil_uri(file_id, self._task_path, parent_id, os.path.basename(abs_filename)) - logger.debug('Virtualized %s as WDL file %s', filename, result) - # Remember the upload in case we share a cache - self._devirtualized_to_virtualized[abs_filename] = result - # And remember the local path in case we want a redownload - self._virtualized_to_devirtualized[result] = abs_filename - return result + file_dir = os.path.dirname(abs_filename) + parent_id = self._parent_dir_to_ids.setdefault(file_dir, uuid.uuid4()) + result = pack_toil_uri(file_id, self.task_path, parent_id, os.path.basename(abs_filename)) + logger.debug('Virtualized %s as WDL file %s', filename, result) + # Remember the upload in case we share a cache + self._devirtualized_to_virtualized[abs_filename] = result + # And remember the local path in case we want a redownload + self._virtualized_to_devirtualized[result] = abs_filename + return result class ToilWDLStdLibTaskCommand(ToilWDLStdLibBase): """ @@ -1048,15 +1284,48 @@ class ToilWDLStdLibTaskCommand(ToilWDLStdLibBase): are host-side paths. """ - def __init__(self, file_store: AbstractFileStore, task_path: str, container: TaskContainer, execution_dir: Optional[str] = None): + def __init__(self, file_store: AbstractFileStore, container: TaskContainer, wdl_options: WDLContext): """ Set up the standard library for the task command section. """ # TODO: Don't we want to make sure we don't actually use the file store? - super().__init__(file_store, task_path, execution_dir=execution_dir) + super().__init__(file_store, wdl_options=wdl_options) self.container = container + # Revert the _read and _write functions to the parent WDL.StdLib.Base implementation + # This is because the task command standard library is used in MiniWDL's internals when executing a task + # which we don't have much control over (miniwdl will create its own file objects that represent files within the container) + # and MiniWDL seems to treat the task standard library and the base standard library different (mainly in how it creates File objects; + # the file values are valid paths in the base standard library but are container paths in the task library) + # In _read, we typically always ensure a file is virtualized before use. Here, we can't virtualize a within-container file because + # MiniWDL created a file representing the in-container path, which does not exist on the host machine + # In _write, we need virtualize to an in-container path from a host machine path because we mount the file through. The ideal spot for this virtualization + # to happen is here before the path injection + def _read(self, parse: Callable[[str], WDL.Value.Base]) -> Callable[[WDL.Value.File], WDL.Value.Base]: + # todo: figure out better way than reoverriding overridden function + def _f(file: WDL.Value.File) -> WDL.Value.Base: + with open(self._devirtualize_filename(file.value), "r") as infile: + return parse(infile.read()) + + return _f + + def _write( + self, serialize: Callable[[WDL.Value.Base, IO[bytes]], None] + ) -> Callable[[WDL.Value.Base], WDL.Value.File]: + def _f( + v: WDL.Value.Base, + ) -> WDL.Value.File: + os.makedirs(self._write_dir, exist_ok=True) + with tempfile.NamedTemporaryFile(dir=self._write_dir, delete=False) as outfile: + serialize(v, outfile) + filename = outfile.name + chmod_R_plus(filename, file_bits=0o660) + vfn = self._virtualize_filename(filename) + return WDL.Value.File(vfn) + + return _f + @memoize def _devirtualize_filename(self, filename: str) -> str: """ @@ -1106,12 +1375,11 @@ class ToilWDLStdLibTaskOutputs(ToilWDLStdLibBase, WDL.StdLib.TaskOutputs): def __init__( self, file_store: AbstractFileStore, - task_path: str, stdout_path: str, stderr_path: str, file_to_mountpoint: Dict[str, str], - current_directory_override: Optional[str] = None, - share_files_with: Optional[ToilWDLStdLibBase] = None + wdl_options: WDLContext, + share_files_with: Optional["ToilWDLStdLibBase"] = None ): """ Set up the standard library for a task output section. Needs to know @@ -1126,12 +1394,7 @@ def __init__( # Just set up as ToilWDLStdLibBase, but it will call into # WDL.StdLib.TaskOutputs next. - super().__init__( - file_store, - task_path, - execution_dir=current_directory_override, - share_files_with=share_files_with - ) + super().__init__(file_store, wdl_options, share_files_with) # Remember task output files self._stdout_path = stdout_path @@ -1206,7 +1469,7 @@ def _glob(self, pattern: WDL.Value.String) -> WDL.Value.Array: # So we send a little Bash script that can delimit the files with something, and assume the Bash really is a Bash. # This needs to run in the work directory that the container used, if any. - work_dir = '.' if not self._execution_dir else self._execution_dir + work_dir = '.' if not self.execution_dir else self.execution_dir # TODO: get this to run in the right container if there is one # We would use compgen -G to resolve the glob but that doesn't output @@ -1256,7 +1519,7 @@ def _devirtualize_filename(self, filename: str) -> str: if not is_url(filename) and not filename.startswith('/'): # We are getting a bare relative path from the WDL side. # Find a real path to it relative to the current directory override. - work_dir = '.' if not self._execution_dir else self._execution_dir + work_dir = '.' if not self.execution_dir else self.execution_dir filename = os.path.join(work_dir, filename) return super()._devirtualize_filename(filename) @@ -1274,7 +1537,7 @@ def _virtualize_filename(self, filename: str) -> str: if not is_url(filename) and not filename.startswith('/'): # We are getting a bare relative path on the supposedly devirtualized side. # Find a real path to it relative to the current directory override. - work_dir = '.' if not self._execution_dir else self._execution_dir + work_dir = '.' if not self.execution_dir else self.execution_dir filename = os.path.join(work_dir, filename) if filename in self._devirtualized_to_virtualized: @@ -1400,21 +1663,21 @@ def evaluate_defaultable_decl(node: WDL.Tree.Decl, environment: WDLBindings, std raise # TODO: make these stdlib methods??? -def devirtualize_files(environment: WDLBindings, stdlib: WDL.StdLib.Base) -> WDLBindings: +def devirtualize_files(environment: WDLBindings, stdlib: ToilWDLStdLibBase) -> WDLBindings: """ Make sure all the File values embedded in the given bindings point to files that are actually available to command line commands. The same virtual file always maps to the same devirtualized filename even with duplicates """ - return map_over_files_in_bindings(environment, stdlib._devirtualize_filename) + return map_over_files_in_bindings(environment, stdlib._devirtualize_file) -def virtualize_files(environment: WDLBindings, stdlib: WDL.StdLib.Base) -> WDLBindings: +def virtualize_files(environment: WDLBindings, stdlib: ToilWDLStdLibBase, enforce_existence: bool = True) -> WDLBindings: """ Make sure all the File values embedded in the given bindings point to files that are usable from other machines. """ - - return map_over_files_in_bindings(environment, stdlib._virtualize_filename) + virtualize_func = partial(stdlib._virtualize_file, enforce_existence=enforce_existence) + return map_over_files_in_bindings(environment, virtualize_func) def add_paths(task_container: TaskContainer, host_paths: Iterable[str]) -> None: """ @@ -1449,120 +1712,27 @@ def add_paths(task_container: TaskContainer, host_paths: Iterable[str]) -> None: task_container.input_path_map[host_path] = container_path task_container.input_path_map_rev[container_path] = host_path -def import_files(environment: WDLBindings, task_path: str, toil: Toil, path: Optional[List[str]] = None, skip_remote: bool = False) -> WDLBindings: - """ - Make sure all File values embedded in the given bindings are imported, - using the given Toil object. - - :param task_path: Dotted WDL name of the user-level code doing the - importing (probably the workflow name). - - :param path: If set, try resolving input location relative to the URLs or - directories in this list. - :param skip_remote: If set, don't try to import files from remote - locations. Leave them as URIs. - """ - path_to_id: Dict[str, uuid.UUID] = {} - @memoize - def import_file_from_uri(uri: str) -> str: - """ - Import a file from a URI and return a virtualized filename for it. - """ - - tried = [] - for candidate_uri in potential_absolute_uris(uri, path if path is not None else []): - # Try each place it could be according to WDL finding logic. - tried.append(candidate_uri) - try: - if skip_remote and is_url(candidate_uri): - # Use remote URIs in place. But we need to find the one that exists. - if not AbstractJobStore.url_exists(candidate_uri): - # Wasn't found there - continue - - # Now we know this exists, so pass it through - return candidate_uri - else: - # Actually import - # Try to import the file. Don't raise if we can't find it, just - # return None! - imported = toil.import_file(candidate_uri, check_existence=False) - if imported is None: - # Wasn't found there - continue - logger.info('Imported %s', candidate_uri) - except UnimplementedURLException as e: - # We can't find anything that can even support this URL scheme. - # Report to the user, they are probably missing an extra. - logger.critical('Error: ' + str(e)) - sys.exit(1) - except HTTPError as e: - # Something went wrong looking for it there. - logger.warning("Checked URL %s but got HTTP status %s", candidate_uri, e.code) - # Try the next location. - continue - except Exception: - # Something went wrong besides the file not being found. Maybe - # we have no auth. - logger.error("Something went wrong importing %s", candidate_uri) - raise - - if imported is None: - # Wasn't found there - continue - logger.info('Imported %s', candidate_uri) - - # Work out what the basename for the file was - file_basename = os.path.basename(urlsplit(candidate_uri).path) - - if file_basename == "": - # We can't have files with no basename because we need to - # download them at that basename later. - raise RuntimeError(f"File {candidate_uri} has no basename and so cannot be a WDL File") - - # Was actually found - if is_url(candidate_uri): - # Might be a file URI or other URI. - # We need to make sure file URIs and local paths that point to - # the same place are treated the same. - parsed = urlsplit(candidate_uri) - if parsed.scheme == "file:": - # This is a local file URI. Convert to a path for source directory tracking. - parent_dir = os.path.dirname(unquote(parsed.path)) - else: - # This is some other URL. Get the URL to the parent directory and use that. - parent_dir = urljoin(candidate_uri, ".") - else: - # Must be a local path - parent_dir = os.path.dirname(candidate_uri) - - # Pack a UUID of the parent directory - dir_id = path_to_id.setdefault(parent_dir, uuid.uuid4()) - - return pack_toil_uri(imported, task_path, dir_id, file_basename) - - # If we get here we tried all the candidates - raise RuntimeError(f"Could not find {uri} at any of: {tried}") - - return map_over_files_in_bindings(environment, import_file_from_uri) - - -def drop_if_missing(value_type: WDL.Type.Base, filename: str, work_dir: str) -> Optional[str]: +def drop_if_missing(file: WDL.Value.File, standard_library: ToilWDLStdLibBase) -> Optional[WDL.Value.File]: """ Return None if a file doesn't exist, or its path if it does. filename represents a URI or file name belonging to a WDL value of type value_type. work_dir represents the current working directory of the job and is where all relative paths will be interpreted from """ + work_dir = standard_library.execution_dir + filename = getattr(file, "virtualized_value", None) or file.value + value_type = file.type logger.debug("Consider file %s", filename) - if is_url(filename): + if filename is not None and is_url(filename): try: - if (not filename.startswith(TOIL_NONEXISTENT_URI_SCHEME) - and (filename.startswith(TOIL_URI_SCHEME) or AbstractJobStore.url_exists(filename))): + if filename.startswith(TOIL_URI_SCHEME) or AbstractJobStore.url_exists(filename): # We assume anything in the filestore actually exists. - return filename + devirtualized_filename = standard_library._devirtualize_filename(filename) + file.value = devirtualized_filename + setattr(file, "virtualized_value", filename) + return file else: logger.warning('File %s with type %s does not actually exist at its URI', filename, value_type) return None @@ -1572,15 +1742,15 @@ def drop_if_missing(value_type: WDL.Type.Base, filename: str, work_dir: str) -> raise else: # Get the absolute path, not resolving symlinks - effective_path = os.path.abspath(os.path.join(work_dir, filename)) + effective_path = os.path.abspath(os.path.join(work_dir or os.getcwd(), filename)) if os.path.islink(effective_path) or os.path.exists(effective_path): # This is a broken symlink or a working symlink or a file. - return filename + return file else: logger.warning('File %s with type %s does not actually exist at %s', filename, value_type, effective_path) return None -def drop_missing_files(environment: WDLBindings, current_directory_override: Optional[str] = None) -> WDLBindings: +def drop_missing_files(environment: WDLBindings, standard_library: ToilWDLStdLibBase) -> WDLBindings: """ Make sure all the File values embedded in the given bindings point to files that exist, or are null. @@ -1589,10 +1759,8 @@ def drop_missing_files(environment: WDLBindings, current_directory_override: Opt """ # Determine where to evaluate relative paths relative to - work_dir = '.' if not current_directory_override else current_directory_override - - drop_if_missing_with_workdir = partial(drop_if_missing, work_dir=work_dir) - return map_over_typed_files_in_bindings(environment, drop_if_missing_with_workdir) + drop_if_missing_with_workdir = partial(drop_if_missing, standard_library=standard_library) + return map_over_files_in_bindings(environment, drop_if_missing_with_workdir) def get_file_paths_in_bindings(environment: WDLBindings) -> List[str]: """ @@ -1605,16 +1773,17 @@ def get_file_paths_in_bindings(environment: WDLBindings) -> List[str]: paths = [] - def append_to_paths(path: str) -> Optional[str]: + def append_to_paths(file: WDL.Value.File) -> Optional[WDL.Value.File]: # Append element and return the element. This is to avoid a logger warning inside map_over_typed_files_in_value() # But don't process nonexistent files - if not path.startswith(TOIL_NONEXISTENT_URI_SCHEME): + if getattr(file, "nonexistent", False) is False: + path = file.value paths.append(path) - return path + return file map_over_files_in_bindings(environment, append_to_paths) return paths -def map_over_typed_files_in_bindings(environment: WDLBindings, transform: Callable[[WDL.Type.Base, str], Optional[str]]) -> WDLBindings: +def map_over_files_in_bindings(environment: WDLBindings, transform: Callable[[WDL.Value.File], Optional[WDL.Value.File]]) -> WDLBindings: """ Run all File values embedded in the given bindings through the given transformation function. @@ -1622,20 +1791,10 @@ def map_over_typed_files_in_bindings(environment: WDLBindings, transform: Callab TODO: Replace with WDL.Value.rewrite_env_paths or WDL.Value.rewrite_files """ - return environment.map(lambda b: map_over_typed_files_in_binding(b, transform)) - -def map_over_files_in_bindings(bindings: WDLBindings, transform: Callable[[str], Optional[str]]) -> WDLBindings: - """ - Run all File values' types and values embedded in the given bindings - through the given transformation function. - - TODO: Replace with WDL.Value.rewrite_env_paths or WDL.Value.rewrite_files - """ - - return map_over_typed_files_in_bindings(bindings, lambda _, x: transform(x)) + return environment.map(lambda b: map_over_files_in_binding(b, transform)) -def map_over_typed_files_in_binding(binding: WDL.Env.Binding[WDL.Value.Base], transform: Callable[[WDL.Type.Base, str], Optional[str]]) -> WDL.Env.Binding[WDL.Value.Base]: +def map_over_files_in_binding(binding: WDL.Env.Binding[WDL.Value.Base], transform: Callable[[WDL.Value.File], Optional[WDL.Value.File]]) -> WDL.Env.Binding[WDL.Value.Base]: """ Run all File values' types and values embedded in the given binding's value through the given transformation function. @@ -1650,7 +1809,7 @@ def map_over_typed_files_in_binding(binding: WDL.Env.Binding[WDL.Value.Base], tr # # For now we assume that any types extending the WDL value types will implement # compatible constructors. -def map_over_typed_files_in_value(value: WDL.Value.Base, transform: Callable[[WDL.Type.Base, str], Optional[str]]) -> WDL.Value.Base: +def map_over_typed_files_in_value(value: WDL.Value.Base, transform: Callable[[WDL.Value.File], Optional[WDL.Value.File]]) -> WDL.Value.Base: """ Run all File values embedded in the given value through the given transformation function. @@ -1666,8 +1825,8 @@ def map_over_typed_files_in_value(value: WDL.Value.Base, transform: Callable[[WD """ if isinstance(value, WDL.Value.File): # This is a file so we need to process it - new_path = transform(value.type, value.value) - if new_path is None: + new_file = transform(value) + if new_file is None: # Assume the transform checked types if we actually care about the # result. logger.warning("File %s became Null", value) @@ -1675,7 +1834,7 @@ def map_over_typed_files_in_value(value: WDL.Value.Base, transform: Callable[[WD else: # Make whatever the value is around the new path. # TODO: why does this need casting? - return WDL.Value.File(new_path, value.expr) + return new_file elif isinstance(value, WDL.Value.Array): # This is an array, so recurse on the items return WDL.Value.Array(value.type.item_type, [map_over_typed_files_in_value(v, transform) for v in value.value], value.expr) @@ -1693,6 +1852,46 @@ def map_over_typed_files_in_value(value: WDL.Value.Base, transform: Callable[[WD # All other kinds of value can be passed through unmodified. return value +def ensure_null_files_are_nullable(value: WDL.Value.Base, original_value: WDL.Value.Base, expected_type: WDL.Type.Base) -> None: + """ + Run through all nested values embedded in the given value and check that the null values are valid. + + If a null value is found that does not have a valid corresponding expected_type, raise an error + + (This is currently only used to check that null values arising from File coercion are in locations with a nullable File? type. + If this is to be used elsewhere, the error message should be changed to describe the appropriate types and not just talk about files.) + + For example: + If one of the nested values is null but the equivalent nested expected_type is not optional, a FileNotFoundError will be raised + :param value: WDL base value to check. This is the WDL value that has been transformed and has the null elements + :param original_value: The original WDL base value prior to the transformation. Only used for error messages + :param expected_type: The WDL type of the value + """ + if isinstance(value, WDL.Value.File): + pass + elif isinstance(value, WDL.Value.Array) and isinstance(expected_type, WDL.Type.Array): + for elem, orig_elem in zip(value.value, original_value.value): + ensure_null_files_are_nullable(elem, orig_elem, expected_type.item_type) + elif isinstance(value, WDL.Value.Map) and isinstance(expected_type, WDL.Type.Map): + for pair, orig_pair in zip(value.value, original_value.value): + # The key of the map cannot be optional or else it is not serializable, so we only need to check the value + ensure_null_files_are_nullable(pair[1], orig_pair[1], expected_type.item_type[1]) + elif isinstance(value, WDL.Value.Pair) and isinstance(expected_type, WDL.Type.Pair): + ensure_null_files_are_nullable(value.value[0], original_value.value[0], expected_type.left_type) + ensure_null_files_are_nullable(value.value[1], original_value.value[1], expected_type.right_type) + elif isinstance(value, WDL.Value.Struct) and isinstance(expected_type, WDL.Type.StructInstance): + for (k, v), (_, orig_v) in zip(value.value.items(), original_value.value.items()): + # The parameters method for WDL.Type.StructInstance returns the values rather than the dictionary + # While dictionaries are ordered, this should be more robust; the else branch should never be hit + if expected_type.members is not None: + ensure_null_files_are_nullable(v, orig_v, expected_type.members[k]) + elif isinstance(value, WDL.Value.Null): + if not expected_type.optional: + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), original_value.value) + else: + # Don't check other (unsupported?) types + return + class WDLBaseJob(Job): """ Base job class for all WDL-related jobs. @@ -1705,7 +1904,7 @@ class WDLBaseJob(Job): Also responsible for remembering the Toil WDL configuration keys and values. """ - def __init__(self, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any) -> None: + def __init__(self, wdl_options: WDLContext, **kwargs: Any) -> None: """ Make a WDL-related job. @@ -1732,7 +1931,7 @@ def __init__(self, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any) # jobs returning other jobs' promised RVs. self._postprocessing_steps: List[Tuple[str, Union[str, Promised[WDLBindings]]]] = [] - self._wdl_options = wdl_options if wdl_options is not None else {} + self._wdl_options = wdl_options assert self._wdl_options.get("container") is not None @@ -1839,25 +2038,21 @@ class WDLTaskWrapperJob(WDLBaseJob): All bindings are in terms of task-internal names. """ - def __init__(self, task: WDL.Tree.Task, prev_node_results: Sequence[Promised[WDLBindings]], task_id: List[str], namespace: str, task_path: str, **kwargs: Any) -> None: + def __init__(self, task: WDL.Tree.Task, prev_node_results: Sequence[Promised[WDLBindings]], task_id: List[str], wdl_options: WDLContext, **kwargs: Any) -> None: """ Make a new job to determine resources and run a task. :param namespace: The namespace that the task's *contents* exist in. The caller has alredy added the task's own name. - - :param task_path: Like the namespace, but including subscript numbers - for scatters. """ - super().__init__(unitName=task_path + ".inputs", displayName=namespace + ".inputs", local=True, **kwargs) + # task_path in wdl_options is like the namespace, but including subscript numbers for scatters + super().__init__(unitName=wdl_options["task_path"] + ".inputs", displayName=wdl_options["namespace"] + ".inputs", wdl_options=wdl_options, **kwargs) - logger.info("Preparing to run task code for %s as %s", task.name, namespace) + logger.info("Preparing to run task code for %s as %s", task.name, wdl_options["namespace"]) self._task = task self._prev_node_results = prev_node_results self._task_id = task_id - self._namespace = namespace - self._task_path = task_path @report_wdl_errors("evaluate task code", exit=True) def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: @@ -1865,28 +2060,27 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: Evaluate inputs and runtime and schedule the task. """ super().run(file_store) - logger.info("Evaluating inputs and runtime for task %s (%s) called as %s", self._task.name, self._task_id, self._namespace) + logger.info("Evaluating inputs and runtime for task %s (%s) called as %s", self._task.name, self._task_id, self._wdl_options["namespace"]) # Combine the bindings we get from previous jobs. # For a task we are only passed the inside-the-task namespace. bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library # UUID to use for virtualizing files - standard_library = ToilWDLStdLibBase(file_store, self._task_path) - with monkeypatch_coerce(standard_library): - if self._task.inputs: - logger.debug("Evaluating task code") - for input_decl in self._task.inputs: - # Evaluate all the inputs that aren't pre-set - bindings = bindings.bind(input_decl.name, evaluate_defaultable_decl(input_decl, bindings, standard_library)) - for postinput_decl in self._task.postinputs: - # Evaluate all the postinput decls. - # We need these in order to evaluate the runtime. - # TODO: What if they wanted resources from the runtime? - bindings = bindings.bind(postinput_decl.name, evaluate_defaultable_decl(postinput_decl, bindings, standard_library)) - - # Evaluate the runtime section - runtime_bindings = evaluate_call_inputs(self._task, self._task.runtime, bindings, standard_library) + standard_library = ToilWDLStdLibBase(file_store, self._wdl_options) + + if self._task.inputs: + logger.debug("Evaluating task code") + # Evaluate all the inputs that aren't pre-set + bindings = evaluate_decls_to_bindings(self._task.inputs, bindings, standard_library, include_previous=True) + if self._task.postinputs: + # Evaluate all the postinput decls. + # We need these in order to evaluate the runtime. + # TODO: What if they wanted resources from the runtime? + bindings = evaluate_decls_to_bindings(self._task.postinputs, bindings, standard_library, include_previous=True) + + # Evaluate the runtime section + runtime_bindings = evaluate_call_inputs(self._task, self._task.runtime, bindings, standard_library) # Fill these in with not-None if the workflow asks for each resource. runtime_memory: Optional[int] = None @@ -1974,8 +2168,15 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: accelerator_requirement = parse_accelerator(accelerator_spec) runtime_accelerators = [accelerator_requirement] + task_wdl_options = self._wdl_options.copy() + # A task is not guaranteed to have access to the current execution directory, so get rid of it. The execution directory also is not needed as all files will be virtualized + task_wdl_options.pop("execution_dir") # Schedule to get resources. Pass along the bindings from evaluating all the inputs and decls, and the runtime, with files virtualized. - run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, self._task_path, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) + run_job = WDLTaskJob(self._task, + virtualize_files(bindings, standard_library, enforce_existence=False), + virtualize_files(runtime_bindings, standard_library, enforce_existence=False), + self._task_id, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, + accelerators=runtime_accelerators or self.accelerators, wdl_options=task_wdl_options) # Run that as a child self.addChild(run_job) @@ -1998,30 +2199,27 @@ class WDLTaskJob(WDLBaseJob): All bindings are in terms of task-internal names. """ - def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], namespace: str, task_path: str, **kwargs: Any) -> None: + def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], wdl_options: WDLContext, **kwargs: Any) -> None: """ Make a new job to run a task. :param namespace: The namespace that the task's *contents* exist in. The caller has alredy added the task's own name. - - :param task_path: Like the namespace, but including subscript numbers - for scatters. """ # This job should not be local because it represents a real workflow task. # TODO: Instead of re-scheduling with more resources, add a local # "wrapper" job like CWL uses to determine the actual requirements. - super().__init__(unitName=task_path + ".command", displayName=namespace + ".command", local=False, **kwargs) - logger.info("Preparing to run task %s as %s", task.name, namespace) + # task_path in wdl_options is like the namespace, but including subscript numbers for scatters + super().__init__(unitName=wdl_options["task_path"] + ".command", displayName=wdl_options["namespace"] + ".command", local=False, wdl_options=wdl_options, **kwargs) + + logger.info("Preparing to run task %s as %s", task.name, wdl_options["namespace"]) self._task = task self._task_internal_bindings = task_internal_bindings self._runtime_bindings = runtime_bindings self._task_id = task_id - self._namespace = namespace - self._task_path = task_path ### # Runtime code injection system @@ -2207,12 +2405,12 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: Actually run the task. """ super().run(file_store) - logger.info("Running task command for %s (%s) called as %s", self._task.name, self._task_id, self._namespace) + logger.info("Running task command for %s (%s) called as %s", self._task.name, self._task_id, self._wdl_options["namespace"]) # Set up the WDL standard library # UUID to use for virtualizing files # We process nonexistent files in WDLTaskWrapperJob as those must be run locally, so don't try to devirtualize them - standard_library = ToilWDLStdLibBase(file_store, self._task_path, enforce_existence=False) + standard_library = ToilWDLStdLibBase(file_store, wdl_options=self._wdl_options) # Get the bindings from after the input section bindings = unwrap(self._task_internal_bindings) @@ -2247,7 +2445,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: os.makedirs(os.environ['MINIWDL__SINGULARITY__IMAGE_CACHE'], exist_ok=True) # Run containers with Singularity - TaskContainerImplementation: Type[TaskContainer] = SingularityContainer + TaskContainerImplementation: Type[TaskContainer] = SingularityContainer elif self._wdl_options.get("container") in ["docker", "auto"]: # Run containers with Docker # TODO: Poll if it is available and don't just try and fail. @@ -2285,6 +2483,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # filesystem, so we can interpret file anmes and globs relative to # there. workdir_in_container: Optional[str] = None + task_path = self._wdl_options["task_path"] if self._task.command: # When the command string references a File, we need to get a path @@ -2389,14 +2588,16 @@ def patched_run_invocation(*args: Any, **kwargs: Any) -> List[str]: # Replace everything with in-container paths for the command. # TODO: MiniWDL deals with directory paths specially here. - def get_path_in_container(path: str) -> Optional[str]: - if path.startswith(TOIL_NONEXISTENT_URI_SCHEME): - return None - return task_container.input_path_map[path] + def get_path_in_container(file: WDL.Value.File) -> Optional[WDL.Value.File]: + if getattr(file, "nonexistent", False) is False: + return WDL.Value.File(task_container.input_path_map[file.value]) contained_bindings = map_over_files_in_bindings(bindings, get_path_in_container) # Make a new standard library for evaluating the command specifically, which only deals with in-container paths and out-of-container paths. - command_library = ToilWDLStdLibTaskCommand(file_store, self._task_path, task_container, workdir_in_container) + command_wdl_options: WDLContext = self._wdl_options.copy() + if workdir_in_container is not None: + command_wdl_options["execution_dir"] = workdir_in_container + command_library = ToilWDLStdLibTaskCommand(file_store, task_container, wdl_options=command_wdl_options) # Work out the command string, and unwrap it command_string: str = evaluate_named_expression(self._task, "command", WDL.Type.String(), remove_common_leading_whitespace(self._task.command), contained_bindings, command_library).coerce(WDL.Type.String()).value @@ -2441,7 +2642,7 @@ def get_path_in_container(path: str) -> Optional[str]: logger.error('Failed task left standard error at %s of %d bytes', host_stderr_txt, size) if size > 0: # Send the whole error stream. - file_store.log_user_stream(self._task_path + '.stderr', open(host_stderr_txt, 'rb')) + file_store.log_user_stream(task_path + '.stderr', open(host_stderr_txt, 'rb')) if logger.isEnabledFor(logging.DEBUG): logger.debug("MiniWDL already logged standard error") else: @@ -2462,7 +2663,7 @@ def get_path_in_container(path: str) -> Optional[str]: # Save the whole output stream. # TODO: We can't tell if this was supposed to be # captured. It might really be huge binary data. - file_store.log_user_stream(self._task_path + '.stdout', open(host_stdout_txt, 'rb')) + file_store.log_user_stream(task_path + '.stdout', open(host_stdout_txt, 'rb')) # Keep crashing raise @@ -2479,17 +2680,11 @@ def get_path_in_container(path: str) -> Optional[str]: # container-determined strings that are absolute paths to WDL File # objects, and like MiniWDL we can say we only support # working-directory-based relative paths for globs. - outputs_library = ToilWDLStdLibTaskOutputs( - file_store, - self._task_path, - host_stdout_txt, - host_stderr_txt, - task_container.input_path_map, - current_directory_override=workdir_in_container, - share_files_with=standard_library - ) - with monkeypatch_coerce(outputs_library): - output_bindings = evaluate_output_decls(self._task.outputs, bindings, outputs_library) + output_wdl_options: WDLContext = self._wdl_options.copy() + if workdir_in_container is not None: + output_wdl_options["execution_dir"] = workdir_in_container + outputs_library = ToilWDLStdLibTaskOutputs(file_store, host_stdout_txt, host_stderr_txt, task_container.input_path_map, wdl_options=output_wdl_options, share_files_with=standard_library) + output_bindings = evaluate_decls_to_bindings(self._task.outputs, bindings, outputs_library, drop_missing_files=True) # Now we know if the standard output and error were sent somewhere by # the workflow. If not, we should report them to the leader. @@ -2499,20 +2694,20 @@ def get_path_in_container(path: str) -> Optional[str]: logger.info('Unused standard error at %s of %d bytes', host_stderr_txt, size) if size > 0: # Save the whole error stream because the workflow didn't capture it. - file_store.log_user_stream(self._task_path + '.stderr', open(host_stderr_txt, 'rb')) + file_store.log_user_stream(task_path + '.stderr', open(host_stderr_txt, 'rb')) if not outputs_library.stdout_used() and os.path.exists(host_stdout_txt): size = os.path.getsize(host_stdout_txt) logger.info('Unused standard output at %s of %d bytes', host_stdout_txt, size) if size > 0: # Save the whole output stream because the workflow didn't capture it. - file_store.log_user_stream(self._task_path + '.stdout', open(host_stdout_txt, 'rb')) + file_store.log_user_stream(task_path + '.stdout', open(host_stdout_txt, 'rb')) # Collect output messages from any code Toil injected into the task. self.handle_injection_messages(outputs_library) # Drop any files from the output which don't actually exist - output_bindings = drop_missing_files(output_bindings, current_directory_override=workdir_in_container) + output_bindings = drop_missing_files(output_bindings, standard_library=outputs_library) for decl in self._task.outputs: if not decl.type.optional and output_bindings[decl.name].value is None: # todo: make recursive @@ -2534,16 +2729,14 @@ class WDLWorkflowNodeJob(WDLBaseJob): Job that evaluates a WDL workflow node. """ - def __init__(self, node: WDL.Tree.WorkflowNode, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, task_path: str, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any) -> None: + def __init__(self, node: WDL.Tree.WorkflowNode, prev_node_results: Sequence[Promised[WDLBindings]], wdl_options: WDLContext, **kwargs: Any) -> None: """ Make a new job to run a workflow node to completion. """ - super().__init__(unitName=node.workflow_node_id, displayName=node.workflow_node_id, wdl_options=wdl_options or {}, **kwargs) + super().__init__(unitName=node.workflow_node_id, displayName=node.workflow_node_id, wdl_options=wdl_options, **kwargs) self._node = node self._prev_node_results = prev_node_results - self._namespace = namespace - self._task_path = task_path if isinstance(self._node, WDL.Tree.Call): logger.debug("Preparing job for call node %s", self._node.workflow_node_id) @@ -2559,64 +2752,70 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs incoming_bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) - with monkeypatch_coerce(standard_library): - if isinstance(self._node, WDL.Tree.Decl): - # This is a variable assignment - logger.info('Setting %s to %s', self._node.name, self._node.expr) - value = evaluate_decl(self._node, incoming_bindings, standard_library) - return self.postprocess(incoming_bindings.bind(self._node.name, value)) - elif isinstance(self._node, WDL.Tree.Call): - # This is a call of a task or workflow - - # Fetch all the inputs we are passing and bind them. - # The call is only allowed to use these. - logger.debug("Evaluating step inputs") - if self._node.callee is None: - # This should never be None, but mypy gets unhappy and this is better than an assert - inputs_mapping = None - else: - inputs_mapping = {e.name: e.type for e in self._node.callee.inputs or []} - input_bindings = evaluate_call_inputs(self._node, self._node.inputs, incoming_bindings, standard_library, inputs_mapping) - - # Bindings may also be added in from the enclosing workflow inputs - # TODO: this is letting us also inject them from the workflow body. - # TODO: Can this result in picking up non-namespaced values that - # aren't meant to be inputs, by not changing their names? - passed_down_bindings = incoming_bindings.enter_namespace(self._node.name) - - if isinstance(self._node.callee, WDL.Tree.Workflow): - # This is a call of a workflow - subjob: WDLBaseJob = WDLWorkflowJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}', f'{self._task_path}.{self._node.name}', wdl_options=self._wdl_options) - self.addChild(subjob) - elif isinstance(self._node.callee, WDL.Tree.Task): - # This is a call of a task - subjob = WDLTaskWrapperJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}', f'{self._task_path}.{self._node.name}', wdl_options=self._wdl_options) - self.addChild(subjob) - else: - raise WDL.Error.InvalidType(self._node, "Cannot call a " + str(type(self._node.callee))) - - # We need to agregate outputs namespaced with our node name, and existing bindings - subjob.then_namespace(self._node.name) - subjob.then_overlay(incoming_bindings) - self.defer_postprocessing(subjob) - return subjob.rv() - elif isinstance(self._node, WDL.Tree.Scatter): - subjob = WDLScatterJob(self._node, [incoming_bindings], self._namespace, self._task_path, wdl_options=self._wdl_options) + standard_library = ToilWDLStdLibBase(file_store, self._wdl_options) + + if isinstance(self._node, WDL.Tree.Decl): + # This is a variable assignment + logger.info('Setting %s to %s', self._node.name, self._node.expr) + value = evaluate_decl(self._node, incoming_bindings, standard_library) + bindings = incoming_bindings.bind(self._node.name, value) + return self.postprocess(bindings) + elif isinstance(self._node, WDL.Tree.Call): + # This is a call of a task or workflow + + # Fetch all the inputs we are passing and bind them. + # The call is only allowed to use these. + logger.debug("Evaluating step inputs") + if self._node.callee is None: + # This should never be None, but mypy gets unhappy and this is better than an assert + inputs_mapping = None + else: + inputs_mapping = {e.name: e.type for e in self._node.callee.inputs or []} + input_bindings = evaluate_call_inputs(self._node, self._node.inputs, incoming_bindings, standard_library, inputs_mapping) + + # Bindings may also be added in from the enclosing workflow inputs + # TODO: this is letting us also inject them from the workflow body. + # TODO: Can this result in picking up non-namespaced values that + # aren't meant to be inputs, by not changing their names? + passed_down_bindings = incoming_bindings.enter_namespace(self._node.name) + task_path = self._wdl_options.get("task_path") + namespace = self._wdl_options.get("namespace") + wdl_options = self._wdl_options.copy() + wdl_options["task_path"] = f'{task_path}.{self._node.name}' + wdl_options["namespace"] = f'{namespace}.{self._node.name}' + + if isinstance(self._node.callee, WDL.Tree.Workflow): + # This is a call of a workflow + subjob: WDLBaseJob = WDLWorkflowJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, wdl_options=wdl_options, local=True) self.addChild(subjob) - # Scatters don't really make a namespace, just kind of a scope? - # TODO: Let stuff leave scope! - self.defer_postprocessing(subjob) - return subjob.rv() - elif isinstance(self._node, WDL.Tree.Conditional): - subjob = WDLConditionalJob(self._node, [incoming_bindings], self._namespace, self._task_path, wdl_options=self._wdl_options) + elif isinstance(self._node.callee, WDL.Tree.Task): + # This is a call of a task + subjob = WDLTaskWrapperJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, wdl_options=wdl_options, local=True) self.addChild(subjob) - # Conditionals don't really make a namespace, just kind of a scope? - # TODO: Let stuff leave scope! - self.defer_postprocessing(subjob) - return subjob.rv() else: - raise WDL.Error.InvalidType(self._node, "Unimplemented WorkflowNode: " + str(type(self._node))) + raise WDL.Error.InvalidType(self._node, "Cannot call a " + str(type(self._node.callee))) + + # We need to agregate outputs namespaced with our node name, and existing bindings + subjob.then_namespace(self._node.name) + subjob.then_overlay(incoming_bindings) + self.defer_postprocessing(subjob) + return subjob.rv() + elif isinstance(self._node, WDL.Tree.Scatter): + subjob = WDLScatterJob(self._node, [incoming_bindings], wdl_options=self._wdl_options, local=True) + self.addChild(subjob) + # Scatters don't really make a namespace, just kind of a scope? + # TODO: Let stuff leave scope! + self.defer_postprocessing(subjob) + return subjob.rv() + elif isinstance(self._node, WDL.Tree.Conditional): + subjob = WDLConditionalJob(self._node, [incoming_bindings], wdl_options=self._wdl_options, local=True) + self.addChild(subjob) + # Conditionals don't really make a namespace, just kind of a scope? + # TODO: Let stuff leave scope! + self.defer_postprocessing(subjob) + return subjob.rv() + else: + raise WDL.Error.InvalidType(self._node, "Unimplemented WorkflowNode: " + str(type(self._node))) class WDLWorkflowNodeListJob(WDLBaseJob): """ @@ -2625,7 +2824,7 @@ class WDLWorkflowNodeListJob(WDLBaseJob): workflows or tasks or sections. """ - def __init__(self, nodes: List[WDL.Tree.WorkflowNode], prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, task_path: str, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any) -> None: + def __init__(self, nodes: List[WDL.Tree.WorkflowNode], prev_node_results: Sequence[Promised[WDLBindings]], wdl_options: WDLContext, **kwargs: Any) -> None: """ Make a new job to run a list of workflow nodes to completion. """ @@ -2633,8 +2832,6 @@ def __init__(self, nodes: List[WDL.Tree.WorkflowNode], prev_node_results: Sequen self._nodes = nodes self._prev_node_results = prev_node_results - self._namespace = namespace - self._task_path = task_path for n in self._nodes: if isinstance(n, (WDL.Tree.Call, WDL.Tree.Scatter, WDL.Tree.Conditional)): @@ -2650,17 +2847,16 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs current_bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) - - with monkeypatch_coerce(standard_library): - for node in self._nodes: - if isinstance(node, WDL.Tree.Decl): - # This is a variable assignment - logger.info('Setting %s to %s', node.name, node.expr) - value = evaluate_decl(node, current_bindings, standard_library) - current_bindings = current_bindings.bind(node.name, value) - else: - raise WDL.Error.InvalidType(node, "Unimplemented WorkflowNode: " + str(type(node))) + standard_library = ToilWDLStdLibBase(file_store, self._wdl_options) + + for node in self._nodes: + if isinstance(node, WDL.Tree.Decl): + # This is a variable assignment + logger.info('Setting %s to %s', node.name, node.expr) + value = evaluate_decl(node, current_bindings, standard_library) + current_bindings = current_bindings.bind(node.name, value) + else: + raise WDL.Error.InvalidType(node, "Unimplemented WorkflowNode: " + str(type(node))) return self.postprocess(current_bindings) @@ -2690,6 +2886,10 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: """ super().run(file_store) combined = combine_bindings(unwrap_all(self._prev_node_results)) + + # Set up the WDL standard library + standard_library = ToilWDLStdLibBase(file_store, self._wdl_options) + # Make sure to run the universal postprocessing steps return self.postprocess(combined) @@ -2830,14 +3030,12 @@ class WDLSectionJob(WDLBaseJob): Job that can create more graph for a section of the workflow. """ - def __init__(self, namespace: str, task_path: str, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any) -> None: + def __init__(self, wdl_options: WDLContext, **kwargs: Any) -> None: """ Make a WDLSectionJob where the interior runs in the given namespace, starting with the root workflow. """ super().__init__(wdl_options=wdl_options, **kwargs) - self._namespace = namespace - self._task_path = task_path @staticmethod def coalesce_nodes(order: List[str], section_graph: WDLWorkflowGraph) -> List[List[str]]: @@ -2926,7 +3124,7 @@ def create_subgraph(self, nodes: Sequence[WDL.Tree.WorkflowNode], gather_nodes: """ # Work out what to call what we are working on - task_path = self._task_path + task_path = self._wdl_options["task_path"] if subscript is not None: # We need to include a scatter loop number. task_path += f'.{subscript}' @@ -2990,10 +3188,10 @@ def get_job_set_any(wdl_ids: Set[str]) -> List[WDLBaseJob]: if len(node_ids) == 1: # Make a one-node job - job: WDLBaseJob = WDLWorkflowNodeJob(section_graph.get(node_ids[0]), rvs, self._namespace, task_path, wdl_options=self._wdl_options) + job: WDLBaseJob = WDLWorkflowNodeJob(section_graph.get(node_ids[0]), rvs, wdl_options=self._wdl_options, local=True) else: # Make a multi-node job - job = WDLWorkflowNodeListJob([section_graph.get(node_id) for node_id in node_ids], rvs, self._namespace, task_path, wdl_options=self._wdl_options) + job = WDLWorkflowNodeListJob([section_graph.get(node_id) for node_id in node_ids], rvs, wdl_options=self._wdl_options, local=True) for prev_job in prev_jobs: # Connect up the happens-after relationships to make sure the # return values are available. @@ -3023,7 +3221,7 @@ def get_job_set_any(wdl_ids: Set[str]) -> List[WDLBaseJob]: leaf_rvs.append(environment) # And to fill in bindings from code not executed in this instantiation # with Null, and filter out stuff that should leave scope. - sink = WDLCombineBindingsJob(leaf_rvs, wdl_options=self._wdl_options) + sink = WDLCombineBindingsJob(leaf_rvs, wdl_options=self._wdl_options, local=True) # It runs inside us self.addChild(sink) for leaf_job in toil_leaves.values(): @@ -3090,11 +3288,11 @@ class WDLScatterJob(WDLSectionJob): instance of the body. If an instance of the body doesn't create a binding, it gets a null value in the corresponding array. """ - def __init__(self, scatter: WDL.Tree.Scatter, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, task_path: str, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any) -> None: + def __init__(self, scatter: WDL.Tree.Scatter, prev_node_results: Sequence[Promised[WDLBindings]], wdl_options: WDLContext, **kwargs: Any) -> None: """ Create a subtree that will run a WDL scatter. The scatter itself and the contents live in the given namespace. """ - super().__init__(namespace, task_path, **kwargs, unitName=scatter.workflow_node_id, displayName=scatter.workflow_node_id, wdl_options=wdl_options) + super().__init__(**kwargs, unitName=scatter.workflow_node_id, displayName=scatter.workflow_node_id, wdl_options=wdl_options) # Because we need to return the return value of the workflow, we need # to return a Toil promise for the last/sink job in the workflow's @@ -3121,15 +3319,14 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # For a task we only see the inside-the-task namespace. bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store, self._task_path) + standard_library = ToilWDLStdLibBase(file_store, self._wdl_options) # Get what to scatter over - with monkeypatch_coerce(standard_library): - try: - scatter_value = evaluate_named_expression(self._scatter, self._scatter.variable, None, self._scatter.expr, bindings, standard_library) - finally: - # Report all files are downloaded now that all expressions are evaluated. - self.files_downloaded_hook([(p, p) for p in standard_library.get_local_paths()]) + try: + scatter_value = evaluate_named_expression(self._scatter, self._scatter.variable, None, self._scatter.expr, bindings, standard_library) + finally: + # Report all files are downloaded now that all expressions are evaluated. + self.files_downloaded_hook([(p, p) for p in standard_library.get_local_paths()]) if not isinstance(scatter_value, WDL.Value.Array): raise RuntimeError("The returned value from a scatter is not an Array type.") @@ -3233,11 +3430,11 @@ class WDLConditionalJob(WDLSectionJob): """ Job that evaluates a conditional in a WDL workflow. """ - def __init__(self, conditional: WDL.Tree.Conditional, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, task_path: str, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any) -> None: + def __init__(self, conditional: WDL.Tree.Conditional, prev_node_results: Sequence[Promised[WDLBindings]], wdl_options: WDLContext, **kwargs: Any) -> None: """ Create a subtree that will run a WDL conditional. The conditional itself and its contents live in the given namespace. """ - super().__init__(namespace, task_path, **kwargs, unitName=conditional.workflow_node_id, displayName=conditional.workflow_node_id, wdl_options=wdl_options) + super().__init__(**kwargs, unitName=conditional.workflow_node_id, displayName=conditional.workflow_node_id, wdl_options=wdl_options) # Once again we need to ship the whole body template to be instantiated # into Toil jobs only if it will actually run. @@ -3260,15 +3457,14 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # For a task we only see the insode-the-task namespace. bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store, self._task_path) + standard_library = ToilWDLStdLibBase(file_store, self._wdl_options) # Get the expression value. Fake a name. - with monkeypatch_coerce(standard_library): - try: - expr_value = evaluate_named_expression(self._conditional, "", WDL.Type.Boolean(), self._conditional.expr, bindings, standard_library) - finally: - # Report all files are downloaded now that all expressions are evaluated. - self.files_downloaded_hook([(p, p) for p in standard_library.get_local_paths()]) + try: + expr_value = evaluate_named_expression(self._conditional, "", WDL.Type.Boolean(), self._conditional.expr, bindings, standard_library) + finally: + # Report all files are downloaded now that all expressions are evaluated. + self.files_downloaded_hook([(p, p) for p in standard_library.get_local_paths()]) if expr_value.value: # Evaluated to true! @@ -3289,7 +3485,7 @@ class WDLWorkflowJob(WDLSectionJob): Job that evaluates an entire WDL workflow. """ - def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Promised[WDLBindings]], workflow_id: List[str], namespace: str, task_path: str, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any) -> None: + def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Promised[WDLBindings]], workflow_id: List[str], wdl_options: WDLContext, **kwargs: Any) -> None: """ Create a subtree that will run a WDL workflow. The job returns the return value of the workflow. @@ -3297,7 +3493,7 @@ def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Prom :param namespace: the namespace that the workflow's *contents* will be in. Caller has already added the workflow's own name. """ - super().__init__(namespace, task_path, wdl_options=wdl_options, **kwargs) + super().__init__(wdl_options=wdl_options, **kwargs) # Because we need to return the return value of the workflow, we need # to return a Toil promise for the last/sink job in the workflow's @@ -3308,11 +3504,9 @@ def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Prom logger.debug("Preparing to run workflow %s", workflow.name) - self._workflow = workflow self._prev_node_results = prev_node_results self._workflow_id = workflow_id - self._namespace = namespace @report_wdl_errors("run workflow") def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: @@ -3321,31 +3515,29 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: """ super().run(file_store) - logger.info("Running workflow %s (%s) called as %s", self._workflow.name, self._workflow_id, self._namespace) + logger.info("Running workflow %s (%s) called as %s", self._workflow.name, self._workflow_id, self._wdl_options["namespace"]) # Combine the bindings we get from previous jobs. # For a task we only see the insode-the-task namespace. bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) + standard_library = ToilWDLStdLibBase(file_store, self._wdl_options) if self._workflow.inputs: - with monkeypatch_coerce(standard_library): - try: - for input_decl in self._workflow.inputs: - # Evaluate all the inputs that aren't pre-set - bindings = bindings.bind(input_decl.name, evaluate_defaultable_decl(input_decl, bindings, standard_library)) - finally: - # Report all files are downloaded now that all expressions are evaluated. - self.files_downloaded_hook([(p, p) for p in standard_library.get_local_paths()]) + try: + bindings = evaluate_decls_to_bindings(self._workflow.inputs, bindings, standard_library, include_previous=True) + finally: + # Report all files are downloaded now that all expressions are evaluated. + self.files_downloaded_hook([(p, p) for p in standard_library.get_local_paths()]) + bindings = virtualize_files(bindings, standard_library, enforce_existence=False) # Make jobs to run all the parts of the workflow sink = self.create_subgraph(self._workflow.body, [], bindings) if self._workflow.outputs != []: # Compare against empty list as None means there should be outputs # Either the output section is declared and nonempty or it is not declared # Add evaluating the outputs after the sink - outputs_job = WDLOutputsJob(self._workflow, sink.rv(), self._task_path, wdl_options=self._wdl_options) + outputs_job = WDLOutputsJob(self._workflow, sink.rv(), wdl_options=self._wdl_options, local=True) sink.addFollowOn(outputs_job) # Caller is responsible for making sure namespaces are applied self.defer_postprocessing(outputs_job) @@ -3360,7 +3552,7 @@ class WDLOutputsJob(WDLBaseJob): Returns an environment with just the outputs bound, in no namespace. """ - def __init__(self, workflow: WDL.Tree.Workflow, bindings: Promised[WDLBindings], task_path: str, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any): + def __init__(self, workflow: WDL.Tree.Workflow, bindings: Promised[WDLBindings], wdl_options: WDLContext, **kwargs: Any): """ Make a new WDLWorkflowOutputsJob for the given workflow, with the given set of bindings after its body runs. """ @@ -3368,7 +3560,6 @@ def __init__(self, workflow: WDL.Tree.Workflow, bindings: Promised[WDLBindings], self._bindings = bindings self._workflow = workflow - self._task_path = task_path @report_wdl_errors("evaluate outputs") def run(self, file_store: AbstractFileStore) -> WDLBindings: @@ -3378,15 +3569,14 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: super().run(file_store) # Evaluate all output expressions in the normal, non-task-outputs library context - standard_library = ToilWDLStdLibBase(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) + standard_library = ToilWDLStdLibBase(file_store, self._wdl_options) try: if self._workflow.outputs is not None: # Output section is declared and is nonempty, so evaluate normally # Combine the bindings from the previous job - with monkeypatch_coerce(standard_library): - output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library) + output_bindings = evaluate_decls_to_bindings(self._workflow.outputs, unwrap(self._bindings), standard_library) else: # If no output section is present, start with an empty bindings output_bindings = WDL.Env.Bindings() @@ -3419,6 +3609,11 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: if binding.name in output_set: # The bindings will already be namespaced with the task namespaces output_bindings = output_bindings.bind(binding.name, binding.value) + else: + # Output section is declared and is nonempty, so evaluate normally + + # Combine the bindings from the previous job + output_bindings = evaluate_decls_to_bindings(self._workflow.outputs, unwrap(self._bindings), standard_library, drop_missing_files=True) finally: # We don't actually know when all our files are downloaded since # anything we evaluate might devirtualize inside any expression. @@ -3431,7 +3626,7 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: self.files_downloaded_hook([(p, p) for p in standard_library.get_local_paths()]) # Null nonexistent optional values and error on the rest - output_bindings = drop_missing_files(output_bindings, self._wdl_options.get("execution_dir")) + output_bindings = drop_missing_files(output_bindings, standard_library=standard_library) return self.postprocess(output_bindings) @@ -3442,13 +3637,13 @@ class WDLRootJob(WDLSectionJob): the workflow name; both forms are accepted. """ - def __init__(self, target: Union[WDL.Tree.Workflow, WDL.Tree.Task], inputs: WDLBindings, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any) -> None: + def __init__(self, target: Union[WDL.Tree.Workflow, WDL.Tree.Task], inputs: WDLBindings, wdl_options: WDLContext, **kwargs: Any) -> None: """ Create a subtree to run the workflow and namespace the outputs. """ # The root workflow names the root namespace and task path. - super().__init__(target.name, target.name, wdl_options=wdl_options, **kwargs) + super().__init__(wdl_options=wdl_options, **kwargs) self._target = target self._inputs = inputs @@ -3461,59 +3656,17 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: if isinstance(self._target, WDL.Tree.Workflow): # Create a workflow job. We rely in this to handle entering the input # namespace if needed, or handling free-floating inputs. - job: WDLBaseJob = WDLWorkflowJob(self._target, [self._inputs], [self._target.name], self._namespace, self._task_path, wdl_options=self._wdl_options) + job: WDLBaseJob = WDLWorkflowJob(self._target, [self._inputs], [self._target.name], wdl_options=self._wdl_options, local=True) else: # There is no workflow. Create a task job. - job = WDLTaskWrapperJob(self._target, [self._inputs], [self._target.name], self._namespace, self._task_path, wdl_options=self._wdl_options) + job = WDLTaskWrapperJob(self._target, [self._inputs], [self._target.name], wdl_options=self._wdl_options, local=True) # Run the task or workflow - job.then_namespace(self._namespace) + job.then_namespace(self._wdl_options["namespace"]) self.addChild(job) self.defer_postprocessing(job) return job.rv() -@contextmanager -def monkeypatch_coerce(standard_library: ToilWDLStdLibBase) -> Generator[None, None, None]: - """ - Monkeypatch miniwdl's WDL.Value.Base.coerce() function to virtualize files when they are represented as Strings. - Calls _virtualize_filename from a given standard library object. - :param standard_library: a standard library object - :return - """ - # We're doing this because while miniwdl recognizes when a string needs to be converted into a file, its method of - # conversion is to just store the local filepath. Toil needs to virtualize the file into the jobstore so until - # there is a proper hook, monkeypatch it. - - SelfType = TypeVar("SelfType", bound=WDL.Value.Base) - def make_coerce(old_coerce: Callable[[SelfType, Optional[WDL.Type.Base]], WDL.Value.Base]) -> Callable[[Arg(SelfType, 'self'), DefaultArg(Optional[WDL.Type.Base], 'desired_type')], WDL.Value.Base]: - """ - Stamp out a replacement coerce method that calls the given original one. - """ - def coerce(self: SelfType, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: - if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Value.File): - # Coercing something to File. - if not is_url(self.value) and not os.path.isfile(os.path.join(standard_library.execution_dir or ".", self.value)): - # It is a local file that isn't there. - return WDL.Value.File(TOIL_NONEXISTENT_URI_SCHEME + self.value, self.expr) - else: - # Virtualize normally - return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr) - return old_coerce(self, desired_type) - - return coerce - - old_base_coerce = WDL.Value.Base.coerce - old_str_coerce = WDL.Value.String.coerce - try: - # Mypy does not like monkeypatching: - # https://github.com/python/mypy/issues/2427#issuecomment-1419206807 - WDL.Value.Base.coerce = make_coerce(old_base_coerce) # type: ignore[method-assign] - WDL.Value.String.coerce = make_coerce(old_str_coerce) # type: ignore[method-assign] - yield - finally: - WDL.Value.Base.coerce = old_base_coerce # type: ignore[method-assign] - WDL.Value.String.coerce = old_str_coerce # type: ignore[method-assign] - @report_wdl_errors("run workflow", exit=True) def main() -> None: """ @@ -3536,8 +3689,6 @@ def main() -> None: # If we don't have a directory assigned, make one in the current directory. output_directory: str = options.output_directory if options.output_directory else mkdtemp(prefix='wdl-out-', dir=os.getcwd()) - # Get the execution directory - execution_dir = os.getcwd() try: with Toil(options) as toil: if options.restart: @@ -3623,23 +3774,20 @@ def main() -> None: logger.info("Inputs appear to come from a Github repository; adding repository root to file search path") inputs_search_path.append(match.group(0)) - # Import any files in the bindings - input_bindings = import_files(input_bindings, target.name, toil, inputs_search_path, skip_remote=options.reference_inputs) - # TODO: Automatically set a good MINIWDL__SINGULARITY__IMAGE_CACHE ? # Get the execution directory execution_dir = os.getcwd() + convert_remote_files(input_bindings, toil, task_path=target.name, search_paths=inputs_search_path, import_remote_files=options.reference_inputs) + # Configure workflow interpreter options - wdl_options: Dict[str, str] = {} - wdl_options["execution_dir"] = execution_dir - wdl_options["container"] = options.container + wdl_options: WDLContext = {"execution_dir": execution_dir, "container": options.container, "task_path": target.name, + "namespace": target.name, "all_call_outputs": options.all_call_outputs} assert wdl_options.get("container") is not None - wdl_options["all_call_outputs"] = options.all_call_outputs # Run the workflow and get its outputs namespaced with the workflow name. - root_job = WDLRootJob(target, input_bindings, wdl_options=wdl_options) + root_job = WDLRootJob(target, input_bindings, wdl_options=wdl_options, local=True) output_bindings = toil.start(root_job) if not isinstance(output_bindings, WDL.Env.Bindings): raise RuntimeError("The output of the WDL job is not a binding.") @@ -3649,23 +3797,17 @@ def main() -> None: virtualized_to_devirtualized: Dict[str, str] = dict() # Fetch all the output files - def devirtualize_output(filename: str) -> str: + def devirtualize_output(file: WDL.Value.File) -> WDL.Value.File: """ 'devirtualize' a file using the "toil" object instead of a filestore. Returns its local path. """ # Make sure the output directory exists if we have output files # that might need to use it. + filename = getattr(file, "virtualized_value", None) or file.value os.makedirs(output_directory, exist_ok=True) - return ToilWDLStdLibBase.devirtualize_to( - filename, - output_directory, - toil, - execution_dir, - devirtualization_state, - devirtualized_to_virtualized, - virtualized_to_devirtualized - ) + file.value = ToilWDLStdLibBase.devirtualize_to(filename, output_directory, toil, devirtualization_state, wdl_options, devirtualized_to_virtualized, virtualized_to_devirtualized) + return file # Make all the files local files output_bindings = map_over_files_in_bindings(output_bindings, devirtualize_output)