diff --git a/.github/workflows/connector_integration_test_single_dagger.yml b/.github/workflows/connector_integration_test_single_dagger.yml index 482578d53da5..2bbe0d6d3f64 100644 --- a/.github/workflows/connector_integration_test_single_dagger.yml +++ b/.github/workflows/connector_integration_test_single_dagger.yml @@ -51,7 +51,7 @@ jobs: mkdir -p "$DAGGER_TMP_BINDIR" curl "https://dl.dagger.io/dagger/main/${DAGGER_CLI_COMMIT}/dagger_${DAGGER_CLI_COMMIT}_$(uname -s | tr A-Z a-z)_$(uname -m | sed s/x86_64/amd64/).tar.gz" | tar xvz -C "$DAGGER_TMP_BINDIR" fi - airbyte-ci --is-ci --gha-workflow-run-id=${{ github.run_id }} connectors test ${{ github.event.inputs.test-connectors-options }} + airbyte-ci --is-ci --gha-workflow-run-id=${{ github.run_id }} connectors ${{ github.event.inputs.test-connectors-options }} test env: _EXPERIMENTAL_DAGGER_CLOUD_TOKEN: "p.eyJ1IjogIjFiZjEwMmRjLWYyZmQtNDVhNi1iNzM1LTgxNzI1NGFkZDU2ZiIsICJpZCI6ICJlNjk3YzZiYy0yMDhiLTRlMTktODBjZC0yNjIyNGI3ZDBjMDEifQ.hT6eMOYt3KZgNoVGNYI3_v4CC-s19z8uQsBkGrBhU3k" GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }} @@ -75,7 +75,7 @@ jobs: mkdir -p "$DAGGER_TMP_BINDIR" curl "https://dl.dagger.io/dagger/main/${DAGGER_CLI_COMMIT}/dagger_${DAGGER_CLI_COMMIT}_$(uname -s | tr A-Z a-z)_$(uname -m | sed s/x86_64/amd64/).tar.gz" | tar xvz -C "$DAGGER_TMP_BINDIR" fi - airbyte-ci --is-ci --gha-workflow-run-id=${{ github.run_id }} connectors test --modified + airbyte-ci --is-ci --gha-workflow-run-id=${{ github.run_id }} connectors --modified test env: _EXPERIMENTAL_DAGGER_CLOUD_TOKEN: "p.eyJ1IjogIjFiZjEwMmRjLWYyZmQtNDVhNi1iNzM1LTgxNzI1NGFkZDU2ZiIsICJpZCI6ICJlNjk3YzZiYy0yMDhiLTRlMTktODBjZC0yNjIyNGI3ZDBjMDEifQ.hT6eMOYt3KZgNoVGNYI3_v4CC-s19z8uQsBkGrBhU3k" GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }} diff --git a/.github/workflows/connector_nightly_builds_dagger.yml b/.github/workflows/connector_nightly_builds_dagger.yml index c49ff4604a9d..df2914c67653 100644 --- a/.github/workflows/connector_nightly_builds_dagger.yml +++ b/.github/workflows/connector_nightly_builds_dagger.yml @@ -56,8 +56,7 @@ jobs: mkdir -p "$DAGGER_TMP_BINDIR" curl "https://dl.dagger.io/dagger/main/${DAGGER_CLI_COMMIT}/dagger_${DAGGER_CLI_COMMIT}_$(uname -s | tr A-Z a-z)_$(uname -m | sed s/x86_64/amd64/).tar.gz" | tar xvz -C "$DAGGER_TMP_BINDIR" fi - airbyte-ci --is-ci --gha-workflow-run-id=${{ github.run_id }} connectors test ${{ inputs.test-connectors-options || '--concurrency=5 --release-stage=generally_available --release-stage=beta' }} - env: + airbyte-ci --is-ci --gha-workflow-run-id=${{ github.run_id }} connectors ${{ inputs.test-connectors-options || '--concurrency=20 --release-stage=generally_available --release-stage=beta' }} test env: _EXPERIMENTAL_DAGGER_CLOUD_TOKEN: "p.eyJ1IjogIjFiZjEwMmRjLWYyZmQtNDVhNi1iNzM1LTgxNzI1NGFkZDU2ZiIsICJpZCI6ICJlNjk3YzZiYy0yMDhiLTRlMTktODBjZC0yNjIyNGI3ZDBjMDEifQ.hT6eMOYt3KZgNoVGNYI3_v4CC-s19z8uQsBkGrBhU3k" GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }} AWS_ACCESS_KEY_ID: ${{ secrets.STATUS_API_AWS_ACCESS_KEY_ID }} diff --git a/tools/ci_connector_ops/.gitignore b/tools/ci_connector_ops/.gitignore index be583283ae5a..a93f5bbc51ac 100644 --- a/tools/ci_connector_ops/.gitignore +++ b/tools/ci_connector_ops/.gitignore @@ -1 +1 @@ -test_reports +pipeline_reports \ No newline at end of file diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/README.md b/tools/ci_connector_ops/ci_connector_ops/pipelines/README.md index eca36d65d39f..5bc23bb63934 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/README.md +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/README.md @@ -39,6 +39,8 @@ If you don't want to use the remote secrets please call airbyte-ci connectors-ci airbyte-ci connectors-ci --use-remote-secrets=False ``` + + ### Environment variables required for CI run: - `GCP_GSM_CREDENTIALS`: the credentials to connect to GSM @@ -52,26 +54,26 @@ airbyte-ci connectors-ci --use-remote-secrets=False (source-pokeapi does not require GSM access) ```bash -airbyte-ci connectors test --name=source-pokeapi +airbyte-ci connectors --name=source-pokeapi test ``` ### **Run the pipeline for multiple connectors** ```bash -airbyte-ci connectors test --name=source-pokeapi --name=source-openweather +airbyte-ci connectors --name=source-pokeapi --name=source-openweather test ``` ### **Run the pipeline for generally available connectors** ```bash -airbyte-ci connectors test --release-stage=generally_available +airbyte-ci connectors --release-stage=generally_available test ``` ### **Run the pipeline for the connectors you changed on the branch** ```bash touch airbyte-integrations/connectors/source-pokeapi/random_file_addition.txt -airbyte-ci connectors test --modified #the source-pokeapi pipeline should run +airbyte-ci connectors --modified test #the source-pokeapi pipeline should run ``` ### Local VS. CI @@ -88,7 +90,7 @@ The main differences are that: - The pipeline will pull the branch under test from Airbyte's GitHub repo - The pipeline will upload per connector test reports to S3 -## What does a connector pipeline run +### What does a connector test pipeline run ```mermaid flowchart TB @@ -113,12 +115,35 @@ This is the DAG we expect for every connector for which the pipeline is triggere The Airbyte git repo will be the local one if you use `--is-local=True` command line option. The connector secrets won't be downloaded nor uploaded if you use the `--use-remote-secrets=False` command line option. +## Running the Connector build pipeline +You can build connector images that will be available on your docker host. +Both linux/arm64 and linux/amd64 will be built but only the image corresponding to your architecture will be loaded to your host. + +```bash +airbyte-ci connectors --name=source-postgres build +``` + +**You can build multiple connectors in a single command:** -### Performance benchmarks +Build all the modified connectors on your branch (given you committed the changes): +```bash +airbyte-ci connectors --modified +``` -| Connector | Run integration test GHA duration | Dagger POC duration (CI no cache) | -| -------------- | ---------------------------------------------------------------------- | ---------------------------------------------------------------------- | -| source-pokeapi | [7mn22s](https://github.com/airbytehq/airbyte/actions/runs/4395453220) | [5mn26s](https://github.com/airbytehq/airbyte/actions/runs/4403595746) | +Build only source-postgres and source-pokeapi: +```bash +airbyte-ci connectors --name=source-postgres --name=source-pokeapi build +``` + +Build all GA connectors: +```bash +airbyte-ci connectors --release-stage=generally_available build +``` + +Build all GA java connectors: +```bash +airbyte-ci connectors --release-stage=generally_available --language=java build +``` ## Running the Metadata pipelines The new metadata service also uses dagger for its reproducible CI pipeline. diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py index 20d97acb4f1c..8e0077555996 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py @@ -13,7 +13,7 @@ from dagger import CacheSharingMode, CacheVolume, Container, Directory, File, Secret if TYPE_CHECKING: - from ci_connector_ops.pipelines.contexts import ConnectorTestContext, PipelineContext + from ci_connector_ops.pipelines.contexts import ConnectorContext, PipelineContext PYPROJECT_TOML_FILE_PATH = "pyproject.toml" @@ -150,11 +150,11 @@ async def with_installed_python_package( return container -def with_airbyte_connector(context: ConnectorTestContext) -> Container: +def with_airbyte_connector(context: ConnectorContext) -> Container: """Load an airbyte connector source code in a testing environment. Args: - context (ConnectorTestContext): The current test context, providing the repository directory from which the connector sources will be pulled. + context (ConnectorContext): The current test context, providing the repository directory from which the connector sources will be pulled. Returns: Container: A python environment container (with the connector source code). """ @@ -163,11 +163,11 @@ def with_airbyte_connector(context: ConnectorTestContext) -> Container: return with_python_package(context, testing_environment, connector_source_path, exclude=["secrets"]) -async def with_installed_airbyte_connector(context: ConnectorTestContext) -> Container: +async def with_installed_airbyte_connector(context: ConnectorContext) -> Container: """Install an airbyte connector python package in a testing environment. Args: - context (ConnectorTestContext): The current test context, providing the repository directory from which the connector sources will be pulled. + context (ConnectorContext): The current test context, providing the repository directory from which the connector sources will be pulled. Returns: Container: A python environment container (with the connector installed). """ @@ -249,12 +249,12 @@ async def with_ci_connector_ops(context: PipelineContext) -> Container: def with_dockerd_service( - context: ConnectorTestContext, shared_volume: Optional(Tuple[str, CacheVolume]) = None, docker_service_name: Optional[str] = None + context: ConnectorContext, shared_volume: Optional(Tuple[str, CacheVolume]) = None, docker_service_name: Optional[str] = None ) -> Container: """Create a container running dockerd, exposing its 2375 port, can be used as the docker host for docker-in-docker use cases. Args: - context (ConnectorTestContext): The current connector test context. + context (ConnectorContext): The current connector context. shared_volume (Optional, optional): A tuple in the form of (mounted path, cache volume) that will be mounted to the dockerd container. Defaults to None. docker_service_name (Optional[str], optional): The name of the docker service, appended to volume name, useful context isolation. Defaults to None. @@ -281,7 +281,7 @@ def with_dockerd_service( def with_bound_docker_host( - context: ConnectorTestContext, + context: ConnectorContext, container: Container, shared_volume: Optional(Tuple[str, CacheVolume]) = None, docker_service_name: Optional[str] = None, @@ -289,7 +289,7 @@ def with_bound_docker_host( """Bind a container to a docker host. It will use the dockerd service as a docker host. Args: - context (ConnectorTestContext): The current connector test context. + context (ConnectorContext): The current connector context. container (Container): The container to bind to the docker host. shared_volume (Optional, optional): A tuple in the form of (mounted path, cache volume) that will be both mounted to the container and the dockerd container. Defaults to None. docker_service_name (Optional[str], optional): The name of the docker service, useful context isolation. Defaults to None. @@ -308,12 +308,12 @@ def with_bound_docker_host( def with_docker_cli( - context: ConnectorTestContext, shared_volume: Optional(Tuple[str, CacheVolume]) = None, docker_service_name: Optional[str] = None + context: ConnectorContext, shared_volume: Optional(Tuple[str, CacheVolume]) = None, docker_service_name: Optional[str] = None ) -> Container: """Create a container with the docker CLI installed and bound to a persistent docker host. Args: - context (ConnectorTestContext): The current connector test context. + context (ConnectorContext): The current connector context. shared_volume (Optional, optional): A tuple in the form of (mounted path, cache volume) that will be both mounted to the container and the dockerd container. Defaults to None. docker_service_name (Optional[str], optional): The name of the docker service, useful context isolation. Defaults to None. @@ -324,11 +324,11 @@ def with_docker_cli( return with_bound_docker_host(context, docker_cli, shared_volume, docker_service_name) -async def with_connector_acceptance_test(context: ConnectorTestContext, connector_under_test_image_tar: File) -> Container: +async def with_connector_acceptance_test(context: ConnectorContext, connector_under_test_image_tar: File) -> Container: """Create a container to run connector acceptance tests, bound to a persistent docker host. Args: - context (ConnectorTestContext): The current connector test context. + context (ConnectorContext): The current connector context. connector_under_test_image_tar (File): The file containing the tar archive the image of the connector under test. Returns: Container: A container with connector acceptance tests installed. @@ -355,7 +355,7 @@ async def with_connector_acceptance_test(context: ConnectorTestContext, connecto def with_gradle( - context: ConnectorTestContext, + context: ConnectorContext, sources_to_include: List[str] = None, bind_to_docker_host: bool = True, docker_service_name: Optional[str] = "gradle", @@ -363,7 +363,7 @@ def with_gradle( """Create a container with Gradle installed and bound to a persistent docker host. Args: - context (ConnectorTestContext): The current connector test context. + context (ConnectorContext): The current connector context. sources_to_include (List[str], optional): List of additional source path to mount to the container. Defaults to None. bind_to_docker_host (bool): Whether to bind the gradle container to a docker host. docker_service_name (Optional[str], optional): The name of the docker service, useful context isolation. Defaults to "gradle". @@ -419,13 +419,11 @@ def with_gradle( return openjdk_with_docker -async def load_image_to_docker_host( - context: ConnectorTestContext, tar_file: File, image_tag: str, docker_service_name: Optional[str] = None -): +async def load_image_to_docker_host(context: ConnectorContext, tar_file: File, image_tag: str, docker_service_name: Optional[str] = None): """Load a docker image tar archive to the docker host. Args: - context (ConnectorTestContext): The current connector test context. + context (ConnectorContext): The current connector context. tar_file (File): The file object holding the docker image tar archive. image_tag (str): The tag to create on the image if it has no tag. docker_service_name (str): Name of the docker service, useful for context isolation. @@ -476,7 +474,7 @@ def with_poetry_module(context: PipelineContext, parent_dir: Directory, module_p ) -def with_integration_base(context: PipelineContext, jdk_version: str = "17.0.4") -> Container: +def with_integration_base(context: PipelineContext, build_platform: str, jdk_version: str = "17.0.4") -> Container: """Create an integration base container. Reproduce with Dagger the Dockerfile defined here: airbyte-integrations/bases/base/Dockerfile @@ -484,7 +482,7 @@ def with_integration_base(context: PipelineContext, jdk_version: str = "17.0.4") base_sh = context.get_repo_dir("airbyte-integrations/bases/base", include=["base.sh"]).file("base.sh") return ( - context.dagger_client.container() + context.dagger_client.container(platform=build_platform) .from_(f"amazoncorretto:{jdk_version}") .with_workdir("/airbyte") .with_file("base.sh", base_sh) @@ -494,7 +492,7 @@ def with_integration_base(context: PipelineContext, jdk_version: str = "17.0.4") ) -async def with_java_base(context: PipelineContext, jdk_version: str = "17.0.4") -> Container: +async def with_java_base(context: PipelineContext, build_platform: str, jdk_version: str = "17.0.4") -> Container: """Create a java base container. Reproduce with Dagger the Dockerfile defined here: airbyte-integrations/bases/base-java/Dockerfile_ @@ -505,7 +503,7 @@ async def with_java_base(context: PipelineContext, jdk_version: str = "17.0.4") java_base_version = await get_version_from_dockerfile(dockerfile) return ( - with_integration_base(context, jdk_version) + with_integration_base(context, build_platform, jdk_version) .with_exec(["yum", "install", "-y", "tar", "openssl"]) .with_exec(["yum", "clean", "all"]) .with_workdir("/airbyte") @@ -522,11 +520,11 @@ async def with_java_base(context: PipelineContext, jdk_version: str = "17.0.4") ) -async def with_airbyte_java_connector(context: ConnectorTestContext, connector_java_tar_file: File): +async def with_airbyte_java_connector(context: ConnectorContext, connector_java_tar_file: File, build_platform: str): dockerfile = context.get_connector_dir(include=["Dockerfile"]).file("Dockerfile") connector_version = await get_version_from_dockerfile(dockerfile) application = context.connector.technical_name - java_base = await with_java_base(context) + java_base = await with_java_base(context, build_platform) enable_sentry = await should_enable_sentry(dockerfile) return ( diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/secrets.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/secrets.py index 50d6b6fdf8e7..99196a7b1891 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/secrets.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/secrets.py @@ -13,14 +13,14 @@ from dagger import Directory if TYPE_CHECKING: - from ci_connector_ops.pipelines.contexts import ConnectorTestContext + from ci_connector_ops.pipelines.contexts import ConnectorContext -async def download(context: ConnectorTestContext, gcp_gsm_env_variable_name: str = "GCP_GSM_CREDENTIALS") -> Directory: +async def download(context: ConnectorContext, gcp_gsm_env_variable_name: str = "GCP_GSM_CREDENTIALS") -> Directory: """Use the ci-credentials tool to download the secrets stored for a specific connector to a Directory. Args: - context (ConnectorTestContext): The context providing a connector object. + context (ConnectorContext): The context providing a connector object. gcp_gsm_env_variable_name (str, optional): The name of the environment variable holding credentials to connect to Google Secret Manager. Defaults to "GCP_GSM_CREDENTIALS". Returns: @@ -40,11 +40,11 @@ async def download(context: ConnectorTestContext, gcp_gsm_env_variable_name: str ) -async def upload(context: ConnectorTestContext, gcp_gsm_env_variable_name: str = "GCP_GSM_CREDENTIALS") -> int: +async def upload(context: ConnectorContext, gcp_gsm_env_variable_name: str = "GCP_GSM_CREDENTIALS") -> int: """Use the ci-credentials tool to upload the secrets stored in the context's updated_secrets-dir. Args: - context (ConnectorTestContext): The context providing a connector object and the update secrets dir. + context (ConnectorContext): The context providing a connector object and the update secrets dir. gcp_gsm_env_variable_name (str, optional): The name of the environment variable holding credentials to connect to Google Secret Manager. Defaults to "GCP_GSM_CREDENTIALS". Returns: @@ -62,11 +62,11 @@ async def upload(context: ConnectorTestContext, gcp_gsm_env_variable_name: str = ) -async def get_connector_secret_dir(context: ConnectorTestContext) -> Directory: +async def get_connector_secret_dir(context: ConnectorContext) -> Directory: """Download the secrets from GSM or use the local secrets directory for a connector. Args: - context (ConnectorTestContext): The context providing the connector directory and the use_remote_secrets flag. + context (ConnectorContext): The context providing the connector directory and the use_remote_secrets flag. Returns: Directory: A directory with the downloaded connector secrets. diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/bases.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/bases.py index dd24b6730efb..5c43407d6f81 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/bases.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/bases.py @@ -11,13 +11,13 @@ from dataclasses import dataclass, field from datetime import datetime from enum import Enum -from typing import TYPE_CHECKING, ClassVar, List, Optional +from typing import TYPE_CHECKING, ClassVar, List, Optional, Tuple import asyncer from ci_connector_ops.pipelines.actions import environments -from ci_connector_ops.pipelines.utils import check_path_in_workdir, with_exit_code, with_stderr, with_stdout +from ci_connector_ops.pipelines.utils import check_path_in_workdir, slugify, with_exit_code, with_stderr, with_stdout from ci_connector_ops.utils import console -from dagger import Container, QueryError +from dagger import CacheVolume, Container, Directory, QueryError from rich.console import Group from rich.panel import Panel from rich.style import Style @@ -25,7 +25,7 @@ from rich.text import Text if TYPE_CHECKING: - from ci_connector_ops.pipelines.contexts import ConnectorTestContext, PipelineContext + from ci_connector_ops.pipelines.contexts import ConnectorContext, PipelineContext class StepStatus(Enum): @@ -76,9 +76,8 @@ class Step(ABC): title: ClassVar[str] started_at: ClassVar[datetime] - def __init__(self, context: ConnectorTestContext) -> None: # noqa D107 + def __init__(self, context: ConnectorContext) -> None: # noqa D107 self.context = context - self.host_image_export_dir_path = "." if self.context.is_ci else "/tmp" async def run(self, *args, **kwargs) -> StepResult: """Public method to run the step. It output a step result. @@ -203,12 +202,13 @@ def __repr__(self) -> str: # noqa D105 @dataclass(frozen=True) -class TestReport: - """A dataclass to build test reports to share pipelines executions results with the user.""" +class Report: + """A dataclass to build reports to share pipelines executions results with the user.""" pipeline_context: PipelineContext steps_results: List[StepResult] created_at: datetime = field(default_factory=datetime.utcnow) + name: str = "REPORT" @property def failed_steps(self) -> List[StepResult]: # noqa D102 @@ -258,7 +258,7 @@ def to_json(self) -> str: def print(self): """Print the test report to the console in a nice way.""" pipeline_name = self.pipeline_context.pipeline_name - main_panel_title = Text(f"{pipeline_name.upper()} - TEST RESULTS") + main_panel_title = Text(f"{pipeline_name.upper()} - {self.name}") main_panel_title.stylize(Style(color="blue", bold=True)) duration_subtitle = Text(f"⏲️ Total pipeline duration for {pipeline_name}: {round(self.run_duration)} seconds") step_results_table = Table(title="Steps results") @@ -295,7 +295,7 @@ def print(self): @dataclass(frozen=True) -class ConnectorTestReport(TestReport): +class ConnectorReport(Report): """A dataclass to build connector test reports to share pipelines executions results with the user.""" @property @@ -331,7 +331,7 @@ def to_json(self) -> str: def print(self): """Print the test report to the console in a nice way.""" connector_name = self.pipeline_context.connector.technical_name - main_panel_title = Text(f"{connector_name.upper()} - TEST RESULTS") + main_panel_title = Text(f"{connector_name.upper()} - {self.name}") main_panel_title.stylize(Style(color="blue", bold=True)) duration_subtitle = Text(f"⏲️ Total pipeline duration for {connector_name}: {round(self.run_duration)} seconds") step_results_table = Table(title="Steps results") @@ -360,3 +360,112 @@ def print(self): main_panel = Panel(Group(*to_render), title=main_panel_title, subtitle=duration_subtitle) console.print(main_panel) + + +class GradleTask(Step, ABC): + """ + A step to run a Gradle task. + + Attributes: + task_name (str): The Gradle task name to run. + title (str): The step title. + """ + + DEFAULT_TASKS_TO_EXCLUDE = ["airbyteDocker"] + BIND_TO_DOCKER_HOST = True + gradle_task_name: ClassVar + + # TODO more robust way to find all projects on which the task depends? + JAVA_BUILD_INCLUDE = [ + "airbyte-api", + "airbyte-commons-cli", + "airbyte-commons-protocol", + "airbyte-commons", + "airbyte-config", + "airbyte-connector-test-harnesses", + "airbyte-db", + "airbyte-integrations/bases", + "airbyte-json-validation", + "airbyte-protocol", + "airbyte-test-utils", + "airbyte-config-oss", + ] + + SOURCE_BUILD_INCLUDE = [ + "airbyte-integrations/connectors/source-jdbc", + "airbyte-integrations/connectors/source-relational-db", + ] + + DESTINATION_BUILD_INCLUDE = [ + "airbyte-integrations/bases/bases-destination-jdbc", + "airbyte-integrations/connectors/destination-gcs", + "airbyte-integrations/connectors/destination-azure-blob-storage", + ] + + # These are the lines we remove from the connector gradle file to ignore specific tasks / plugins. + LINES_TO_REMOVE_FROM_GRADLE_FILE = [ + # Do not build normalization with Gradle - we build normalization with Dagger in the BuildOrPullNormalization step. + "project(':airbyte-integrations:bases:base-normalization').airbyteDocker.output", + ] + + @property + def docker_service_name(self) -> str: + return slugify(f"gradle-{self.title}") + + @property + def connector_java_build_cache(self) -> CacheVolume: + return self.context.dagger_client.cache_volume("connector_java_build_cache") + + @property + def build_include(self) -> List[str]: + """Retrieve the list of source code directory required to run a Java connector Gradle task. + + The list is different according to the connector type. + + Returns: + List[str]: List of directories or files to be mounted to the container to run a Java connector Gradle task. + """ + if self.context.connector.connector_type == "source": + return self.JAVA_BUILD_INCLUDE + self.SOURCE_BUILD_INCLUDE + elif self.context.connector.connector_type == "destination": + return self.JAVA_BUILD_INCLUDE + self.DESTINATION_BUILD_INCLUDE + else: + raise ValueError(f"{self.context.connector.connector_type} is not supported") + + async def _get_patched_connector_dir(self) -> Directory: + """Patch the build.gradle file of the connector under test by removing the lines declared in LINES_TO_REMOVE_FROM_GRADLE_FILE. + + Returns: + Directory: The patched connector directory + """ + + gradle_file_content = await self.context.get_connector_dir(include=["build.gradle"]).file("build.gradle").contents() + patched_file_content = "" + for line in gradle_file_content.split("\n"): + if not any(line_to_remove in line for line_to_remove in self.LINES_TO_REMOVE_FROM_GRADLE_FILE): + patched_file_content += line + "\n" + return self.context.get_connector_dir().with_new_file("build.gradle", patched_file_content) + + def _get_gradle_command(self, extra_options: Tuple[str] = ("--no-daemon", "--scan")) -> List: + command = ( + ["./gradlew"] + + list(extra_options) + + [f":airbyte-integrations:connectors:{self.context.connector.technical_name}:{self.gradle_task_name}"] + ) + for task in self.DEFAULT_TASKS_TO_EXCLUDE: + command += ["-x", task] + return command + + async def _run(self) -> StepResult: + connector_under_test = ( + environments.with_gradle( + self.context, self.build_include, docker_service_name=self.docker_service_name, bind_to_docker_host=self.BIND_TO_DOCKER_HOST + ) + .with_mounted_directory(str(self.context.connector.code_directory), await self._get_patched_connector_dir()) + # Disable the Ryuk container because it needs privileged docker access that does not work: + .with_env_variable("TESTCONTAINERS_RYUK_DISABLED", "true") + .with_directory(f"{self.context.connector.code_directory}/secrets", self.context.secrets_dir) + .with_exec(self._get_gradle_command()) + ) + + return await self.get_step_result(connector_under_test) diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/__init__.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/__init__.py new file mode 100644 index 000000000000..34c396bcc4c3 --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/__init__.py @@ -0,0 +1,48 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +"""This module groups factory like functions to dispatch builds steps according to the connector language.""" + +import platform +from typing import Optional, Tuple + +from ci_connector_ops.pipelines.bases import StepResult +from ci_connector_ops.pipelines.builds import java_connectors, python_connectors +from ci_connector_ops.pipelines.contexts import ConnectorContext +from ci_connector_ops.utils import ConnectorLanguage +from dagger import Container, Platform + +BUILD_PLATFORMS = [Platform("linux/amd64"), Platform("linux/arm64")] +LOCAL_BUILD_PLATFORM = Platform(f"linux/{platform.machine()}") + + +class NoBuildStepForLanguageError(Exception): + pass + + +LANGUAGE_BUILD_CONNECTOR_MAPPING = { + ConnectorLanguage.PYTHON: python_connectors.BuildConnectorImage, + ConnectorLanguage.LOW_CODE: python_connectors.BuildConnectorImage, + ConnectorLanguage.JAVA: java_connectors.BuildConnectorImage, +} + + +async def run_connector_build(context: ConnectorContext) -> dict[str, Tuple[StepResult, Optional[Container]]]: + """Build a connector according to its language and return the build result and the built container. + + Args: + context (ConnectorContext): The current connector context. + + Returns: + dict[str, Tuple[StepResult, Optional[Container]]]: A dictionary with platform as key and a tuple of step result and built container as value. + """ + try: + BuildConnectorImage = LANGUAGE_BUILD_CONNECTOR_MAPPING[context.connector.language] + except KeyError: + raise NoBuildStepForLanguageError(f"No step to build a {context.connector.language} connector was found.") + + per_platform_containers = {} + for build_platform in BUILD_PLATFORMS: + per_platform_containers[build_platform] = await BuildConnectorImage(context, build_platform).run() + + return per_platform_containers diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/common.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/common.py new file mode 100644 index 000000000000..a6c070419d76 --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/common.py @@ -0,0 +1,45 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +from abc import ABC +from typing import Tuple + +import docker +from ci_connector_ops.pipelines.bases import Step, StepResult, StepStatus +from ci_connector_ops.pipelines.contexts import ConnectorContext +from ci_connector_ops.pipelines.utils import export_container_to_tarball +from dagger import Container, Platform + + +class BuildConnectorImageBase(Step, ABC): + @property + def title(self): + return f"Build {self.context.connector.technical_name} docker image for platform {self.build_platform}" + + def __init__(self, context: ConnectorContext, build_platform: Platform) -> None: + self.build_platform = build_platform + super().__init__(context) + + +class LoadContainerToLocalDockerHost(Step): + IMAGE_TAG = "dev" + + def __init__(self, context: ConnectorContext, container: Container) -> None: + super().__init__(context) + self.container = container + + @property + def title(self): + return f"Load {self.image_name}:{self.IMAGE_TAG} to the local docker host." + + @property + def image_name(self) -> Tuple: + return f"airbyte/{self.context.connector.technical_name}" + + async def _run(self) -> StepResult: + _, exported_tarball_path = await export_container_to_tarball(self.context, self.container) + client = docker.from_env() + with open(exported_tarball_path, "rb") as tarball_content: + new_image = client.images.load(tarball_content.read())[0] + new_image.tag(self.image_name, tag=self.IMAGE_TAG) + return StepResult(self, StepStatus.SUCCESS) diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/java_connectors.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/java_connectors.py new file mode 100644 index 000000000000..24e123ed89e5 --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/java_connectors.py @@ -0,0 +1,49 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from typing import Optional, Tuple + +from ci_connector_ops.pipelines.actions import environments +from ci_connector_ops.pipelines.bases import GradleTask, StepResult, StepStatus +from ci_connector_ops.pipelines.builds.common import BuildConnectorImageBase +from dagger import Container, File, QueryError + + +class BuildConnectorImage(BuildConnectorImageBase, GradleTask): + """ + A step to build a Java connector image using the distTar Gradle task. + """ + + gradle_task_name = "distTar" + + async def build_tar(self) -> File: + distTar = ( + environments.with_gradle( + self.context, + self.build_include, + docker_service_name=self.docker_service_name, + bind_to_docker_host=self.BIND_TO_DOCKER_HOST, + ) + .with_mounted_directory(str(self.context.connector.code_directory), await self._get_patched_connector_dir()) + .with_exec(self._get_gradle_command()) + .with_workdir(f"{self.context.connector.code_directory}/build/distributions") + ) + + distributions = await distTar.directory(".").entries() + tar_files = [f for f in distributions if f.endswith(".tar")] + if len(tar_files) > 1: + raise Exception( + "The distributions directory contains multiple connector tar files. We can't infer which one should be used. Please review and delete any unnecessary tar files." + ) + return distTar.file(tar_files[0]) + + async def _run(self) -> Tuple[StepResult, Optional[Container]]: + try: + tar_file = await self.build_tar() + java_connector = await environments.with_airbyte_java_connector(self.context, tar_file, self.build_platform) + step_result = await self.get_step_result(java_connector.with_exec(["spec"])) + return step_result, java_connector + except QueryError as e: + return StepResult(self, StepStatus.FAILURE, stderr=str(e)), None diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/normalization.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/normalization.py new file mode 100644 index 000000000000..767fb028bb75 --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/normalization.py @@ -0,0 +1,45 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +from typing import Tuple + +from ci_connector_ops.pipelines.actions import environments +from ci_connector_ops.pipelines.bases import Step, StepResult, StepStatus +from ci_connector_ops.pipelines.contexts import ConnectorContext +from ci_connector_ops.utils import Connector +from dagger import Container + + +class BuildOrPullNormalization(Step): + """A step to build or pull the normalization image for a connector according to the image name.""" + + DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING = { + Connector("destination-clickhouse"): "clickhouse.Dockerfile", + Connector("destination-duckdb"): "duckdb.Dockerfile", + Connector("destination-mssql"): "mssql.Dockerfile", + Connector("destination-mysql"): "mysql.Dockerfile", + Connector("destination-oracle"): "oracle.Dockerfile", + Connector("destination-redshift"): "redshift.Dockerfile", + Connector("destination-snowflake"): "snowflake.Dockerfile", + Connector("destination-tidb"): "tidb.Dockerfile", + } + + def __init__(self, context: ConnectorContext, normalization_image: str) -> None: + """Initialize the step to build or pull the normalization image. + + Args: + context (ConnectorContext): The current connector context. + normalization_image (str): The normalization image to build (if :dev) or pull. + """ + super().__init__(context) + self.use_dev_normalization = normalization_image.endswith(":dev") + self.normalization_image = normalization_image + self.normalization_dockerfile = self.DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING.get(context.connector, "Dockerfile") + self.title = f"Build {self.normalization_image}" if self.use_dev_normalization else f"Pull {self.normalization_image}" + + async def _run(self) -> Tuple[StepResult, Container]: + if self.use_dev_normalization: + build_normalization_container = environments.with_normalization(self.context, self.normalization_dockerfile) + else: + build_normalization_container = self.context.dagger_client.container().from_(self.normalization_image) + return StepResult(self, StepStatus.SUCCESS), build_normalization_container diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/python_connectors.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/python_connectors.py new file mode 100644 index 000000000000..f592b8e3c87f --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/python_connectors.py @@ -0,0 +1,25 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from typing import Optional, Tuple + +from ci_connector_ops.pipelines.bases import StepResult, StepStatus +from ci_connector_ops.pipelines.builds.common import BuildConnectorImageBase +from dagger import Container, QueryError + + +class BuildConnectorImage(BuildConnectorImageBase): + """ + A step to build a Python connector image using its Dockerfile. + A spec command is run on the container to validate it was built successfully. + """ + + async def _run(self) -> Tuple[StepResult, Optional[Container]]: + connector_dir = self.context.get_connector_dir() + connector = connector_dir.docker_build(platform=self.build_platform) + try: + build_result = await self.get_step_result(connector.with_exec(["spec"])) + return build_result, connector + except QueryError as e: + return StepResult(self, StepStatus.FAILURE, stderr=str(e)), None diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/commands/groups/connectors.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/commands/groups/connectors.py index bc9b5c09d16a..5a10b0c93f17 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/commands/groups/connectors.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/commands/groups/connectors.py @@ -13,10 +13,10 @@ import anyio import click import dagger -from ci_connector_ops.pipelines.contexts import CIContext, ConnectorTestContext +from ci_connector_ops.pipelines.contexts import CIContext, ConnectorContext from ci_connector_ops.pipelines.github import update_commit_status_check -from ci_connector_ops.pipelines.pipelines.connectors import run_connectors_test_pipelines -from ci_connector_ops.pipelines.utils import get_modified_connectors +from ci_connector_ops.pipelines.pipelines.connectors import run_connectors_build_pipelines, run_connectors_test_pipelines +from ci_connector_ops.pipelines.utils import DaggerPipelineCommand, get_modified_connectors from ci_connector_ops.utils import ConnectorLanguage, get_all_released_connectors from rich.logging import RichHandler @@ -61,17 +61,39 @@ def validate_environment(is_local: bool, use_remote_secrets: bool): @click.group(help="Commands related to connectors and connector acceptance tests.") @click.option("--use-remote-secrets", default=True) # specific to connectors +@click.option( + "--name", "names", multiple=True, help="Only test a specific connector. Use its technical name. e.g source-pokeapi.", type=str +) +@click.option("--language", "languages", multiple=True, help="Filter connectors to test by language.", type=click.Choice(ConnectorLanguage)) +@click.option( + "--release-stage", + "release_stages", + multiple=True, + help="Filter connectors to test by release stage.", + type=click.Choice(["alpha", "beta", "generally_available"]), +) +@click.option("--modified/--not-modified", help="Only test modified connectors in the current branch.", default=False, type=bool) +@click.option("--concurrency", help="Number of connector tests pipeline to run in parallel.", default=5, type=int) @click.pass_context def connectors( ctx: click.Context, use_remote_secrets: str, + names: Tuple[str], + languages: Tuple[ConnectorLanguage], + release_stages: Tuple[str], + modified: bool, + concurrency: int, ): """Group all the connectors-ci command.""" validate_environment(ctx.obj["is_local"], use_remote_secrets) ctx.ensure_object(dict) ctx.obj["use_remote_secrets"] = use_remote_secrets - + ctx.obj["connector_names"] = names + ctx.obj["connector_languages"] = languages + ctx.obj["release_states"] = release_stages + ctx.obj["modified"] = modified + ctx.obj["concurrency"] = concurrency update_commit_status_check( ctx.obj["git_revision"], "pending", @@ -82,55 +104,41 @@ def connectors( logger=logger, ) - -@connectors.command() -@click.option( - "--name", "names", multiple=True, help="Only test a specific connector. Use its technical name. e.g source-pokeapi.", type=str -) -@click.option("--language", "languages", multiple=True, help="Filter connectors to test by language.", type=click.Choice(ConnectorLanguage)) -@click.option( - "--release-stage", - "release_stages", - multiple=True, - help="Filter connectors to test by release stage.", - type=click.Choice(["alpha", "beta", "generally_available"]), -) -@click.option("--modified/--not-modified", help="Only test modified connectors in the current branch.", default=False, type=bool) -@click.option("--concurrency", help="Number of connector tests pipeline to run in parallel.", default=5, type=int) -@click.pass_context -def test( - ctx: click.Context, names: Tuple[str], languages: Tuple[ConnectorLanguage], release_stages: Tuple[str], modified: bool, concurrency: int -): - """Runs a CI pipeline the connector passed as CLI argument. - - Args: - ctx (click.Context): The click context. - connector_name (str): The connector technical name. E.G. source-pokeapi - """ - connectors_under_test = get_all_released_connectors() + selected_connectors = get_all_released_connectors() modified_connectors = get_modified_connectors(ctx.obj["modified_files_in_branch"]) if modified: - connectors_under_test = modified_connectors + selected_connectors = modified_connectors else: - connectors_under_test.update(modified_connectors) + selected_connectors.update(modified_connectors) if names: - connectors_under_test = {connector for connector in connectors_under_test if connector.technical_name in names} + selected_connectors = {connector for connector in selected_connectors if connector.technical_name in names} if languages: - connectors_under_test = {connector for connector in connectors_under_test if connector.language in languages} + selected_connectors = {connector for connector in selected_connectors if connector.language in languages} if release_stages: - connectors_under_test = {connector for connector in connectors_under_test if connector.release_stage in release_stages} - connectors_under_test_names = [c.technical_name for c in connectors_under_test] - if connectors_under_test_names: - click.secho(f"Will run the test pipeline for the following connectors: {', '.join(connectors_under_test_names)}.", fg="green") - click.secho( - "If you're running this command for the first time the Dagger engine image will be pulled, it can take a short minute..." - ) - else: - click.secho("No connector test will run according to your inputs.", fg="yellow") + selected_connectors = {connector for connector in selected_connectors if connector.release_stage in release_stages} + + if not selected_connectors: + click.secho("No connector were selected according to your inputs. Please double check your filters.", fg="yellow") sys.exit(0) + ctx.obj["selected_connectors"] = selected_connectors + ctx.obj["selected_connectors_names"] = [c.technical_name for c in selected_connectors] + + +@connectors.command(cls=DaggerPipelineCommand, help="Test all the selected connectors.") +@click.pass_context +def test( + ctx: click.Context, +) -> bool: + """Runs a test pipeline for the selected connectors. + + Args: + ctx (click.Context): The click context. + """ + click.secho(f"Will run the test pipeline for the following connectors: {', '.join(ctx.obj['selected_connectors_names'])}.", fg="green") + connectors_tests_contexts = [ - ConnectorTestContext( + ConnectorContext( connector, ctx.obj["is_local"], ctx.obj["git_branch"], @@ -140,10 +148,10 @@ def test( pipeline_start_timestamp=ctx.obj.get("pipeline_start_timestamp"), ci_context=ctx.obj.get("ci_context"), ) - for connector in connectors_under_test + for connector in ctx.obj["selected_connectors"] ] try: - anyio.run(run_connectors_test_pipelines, connectors_tests_contexts, concurrency) + anyio.run(run_connectors_test_pipelines, connectors_tests_contexts, ctx.obj["concurrency"]) update_commit_status_check( ctx.obj["git_revision"], "success", @@ -164,3 +172,26 @@ def test( should_send=ctx.obj.get("ci_context") == CIContext.PULL_REQUEST, logger=logger, ) + return False + return True + + +@connectors.command(cls=DaggerPipelineCommand, help="Build all images for the selected connectors.") +@click.pass_context +def build(ctx: click.Context) -> bool: + click.secho(f"Will build the following connectors: {', '.join(ctx.obj['selected_connectors_names'])}.", fg="green") + connectors_contexts = [ + ConnectorContext( + connector, + ctx.obj["is_local"], + ctx.obj["git_branch"], + ctx.obj["git_revision"], + ctx.obj["use_remote_secrets"], + gha_workflow_run_url=ctx.obj.get("gha_workflow_run_url"), + pipeline_start_timestamp=ctx.obj.get("pipeline_start_timestamp"), + ci_context=ctx.obj.get("ci_context"), + ) + for connector in ctx.obj["selected_connectors"] + ] + anyio.run(run_connectors_build_pipelines, connectors_contexts, ctx.obj["concurrency"]) + return True diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py index 089a8ff31300..a0cc125f74df 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py @@ -15,7 +15,7 @@ from anyio import Path from asyncer import asyncify from ci_connector_ops.pipelines.actions import remote_storage, secrets -from ci_connector_ops.pipelines.bases import ConnectorTestReport, TestReport +from ci_connector_ops.pipelines.bases import ConnectorReport, Report from ci_connector_ops.pipelines.github import update_commit_status_check from ci_connector_ops.pipelines.utils import AIRBYTE_REPO_URL from ci_connector_ops.utils import Connector @@ -93,7 +93,7 @@ def __init__( self.logger = logging.getLogger(self.pipeline_name) self.dagger_client = None - self._test_report = None + self._report = None update_commit_status_check(**self.github_commit_status) @property @@ -117,13 +117,13 @@ def repo(self): # noqa D102 return self.dagger_client.git(AIRBYTE_REPO_URL, keep_git_dir=True) @property - def test_report(self) -> TestReport: # noqa D102 - return self._test_report + def report(self) -> Report: # noqa D102 + return self._report - @test_report.setter - def test_report(self, test_report: TestReport): # noqa D102 - self._test_report = test_report - self.state = ContextState.SUCCESSFUL if test_report.success else ContextState.FAILURE + @report.setter + def report(self, report: Report): # noqa D102 + self._report = report + self.state = ContextState.SUCCESSFUL if report.success else ContextState.FAILURE @property def github_commit_status(self) -> dict: @@ -209,13 +209,13 @@ async def __aexit__( self.logger.error("An error was handled by the Pipeline", exc_info=True) self.state = ContextState.ERROR - if self.test_report is None: + if self.report is None: self.logger.error("No test report was provided. This is probably due to an upstream error") self.state = ContextState.ERROR - self.test_report = TestReport(self, steps_results=[]) + self.report = Report(self, steps_results=[]) - self.test_report.print() - self.logger.info(self.test_report.to_json()) + self.report.print() + self.logger.info(self.report.to_json()) await asyncify(update_commit_status_check)(**self.github_commit_status) @@ -223,8 +223,8 @@ async def __aexit__( return True -class ConnectorTestContext(PipelineContext): - """The connector test context is used to store configuration for a specific connector pipeline run.""" +class ConnectorContext(PipelineContext): + """The connector context is used to store configuration for a specific connector pipeline run.""" DEFAULT_CONNECTOR_ACCEPTANCE_TEST_IMAGE = "airbyte/connector-acceptance-test:latest" @@ -240,7 +240,7 @@ def __init__( pipeline_start_timestamp: Optional[int] = None, ci_context: Optional[str] = None, ): - """Initialize a connector test context. + """Initialize a connector context. Args: connector (Connector): The connector under test. @@ -295,9 +295,13 @@ def connector_acceptance_test_source_dir(self) -> Directory: # noqa D102 return self.get_repo_dir("airbyte-integrations/bases/connector-acceptance-test") @property - def should_save_updated_secrets(self): # noqa D102 + def should_save_updated_secrets(self) -> bool: # noqa D102 return self.use_remote_secrets and self.updated_secrets_dir is not None + @property + def host_image_export_dir_path(self) -> str: + return "." if self.is_ci else "/tmp" + def get_connector_dir(self, exclude=None, include=None) -> Directory: """Get the connector under test source code directory. @@ -313,7 +317,7 @@ def get_connector_dir(self, exclude=None, include=None) -> Directory: async def __aexit__( self, exception_type: Optional[type[BaseException]], exception_value: Optional[BaseException], traceback: Optional[TracebackType] ) -> bool: - """Perform teardown operation for the ConnectorTestContext. + """Perform teardown operation for the ConnectorContext. On the context exit the following operations will happen: - Upload updated connector secrets back to Google Secret Manager @@ -328,29 +332,29 @@ async def __aexit__( bool: Whether the teardown operation ran successfully. """ if exception_value: - self.logger.error("An error got handled by the ConnectorTestContext", exc_info=True) + self.logger.error("An error got handled by the ConnectorContext", exc_info=True) self.state = ContextState.ERROR - if self.test_report is None: + if self.report is None: self.logger.error("No test report was provided. This is probably due to an upstream error") self.state = ContextState.ERROR - self.test_report = ConnectorTestReport(self, []) + self.report = ConnectorReport(self, []) if self.should_save_updated_secrets: await secrets.upload(self) - self.test_report.print() - self.logger.info(self.test_report.to_json()) + self.report.print() + self.logger.info(self.report.to_json()) - local_test_reports_path_root = "tools/ci_connector_ops/test_reports/" - connector_name = self.test_report.pipeline_context.connector.technical_name - connector_version = self.test_report.pipeline_context.connector.version - git_revision = self.test_report.pipeline_context.git_revision - git_branch = self.test_report.pipeline_context.git_branch.replace("/", "_") + local_reports_path_root = "tools/ci_connector_ops/pipeline_reports/" + connector_name = self.report.pipeline_context.connector.technical_name + connector_version = self.report.pipeline_context.connector.version + git_revision = self.report.pipeline_context.git_revision + git_branch = self.report.pipeline_context.git_branch.replace("/", "_") suffix = f"{connector_name}/{git_branch}/{connector_version}/{git_revision}.json" - local_report_path = Path(local_test_reports_path_root + suffix) + local_report_path = Path(local_reports_path_root + suffix) await local_report_path.parents[0].mkdir(parents=True, exist_ok=True) - await local_report_path.write_text(self.test_report.to_json()) - if self.test_report.should_be_saved: + await local_report_path.write_text(self.report.to_json()) + if self.report.should_be_saved: s3_reports_path_root = "python-poc/tests/history/" s3_key = s3_reports_path_root + suffix report_upload_exit_code = await remote_storage.upload_to_s3( diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/pipelines/connectors.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/pipelines/connectors.py index 1647b5b87f64..80043bbe2b6a 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/pipelines/connectors.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/pipelines/connectors.py @@ -9,9 +9,9 @@ import anyio import asyncer import dagger -from ci_connector_ops.pipelines import tests -from ci_connector_ops.pipelines.bases import ConnectorTestReport -from ci_connector_ops.pipelines.contexts import ConnectorTestContext +from ci_connector_ops.pipelines import builds, tests +from ci_connector_ops.pipelines.bases import ConnectorReport +from ci_connector_ops.pipelines.contexts import ConnectorContext from ci_connector_ops.pipelines.utils import DAGGER_CONFIG # CONSTANTS @@ -23,16 +23,16 @@ # DAGGER PIPELINES -async def run(context: ConnectorTestContext, semaphore: anyio.Semaphore) -> ConnectorTestReport: - """Run a CI pipeline for a single connector. +async def run_connector_test_pipeline(context: ConnectorContext, semaphore: anyio.Semaphore) -> ConnectorReport: + """Run a test pipeline for a single connector. A visual DAG can be found on the README.md file of the pipelines modules. Args: - context (ConnectorTestContext): The initialized connector test context. + context (ConnectorContext): The initialized connector context. Returns: - ConnectorTestReport: The test reports holding tests results. + ConnectorReport: The test reports holding tests results. """ async with semaphore: async with context: @@ -43,16 +43,16 @@ async def run(context: ConnectorTestContext, semaphore: anyio.Semaphore) -> Conn task_group.soonify(tests.run_all_tests)(context), ] results = list(itertools.chain(*(task.value for task in tasks))) - context.test_report = ConnectorTestReport(context, steps_results=results) + context.report = ConnectorReport(context, steps_results=results, name="TEST RESULTS") - return context.test_report + return context.report -async def run_connectors_test_pipelines(contexts: List[ConnectorTestContext], concurrency: int = 5): +async def run_connectors_test_pipelines(contexts: List[ConnectorContext], concurrency: int = 5): """Run a CI pipeline for all the connectors passed. Args: - contexts (List[ConnectorTestContext]): List of connector test contexts for which a CI pipeline needs to be run. + contexts (List[ConnectorContext]): List of connector contexts for which a CI pipeline needs to be run. concurrency (int): Number of test pipeline that can run in parallel. Defaults to 5 """ semaphore = anyio.Semaphore(concurrency) @@ -60,4 +60,40 @@ async def run_connectors_test_pipelines(contexts: List[ConnectorTestContext], co async with anyio.create_task_group() as tg: for context in contexts: context.dagger_client = dagger_client.pipeline(f"{context.connector.technical_name} - Test Pipeline") - tg.start_soon(run, context, semaphore) + tg.start_soon(run_connector_test_pipeline, context, semaphore) + + +async def run_connector_build_pipeline(context: ConnectorContext, semaphore: anyio.Semaphore) -> ConnectorReport: + """Run a build pipeline for a single connector. + + Args: + context (ConnectorContext): The initialized connector context. + + Returns: + ConnectorReport: The reports holding builds results. + """ + async with semaphore: + async with context: + build_results_per_platform = await builds.run_connector_build(context) + step_results = [value[0] for value in build_results_per_platform.values()] + if context.is_local: + _, connector_container = build_results_per_platform[builds.LOCAL_BUILD_PLATFORM] + load_image_result = await builds.common.LoadContainerToLocalDockerHost(context, connector_container).run() + step_results.append(load_image_result) + context.report = ConnectorReport(context, step_results, name="BUILD RESULTS") + return context.report + + +async def run_connectors_build_pipelines(contexts: List[ConnectorContext], concurrency: int = 5): + """Run a build pipeline for all the connector contexts. + + Args: + contexts (List[ConnectorContext]): List of connector contexts for which a build pipeline needs to be run. + concurrency (int): Number of test pipeline that can run in parallel. Defaults to 5 + """ + semaphore = anyio.Semaphore(concurrency) + async with dagger.Connection(DAGGER_CONFIG) as dagger_client: + async with anyio.create_task_group() as tg: + for context in contexts: + context.dagger_client = dagger_client.pipeline(f"{context.connector.technical_name} - Build Pipeline") + tg.start_soon(run_connector_build_pipeline, context, semaphore) diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/pipelines/metadata.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/pipelines/metadata.py index a3ce22001d9d..54b046b4b6fc 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/pipelines/metadata.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/pipelines/metadata.py @@ -3,13 +3,12 @@ # import uuid from pathlib import Path -from typing import Optional, Set, List +from typing import List, Optional, Set import asyncer import dagger - -from ci_connector_ops.pipelines.actions.environments import with_poetry_module, with_python_base, with_pip_packages -from ci_connector_ops.pipelines.bases import Step, StepStatus, TestReport, StepResult +from ci_connector_ops.pipelines.actions.environments import with_pip_packages, with_poetry_module, with_python_base +from ci_connector_ops.pipelines.bases import Report, Step, StepResult, StepStatus from ci_connector_ops.pipelines.contexts import PipelineContext from ci_connector_ops.pipelines.utils import DAGGER_CONFIG, METADATA_FILE_NAME, execute_concurrently @@ -188,9 +187,11 @@ async def run_metadata_validation_pipeline( validation_steps = [MetadataValidation(metadata_pipeline_context, metadata_path).run for metadata_path in metadata_to_validate] results = await execute_concurrently(validation_steps, concurrency=10) - metadata_pipeline_context.test_report = TestReport(pipeline_context=metadata_pipeline_context, steps_results=results) + metadata_pipeline_context.report = Report( + pipeline_context=metadata_pipeline_context, steps_results=results, name="METADATA VALIDATION RESULTS" + ) - return metadata_pipeline_context.test_report.success + return metadata_pipeline_context.report.success async def run_metadata_lib_test_pipeline( @@ -221,9 +222,11 @@ async def run_metadata_lib_test_pipeline( module_path=METADATA_LIB_MODULE_PATH, ) result = await test_lib_step.run(["pytest"]) - metadata_pipeline_context.test_report = TestReport(pipeline_context=metadata_pipeline_context, steps_results=[result]) + metadata_pipeline_context.report = Report( + pipeline_context=metadata_pipeline_context, steps_results=[result], name="METADATA LIB TEST RESULTS" + ) - return metadata_pipeline_context.test_report.success + return metadata_pipeline_context.report.success async def run_metadata_orchestrator_test_pipeline( @@ -249,9 +252,11 @@ async def run_metadata_orchestrator_test_pipeline( async with metadata_pipeline_context: test_orch_step = TestOrchestrator(context=metadata_pipeline_context) result = await test_orch_step.run() - metadata_pipeline_context.test_report = TestReport(pipeline_context=metadata_pipeline_context, steps_results=[result]) + metadata_pipeline_context.report = Report( + pipeline_context=metadata_pipeline_context, steps_results=[result], name="METADATA ORCHESTRATOR TEST RESULTS" + ) - return metadata_pipeline_context.test_report.success + return metadata_pipeline_context.report.success async def run_metadata_upload_pipeline( @@ -285,9 +290,9 @@ async def run_metadata_upload_pipeline( for metadata_path in metadata_to_upload ] ) - pipeline_context.test_report = TestReport(pipeline_context, results) + pipeline_context.report = Report(pipeline_context, results, name="METADATA UPLOAD RESULTS") - return pipeline_context.test_report.success + return pipeline_context.report.success async def run_metadata_orchestrator_deploy_pipeline( @@ -314,5 +319,7 @@ async def run_metadata_orchestrator_deploy_pipeline( async with metadata_pipeline_context: steps = [TestOrchestrator(context=metadata_pipeline_context), DeployOrchestrator(context=metadata_pipeline_context)] steps_results = await run_steps(steps) - metadata_pipeline_context.test_report = TestReport(pipeline_context=metadata_pipeline_context, steps_results=steps_results) - return metadata_pipeline_context.test_report.success + metadata_pipeline_context.report = Report( + pipeline_context=metadata_pipeline_context, steps_results=steps_results, name="METADATA ORCHESTRATOR DEPLOY RESULTS" + ) + return metadata_pipeline_context.report.success diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/__init__.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/__init__.py index 555776d6645e..70e61c31b1b8 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/__init__.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/__init__.py @@ -6,7 +6,7 @@ from typing import List from ci_connector_ops.pipelines.bases import StepResult -from ci_connector_ops.pipelines.contexts import ConnectorTestContext +from ci_connector_ops.pipelines.contexts import ConnectorContext from ci_connector_ops.pipelines.tests import java_connectors, python_connectors from ci_connector_ops.pipelines.tests.common import AcceptanceTests, QaChecks # noqa from ci_connector_ops.utils import ConnectorLanguage @@ -25,11 +25,11 @@ } -async def run_qa_checks(context: ConnectorTestContext) -> List[StepResult]: +async def run_qa_checks(context: ConnectorContext) -> List[StepResult]: """Run the QA checks on a connector. Args: - context (ConnectorTestContext): The current connector test context. + context (ConnectorContext): The current connector context. Returns: List[StepResult]: The results of the QA checks steps. @@ -38,11 +38,11 @@ async def run_qa_checks(context: ConnectorTestContext) -> List[StepResult]: return [await QaChecks(context).run()] -async def run_code_format_checks(context: ConnectorTestContext) -> List[StepResult]: +async def run_code_format_checks(context: ConnectorContext) -> List[StepResult]: """Run the code format checks according to the connector language. Args: - context (ConnectorTestContext): The current connector test context. + context (ConnectorContext): The current connector context. Returns: List[StepResult]: The results of the code format checks steps. @@ -55,11 +55,11 @@ async def run_code_format_checks(context: ConnectorTestContext) -> List[StepResu return [] -async def run_all_tests(context: ConnectorTestContext) -> List[StepResult]: +async def run_all_tests(context: ConnectorContext) -> List[StepResult]: """Run all the tests steps according to the connector language. Args: - context (ConnectorTestContext): The current connector test context. + context (ConnectorContext): The current connector context. Returns: List[StepResult]: The results of the tests steps. diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/common.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/common.py index bd0a7e07d314..e3e101908d2a 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/common.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/common.py @@ -25,7 +25,7 @@ async def _run(self) -> StepResult: https://github.com/airbytehq/airbyte/blob/master/tools/ci_connector_ops/ci_connector_ops/qa_checks.py Args: - context (ConnectorTestContext): The current test context, providing a connector object, a dagger client and a repository directory. + context (ConnectorContext): The current test context, providing a connector object, a dagger client and a repository directory. Returns: StepResult: Failure or success of the QA checks with stdout and stderr. """ diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/java_connectors.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/java_connectors.py index 29b5227578c5..8b992f7ceba6 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/java_connectors.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/java_connectors.py @@ -4,175 +4,18 @@ """This module groups steps made to run tests for a specific Java connector given a test context.""" -from abc import ABC -from typing import ClassVar, List, Optional, Tuple +from typing import List, Optional import anyio from ci_connector_ops.pipelines.actions import environments, secrets -from ci_connector_ops.pipelines.bases import Step, StepResult, StepStatus -from ci_connector_ops.pipelines.contexts import ConnectorTestContext +from ci_connector_ops.pipelines.bases import GradleTask, StepResult, StepStatus +from ci_connector_ops.pipelines.builds import LOCAL_BUILD_PLATFORM +from ci_connector_ops.pipelines.builds.java_connectors import BuildConnectorImage +from ci_connector_ops.pipelines.builds.normalization import BuildOrPullNormalization +from ci_connector_ops.pipelines.contexts import ConnectorContext from ci_connector_ops.pipelines.tests.common import AcceptanceTests -from ci_connector_ops.pipelines.utils import slugify -from ci_connector_ops.utils import Connector -from dagger import CacheVolume, Container, Directory, File, QueryError - - -class BuildOrPullNormalization(Step): - """A step to build or pull the normalization image for a connector according to the image name.""" - - DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING = { - Connector("destination-clickhouse"): "clickhouse.Dockerfile", - Connector("destination-duckdb"): "duckdb.Dockerfile", - Connector("destination-mssql"): "mssql.Dockerfile", - Connector("destination-mysql"): "mysql.Dockerfile", - Connector("destination-oracle"): "oracle.Dockerfile", - Connector("destination-redshift"): "redshift.Dockerfile", - Connector("destination-snowflake"): "snowflake.Dockerfile", - Connector("destination-tidb"): "tidb.Dockerfile", - } - - def __init__(self, context: ConnectorTestContext, normalization_image: str) -> None: - """Initialize the step to build or pull the normalization image. - - Args: - context (ConnectorTestContext): The current connector test context. - normalization_image (str): The normalization image to build (if :dev) or pull. - """ - super().__init__(context) - self.use_dev_normalization = normalization_image.endswith(":dev") - self.normalization_image = normalization_image - self.normalization_dockerfile = self.DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING.get(context.connector, "Dockerfile") - self.title = f"Build {self.normalization_image}" if self.use_dev_normalization else f"Pull {self.normalization_image}" - - async def _run(self) -> Tuple[StepResult, File]: - normalization_local_tar_path = f"{slugify(self.normalization_image)}.tar" - if self.use_dev_normalization: - build_normalization_container = environments.with_normalization(self.context, self.normalization_dockerfile) - else: - build_normalization_container = self.context.dagger_client.container().from_(self.normalization_image) - - try: - export_success = await build_normalization_container.export(f"{self.host_image_export_dir_path}/{normalization_local_tar_path}") - if export_success: - exported_file = ( - self.context.dagger_client.host() - .directory(self.host_image_export_dir_path, include=[normalization_local_tar_path]) - .file(normalization_local_tar_path) - ) - return StepResult(self, StepStatus.SUCCESS), exported_file - else: - return StepResult(self, StepStatus.FAILURE, stderr="The normalization container could not be exported"), None - except QueryError as e: - return StepResult(self, StepStatus.FAILURE, stderr=str(e)), None - - -class GradleTask(Step, ABC): - """ - A step to run a Gradle task. - - Attributes: - task_name (str): The Gradle task name to run. - title (str): The step title. - """ - - DEFAULT_TASKS_TO_EXCLUDE = ["airbyteDocker"] - BIND_TO_DOCKER_HOST = True - gradle_task_name: ClassVar - - # TODO more robust way to find all projects on which the task depends? - JAVA_BUILD_INCLUDE = [ - "airbyte-api", - "airbyte-commons-cli", - "airbyte-commons-protocol", - "airbyte-commons", - "airbyte-config", - "airbyte-connector-test-harnesses", - "airbyte-db", - "airbyte-integrations/bases", - "airbyte-json-validation", - "airbyte-protocol", - "airbyte-test-utils", - "airbyte-config-oss", - ] - - SOURCE_BUILD_INCLUDE = [ - "airbyte-integrations/connectors/source-jdbc", - "airbyte-integrations/connectors/source-relational-db", - ] - - DESTINATION_BUILD_INCLUDE = [ - "airbyte-integrations/bases/bases-destination-jdbc", - "airbyte-integrations/connectors/destination-gcs", - "airbyte-integrations/connectors/destination-azure-blob-storage", - ] - - # These are the lines we remove from the connector gradle file to ignore specific tasks / plugins. - LINES_TO_REMOVE_FROM_GRADLE_FILE = [ - # Do not build normalization with Gradle - we build normalization with Dagger in the BuildOrPullNormalization step. - "project(':airbyte-integrations:bases:base-normalization').airbyteDocker.output", - ] - - @property - def docker_service_name(self) -> str: - return slugify(f"gradle-{self.title}") - - @property - def connector_java_build_cache(self) -> CacheVolume: - return self.context.dagger_client.cache_volume("connector_java_build_cache") - - @property - def build_include(self) -> List[str]: - """Retrieve the list of source code directory required to run a Java connector Gradle task. - - The list is different according to the connector type. - - Returns: - List[str]: List of directories or files to be mounted to the container to run a Java connector Gradle task. - """ - if self.context.connector.connector_type == "source": - return self.JAVA_BUILD_INCLUDE + self.SOURCE_BUILD_INCLUDE - elif self.context.connector.connector_type == "destination": - return self.JAVA_BUILD_INCLUDE + self.DESTINATION_BUILD_INCLUDE - else: - raise ValueError(f"{self.context.connector.connector_type} is not supported") - - async def _get_patched_connector_dir(self) -> Directory: - """Patch the build.gradle file of the connector under test by removing the lines declared in LINES_TO_REMOVE_FROM_GRADLE_FILE. - - Returns: - Directory: The patched connector directory - """ - - gradle_file_content = await self.context.get_connector_dir(include=["build.gradle"]).file("build.gradle").contents() - patched_file_content = "" - for line in gradle_file_content.split("\n"): - if not any(line_to_remove in line for line_to_remove in self.LINES_TO_REMOVE_FROM_GRADLE_FILE): - patched_file_content += line + "\n" - return self.context.get_connector_dir().with_new_file("build.gradle", patched_file_content) - - def _get_gradle_command(self, extra_options: Tuple[str] = ("--no-daemon", "--scan")) -> List: - command = ( - ["./gradlew"] - + list(extra_options) - + [f":airbyte-integrations:connectors:{self.context.connector.technical_name}:{self.gradle_task_name}"] - ) - for task in self.DEFAULT_TASKS_TO_EXCLUDE: - command += ["-x", task] - return command - - async def _run(self) -> StepResult: - connector_under_test = ( - environments.with_gradle( - self.context, self.build_include, docker_service_name=self.docker_service_name, bind_to_docker_host=self.BIND_TO_DOCKER_HOST - ) - .with_mounted_directory(str(self.context.connector.code_directory), await self._get_patched_connector_dir()) - # Disable the Ryuk container because it needs privileged docker access that does not work: - .with_env_variable("TESTCONTAINERS_RYUK_DISABLED", "true") - .with_directory(f"{self.context.connector.code_directory}/secrets", self.context.secrets_dir) - .with_exec(self._get_gradle_command()) - ) - - return await self.get_step_result(connector_under_test) +from ci_connector_ops.pipelines.utils import export_container_to_tarball +from dagger import File, QueryError class UnitTests(GradleTask): @@ -180,69 +23,6 @@ class UnitTests(GradleTask): gradle_task_name = "test" -class BuildConnectorImage(GradleTask): - """ - A step to build a Java connector image using the build Gradle task. - - Export the image as a tar archive to host. - """ - - title = "Build Connector Image" - gradle_task_name = "distTar" - - async def _export_connector_image(self, connector: Container) -> Optional[File]: - """Save the connector image to the host filesystem as a tar archive. - - Returns: - Optional[File]: The file object holding the tar archive on the host. - """ - connector_image_tar = f"{self.context.connector.technical_name}.tar" - export_success = await connector.export(f"{self.host_image_export_dir_path}/{connector_image_tar}") - if export_success: - exported_file = ( - self.context.dagger_client.host() - .directory(self.host_image_export_dir_path, include=[connector_image_tar]) - .file(connector_image_tar) - ) - return exported_file - else: - return None - - async def build_tar(self) -> File: - distTar = ( - environments.with_gradle( - self.context, - self.build_include, - docker_service_name=self.docker_service_name, - bind_to_docker_host=self.BIND_TO_DOCKER_HOST, - ) - .with_mounted_directory(str(self.context.connector.code_directory), await self._get_patched_connector_dir()) - .with_exec(self._get_gradle_command()) - .with_workdir(f"{self.context.connector.code_directory}/build/distributions") - ) - - distributions = await distTar.directory(".").entries() - tar_files = [f for f in distributions if f.endswith(".tar")] - if len(tar_files) > 1: - raise Exception( - "The distributions directory contains multiple connector tar files. We can't infer which one should be used for the text. Please review and delete any unnecessary tar files." - ) - return distTar.file(tar_files[0]) - - async def _run(self) -> Tuple[StepResult, Optional[File]]: - try: - tar_file = await self.build_tar() - java_connector = await environments.with_airbyte_java_connector(self.context, tar_file) - step_result = await self.get_step_result(java_connector) - connector_image_tar_file = await self._export_connector_image(java_connector) - - if connector_image_tar_file is None: - step_result = StepResult(self, StepStatus.FAILURE, stderr="The java connector could not be exported to the host FS.") - return step_result, connector_image_tar_file - except QueryError as e: - return StepResult(self, StepStatus.FAILURE, stderr=str(e)), None - - class IntegrationTestJava(GradleTask): """A step to run integrations tests for Java connectors using the integrationTestJava Gradle task.""" @@ -276,7 +56,7 @@ async def _run(self, connector_tar_file: File, normalization_tar_file: Optional[ return StepResult(self, StepStatus.FAILURE, stderr=str(e)) -async def run_all_tests(context: ConnectorTestContext) -> List[StepResult]: +async def run_all_tests(context: ConnectorContext) -> List[StepResult]: """Run all tests for a Java connectors. - Build the normalization image if the connector supports it. @@ -285,13 +65,13 @@ async def run_all_tests(context: ConnectorTestContext) -> List[StepResult]: - Run integration and acceptance test in parallel using the built connector and normalization images. Args: - context (ConnectorTestContext): The current connector test context. + context (ConnectorContext): The current connector context. Returns: List[StepResult]: The results of all the tests steps. """ step_results = [] - build_connector_step = BuildConnectorImage(context) + build_connector_step = BuildConnectorImage(context, LOCAL_BUILD_PLATFORM) unit_tests_step = UnitTests(context) build_normalization_step = None if context.connector.supports_normalization: @@ -304,7 +84,7 @@ async def run_all_tests(context: ConnectorTestContext) -> List[StepResult]: normalization_tar_file = None if build_normalization_step: context.logger.info("Run build normalization step.") - build_normalization_results, normalization_tar_file = await build_normalization_step.run() + build_normalization_results, normalization_container = await build_normalization_step.run() if build_normalization_results.status is StepStatus.FAILURE: return step_results + [ build_normalization_results, @@ -313,11 +93,12 @@ async def run_all_tests(context: ConnectorTestContext) -> List[StepResult]: integration_tests_java_step.skip(), acceptance_tests_step.skip(), ] + normalization_tar_file, _ = await export_container_to_tarball(context, normalization_container) context.logger.info(f"{build_normalization_step.normalization_image} was successfully built.") step_results.append(build_normalization_results) context.logger.info("Run build connector step") - build_connector_results, connector_image_tar_file = await build_connector_step.run() + build_connector_results, connector_container = await build_connector_step.run() if build_connector_results.status is StepStatus.FAILURE: return step_results + [ build_connector_results, @@ -325,6 +106,7 @@ async def run_all_tests(context: ConnectorTestContext) -> List[StepResult]: integration_tests_java_step.skip(), acceptance_tests_step.skip(), ] + connector_image_tar_file, _ = await export_container_to_tarball(context, connector_container) context.logger.info("The connector was successfully built.") step_results.append(build_connector_results) diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/python_connectors.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/python_connectors.py index 5cde808fc8f2..b8d186f621f9 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/python_connectors.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/tests/python_connectors.py @@ -9,9 +9,12 @@ import asyncer from ci_connector_ops.pipelines.actions import environments, secrets from ci_connector_ops.pipelines.bases import Step, StepResult, StepStatus -from ci_connector_ops.pipelines.contexts import ConnectorTestContext +from ci_connector_ops.pipelines.builds import LOCAL_BUILD_PLATFORM +from ci_connector_ops.pipelines.builds.python_connectors import BuildConnectorImage +from ci_connector_ops.pipelines.contexts import ConnectorContext from ci_connector_ops.pipelines.tests.common import AcceptanceTests, PytestStep -from dagger import Container, File +from ci_connector_ops.pipelines.utils import export_container_to_tarball +from dagger import Container class CodeFormatChecks(Step): @@ -32,7 +35,7 @@ async def _run(self) -> StepResult: - Flake enforces style-guides: fails if the style-guide is not followed. Args: - context (ConnectorTestContext): The current test context, providing a connector object, a dagger client and a repository directory. + context (ConnectorContext): The current test context, providing a connector object, a dagger client and a repository directory. step (Step): The step in which the code format checks are run. Defaults to Step.CODE_FORMAT_CHECKS Returns: StepResult: Failure or success of the code format checks with stdout and stderr. @@ -102,40 +105,18 @@ async def _run(self, connector_under_test: Container) -> StepResult: return await self._run_tests_in_directory(connector_under_test_with_secrets, "integration_tests") -class BuildConnectorImage(Step): - """ - A step to build a Python connector image using its Dockerfile. - - Export the image as a tar archive on the host /tmp folder. - """ - - title = "Build connector image" - - async def _run(self) -> Tuple[StepResult, File]: - connector_dir = self.context.get_connector_dir() - connector_local_tar_name = f"{self.context.connector.technical_name}.tar" - export_success = await connector_dir.docker_build().export(f"/tmp/{connector_local_tar_name}") - if export_success: - connector_image_tar_path = ( - self.context.dagger_client.host().directory("/tmp", include=[connector_local_tar_name]).file(connector_local_tar_name) - ) - return StepResult(self, StepStatus.SUCCESS), connector_image_tar_path - else: - return StepResult(self, StepStatus.FAILURE, stderr="The connector image could not be exported."), None - - -async def run_all_tests(context: ConnectorTestContext) -> List[StepResult]: - """Run all tests for a Python connnector. +async def run_all_tests(context: ConnectorContext) -> List[StepResult]: + """Run all tests for a Python connector. Args: - context (ConnectorTestContext): The current connector test context. + context (ConnectorContext): The current connector context. Returns: - List[StepResult]: _description_ + List[StepResult]: The results of all the steps that ran or were skipped. """ connector_package_install_step = ConnectorPackageInstall(context) unit_tests_step = UnitTests(context) - build_connector_image_step = BuildConnectorImage(context) + build_connector_image_step = BuildConnectorImage(context, LOCAL_BUILD_PLATFORM) integration_tests_step = IntegrationTests(context) acceptance_test_step = AcceptanceTests(context) @@ -160,10 +141,11 @@ async def run_all_tests(context: ConnectorTestContext) -> List[StepResult]: connector_image_tar_file = None else: context.logger.info("Run the build connector image step.") - build_connector_image_results, connector_image_tar_file = await build_connector_image_step.run() + build_connector_image_results, connector_container = await build_connector_image_step.run() results.append(build_connector_image_results) if build_connector_image_results.status is StepStatus.FAILURE: return results + [integration_tests_step.skip(), acceptance_test_step.skip()] + connector_image_tar_file, _ = await export_container_to_tarball(context, connector_container) context.logger.info("Successfully ran the build connector image step.") context.logger.info("Retrieve the connector secrets.") @@ -178,11 +160,11 @@ async def run_all_tests(context: ConnectorTestContext) -> List[StepResult]: return results + [task.value for task in tasks] -async def run_code_format_checks(context: ConnectorTestContext) -> List[StepResult]: +async def run_code_format_checks(context: ConnectorContext) -> List[StepResult]: """Run the code format check steps for Python connectors. Args: - context (ConnectorTestContext): The current connector test context. + context (ConnectorContext): The current connector context. Returns: List[StepResult]: Results of the code format checks. diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/utils.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/utils.py index 76ff2347c12a..62a683ae913f 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/utils.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/utils.py @@ -3,6 +3,7 @@ # """This module groups util function used in pipelines.""" +from __future__ import annotations import datetime import re @@ -10,7 +11,7 @@ import unicodedata from glob import glob from pathlib import Path -from typing import Any, Callable, List, Optional, Set +from typing import TYPE_CHECKING, Any, Callable, List, Optional, Set, Tuple import anyio import asyncer @@ -20,6 +21,9 @@ from dagger import Config, Connection, Container, DaggerError, File, QueryError from more_itertools import chunked +if TYPE_CHECKING: + from ci_connector_ops.pipelines.contexts import ConnectorContext + DAGGER_CONFIG = Config(log_output=sys.stderr) AIRBYTE_REPO_URL = "https://github.com/airbytehq/airbyte.git" METADATA_FILE_NAME = "metadata.yaml" @@ -292,6 +296,9 @@ def invoke(self, ctx: click.Context) -> Any: """ command_name = self.name click.secho(f"Running Dagger Command {command_name}...") + click.secho( + "If you're running this command for the first time the Dagger engine image will be pulled, it can take a short minute..." + ) try: pipeline_success = super().invoke(ctx) if not pipeline_success: @@ -309,3 +316,27 @@ async def execute_concurrently(steps: List[Callable], concurrency=5): async with asyncer.create_task_group() as task_group: tasks += [task_group.soonify(step)() for step in chunk] return [task.value for task in tasks] + + +async def export_container_to_tarball(context: ConnectorContext, container: Container) -> Tuple[Optional[File], Optional[Path]]: + """Save the container image to the host filesystem as a tar archive. + + Exporting a container image as a tar archive allows user to have a dagger built container image available on their host filesystem. + They can load this tar file to their main docker host with 'docker load'. + This mechanism is also used to share dagger built containers with other steps like AcceptanceTest that have their own dockerd service. + We 'docker load' this tar file to AcceptanceTest's docker host to make sure the container under test image is available for testing. + + Returns: + Tuple[Optional[File], Optional[Path]]: A tuple with the file object holding the tar archive on the host and its path. + """ + container_id = await container.id() + tar_file_name = f"{container_id[:100]}.tar" + local_path = Path(f"{context.host_image_export_dir_path}/{tar_file_name}") + export_success = await container.export(str(local_path)) + if export_success: + exported_file = ( + context.dagger_client.host().directory(context.host_image_export_dir_path, include=[tar_file_name]).file(tar_file_name) + ) + return exported_file, local_path + else: + return None, None diff --git a/tools/ci_connector_ops/setup.py b/tools/ci_connector_ops/setup.py index aaf7bb35a499..43b7308ae667 100644 --- a/tools/ci_connector_ops/setup.py +++ b/tools/ci_connector_ops/setup.py @@ -42,12 +42,7 @@ def local_pkg(name: str) -> str: "pytablewriter~=0.64.2", ] -PIPELINES_REQUIREMENTS = [ - "dagger-io==0.5.0", - "asyncer", - "anyio", - "more-itertools", -] +PIPELINES_REQUIREMENTS = ["dagger-io==0.5.0", "asyncer", "anyio", "more-itertools", "docker"] setup( version="0.2.1",