diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b0a09d7 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,59 @@ +# syntax=docker/dockerfile:1 + +# `python-base` sets up all our shared environment variables +FROM python:3.10-slim as base + +# Setup Environment Variables +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + PIP_NO_CACHE_DIR=true \ + PIP_DISABLE_PIP_VERSION_CHECK=on \ + PIP_DEFAULT_TIMEOUT=100 \ + POETRY_VERSION=1.2.2 \ + POETRY_HOME="/opt/poetry" \ + POETRY_VIRTUALENVS_IN_PROJECT=true \ + POETRY_NO_INTERACTION=1 \ + PYSETUP_PATH="/opt/pysetup" \ + VENV_PATH="/opt/pysetup/.venv" \ + DEBIAN_FRONTEND=noninteractive \ + OPENMP_ENABLED=1 + +# prepend poetry and venv to path +ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH" + +# Setup Shell for the Docker Image +SHELL ["/bin/bash", "-o", "pipefail", "-c"] + +# Install system dependencies +RUN set -ex \ + && apt-get update -yqq \ + && apt-get install --no-install-recommends -yqq \ + curl \ + ssh \ + gnupg \ + lsb-release \ + # Cleanup + && apt-get autoremove -yqq --purge \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* \ + && rm -rf /tmp/* \ + && rm -rf /var/tmp/* \ + && rm -rf /usr/share/man \ + && rm -rf /usr/share/doc \ + && rm -rf /usr/share/doc-base \ + # Setup SSH + && mkdir -p ~/.ssh \ + && touch ~/.ssh/known_hosts \ + && chmod 0600 ~/.ssh/known_hosts ~/.ssh \ + && ssh-keyscan github.com >> ~/.ssh/known_hosts \ + # Install Poetry + && pip install "poetry==$POETRY_VERSION" + +CMD ["/bin/bash"] + +FROM base as production_pipelines +COPY . $PYSETUP_PATH +WORKDIR $PYSETUP_PATH +RUN poetry install --without=dev +# Run the project +CMD ["workflow", "workspace", "set", "development"] diff --git a/docs/cli.md b/docs/cli.md index ab0793a..4fe0d33 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -9,3 +9,185 @@ workflow run [PIPELINE] !!! Note If the `work.function` is a Click CLI command, then `workflow run` will inherit the CLI default arguments and then merge them with the `work.parameters` specified in the work object. This allows for a single CLI to be used for both interactive and non-interactive workflows. + +# Pipelines CLI + +## Overview + +This CLI tool provides a command-line interface for interacting with the Pipelines server, enabling users to manage workflow pipelines efficiently. It supports various operations, such as deploying, listing, counting, and managing the lifecycle of pipeline configurations and schedules. + +## Usage + +The CLI tool offers the following commands for interacting with the workflow pipelines: + +### Get server info + +Get the current version of the pipelines server and info about configuration. + +#### Command: `workflow pipelines version` + +Example output: + +```json +{ + 'client': { + 'baseurl': 'http://localhost:8001', + 'timeout': 15.0, + 'token': None + }, + 'server': { + 'version': '2.6.1' + } +} +``` + +### List pipelines + +List all pipelines or scheduled pipelines. + +#### Command: `workflow pipelines ls [OPTIONS]` +#### Options: +* `--schedule: For interacting with the Schedule API.` + +Example output: + +```shell + Workflow Pipelines +┏━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━━┓ +┃ ID ┃ Name ┃ Status ┃ Stage ┃ +┡━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━━┩ +│ 65fc4dae5ffde0e8dbeebc61 │ example │ created │ 1 │ +│ 65fc4eda5ffde0e8dbeebc65 │ example │ created │ 1 │ +│ 65fc50065ffde0e8dbeebc69 │ example │ created │ 1 │ +└──────────────────────────┴─────────┴─────────┴───────┘ +``` + +### Count pipelines + +Count pipelines configurations per collection. + +#### Command: `workflow pipelines count [OPTIONS]` +#### Options: +* `--schedule: For interacting with the Schedule API.` + +Example output: + +```shell + Workflow Pipelines +┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━┓ +┃ Name ┃ Count ┃ +┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━┩ +│ example │ 3 │ +├───────────────────────────┼────────────────────┤ +│ Total │ 3 │ +└───────────────────────────┴────────────────────┘ +``` + +### Deploy pipeline configurations + +Deploy a workflow pipeline or schedule from a YAML file. + +#### Command: `workflow pipelines deploy [OPTIONS]` +#### Options: +* `--schedule: For interacting with the Schedule API.` + +Example output: + +```shell + Workflow Pipelines +┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +┃ IDs ┃ +┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ +│ 65fc7450157ed72595b91a2c │ +└────────────────────────────────────────────────┘ +``` + +### Pipeline details + +Get the whole payload for a pipeline configuration or schedule. + +#### Command: `workflow pipelines ps [OPTIONS]` +#### Options: +* `--schedule: For interacting with the Schedule API.` + +Example output: + +```shell + Workflow Pipelines +┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +┃ Pipeline: example ┃ +┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ +│ { │ +│ "id": "65fc69cf5ffde0e8dbeebcc1", │ +│ "name": "example", │ +│ "current_stage": 1, │ +│ "status": "created", │ +│ "version": "1", │ +│ "creation": 1711040975.1336632, │ +│ "start": null, │ +│ "stop": null, │ +│ "pipeline": { │ +│ "runs_on": null, │ +│ "services": [], │ +│ "steps": [ │ +│ { │ +│ "name": "daily_monitoring_task", │ +│ "work_id": null, │ +│ "runs_on": null, │ +│ "services": [], │ +│ "replicate_deployments": false, │ +│ "work": { │ +│ "user": "test", │ +│ "site": "local", │ +│ "function": "guidelines.example.alpha", │ +│ "parameters": { │ +│ "mu0": "${{ matrix.mu0 }}", │ +│ "alpha": "${{ matrix.alpha }}", │ +│ "sigma0": 22.0 │ +│ }, │ +│ "pipeline": "daily-monitoring-task" │ +│ }, │ +│ "status": "created", │ +│ "stage": 1, │ +│ "if_condition": "", │ +│ "reason": null, │ +│ "matrix": null, │ +│ "evaluate_on_runtime": false, │ +│ "success_threshold": 1.0 │ +│ } │ +│ ] │ +│ }, │ +│ "deployments": null, │ +│ "user": "test" │ +│ } │ +└───────────────────────────────────────────────────┘ +``` + +### Stop a pipeline management + +Kill a running pipeline. + +#### Command: `workflow pipelines stop [OPTIONS]` +#### Options: +* `--schedule: For interacting with the Schedule API.` + +Example output: + +```shell + Workflow Pipelines +┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +┃ Stopped IDs ┃ +┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ +│ 65fc69cf5ffde0e8dbeebcc1 │ +└────────────────────────────────────────────────┘ +``` + +### Remove pipeline + +Removes a pipeline, you can only use this command on pipelines with `status="cancelled"` + +#### Command: `workflow pipelines rm [OPTIONS]` +#### Options: +* `--schedule: For interacting with the Schedule API.` + +Example output: diff --git a/workflow/cli/pipelines.py b/workflow/cli/pipelines.py index ef8dfc3..f40c66b 100644 --- a/workflow/cli/pipelines.py +++ b/workflow/cli/pipelines.py @@ -23,17 +23,20 @@ show_header=True, header_style="magenta", title_style="bold magenta", + min_width=50, ) BASE_URL = "https://frb.chimenet.ca/pipelines" STATUS = ["created", "queued", "running", "success", "failure", "cancelled"] status_colors = { + "active": "bright_blue", "running": "blue", "created": "lightblue", "queued": "yellow", "success": "green", "failure": "red", - "cancelled": "orange", + "cancelled": "dark_goldenrod", + "expired": "dark_goldenrod", } @@ -51,28 +54,72 @@ def version(): @pipelines.command("ls", help="List pipelines.") -@click.option("name", "--name", "-n", type=str, required=False) -def ls(name: Optional[str] = None): +@click.option( + "name", + "--name", + "-n", + type=str, + required=False, + help="List only Pipelines with provided name.", +) +@click.option( + "--schedule", "-sch", is_flag=True, help="For interacting with the Schedule API." +) +def ls(name: Optional[str] = None, schedule: bool = False): """List all pipelines.""" http = HTTPContext() - pipeline_configs = http.pipelines.list_pipeline_configs(name) - table.add_column("ID", max_width=50, justify="left", style="blue") - table.add_column("Name", max_width=50, justify="left", style="bright_green") - table.add_column("Status", max_width=50, justify="left") - table.add_column("Stage", max_width=50, justify="left") - for config in pipeline_configs: - status = Text(config["status"], style=status_colors[config["status"]]) - table.add_row( - config["id"], config["name"], status, str(config["current_stage"]) - ) + objects = ( + http.pipelines.list_pipeline_configs(name) + if not schedule + else http.pipelines.list_schedules(name) + ) + if schedule: + table.title = "Workflow Scheduled Pipelines" + table.add_column("ID", max_width=50, justify="left", style="blue") + table.add_column("Name", max_width=50, justify="left", style="bright_green") + table.add_column("Status", max_width=50, justify="left") + table.add_column("Lives", max_width=50, justify="left") + table.add_column("Has Spawned", max_width=50, justify="left") + table.add_column("Next Time", max_width=50, justify="left") + for schedule_obj in objects: + status = Text( + schedule_obj["status"], style=status_colors[schedule_obj["status"]] + ) + lives = schedule_obj["lives"] + lives_text = Text(str(lives) if lives > -1 else "\u221e") + table.add_row( + schedule_obj["id"], + schedule_obj["pipeline_config"]["name"], + status, + lives_text, + str(schedule_obj["has_spawned"]), + str(schedule_obj["next_time"]), + ) + else: + table.add_column("ID", max_width=50, justify="left", style="blue") + table.add_column("Name", max_width=50, justify="left", style="bright_green") + table.add_column("Status", max_width=50, justify="left") + table.add_column("Stage", max_width=50, justify="left") + for config in objects: + status = Text(config["status"], style=status_colors[config["status"]]) + table.add_row( + config["id"], config["name"], status, str(config["current_stage"]) + ) console.print(table) @pipelines.command("count", help="Count pipeline configurations per collection.") -def count(): +@click.option( + "--schedule", "-sch", is_flag=True, help="For interacting with the Schedule API." +) +def count(schedule: bool): """Count pipeline configurations.""" http = HTTPContext() - counts = http.pipelines.count() + counts = ( + http.pipelines.count() if not schedule else http.pipelines.count_schedules() + ) + if schedule: + table.title = "Workflow Schedule Pipelines" table.add_column("Name", max_width=50, justify="left", style="blue") table.add_column("Count", max_width=50, justify="left") total = int() @@ -85,38 +132,74 @@ def count(): @pipelines.command("deploy", help="Deploy a workflow pipeline.") +@click.option( + "--schedule", "-sch", is_flag=True, help="For interacting with the Schedule API." +) @click.argument( "filename", type=click.Path(exists=True, dir_okay=False, readable=True), required=True, ) -def deploy(filename: click.Path): +def deploy(filename: click.Path, schedule: bool): """Deploy a workflow pipeline.""" http = HTTPContext() filepath: str = str(filename) data: Dict[str, Any] = {} with open(filepath) as reader: data = yaml.load(reader, Loader=SafeLoader) # type: ignore - deploy_result = http.pipelines.deploy(data) - table.add_column("IDs") - for _id in deploy_result: - table.add_row(_id) + if schedule and "schedule" not in data.keys(): + error_text = Text( + "Your configuration file needs a schedule when using the --schedule option", + style="red", + ) + console.print(error_text) + return + try: + deploy_result = http.pipelines.deploy(data, schedule) + print(deploy_result) + except requests.HTTPError as deploy_error: + console.print(deploy_error.response.json()["message"]) + return + table.add_column("IDs", max_width=50, justify="left", style="bright_green") + if isinstance(deploy_result, list): + for _id in deploy_result: + table.add_row(_id) + if isinstance(deploy_result, dict): + for v in deploy_result.values(): + table.add_row(v) console.print(table) @pipelines.command("ps", help="Get pipeline details.") @click.argument("pipeline", type=str, required=True) @click.argument("id", type=str, required=True) -@click.option("--quiet", "-q", is_flag=True, default=False, help="Only display IDs.") -def ps(pipeline: str, id: str): +@click.option( + "--schedule", "-sch", is_flag=True, help="For interacting with the Schedule API." +) +def ps(pipeline: str, id: str, schedule: bool): """List a pipeline configuration in detail.""" http = HTTPContext() query: Dict[str, Any] = {"id": id} - payload = http.pipelines.get_pipeline_config(collection=pipeline, query=query) - table.add_column(f"Pipeline: {pipeline}", max_width=120, justify="left") - text = JSON(json.dumps(payload), indent=2) - table.add_row(text) - console.print(table) + console_content = None + try: + payload = http.pipelines.get_pipeline_config(pipeline, query, schedule) + except IndexError: + error_text = Text( + f"No {'Schedule' if schedule else 'PipelineConfig'} were found", style="red" + ) + console_content = error_text + else: + if not schedule: + table.add_column(f"Pipeline: {pipeline}", max_width=120, justify="left") + else: + table.add_column( + f"Scheduled Pipeline: {pipeline}", max_width=120, justify="left" + ) + text = JSON(json.dumps(payload), indent=2) + table.add_row(text) + console_content = table + finally: + console.print(console_content) @pipelines.command("stop", help="Kill a running pipeline.") @@ -130,7 +213,7 @@ def stop(pipeline: str, id: Tuple[str]): text = Text("No pipeline configurations were stopped.", style="red") console.print(text) return - table.add_column("Deleted IDs", max_width=50, justify="left") + table.add_column("Stopped IDs", max_width=50, justify="left") for config in stop_result: table.add_row(config["id"]) console.print(table) @@ -139,18 +222,28 @@ def stop(pipeline: str, id: Tuple[str]): @pipelines.command("rm", help="Remove a pipeline.") @click.argument("pipeline", type=str, required=True) @click.argument("id", type=str, required=True) -def rm(pipeline: str, id: Tuple[str]): +@click.option( + "--schedule", "-sch", is_flag=True, help="For interacting with the Schedule API." +) +def rm(pipeline: str, id: Tuple[str], schedule: bool): """Remove a pipeline.""" http = HTTPContext() - delete_result = http.pipelines.remove(pipeline, id) - if not any(delete_result): - text = Text("No pipeline configurations were deleted.", style="red") - console.print(text) - return - table.add_column("Deleted IDs", max_width=50, justify="left") - for config in delete_result: - table.add_row(config["id"]) - console.print(table) + content = None + try: + delete_result = http.pipelines.remove(pipeline, id, schedule) + if delete_result.status_code == 204: + text = Text("No pipeline configurations were deleted.", style="red") + content = text + except Exception as e: + text = Text( + f"No pipeline configurations were deleted.\nError: {e}", style="red" + ) + content = text + else: + table.add_column("Deleted IDs", max_width=50, justify="left", style="red") + table.add_row(id) + content = table + console.print(content) def status( diff --git a/workflow/http/pipelines.py b/workflow/http/pipelines.py index 31e9293..39508c0 100644 --- a/workflow/http/pipelines.py +++ b/workflow/http/pipelines.py @@ -6,7 +6,7 @@ from requests.models import Response from tenacity import retry -from tenacity.stop import stop_after_delay +from tenacity.stop import stop_after_attempt, stop_after_delay from tenacity.wait import wait_random from workflow.http.client import Client @@ -23,15 +23,21 @@ class Pipelines(Client): Pipelines: A client for interacting with the Pipelines backend. """ - @retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30))) + @retry( + reraise=True, + wait=wait_random(min=1.5, max=3.5), + stop=(stop_after_delay(5) | stop_after_attempt(1)), + ) @try_request - def deploy(self, data: Dict[str, Any]): + def deploy(self, data: Dict[str, Any], schedule: bool = False): """Deploys a PipelineConfig from payload data. Parameters ---------- data : Dict[str, Any] YAML data. + schedule : bool + If this function should interact with the Schedule endpoint. Returns ------- @@ -39,7 +45,11 @@ def deploy(self, data: Dict[str, Any]): IDs of PipelineConfig objects generated. """ with self.session as session: - url = f"{self.baseurl}/v1/pipelines" + url = ( + f"{self.baseurl}/v1/pipelines" + if not schedule + else f"{self.baseurl}/v1/schedule" + ) response: Response = session.post(url, json=data) response.raise_for_status() return response.json() @@ -86,7 +96,7 @@ def list_pipeline_configs( @try_request def get_pipeline_config( - self, collection: str, query: Dict[str, Any] + self, collection: str, query: Dict[str, Any], schedule: bool = False ) -> Dict[str, Any]: """Gets details for one pipeline configuration. @@ -96,6 +106,8 @@ def get_pipeline_config( PipelineConfig name. query : Dict[str, Any] Dictionary with search parameters. + schedule : bool + If this function should interact with the Schedule endpoint. Returns ------- @@ -104,14 +116,22 @@ def get_pipeline_config( """ with self.session as session: params = {"query": dumps(query), "name": collection} - url = f"{self.baseurl}/v1/pipelines?{urlencode(params)}" + url = ( + f"{self.baseurl}/v1/pipelines?{urlencode(params)}" + if not schedule + else f"{self.baseurl}/v1/schedule?{urlencode(params)}" + ) response: Response = session.get(url=url) response.raise_for_status() return response.json()[0] - @retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30))) + @retry( + reraise=True, + wait=wait_random(min=1.5, max=3.5), + stop=(stop_after_delay(5) | stop_after_attempt(1)), + ) @try_request - def remove(self, pipeline: str, id: str) -> List[Dict[str, Any]]: + def remove(self, pipeline: str, id: str, schedule: bool) -> Response: """Removes a cancelled pipeline configuration. Parameters @@ -120,6 +140,8 @@ def remove(self, pipeline: str, id: str) -> List[Dict[str, Any]]: PipelineConfig name. id : str PipelineConfig ID. + schedule : bool + If this function should interact with the Schedule endpoint. Returns ------- @@ -129,10 +151,14 @@ def remove(self, pipeline: str, id: str) -> List[Dict[str, Any]]: with self.session as session: query = {"id": id} params = {"query": dumps(query), "name": pipeline} - url = f"{self.baseurl}/v1/pipelines?{urlencode(params)}" + url = ( + f"{self.baseurl}/v1/pipelines?{urlencode(params)}" + if not schedule + else f"{self.baseurl}/v1/schedule?{urlencode(params)}" + ) response: Response = session.delete(url=url) response.raise_for_status() - return response.json() + return response @retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30))) @try_request @@ -176,3 +202,64 @@ def info(self) -> Dict[str, Any]: response.raise_for_status() server_info = response.json() return {"client": client_info, "server": server_info} + + @try_request + def list_schedules(self, schedule_name: str) -> List[Dict[str, Any]]: + """Gets the list of all schedules. + + Parameters + ---------- + schedule_name : str + Schedule name. + + Returns + ------- + List[Dict[str, Any]] + List of schedule payloads. + """ + with self.session as session: + query = dumps({"pipeline_config.name": schedule_name}) + projection = dumps( + { + "id": True, + "status": True, + "lives": True, + "has_spawned": True, + "next_time": True, + "crontab": True, + "pipeline_config.name": True, + } + ) + url = ( + f"{self.baseurl}/v1/schedule?projection={projection}" + if schedule_name is None + else f"{self.baseurl}/v1/schedule?query={query}&projection={projection}" + ) + response: Response = session.get(url=url) + response.raise_for_status() + return response.json() + + @try_request + def count_schedules(self, schedule_name: Optional[str] = None) -> Dict[str, Any]: + """Count schedules per pipeline name. + + Parameters + ---------- + schedule_name : Optional[str], optional + Schedule name, by default None + + Returns + ------- + Dict[str, Any] + Count payload. + """ + with self.session as session: + query = dumps({"name": schedule_name}) + url = ( + f"{self.baseurl}/v1/schedule/count" + if not schedule_name + else f"{self.baseurl}/v1/schedule/count?query={query}" + ) + response: Response = session.get(url=url) + response.raise_for_status() + return response.json() diff --git a/workflow/workspaces/development.yml b/workflow/workspaces/development.yml index 5e9585c..d95aec3 100644 --- a/workflow/workspaces/development.yml +++ b/workflow/workspaces/development.yml @@ -17,12 +17,12 @@ archive: http: baseurls: - buckets: http://localhost:8001 - results: http://localhost:8002 - pipelines: http://localhost:8003 - products: http://localhost:8004 - minio: http://localhost:8005 - loki: http://localhost:8005/loki/api/v1/push + pipelines: http://localhost:8001 + buckets: http://localhost:8004 + results: http://localhost:8005 + # products: http://localhost:8004 + # minio: http://localhost:8005 + # loki: http://localhost:8005/loki/api/v1/push config: archive: