Skip to content

Commit

Permalink
feat: add compile_step/workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
camilovelezr committed Feb 28, 2024
1 parent f4467ab commit b919687
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 8 deletions.
4 changes: 2 additions & 2 deletions src/wic/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .pythonapi import Step, Workflow
from .pythonapi import Step, Workflow, configure_step, configure_workflow

__all__ = ["Step", "Workflow"]
__all__ = ["Step", "Workflow", "configure_step", "configure_workflow"]
132 changes: 126 additions & 6 deletions src/wic/api/pythonapi.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
# pylint: disable=W1203
"""CLT utilities."""
import json
import logging
import subprocess
from dataclasses import field
from functools import singledispatch
from pathlib import Path
from typing import Any, ClassVar, Optional, TypeVar, Union
from typing import Any, ClassVar, Generic, Optional, TypeVar, Union

import cwl_utils.parser as cu_parser
import yaml
from cwl_utils.parser import CommandLineTool as CWLCommandLineTool
from cwl_utils.parser import load_document_by_uri
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr # pylint: disable=E0611
from pydantic import ( # pylint: disable=E0611
BaseModel,
ConfigDict,
Field,
PrivateAttr,
ValidationError,
)
from pydantic.dataclasses import dataclass as pydantic_dataclass

from wic.api._compat import PYDANTIC_V2
Expand Down Expand Up @@ -131,6 +138,18 @@ def _set_value(
self.linked = True


InputType = TypeVar("InputType")


class _ConfiguredStepInput(BaseModel, Generic[InputType]):
"""Input of Step used for configuration from JSON.
This Generic Pydantic Model is used to configure inputs of
Steps when using a JSON configuration file.
"""
value: InputType


def _default_dict() -> dict:
return {}

Expand Down Expand Up @@ -285,6 +304,8 @@ def __setattr__(self, __name: str, __value: Any) -> Any: # pylint: disable=R171
except BaseException as exc:
raise exc

elif isinstance(__value, _ConfiguredStepInput): # when configuring from JSON
self.inputs[index]._set_value(__value.value)
else:
if isinstance(__value, str) and _is_link(__value):
self.inputs[index]._set_value(__value, check=False)
Expand Down Expand Up @@ -347,6 +368,54 @@ def _save_cwl(self, path: Path, overwrite: bool) -> None:
file.write(yaml.dump(self.yaml))


def _load_config(config_path: StrPath) -> Any:
"""Load configuration JSON file.
Returns `dict[str, Any]`.
Type annotation is `Any` because `json.load` type annotation is `Any`.
"""
with open(config_path, "r", encoding="utf-8") as file:
cfg = json.load(file)
return cfg


def configure_step(clt_path: StrPath, config_path: StrPath) -> Step:
"""Configure Step from JSON configuration file.
This function takes a path to a CLT (Command Line Tool) file and a path to a JSON configuration file,
and configures a `Step` object based on the provided configuration.
Args:
clt_path (StrPath): The path to the CLT file.
config_path (StrPath): The path to the JSON configuration file.
Returns:
Step: The configured `Step` object.
"""
clt = Step(clt_path)
input_names = clt._input_names # pylint: disable=W0212
config = config_path if isinstance(config_path, dict) else _load_config(config_path)
# dict with only the inputs that are in the clt
inp_dict = dict(filter(lambda x: x[0] in input_names, config.items()))
for inp_name, inp_value in inp_dict.items():
if isinstance(inp_value, str) and _is_link(inp_value): # linked * or & inp
# do not perform validation for linked inputs
setattr(clt, inp_name, inp_value)
else: # not linked inp
# generic_input performs validation with Pydantic
inp_type = clt.inputs[input_names.index(inp_name)].inp_type
try:
# inp_type is a variable, mypy complains
generic_input = _ConfiguredStepInput[inp_type](value=inp_value) # type: ignore
except ValidationError as exc:
raise InvalidInputValueError(f"invalid value for {inp_name}") from exc
setattr(clt, inp_name, generic_input)

return clt


if PYDANTIC_V2:
DATACLASS_CONFIG = ConfigDict(validate_assignment=True)
else:
Expand Down Expand Up @@ -412,7 +481,18 @@ def _save_all_cwl(self, overwrite: bool) -> None:
except BaseException as exc:
raise exc

def compile(self, run_local: bool = False, overwrite: bool = False) -> Path:
def _create_symlinks(self, compiled_cwl_path: Path) -> None:
"""Create symlinks for compiled CWL files."""
cwl_workflow_path = self.path.joinpath(self.name, f"{self.name}.cwl")
inputs_path = self.path.joinpath(self.name, f"{self.name}_inputs.yml")
if cwl_workflow_path.exists():
cwl_workflow_path.unlink()
if inputs_path.exists():
inputs_path.unlink()
cwl_workflow_path.symlink_to(compiled_cwl_path)
inputs_path.symlink_to(compiled_cwl_path.with_name(f"{self.name}_inputs.yml"))

def compile(self, run_local: bool = False, overwrite: bool = False) -> Union[Path, None]:
"""Compile Workflow using WIC.
Returns path to compiled CWL Workflow.
Expand All @@ -435,15 +515,55 @@ def compile(self, run_local: bool = False, overwrite: bool = False) -> Path:
)
except subprocess.CalledProcessError as exc:
logger.error(exc.stderr)
logger.info(exc.stdout)
raise exc
logger.error(exc.stdout)
return None
logger.info(proc.stdout)
# copy files to wf path
compiled_cwl_path = _WIC_PATH.joinpath("autogenerated", f"{self.name}.cwl")
self.path.joinpath(self.name, f"{self.name}.cwl").symlink_to(compiled_cwl_path)
self._create_symlinks(compiled_cwl_path)
return compiled_cwl_path

def run(self, overwrite: bool = False) -> None:
"""Run compiled workflow."""
logger.info(f"Running {self.name}")
self.compile(run_local=True, overwrite=overwrite)


def configure_workflow( # pylint: disable=R0913
clt_list: list[StrPath],
config_list: list[StrPath],
name: str,
path: Union[str, Path] = Path.cwd(),
compile_workflow: bool = True,
run: bool = False,
overwrite: bool = False) -> Workflow:
"""Configure Workflow from list of CLT and configuration files.
This function takes a list of CLT (Command Line Tool) files and a list of configuration files
and creates a Workflow object. Each CLT file is paired with a corresponding configuration file
based on their order in the lists. The Workflow object is then returned.
Args:
clt_list (list[StrPath]): A list of paths to CLT files.
config_list (list[StrPath]): A list of paths to configuration files.
name (str): The name of the Workflow.
path (Optional[StrPath]): The path where the Workflow will be created. Defaults to `pathlib.Path.cwd()`.
compile_workflow (bool): Flag indicating whether to compile the Workflow using WIC. Defaults to True.
run (bool): Flag indicating whether to run the Workflow after configuration using WIC. Defaults to False.
overwrite (bool): Flag indicating whether to overwrite existing CLT files in cwl_adapters. Defaults to False.
Returns:
Workflow: The configured Workflow object.
"""
steps = [configure_step(clt, config) for clt, config in zip(clt_list, config_list)]
path_ = Path(path)
# mypy incorrectly marks this init
# of Workflow (pydantic dataclass) as invalid
# having 'too many arguments'
workflow = Workflow(steps, name, path_) # type: ignore
if run:
workflow.run(overwrite=overwrite)
elif compile_workflow:
workflow.compile(overwrite=overwrite)
return workflow

0 comments on commit b919687

Please sign in to comment.