From f2530b848cea06593030fcd81f022ef13499395a Mon Sep 17 00:00:00 2001 From: Augustin Date: Fri, 10 Mar 2023 19:45:08 +0100 Subject: [PATCH] ci_connector_ops: POC of CI connector pipelines in python (#23487) --- ...nnector_integration_test_single_dagger.yml | 71 ++++- .github/workflows/run-qa-engine.yml | 2 +- pyproject.toml | 2 +- tools/ci_common_utils/setup.py | 4 +- tools/ci_connector_ops/.gitignore | 1 + tools/ci_connector_ops/.python-version | 1 + .../ci_connector_ops/pipelines/README.md | 151 +++++++++++ .../ci_connector_ops/pipelines/__init__.py | 3 + .../pipelines/actions/__init__.py | 3 + .../pipelines/actions/environments.py | 173 ++++++++++++ .../pipelines/actions/remote_storage.py | 34 +++ .../pipelines/actions/secrets.py | 80 ++++++ .../pipelines/actions/tests.py | 230 ++++++++++++++++ .../pipelines/connectors_ci.py | 249 ++++++++++++++++++ .../ci_connector_ops/pipelines/contexts.py | 168 ++++++++++++ .../ci_connector_ops/pipelines/github.py | 41 +++ .../ci_connector_ops/pipelines/models.py | 142 ++++++++++ .../ci_connector_ops/pipelines/utils.py | 152 +++++++++++ .../ci_connector_ops/utils.py | 90 +++++-- tools/ci_connector_ops/setup.py | 44 +++- .../test_acceptance_test_config_checks.py | 45 +++- .../tests/test_qa_engine/test_main.py | 4 +- tools/ci_credentials/setup.py | 15 +- 23 files changed, 1651 insertions(+), 54 deletions(-) create mode 100644 tools/ci_connector_ops/.gitignore create mode 100644 tools/ci_connector_ops/.python-version create mode 100644 tools/ci_connector_ops/ci_connector_ops/pipelines/README.md create mode 100644 tools/ci_connector_ops/ci_connector_ops/pipelines/__init__.py create mode 100644 tools/ci_connector_ops/ci_connector_ops/pipelines/actions/__init__.py create mode 100644 tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py create mode 100644 tools/ci_connector_ops/ci_connector_ops/pipelines/actions/remote_storage.py create mode 100644 tools/ci_connector_ops/ci_connector_ops/pipelines/actions/secrets.py create mode 100644 tools/ci_connector_ops/ci_connector_ops/pipelines/actions/tests.py create mode 100644 tools/ci_connector_ops/ci_connector_ops/pipelines/connectors_ci.py create mode 100644 tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py create mode 100644 tools/ci_connector_ops/ci_connector_ops/pipelines/github.py create mode 100644 tools/ci_connector_ops/ci_connector_ops/pipelines/models.py create mode 100644 tools/ci_connector_ops/ci_connector_ops/pipelines/utils.py diff --git a/.github/workflows/connector_integration_test_single_dagger.yml b/.github/workflows/connector_integration_test_single_dagger.yml index ce01c2d76c51..75603ceeadad 100644 --- a/.github/workflows/connector_integration_test_single_dagger.yml +++ b/.github/workflows/connector_integration_test_single_dagger.yml @@ -2,9 +2,20 @@ name: POC Connectors CI - test pipeline on: workflow_dispatch: - + inputs: + test-connectors-options: + description: "Options to pass to the 'connectors-ci test-connectors' command" + default: "--modified" + pull_request: + paths: + - "airbyte-integrations/connectors/**" + push: + paths: + - "airbyte-integrations/connectors/**" + branches: + - "master" jobs: - start-connectors-integration-test-runner: + start-runner: name: "Connectors Integration Test: Start Build EC2 Runner" timeout-minutes: 10 runs-on: ubuntu-latest @@ -27,30 +38,62 @@ jobs: aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} github-token: ${{ env.PAT }} ec2-instance-type: "c5.2xlarge" # https://aws.amazon.com/ec2/instance-types/ - launch_integration_tests: - timeout-minutes: 120 - runs-on: ${{ needs.start-connectors-integration-test-runner.outputs.label }} # run the job on the newly created runner + connectors_ci: + name: Connectors CI + timeout-minutes: 240 # 4 hours + needs: start-runner + runs-on: ${{ needs.start-runner.outputs.label }} # run the job on the newly created runner steps: - name: Checkout Airbyte uses: actions/checkout@v3 + with: + repository: ${{ github.event.inputs.repo }} + ref: ${{ github.event.inputs.gitref }} + - name: Extract branch name + shell: bash + if: github.event_name == 'workflow_dispatch' + run: echo "##[set-output name=branch;]$(echo ${GITHUB_REF#refs/heads/})" + id: extract_branch - name: Install Python 3.10 uses: actions/setup-python@v4 with: python-version: "3.10" - - name: Upgrade pip - run: pip install --upgrade pip - name: Install ci-connector-ops package - run: pip install --quiet -e ./tools/ci_connector_ops - stop-connectors-integration-test-runner: + run: pip install ./tools/ci_connector_ops\[pipelines]\ + - name: Run connectors-ci test-connectors [WORKFLOW DISPATCH] + if: github.event_name == 'workflow_dispatch' + run: connectors-ci --is-ci --gha-workflow-run-id=${{ github.run_id }} test-connectors ${{ github.event.inputs.test-connectors-options }} + env: + GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }} + AWS_ACCESS_KEY_ID: ${{ secrets.STATUS_API_AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.STATUS_API_AWS_SECRET_ACCESS_KEY }} + AWS_DEFAULT_REGION: "us-east-2" + TEST_REPORTS_BUCKET_NAME: "airbyte-connector-build-status" + CI_GITHUB_ACCESS_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} + CI_GIT_BRANCH: ${{ steps.extract_branch.outputs.branch }} + CI_GIT_REVISION: ${{ github.sha }} + - name: Run connectors-ci test-connectors [PULL REQUESTS] + if: github.event_name == 'pull_request' + run: connectors-ci --is-ci --gha-workflow-run-id=${{ github.run_id }} test-connectors --modified + env: + GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }} + AWS_ACCESS_KEY_ID: ${{ secrets.STATUS_API_AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.STATUS_API_AWS_SECRET_ACCESS_KEY }} + AWS_DEFAULT_REGION: "us-east-2" + TEST_REPORTS_BUCKET_NAME: "airbyte-connector-build-status" + CI_GITHUB_ACCESS_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} + CI_GIT_BRANCH: ${{ github.head_ref }} + CI_GIT_REVISION: ${{ github.event.pull_request.head.sha }} + stop-runner: name: "Connectors Integration Test: Stop Build EC2 Runner" timeout-minutes: 10 needs: - - start-connectors-integration-test-runner # required to get output from the start-runner job - - launch_integration_tests # required to wait when the main job is done + - start-runner # required to get output from the start-runner job + - connectors_ci # required to wait when the main job is done runs-on: ubuntu-latest # Always is required to stop the runner even if the previous job has errors. However always() runs even if the previous step is skipped. # Thus, we check for skipped here. - if: ${{ always() && needs.start-connectors-integration-test-runner.result != 'skipped'}} + if: ${{ always() && needs.start-runner.result != 'skipped'}} steps: - name: Configure AWS credentials uses: aws-actions/configure-aws-credentials@v1 @@ -70,5 +113,5 @@ jobs: with: mode: stop github-token: ${{ env.PAT }} - label: ${{ needs.start-connectors-integration-test-runner.outputs.label }} - ec2-instance-id: ${{ needs.start-connectors-integration-test-runner.outputs.ec2-instance-id }} + label: ${{ needs.start-runner.outputs.label }} + ec2-instance-id: ${{ needs.start-runner.outputs.ec2-instance-id }} diff --git a/.github/workflows/run-qa-engine.yml b/.github/workflows/run-qa-engine.yml index e5935d21c7f2..2c70bb63a5ca 100644 --- a/.github/workflows/run-qa-engine.yml +++ b/.github/workflows/run-qa-engine.yml @@ -22,7 +22,7 @@ jobs: - name: Preparing Runner to the Build process uses: ./.github/actions/runner-prepare-for-build - name: Install ci-connector-ops package - run: pip install --quiet -e ./tools/ci_connector_ops + run: pip install --quiet -e ./tools/ci_connector_ops\[qa_engine\] - name: Run QA Engine env: LOGLEVEL: INFO diff --git a/pyproject.toml b/pyproject.toml index e584fd9ba9c9..93bc56a58624 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,7 @@ extend-ignore = [ [tool.isort] profile = "black" color_output = false -skip_gitignore = true +# skip_gitignore = true line_length = 140 multi_line_output = 3 include_trailing_comma = true diff --git a/tools/ci_common_utils/setup.py b/tools/ci_common_utils/setup.py index 2c5aa9692ece..d0e178981fbc 100644 --- a/tools/ci_common_utils/setup.py +++ b/tools/ci_common_utils/setup.py @@ -1,11 +1,11 @@ # -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. # from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["cryptography", "requests", "pyjwt~=2.3.0"] +MAIN_REQUIREMENTS = ["cryptography", "requests", "pyjwt~=2.6.0"] TEST_REQUIREMENTS = ["requests-mock"] diff --git a/tools/ci_connector_ops/.gitignore b/tools/ci_connector_ops/.gitignore new file mode 100644 index 000000000000..be583283ae5a --- /dev/null +++ b/tools/ci_connector_ops/.gitignore @@ -0,0 +1 @@ +test_reports diff --git a/tools/ci_connector_ops/.python-version b/tools/ci_connector_ops/.python-version new file mode 100644 index 000000000000..30291cba2230 --- /dev/null +++ b/tools/ci_connector_ops/.python-version @@ -0,0 +1 @@ +3.10.0 diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/README.md b/tools/ci_connector_ops/ci_connector_ops/pipelines/README.md new file mode 100644 index 000000000000..763998d152ce --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/README.md @@ -0,0 +1,151 @@ +# POC of CI connector pipelines in python + +This Python subpackage of `ci-connector-ops` gathers the POC code we're working on to: +- Rewrite [airbyte-python.gradle](https://github.com/airbytehq/airbyte/blob/7d7e48b2a342a328fa74c6fd11a9268e1dcdcd64/buildSrc/src/main/groovy/airbyte-python.gradle) and [airbyte-connector-acceptance-test.gradle](https://github.com/airbytehq/airbyte/blob/master/buildSrc/src/main/groovy/airbyte-connector-acceptance-test.gradle) in Python. +- Centralize the CI logic for connector testing +- Try out Dagger.io as a promising tool that can provide parallelism and caching out of the box for CI + +## Install and use +From `airbyte` repo root: + +### Install +```bash +cd tools/ci_connector_ops +python -m venv .venv (please use at least Python 3.10) +source .venv/bin/activate +pip install --upgrade pip +pip install -e . +cd ../.. +``` + +### Use + +### Use remote secrets +If you want the pipeline to pull connector secrets from Google Secrets manager you have to set the `GCP_GSM_CREDENTIALS` env variable. +If you don't set this variable the local secrets files, under the `secrets` directory of a connector, will be used for acceptance test run. +More details [here](https://github.com/airbytehq/airbyte/blob/master/tools/ci_credentials/README.md#L20). + +```bash +export GCP_GSM_CREDENTIALS=`cat ` +``` + +If you don't want to use the remote secrets please call connectors-ci with the following flag: +```bash +connectors-ci --use-remote-secrets=False +``` + +### Environment variables required for CI run: +* `GCP_GSM_CREDENTIALS`: the credentials to connect to GSM +* `TEST_REPORTS_BUCKET_NAME`: the name of the bucket where the test report will be uploaded. +* `AWS_ACCESS_KEY_ID`: the access key id of a service account allowed to write to `TEST_REPORTS_BUCKET_NAME` +* `AWS_SECRET_ACCESS_KEY`: the secret access key of a service account allowed to write to`TEST_REPORTS_BUCKET_NAME` +* `AWS_REGION`: The AWS region of the `TEST_REPORTS_BUCKET_NAME` + + +### **Run the pipelines for a specific connectors** +(source-pokeapi does not require GSM access) +```bash +connectors-ci test-connectors --name=source-pokeapi +``` + +### **Run the pipeline for multiple connectors** + +```bash +connectors-ci test-connectors --name=source-pokeapi --name=source-openweather +``` +### **Run the pipeline for generally available connectors** + +```bash +connectors-ci test-connectors --release-stage=generally_available +``` + + +### **Run the pipeline for the connectors you changed on the branch** + +```bash +touch airbyte-integrations/connectors/source-pokeapi/random_file_addition.txt +connectors-ci test-connectors --modified #the source-pokeapi pipeline should run +``` + +### Local VS. CI +The default behavior of the CLI is to run in a local context. +You can tell the CLI that it is running in a CI context with the following flag: +```bash +connectors-ci --is-ci +``` + +The main differences are that: +- The pipeline will pull the branch under test from Airbyte's GitHub repo +- The pipeline will upload per connector test reports to S3 + + +## What does a connector pipeline run + +```mermaid +flowchart LR; + AB_GIT_REPO[Airbyte Git Repo] --> MOUNT_AB[Mount Airbyte repo to container]; + AB_GIT_REPO[Airbyte Git Repo] --> MOUNT_CONN[Mount connector source code to container]; + DOWN_SECRETS[Download secrets from GSM]-->CAT + MOUNT_AB-->QA[Run QA checks]; + MOUNT_CONN-->FORMAT[Code format checks]; + MOUNT_CONN-->INSTALL_CONN[Install connector package in container]; + INSTALL_CONN-->UNIT_TESTS[Run unit tests]; + UNIT_TESTS-->INTEGRATION_TESTS[Run integration tests]; + UNIT_TESTS-->DOCKER_BUILD[Docker build connector dev image]; + DOCKER_BUILD-->CAT[Run acceptance tests]; + CAT-->UPLOAD_SECRETS[Upload updated secrets to GSM]; + CAT-->REPORT[Build test report]; + UNIT_TESTS-->REPORT; + INTEGRATION_TESTS-->REPORT; + QA-->REPORT; + FORMAT-->REPORT; + REPORT--if in CI-->UPLOAD[Upload to S3]; +``` + +This is the DAG we expect for every connector for which the pipeline is triggered. +The Airbyte git repo will be the local one if you use `--is-local=True` command line option. +The connector secrets won't be downloaded nor uploaded if you use the `--use-remote-secrets=False` command line option. + +## Questions for the Dagger team (in priority order) + +Dear Dagger team. You should be able to execute the code we pushed with the instructions above. +Please ignore the `Set your environment variables` step and focus on running `connectors-ci test-connectors source-pokeapi` to reproduce the problems I mention below. + +**First batch of questions**: + +1. ~~How to handle exit codes: if exit_code != 0 an exception is thrown and stops the other pipeline execution. Code context [here](https://github.com/airbytehq/airbyte/blob/7d7e48b2a342a328fa74c6fd11a9268e1dcdcd64/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/tests.py#L25)~~ A stop-gap solution was implemented waiting for this [issue](https://github.com/dagger/dagger/issues/3192) to be fixed. +2. Can we make with_mounted_directory writable so that the container can write to the host FS? Code context [here](https://github.com/airbytehq/airbyte/blob/7d7e48b2a342a328fa74c6fd11a9268e1dcdcd64/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/tests.py#L119) +Dagger team answer: We'll implement a flag to run privileged `with_exec` that will allow containers to write on the host FS. +3. How to get access to visualizations: We'd love to have dynamic status checks on our GitHub PRs, with links to pipeline visualization [like](https://propeller.fly.dev/runs/da68273e-48d8-4354-8d8b-efaccf2792b9). +Dagger team answer: coming soon. +4. Can we build and tag an image in Dagger? +Dagger team answer: Run a local docker registry and publish images to this directory during the pipeline execution. +5. What are the best practices to report success failure details? +I built a custom models (`ConnectorTestReport`) to store test results. I archive tests results to S3. Do you have other suggestions? +6. What are the best practices to write tests for pipelines? +Dagger team answer: Get inspirations from their own repo [here](https://github.com/dagger/dagger/tree/main/sdk/python/tests). +I'll write test once the code architecture is ratified. + +7. What would be your suggestion to run `docker scan` from dagger to spot vulnerabilities on our connectors? +Dagger team answer: A scanning tool should be wrapped in a container to scan images from Dagger. +8. Do you have a tool to re-order logs line by order of pipeline after execution? +A log grouping tool is under construction: https://www.youtube.com/watch&ab_channel=Dagger + +**Second batch of questions**: + +1. I used the stopgap to handle exit-code != 0 but I think as dagger considers the execution as failed this step is always re-run. (cf `run_integration_tests`). Am I understanding this problem correctly, is there a workaround? +2. `run_acceptance_tests` runs at every execution even if the artifacts passed to it did not change: the connector image id is not changing on rebuild of the same code and the secrets I download from GSM are not changing (but re-downloaded everytime because they might change). Is it because the directory instance I mount to it is different on each run? cf (`run_acceptance-tests`) +3. I tried to leverage concurrency as much as possible with `asyncio.create_task` and `asyncio.gather`. Is it something you expect developer to implement or is it not something I should bother with and consider Dagger will take care of concurrency at a lower level. (I noted performance improvement while using `asyncio.create_task` etc.) + +### Airbyte specific context that could help you understand our workflow. +- We always use a :dev tag to tag images of connector we build locally. We try to never publish these images to a public repository. +- We run a container called connector-acceptance-test which is a global test suite for all our connectors. This test suite is ran against a connector under test container, (usually using its :dev image). +- Connector-acceptance-test is a customizable test suite (built with pytest) configured with per-connector `acceptance-test-config.yml` files ([e.g.](https://github.com/airbytehq/airbyte/blob/b0c5f14db6a905899d0f9c043954abcc5ec296f0/airbyte-integrations/connectors/source-pokeapi/acceptance-test-config.yml#L1)) +- As connector-acceptance-test is running connector containers, it triggers actual HTTP requests the public API of our source/destination connector. This is why we need to load secrets configuration with our test account credentials to these connector containers. connector-acceptance-test is also generating dynamic connector configurations to check the behavior of a connector under test when it is it different structure of configuration files. + + +### Performance benchmarks + +| Connector | Run integration test GHA duration | Dagger POC duration (local) | Dagger POC duration (CI) | +|----------------|-----------------------------------|-----------------------------|--------------------------| +| source-pokeapi | 3mn45s | 1mn48 | TBD | \ No newline at end of file diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/__init__.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/__init__.py new file mode 100644 index 000000000000..c941b3045795 --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/__init__.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/__init__.py new file mode 100644 index 000000000000..c941b3045795 --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py new file mode 100644 index 000000000000..0342b3da4d9d --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py @@ -0,0 +1,173 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +"""This modules groups functions made to create reusable environments packaged in dagger containers.""" + +from typing import List, Optional + +from ci_connector_ops.pipelines.contexts import ConnectorTestContext +from ci_connector_ops.pipelines.utils import get_file_contents +from dagger import CacheSharingMode, CacheVolume, Container, Directory, Secret + +PYPROJECT_TOML_FILE_PATH = "pyproject.toml" + +CONNECTOR_TESTING_REQUIREMENTS = [ + "pip==21.3.1", + "mccabe==0.6.1", + "flake8==4.0.1", + "pyproject-flake8==0.0.1a2", + "black==22.3.0", + "isort==5.6.4", + "pytest==6.2.5", + "coverage[toml]==6.3.1", + "pytest-custom_exit_code", +] + +INSTALL_LOCAL_REQUIREMENTS_CMD = ["python", "-m", "pip", "install", "-r", "requirements.txt"] +INSTALL_CONNECTOR_PACKAGE_CMD = ["python", "-m", "pip", "install", "."] +DEFAULT_PYTHON_EXCLUDE = [".venv"] +CI_CREDENTIALS_SOURCE_PATH = "tools/ci_credentials" +CI_CONNECTOR_OPS_SOURCE_PATH = "tools/ci_connector_ops" + + +async def with_python_base(context: ConnectorTestContext, python_image_name: str = "python:3.9-slim") -> Container: + """Builds a Python container with a cache volume for pip cache. + + Args: + context (ConnectorTestContext): The current test context, providing a dagger client and a repository directory. + python_image_name (str, optional): The python image to use to build the python base environment. Defaults to "python:3.9-slim". + + Raises: + ValueError: Raised if the python_image_name is not a python image. + + Returns: + Container: The python base environment container. + """ + if not python_image_name.startswith("python:3"): + raise ValueError("You have to use a python image to build the python base environment") + pip_cache: CacheVolume = context.dagger_client.cache_volume("pip_cache") + return ( + context.dagger_client.container() + .from_(python_image_name) + .with_mounted_cache("/root/.cache/pip", pip_cache, sharing=CacheSharingMode.LOCKED) + .with_mounted_directory("/tools", context.get_repo_dir("tools", include=["ci_credentials", "ci_common_utils"], exclude=[".venv"])) + .with_exec(["pip", "install", "--upgrade", "pip"]) + ) + + +async def with_testing_dependencies(context: ConnectorTestContext) -> Container: + """Builds a testing environment by installing testing dependencies on top of a python base environment. + + Args: + context (ConnectorTestContext): The current test context, providing a dagger client and a repository directory. + + Returns: + Container: The testing environment container. + """ + python_environment: Container = await with_python_base(context) + pyproject_toml_file = context.get_repo_dir(".", include=[PYPROJECT_TOML_FILE_PATH]).file(PYPROJECT_TOML_FILE_PATH) + return python_environment.with_exec(["pip", "install"] + CONNECTOR_TESTING_REQUIREMENTS).with_file( + f"/{PYPROJECT_TOML_FILE_PATH}", pyproject_toml_file + ) + + +async def with_python_package( + context: ConnectorTestContext, + python_environment: Container, + package_source_code_path: str, + additional_dependency_groups: Optional[List] = None, + exclude: Optional[List] = None, + install: bool = True, +) -> Container: + """Installs a python package in a python environment container. + + Args: + context (ConnectorTestContext): The current test context, providing the repository directory from which the python sources will be pulled. + python_environment (Container): An existing python environment in which the package will be installed. + package_source_code_path (str): The local path to the package source code. + additional_dependency_groups (Optional[List]): extra_requires dependency of setup.py to install. Defaults to None. + exclude (Optional[List]): A list of file or directory to exclude from the python package source code. + install (bool): Whether to install the python package or not. Defaults to True. + + Returns: + Container: A python environment container with the python package installed. + """ + if exclude: + exclude = DEFAULT_PYTHON_EXCLUDE + exclude + else: + exclude = DEFAULT_PYTHON_EXCLUDE + package_source_code_directory: Directory = context.get_repo_dir(package_source_code_path, exclude=exclude) + container = python_environment.with_mounted_directory("/" + package_source_code_path, package_source_code_directory).with_workdir( + "/" + package_source_code_path + ) + + if install: + if requirements_txt := await get_file_contents(container, "requirements.txt"): + for line in requirements_txt.split("\n"): + if line.startswith("-e ."): + local_dependency_path = package_source_code_path + "/" + line[3:] + container = container.with_mounted_directory( + "/" + local_dependency_path, context.get_repo_dir(local_dependency_path, exclude=DEFAULT_PYTHON_EXCLUDE) + ) + container = container.with_exec(INSTALL_LOCAL_REQUIREMENTS_CMD) + + container = container.with_exec(INSTALL_CONNECTOR_PACKAGE_CMD) + + if additional_dependency_groups: + container = container.with_exec( + INSTALL_CONNECTOR_PACKAGE_CMD[:-1] + [INSTALL_CONNECTOR_PACKAGE_CMD[-1] + f"[{','.join(additional_dependency_groups)}]"] + ) + + return container + + +async def with_airbyte_connector(context: ConnectorTestContext, install: bool = True) -> Container: + """Installs an airbyte connector python package in a testing environment. + + Args: + context (ConnectorTestContext): The current test context, providing the repository directory from which the connector sources will be pulled. + install (bool): Whether to install the connector package or not. Defaults to True. + Returns: + Container: A python environment container (with the connector installed if install == True). + """ + connector_source_path = str(context.connector.code_directory) + testing_environment: Container = await with_testing_dependencies(context) + return await with_python_package( + context, + testing_environment, + connector_source_path, + additional_dependency_groups=["dev", "tests", "main"], + exclude=["secrets"], + install=install, + ) + + +async def with_ci_credentials(context: ConnectorTestContext, gsm_secret: Secret) -> Container: + """Installs the ci_credentials package in a python environment. + + Args: + context (ConnectorTestContext): The current test context, providing the repository directory from which the ci_credentials sources will be pulled. + gsm_secret (Secret): The secret holding GCP_GSM_CREDENTIALS env variable value. + + Returns: + Container: A python environment with the ci_credentials package installed. + """ + python_base_environment: Container = await with_python_base(context) + ci_credentials = await with_python_package(context, python_base_environment, CI_CREDENTIALS_SOURCE_PATH) + + return ci_credentials.with_env_variable("VERSION", "dev").with_secret_variable("GCP_GSM_CREDENTIALS", gsm_secret).with_workdir("/") + + +async def with_ci_connector_ops(context: ConnectorTestContext) -> Container: + """Installs the ci_connector_ops package in a Container running Python > 3.10 with git.. + + Args: + context (ConnectorTestContext): The current test context, providing the repository directory from which the ci_connector_sources sources will be pulled. + + Returns: + Container: A python environment container with ci_connector_ops installed. + """ + python_base_environment: Container = await with_python_base(context, "python:3-alpine") + python_with_git = python_base_environment.with_exec(["apk", "add", "gcc", "libffi-dev", "musl-dev", "git"]) + return await with_python_package(context, python_with_git, CI_CONNECTOR_OPS_SOURCE_PATH, exclude=["pipelines"]) diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/remote_storage.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/remote_storage.py new file mode 100644 index 000000000000..e521f88886ec --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/remote_storage.py @@ -0,0 +1,34 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +from pathlib import Path + +from dagger import Client, File, Secret + + +async def upload_to_s3(dagger_client: Client, file_to_upload_path: Path, key: str, bucket: str) -> int: + """Upload a local file to S3 using the AWS CLI docker image and running aws s3 cp command. + + Args: + dagger_client (Client): The dagger client. + file_to_upload_path (Path): The local path to the file to upload. + key (str): The key that will be written on the S3 bucket. + bucket (str): The S3 bucket name. + + Returns: + int: Exit code of the upload process. + """ + file_to_upload: File = dagger_client.host().directory(".", include=[str(file_to_upload_path)]).file(str(file_to_upload_path)) + aws_access_key_id: Secret = dagger_client.host().env_variable("AWS_ACCESS_KEY_ID").secret() + aws_secret_access_key: Secret = dagger_client.host().env_variable("AWS_SECRET_ACCESS_KEY").secret() + aws_region: Secret = dagger_client.host().env_variable("AWS_DEFAULT_REGION").secret() + return await ( + dagger_client.container() + .from_("amazon/aws-cli:latest") + .with_file(str(file_to_upload_path), file_to_upload) + .with_secret_variable("AWS_ACCESS_KEY_ID", aws_access_key_id) + .with_secret_variable("AWS_SECRET_ACCESS_KEY", aws_secret_access_key) + .with_secret_variable("AWS_DEFAULT_REGION", aws_region) + .with_exec(["s3", "cp", str(file_to_upload_path), f"s3://{bucket}/{key}"]) + .exit_code() + ) diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/secrets.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/secrets.py new file mode 100644 index 000000000000..5c152ab316be --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/secrets.py @@ -0,0 +1,80 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +"""This modules groups functions made to download/upload secrets from/to a remote secret service and provide these secret in a dagger Directory.""" +from __future__ import annotations + +import datetime +from typing import TYPE_CHECKING + +import anyio +from ci_connector_ops.pipelines.actions import environments +from dagger import Directory + +if TYPE_CHECKING: + from ci_connector_ops.pipelines.contexts import ConnectorTestContext + + +async def download(context: ConnectorTestContext, gcp_gsm_env_variable_name: str = "GCP_GSM_CREDENTIALS") -> Directory: + """Use the ci-credentials tool to download the secrets stored for a specific connector to a Directory. + + Args: + context (ConnectorTestContext): The context providing a connector object. + gcp_gsm_env_variable_name (str, optional): The name of the environment variable holding credentials to connect to Google Secret Manager. Defaults to "GCP_GSM_CREDENTIALS". + + Returns: + Directory: A directory with the downloaded secrets. + """ + gsm_secret = context.dagger_client.host().env_variable(gcp_gsm_env_variable_name).secret() + secrets_path = "/" + str(context.connector.code_directory) + "/secrets" + + ci_credentials = await environments.with_ci_credentials(context, gsm_secret) + return ( + ci_credentials.with_exec(["mkdir", "-p", secrets_path]) + .with_env_variable( + "CACHEBUSTER", datetime.datetime.now().isoformat() + ) # Secrets can be updated on GSM anytime, we can't cache this step... + .with_exec(["ci_credentials", context.connector.technical_name, "write-to-storage"]) + .directory(secrets_path) + ) + + +async def upload(context: ConnectorTestContext, gcp_gsm_env_variable_name: str = "GCP_GSM_CREDENTIALS") -> int: + """Use the ci-credentials tool to upload the secrets stored in the context's updated_secrets-dir. + + Args: + context (ConnectorTestContext): The context providing a connector object and the update secrets dir. + gcp_gsm_env_variable_name (str, optional): The name of the environment variable holding credentials to connect to Google Secret Manager. Defaults to "GCP_GSM_CREDENTIALS". + + Returns: + int: The exit code of the ci-credentials update-secrets command. + """ + gsm_secret = context.dagger_client.host().env_variable(gcp_gsm_env_variable_name).secret() + secrets_path = "/" + str(context.connector.code_directory) + "/secrets" + + ci_credentials = await environments.with_ci_credentials(context, gsm_secret) + + return await ( + ci_credentials.with_directory(secrets_path, context.updated_secrets_dir) + .with_exec(["ci_credentials", context.connector.technical_name, "update-secrets"]) + .exit_code() + ) + + +async def get_connector_secret_dir(context: ConnectorTestContext) -> Directory: + """Download the secrets from GSM or use the local secrets directory for a connector. + + Args: + context (ConnectorTestContext): The context providing the connector directory and the use_remote_secrets flag. + + Returns: + Directory: A directory with the downloaded connector secrets. + """ + if context.use_remote_secrets: + secrets_dir = await download(context) + else: + local_secrets_dir = anyio.Path(context.connector.code_directory) / "secrets" + await local_secrets_dir.mkdir(exist_ok=True) + secrets_dir = context.get_connector_dir(include=["secrets"]).directory("secrets") + return secrets_dir diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/tests.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/tests.py new file mode 100644 index 000000000000..88136f2e7943 --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/tests.py @@ -0,0 +1,230 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +"""This modules groups functions made to run tests for a specific connector given a test context.""" + +import json +import uuid +from typing import Tuple + +from ci_connector_ops.pipelines.actions import environments +from ci_connector_ops.pipelines.contexts import ConnectorTestContext +from ci_connector_ops.pipelines.models import Step, StepResult, StepStatus +from ci_connector_ops.pipelines.utils import check_path_in_workdir, with_exit_code, with_stderr, with_stdout +from dagger import CacheSharingMode, Container, Directory + +RUN_BLACK_CMD = ["python", "-m", "black", f"--config=/{environments.PYPROJECT_TOML_FILE_PATH}", "--check", "."] +RUN_ISORT_CMD = ["python", "-m", "isort", f"--settings-file=/{environments.PYPROJECT_TOML_FILE_PATH}", "--check-only", "--diff", "."] +RUN_FLAKE_CMD = ["python", "-m", "pflake8", f"--config=/{environments.PYPROJECT_TOML_FILE_PATH}", "."] + + +def pytest_logs_to_step_result(logs: str, step: Step) -> StepResult: + last_log_line = logs.split("\n")[-2] + if "failed" in last_log_line: + return StepResult(step, StepStatus.FAILURE, stderr=logs) + elif "no tests ran" in last_log_line: + return StepResult(step, StepStatus.SKIPPED, stdout=logs) + else: + return StepResult(step, StepStatus.SUCCESS, stdout=logs) + + +async def _run_tests_in_directory(connector_under_test: Container, test_directory: str, step: Step) -> StepResult: + """Runs the pytest tests in the test_directory that was passed. + A StepStatus.SKIPPED is returned if no tests were discovered. + Args: + connector_under_test (Container): The connector under test container. + test_directory (str): The directory in which the python test modules are declared + + Returns: + Tuple[StepStatus, Optional[str], Optional[str]]: Tuple of StepStatus, stderr and stdout. + """ + test_config = ( + "pytest.ini" if await check_path_in_workdir(connector_under_test, "pytest.ini") else "/" + environments.PYPROJECT_TOML_FILE_PATH + ) + if await check_path_in_workdir(connector_under_test, test_directory): + tester = connector_under_test.with_exec( + [ + "python", + "-m", + "pytest", + "--suppress-tests-failed-exit-code", + "--suppress-no-test-exit-code", + "-s", + test_directory, + "-c", + test_config, + ] + ) + return pytest_logs_to_step_result(await tester.stdout(), step) + + else: + return StepResult(step, StepStatus.SKIPPED) + + +async def code_format_checks(connector_under_test: Container, step=Step.CODE_FORMAT_CHECKS) -> StepResult: + """Run a code format check on the container source code. + We call black, isort and flake commands: + - Black formats the code: fails if the code is not formatted. + - Isort checks the import orders: fails if the import are not properly ordered. + - Flake enforces style-guides: fails if the style-guide is not followed. + Args: + connector_under_test (Container): The connector under test container. + step (Step): The step in which the code format checks are run. Defaults to Step.CODE_FORMAT_CHECKS + Returns: + StepResult: Failure or success of the code format checks with stdout and stdout. + """ + connector_under_test = step.get_dagger_pipeline(connector_under_test) + + formatter = ( + connector_under_test.with_exec(["echo", "Running black"]) + .with_exec(RUN_BLACK_CMD) + .with_exec(["echo", "Running Isort"]) + .with_exec(RUN_ISORT_CMD) + .with_exec(["echo", "Running Flake"]) + .with_exec(RUN_FLAKE_CMD) + ) + return StepResult( + step, + StepStatus.from_exit_code(await with_exit_code(formatter)), + stderr=await with_stderr(formatter), + stdout=await with_stdout(formatter), + ) + + +async def run_unit_tests(connector_under_test: Container, step=Step.UNIT_TESTS) -> StepStatus: + """Run all pytest tests declared in the unit_tests directory of the connector code. + + Args: + connector_under_test (Container): The connector under test container. + step (Step): The step in which the unit tests are run. Defaults to Step.UNIT_TESTS + + Returns: + StepResult: Failure or success of the unit tests with stdout and stdout. + """ + connector_under_test = step.get_dagger_pipeline(connector_under_test) + return await _run_tests_in_directory(connector_under_test, "unit_tests", step) + + +async def run_integration_tests(connector_under_test: Container, step=Step.INTEGRATION_TESTS) -> StepStatus: + """Run all pytest tests declared in the unit_tests directory of the connector code. + + Args: + connector_under_test (Container): The connector under test container. + step (Step): The step in which the integration tests are run. Defaults to Step.UNIT_TESTS + + Returns: + StepResult: Failure or success of the integration tests with stdout and stdout. + """ + connector_under_test = step.get_dagger_pipeline(connector_under_test) + return await _run_tests_in_directory(connector_under_test, "integration_tests", step) + + +async def run_acceptance_tests( + context: ConnectorTestContext, + step=Step.ACCEPTANCE_TESTS, +) -> Tuple[StepResult, Directory]: + """Runs the acceptance test suite on a connector dev image. + It's rebuilding the connector acceptance test image if the tag is :dev. + It's building the connector under test dev image if the connector image is :dev in the acceptance test config. + Args: + context (ConnectorTestContext): The current test context, providing a connector object, a dagger client, a repository directory and the secrets directory. + step (Step): The step in which the acceptance tests are run. Defaults to Step.ACCEPTANCE_TESTS + + Returns: + Tuple[StepResult, Directory]: Failure or success of the acceptances tests with stdout and stdout AND an updated secrets directory. + + """ + if not context.connector.acceptance_test_config: + return StepResult(Step.ACCEPTANCE_TESTS, StepStatus.SKIPPED), None + + dagger_client = step.get_dagger_pipeline(context.dagger_client) + + if context.connector_acceptance_test_image.endswith(":dev"): + cat_container = context.connector_acceptance_test_source_dir.docker_build() + else: + cat_container = dagger_client.container().from_(context.connector_acceptance_test_image) + + dockerd = ( + dagger_client.container() + .from_("docker:23.0.1-dind") + .with_mounted_cache("/var/lib/docker", dagger_client.cache_volume("docker-lib"), sharing=CacheSharingMode.PRIVATE) + .with_mounted_cache("/tmp", dagger_client.cache_volume("share-tmp")) + .with_exposed_port(2375) + .with_exec(["dockerd", "--log-level=error", "--host=tcp://0.0.0.0:2375", "--tls=false"], insecure_root_capabilities=True) + ) + docker_host = await dockerd.endpoint(scheme="tcp") + + acceptance_test_cache_buster = str(uuid.uuid4()) + if context.connector.acceptance_test_config["connector_image"].endswith(":dev"): + inspect_output = await ( + dagger_client.pipeline(f"Building {context.connector.acceptance_test_config['connector_image']}") + .container() + .from_("docker:23.0.1-cli") + .with_env_variable("DOCKER_HOST", docker_host) + .with_service_binding("docker", dockerd) + .with_mounted_directory("/connector_to_build", context.get_connector_dir(exclude=[".venv"])) + .with_workdir("/connector_to_build") + .with_exec(["docker", "build", ".", "-t", f"airbyte/{context.connector.technical_name}:dev"]) + .with_exec(["docker", "image", "inspect", f"airbyte/{context.connector.technical_name}:dev"]) + .stdout() + ) + acceptance_test_cache_buster = json.loads(inspect_output)[0]["Id"] + + cat_container = ( + cat_container.with_env_variable("DOCKER_HOST", docker_host) + .with_entrypoint(["pip"]) + .with_exec(["install", "pytest-custom_exit_code"]) + .with_service_binding("docker", dockerd) + .with_mounted_cache("/tmp", dagger_client.cache_volume("share-tmp")) + .with_mounted_directory("/test_input", context.get_connector_dir(exclude=["secrets", ".venv"])) + .with_directory("/test_input/secrets", context.secrets_dir) + .with_workdir("/test_input") + .with_entrypoint(["python", "-m", "pytest", "-p", "connector_acceptance_test.plugin", "--suppress-tests-failed-exit-code"]) + .with_env_variable("CACHEBUSTER", acceptance_test_cache_buster) + .with_exec(["--acceptance-test-config", "/test_input"]) + ) + + secret_dir = cat_container.directory("/test_input/secrets") + updated_secrets_dir = None + if secret_files := await secret_dir.entries(): + for file_path in secret_files: + if file_path.startswith("updated_configurations"): + updated_secrets_dir = secret_dir + break + + return (pytest_logs_to_step_result(await cat_container.stdout(), step), updated_secrets_dir) + + +async def run_qa_checks(context: ConnectorTestContext, step=Step.QA_CHECKS) -> StepResult: + """Runs our QA checks on a connector. + + Args: + context (ConnectorTestContext): The current test context, providing a connector object, a dagger client and a repository directory. + + Returns: + StepResult: Failure or success of the QA checks with stdout and stdout. + """ + ci_connector_ops = await environments.with_ci_connector_ops(context) + ci_connector_ops = step.get_dagger_pipeline(ci_connector_ops) + filtered_repo = context.get_repo_dir( + include=[ + str(context.connector.code_directory), + str(context.connector.documentation_file_path), + str(context.connector.icon_path), + "airbyte-config/init/src/main/resources/seed/source_definitions.yaml", + "airbyte-config/init/src/main/resources/seed/destination_definitions.yaml", + ], + ) + qa_checks = ( + ci_connector_ops.with_mounted_directory("/airbyte", filtered_repo) + .with_workdir("/airbyte") + .with_exec(["run-qa-checks", f"connectors/{context.connector.technical_name}"]) + ) + + return StepResult( + step, + StepStatus.from_exit_code(await with_exit_code(qa_checks)), + stderr=await with_stderr(qa_checks), + stdout=await with_stdout(qa_checks), + ) diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/connectors_ci.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/connectors_ci.py new file mode 100644 index 000000000000..7963c59c04bd --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/connectors_ci.py @@ -0,0 +1,249 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +import asyncio +import logging +import os +import sys +from pathlib import Path +from typing import List, Tuple + +import anyio +import click +import dagger +from ci_connector_ops.pipelines.actions import environments, tests +from ci_connector_ops.pipelines.contexts import ConnectorTestContext +from ci_connector_ops.pipelines.github import update_commit_status_check +from ci_connector_ops.pipelines.models import ConnectorTestReport, Step, StepResult, StepStatus +from ci_connector_ops.pipelines.utils import ( + DAGGER_CONFIG, + get_current_git_branch, + get_current_git_revision, + get_modified_connectors, + get_modified_files, +) +from ci_connector_ops.utils import ConnectorLanguage, get_all_released_connectors +from rich.logging import RichHandler + +GITHUB_GLOBAL_CONTEXT = "[POC please ignore] Connectors CI" +GITHUB_GLOBAL_DESCRIPTION = "Running connectors tests" +REQUIRED_ENV_VARS_FOR_CI = [ + "GCP_GSM_CREDENTIALS", + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + "AWS_DEFAULT_REGION", + "TEST_REPORTS_BUCKET_NAME", + "CI_GITHUB_ACCESS_TOKEN", +] + +logging.basicConfig(level=logging.INFO, format="%(name)s: %(message)s", datefmt="[%X]", handlers=[RichHandler(rich_tracebacks=True)]) + +logger = logging.getLogger(__name__) + + +async def run(test_context: ConnectorTestContext) -> ConnectorTestReport: + """Runs a CI pipeline for a single connector. + A visual DAG can be found on the README.md file of the pipelines modules. + + Args: + test_context (ConnectorTestContext): The initialized test context. + + Returns: + ConnectorTestReport: The test reports holding tests results. + """ + async with test_context: + qa_checks_results = await tests.run_qa_checks(test_context) + connector_source_code = await environments.with_airbyte_connector(test_context, install=False) + connector_under_test = await environments.with_airbyte_connector(test_context) + + code_format_checks_results_future = asyncio.create_task(tests.code_format_checks(connector_source_code)) + unit_tests_results, connector_under_test_exit_code = await asyncio.gather( + tests.run_unit_tests(connector_under_test), connector_under_test.exit_code() + ) + + package_install_result = StepResult(Step.PACKAGE_INSTALL, StepStatus.from_exit_code(connector_under_test_exit_code)) + + if unit_tests_results.status is StepStatus.SUCCESS: + integration_test_future = asyncio.create_task(tests.run_integration_tests(connector_under_test)) + acceptance_tests_results, test_context.updated_secrets_dir = await tests.run_acceptance_tests( + test_context, + ) + integration_tests_result = await integration_test_future + + else: + integration_tests_result = StepResult(Step.INTEGRATION_TESTS, StepStatus.SKIPPED, stdout="Skipped because unit tests failed") + acceptance_tests_results = StepResult(Step.ACCEPTANCE_TESTS, StepStatus.SKIPPED, stdout="Skipped because unit tests failed") + + test_context.test_report = ConnectorTestReport( + test_context, + steps_results=[ + package_install_result, + await code_format_checks_results_future, + unit_tests_results, + integration_tests_result, + acceptance_tests_results, + qa_checks_results, + ], + ) + + return test_context.test_report + + +async def run_connectors_test_pipelines(test_contexts: List[ConnectorTestContext]): + """Runs a CI pipeline for all the connectors passed. + + Args: + test_contexts (List[ConnectorTestContext]): List of connector test contexts for which a CI pipeline needs to be run. + """ + async with dagger.Connection(DAGGER_CONFIG) as dagger_client: + async with anyio.create_task_group() as tg: + for test_context in test_contexts: + test_context.dagger_client = dagger_client.pipeline(f"{test_context.connector.technical_name} - Test Pipeline") + tg.start_soon(run, test_context) + + +@click.group() +@click.option("--use-remote-secrets", default=True) +@click.option("--is-local/--is-ci", default=True) +@click.option("--git-branch", default=get_current_git_branch, envvar="CI_GIT_BRANCH") +@click.option("--git-revision", default=get_current_git_revision, envvar="CI_GIT_REVISION") +@click.option( + "--diffed-branch", + help="Branch to which the git diff will happen to detect new or modified connectors", + default="origin/master", + type=str, +) +@click.option("--gha-workflow-run-id", help="[CI Only] The run id of the GitHub action workflow", default=None, type=str) +@click.pass_context +def connectors_ci( + ctx: click.Context, + use_remote_secrets: str, + is_local: bool, + git_branch: str, + git_revision: str, + diffed_branch: str, + gha_workflow_run_id: str, +): + """A command group to gather all the connectors-ci command""" + + validate_environment(is_local, use_remote_secrets) + + ctx.ensure_object(dict) + ctx.obj["use_remote_secrets"] = use_remote_secrets + ctx.obj["is_local"] = is_local + ctx.obj["git_branch"] = git_branch + ctx.obj["git_revision"] = git_revision + ctx.obj["gha_workflow_run_id"] = gha_workflow_run_id + ctx.obj["gha_workflow_run_url"] = ( + f"https://github.com/airbytehq/airbyte/actions/runs/{gha_workflow_run_id}" if gha_workflow_run_id else None + ) + + update_commit_status_check( + ctx.obj["git_revision"], + "pending", + ctx.obj["gha_workflow_run_url"], + GITHUB_GLOBAL_DESCRIPTION, + GITHUB_GLOBAL_CONTEXT, + should_send=not ctx.obj["is_local"], + logger=logger, + ) + + ctx.obj["modified_files"] = get_modified_files(git_branch, git_revision, diffed_branch, is_local) + + +@connectors_ci.command() +@click.option( + "--name", "names", multiple=True, help="Only test a specific connector. Use its technical name. e.g source-pokeapi.", type=str +) +@click.option("--language", "languages", multiple=True, help="Filter connectors to test by language.", type=click.Choice(ConnectorLanguage)) +@click.option( + "--release-stage", + "release_stages", + multiple=True, + help="Filter connectors to test by release stage.", + type=click.Choice(["alpha", "beta", "generally_available"]), +) +@click.option("--modified/--not-modified", help="Only test modified connectors in the current branch.", default=False, type=bool) +@click.pass_context +def test_connectors(ctx: click.Context, names: Tuple[str], languages: Tuple[ConnectorLanguage], release_stages: Tuple[str], modified: bool): + """Runs a CI pipeline the connector passed as CLI argument. + + Args: + ctx (click.Context): The click context. + connector_name (str): The connector technical name. E.G. source-pokeapi + """ + connectors_under_test = get_all_released_connectors() + modified_connectors = get_modified_connectors(ctx.obj["modified_files"]) + if modified: + connectors_under_test = modified_connectors + else: + connectors_under_test.update(modified_connectors) + if names: + connectors_under_test = {connector for connector in connectors_under_test if connector.technical_name in names} + if languages: + connectors_under_test = {connector for connector in connectors_under_test if connector.language in languages} + if release_stages: + connectors_under_test = {connector for connector in connectors_under_test if connector.release_stage in release_stages} + connectors_under_test_names = [c.technical_name for c in connectors_under_test] + if connectors_under_test_names: + click.secho(f"Will run the test pipeline for the following connectors: {', '.join(connectors_under_test_names)}.", fg="green") + click.secho( + "If you're running this command for the first time the Dagger engine image will be pulled, it can take a short minute..." + ) + else: + click.secho("No connector test will run according to your inputs.", fg="yellow") + sys.exit(0) + + connectors_tests_contexts = [ + ConnectorTestContext( + connector, + ctx.obj["is_local"], + ctx.obj["git_branch"], + ctx.obj["git_revision"], + ctx.obj["use_remote_secrets"], + gha_workflow_run_url=ctx.obj.get("gha_workflow_run_url"), + ) + for connector in connectors_under_test + if connector.language + in [ConnectorLanguage.PYTHON, ConnectorLanguage.LOW_CODE] # TODO: remove this once we implement pipelines for Java connector + ] + try: + anyio.run(run_connectors_test_pipelines, connectors_tests_contexts) + update_commit_status_check( + ctx.obj["git_revision"], + "success", + ctx.obj["gha_workflow_run_url"], + GITHUB_GLOBAL_DESCRIPTION, + GITHUB_GLOBAL_CONTEXT, + should_send=not ctx.obj["is_local"], + logger=logger, + ) + except dagger.DaggerError as e: + click.secho(str(e), err=True, fg="red") + update_commit_status_check( + ctx.obj["git_revision"], + "error", + ctx.obj["gha_workflow_run_url"], + GITHUB_GLOBAL_DESCRIPTION, + GITHUB_GLOBAL_CONTEXT, + should_send=not ctx.obj["is_local"], + logger=logger, + ) + + +def validate_environment(is_local: bool, use_remote_secrets: bool): + if is_local: + if not (os.getcwd().endswith("/airbyte") and Path(".git").is_dir()): + raise click.UsageError("You need to run this command from the airbyte repository root.") + else: + for required_env_var in REQUIRED_ENV_VARS_FOR_CI: + if os.getenv(required_env_var) is None: + raise click.UsageError(f"When running in a CI context a {required_env_var} environment variable must be set.") + if use_remote_secrets and os.getenv("GCP_GSM_CREDENTIALS") is None: + raise click.UsageError( + "You have to set the GCP_GSM_CREDENTIALS if you want to download secrets from GSM. Set the --use-remote-secrets option to false otherwise." + ) + + +if __name__ == "__main__": + connectors_ci() diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py new file mode 100644 index 000000000000..fe398282c61b --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py @@ -0,0 +1,168 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +import logging +import os +from datetime import datetime +from enum import Enum +from typing import Optional + +from anyio import Path +from asyncer import asyncify +from ci_connector_ops.pipelines.actions import remote_storage, secrets +from ci_connector_ops.pipelines.github import update_commit_status_check +from ci_connector_ops.pipelines.models import ConnectorTestReport +from ci_connector_ops.pipelines.utils import AIRBYTE_REPO_URL +from ci_connector_ops.utils import Connector +from dagger import Client, Directory + + +class ContextState(Enum): + INITIALIZED = {"github_state": "pending", "description": "Tests are being initialized..."} + RUNNING = {"github_state": "pending", "description": "Tests are running..."} + ERROR = {"github_state": "error", "description": "Something went wrong while running the tests."} + SUCCESSFUL = {"github_state": "success", "description": "All tests ran successfully."} + FAILURE = {"github_state": "failure", "description": "Test failed."} + + +class ConnectorTestContext: + """The connector test context is used to store configuration for a specific connector pipeline run.""" + + DEFAULT_CONNECTOR_ACCEPTANCE_TEST_IMAGE = "airbyte/connector-acceptance-test:latest" + + def __init__( + self, + connector: Connector, + is_local: bool, + git_branch: bool, + git_revision: bool, + use_remote_secrets: bool = True, + connector_acceptance_test_image: Optional[str] = DEFAULT_CONNECTOR_ACCEPTANCE_TEST_IMAGE, + gha_workflow_run_url: Optional[str] = None, + ): + self.connector = connector + self.is_local = is_local + self.git_branch = git_branch + self.git_revision = git_revision + self.use_remote_secrets = use_remote_secrets + self.connector_acceptance_test_image = connector_acceptance_test_image + self.gha_workflow_run_url = gha_workflow_run_url + + self.created_at = datetime.utcnow() + + self.state = ContextState.INITIALIZED + self.logger = logging.getLogger(self.main_pipeline_name) + self.dagger_client = None + self._secrets_dir = None + self._updated_secrets_dir = None + self._test_report = None + update_commit_status_check(**self.github_commit_status) + + @property + def updated_secrets_dir(self) -> Directory: + return self._updated_secrets_dir + + @updated_secrets_dir.setter + def updated_secrets_dir(self, updated_secrets_dir: Directory): + self._updated_secrets_dir = updated_secrets_dir + + @property + def dagger_client(self) -> Client: + return self._dagger_client + + @dagger_client.setter + def dagger_client(self, dagger_client: Client): + self._dagger_client = dagger_client + + @property + def is_ci(self): + return self.is_local is False + + @property + def repo(self): + return self.dagger_client.git(AIRBYTE_REPO_URL, keep_git_dir=True) + + @property + def connector_acceptance_test_source_dir(self) -> Directory: + return self.get_repo_dir("airbyte-integrations/bases/connector-acceptance-test") + + @property + def should_save_updated_secrets(self): + return self.use_remote_secrets and self.updated_secrets_dir is not None + + @property + def main_pipeline_name(self): + return f"CI test for {self.connector.technical_name}" + + @property + def test_report(self) -> ConnectorTestReport: + return self._test_report + + @test_report.setter + def test_report(self, test_report: ConnectorTestReport): + self._test_report = test_report + self.state = ContextState.SUCCESSFUL if test_report.success else ContextState.FAILURE + + @property + def github_commit_status(self) -> dict: + return { + "sha": self.git_revision, + "state": self.state.value["github_state"], + "target_url": self.gha_workflow_run_url, + "description": self.state.value["description"], + "context": f"[POC please ignore] Connector tests: {self.connector.technical_name}", + "should_send": self.is_ci, + "logger": self.logger, + } + + def get_repo_dir(self, subdir=".", exclude=None, include=None) -> Directory: + if self.is_local: + return self.dagger_client.host().directory(subdir, exclude=exclude, include=include) + else: + return self.repo.branch(self.git_branch).tree().directory(subdir) + + def get_connector_dir(self, exclude=None, include=None) -> Directory: + return self.get_repo_dir(str(self.connector.code_directory), exclude=exclude, include=include) + + async def __aenter__(self): + if self.dagger_client is None: + raise Exception("A ConnectorTestContext can't be entered with an undefined dagger_client") + self.secrets_dir = await secrets.get_connector_secret_dir(self) + self.updated_secrets_dir = None + self.state = ContextState.RUNNING + await asyncify(update_commit_status_check)(**self.github_commit_status) + return self + + async def __aexit__(self, exception_type, exception_value, traceback) -> bool: + if exception_value: + self.logger.error("An error got handled by the ConnectorTestContext", exc_info=True) + self.state = ContextState.ERROR + elif self.test_report is None: + self.logger.error("No test report was provided. This is probably due to an upstream error") + self.state = ContextState.ERROR + return True + else: + teardown_pipeline = self.dagger_client.pipeline(f"Teardown {self.connector.technical_name}") + if self.should_save_updated_secrets: + await secrets.upload( + teardown_pipeline, + self.connector, + ) + self.test_report.print() + self.logger.info(self.test_report.to_json()) + local_test_reports_path_root = "tools/ci_connector_ops/test_reports/" + connector_name = self.test_report.connector_test_context.connector.technical_name + connector_version = self.test_report.connector_test_context.connector.version + git_revision = self.test_report.connector_test_context.git_revision + git_branch = self.test_report.connector_test_context.git_branch.replace("/", "_") + suffix = f"{connector_name}/{git_branch}/{connector_version}/{git_revision}.json" + local_report_path = Path(local_test_reports_path_root + suffix) + await local_report_path.parents[0].mkdir(parents=True, exist_ok=True) + await local_report_path.write_text(self.test_report.to_json()) + if self.test_report.should_be_saved: + s3_reports_path_root = "python-poc/tests/history/" + s3_key = s3_reports_path_root + suffix + await remote_storage.upload_to_s3(teardown_pipeline, str(local_report_path), s3_key, os.environ["TEST_REPORTS_BUCKET_NAME"]) + + await asyncify(update_commit_status_check)(**self.github_commit_status) + return True diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/github.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/github.py new file mode 100644 index 000000000000..b1bf0040cf7e --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/github.py @@ -0,0 +1,41 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +from __future__ import annotations + +import os +from typing import TYPE_CHECKING + +from ci_connector_ops.utils import console + +if TYPE_CHECKING: + from logging import Logger + +from github import Github + +AIRBYTE_GITHUB_REPO = "airbytehq/airbyte" + + +def update_commit_status_check( + sha: str, state: str, target_url: str, description: str, context: str, should_send=True, logger: Logger = None +): + if not should_send: + return + + try: + github_client = Github(os.environ["CI_GITHUB_ACCESS_TOKEN"]) + airbyte_repo = github_client.get_repo(AIRBYTE_GITHUB_REPO) + except Exception as e: + if logger: + logger.error("No commit status check sent, the connection to Github API failed", exc_info=True) + else: + console.print(e) + return + + airbyte_repo.get_commit(sha=sha).create_status( + state=state, + target_url=target_url, + description=description, + context=context, + ) + logger.info(f"Created {state} status for commit {sha} on Github in {context} context.") diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/models.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/models.py new file mode 100644 index 000000000000..5e7b56b680a8 --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/models.py @@ -0,0 +1,142 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import TYPE_CHECKING, List, Optional, Union + +from ci_connector_ops.utils import console +from dagger import Client, Container +from rich.console import Group +from rich.panel import Panel +from rich.style import Style +from rich.table import Table +from rich.text import Text + +if TYPE_CHECKING: + from ci_connector_ops.pipelines.contexts import ConnectorTestContext + + +class Step(Enum): + CODE_FORMAT_CHECKS = "Code format checks" + PACKAGE_INSTALL = "Package install" + UNIT_TESTS = "Unit tests" + INTEGRATION_TESTS = "Integration tests" + DOCKER_BUILD = "Docker Build" + ACCEPTANCE_TESTS = "Acceptance tests" + QA_CHECKS = "QA Checks" + + def get_dagger_pipeline(self, dagger_client_or_container: Union[Client, Container]) -> Union[Client, Container]: + return dagger_client_or_container.pipeline(self.value) + + +class StepStatus(Enum): + SUCCESS = "Successful" + FAILURE = "Failed" + SKIPPED = "Skipped" + + def from_exit_code(exit_code: int): + if exit_code == 0: + return StepStatus.SUCCESS + if exit_code == 1: + return StepStatus.FAILURE + # pytest returns a 5 exit code when no test is found. + if exit_code == 5: + return StepStatus.SKIPPED + else: + raise ValueError(f"No step status is mapped to exit code {exit_code}") + + def get_rich_style(self) -> Style: + if self is StepStatus.SUCCESS: + return Style(color="green") + if self is StepStatus.FAILURE: + return Style(color="red", bold=True) + if self is StepStatus.SKIPPED: + return Style(color="yellow") + + def __str__(self) -> str: + return self.value + + +@dataclass(frozen=True) +class StepResult: + step: Step + status: StepStatus + created_at: datetime = field(default_factory=datetime.utcnow) + stderr: Optional[str] = None + stdout: Optional[str] = None + + def __repr__(self) -> str: + return f"{self.step.value}: {self.status.value}" + + +@dataclass(frozen=True) +class ConnectorTestReport: + connector_test_context: ConnectorTestContext + steps_results: List[StepResult] + created_at: datetime = field(default_factory=datetime.utcnow) + + @property + def failed_steps(self) -> StepResult: + return [step_result for step_result in self.steps_results if step_result.status is StepStatus.FAILURE] + + @property + def success(self) -> StepResult: + return len(self.failed_steps) == 0 + + @property + def should_be_saved(self) -> bool: + return self.connector_test_context.is_ci + + @property + def run_duration(self) -> int: + return (self.created_at - self.connector_test_context.created_at).total_seconds() + + def to_json(self) -> str: + return json.dumps( + { + "connector_technical_name": self.connector_test_context.connector.technical_name, + "connector_version": self.connector_test_context.connector.version, + "run_timestamp": self.created_at.isoformat(), + "run_duration": self.run_duration, + "success": self.success, + "failed_step": [failed_step_result.step.name for failed_step_result in self.failed_steps], + "gha_workflow_run_url": self.connector_test_context.gha_workflow_run_url, + } + ) + + def print(self): + connector_name = self.connector_test_context.connector.technical_name + main_panel_title = Text(f"{connector_name.upper()} - TEST RESULTS") + main_panel_title.stylize(Style(color="blue", bold=True)) + duration_subtitle = Text(f"⏲️ Total pipeline duration for {connector_name}: {round(self.run_duration)} seconds") + step_results_table = Table(title="Steps results") + step_results_table.add_column("Step") + step_results_table.add_column("Result") + step_results_table.add_column("Finished after") + + for step_result in self.steps_results: + step = Text(step_result.step.value) + step.stylize(step_result.status.get_rich_style()) + result = Text(step_result.status.value) + result.stylize(step_result.status.get_rich_style()) + step_results_table.add_row(step, result, str(round((self.created_at - step_result.created_at).total_seconds())) + "s") + + to_render = [step_results_table] + if self.failed_steps: + sub_panels = [] + for failed_step in self.failed_steps: + errors = Text(failed_step.stderr) + panel_title = Text(f"{connector_name} {failed_step.step.value.lower()} failures") + panel_title.stylize(Style(color="red", bold=True)) + sub_panel = Panel(errors, title=panel_title) + sub_panels.append(sub_panel) + failures_group = Group(*sub_panels) + to_render.append(failures_group) + + main_panel = Panel(Group(*to_render), title=main_panel_title, subtitle=duration_subtitle) + console.print(main_panel) diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/utils.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/utils.py new file mode 100644 index 000000000000..72de8838b9bc --- /dev/null +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/utils.py @@ -0,0 +1,152 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +import re +import sys +from pathlib import Path +from typing import Optional, Set + +import anyio +import git +from ci_connector_ops.utils import DESTINATION_CONNECTOR_PATH_PREFIX, SOURCE_CONNECTOR_PATH_PREFIX, Connector, get_connector_name_from_path +from dagger import Config, Connection, Container, QueryError + +DAGGER_CONFIG = Config(log_output=sys.stderr) +AIRBYTE_REPO_URL = "https://github.com/airbytehq/airbyte.git" + + +# This utils will probably be redundant once https://github.com/dagger/dagger/issues/3764 is implemented +async def check_path_in_workdir(container: Container, path: str) -> bool: + """Check if a local path is mounted to the working directory of a container + + Args: + container (Container): The container on which we want the check the path existence. + path (str): Directory or file path we want to check the existence in the container working directory. + + Returns: + bool: Whether the path exists in the container working directory. + """ + workdir = (await container.with_exec(["pwd"]).stdout()).strip() + mounts = await container.mounts() + if workdir in mounts: + expected_file_path = Path(workdir[1:]) / path + return expected_file_path.is_file() or expected_file_path.is_dir() + else: + return False + + +# This utils will probably be redundant once https://github.com/dagger/dagger/issues/3764 is implemented +async def get_file_contents(container: Container, path: str) -> Optional[str]: + """Retrieve a container file contents. + + Args: + container (Container): The container hosting the file you want to read. + path (str): Path, in the container, to the file you want to read. + + Returns: + Optional[str]: The file content if the file exists in the container, None otherwise. + """ + try: + return await container.file(path).contents() + except QueryError as e: + if "no such file or directory" not in str(e): + # this is the hicky bit of the stopgap because + # this error could come from a network issue + raise + return None + + +# This is a stop-gap solution to capture non 0 exit code on Containers +# The original issue is tracked here https://github.com/dagger/dagger/issues/3192 +async def with_exit_code(container: Container) -> int: + """Read the container exit code. If the exit code is not 0 a QueryError is raised. We extract the non-zero exit code from the QueryError message. + + Args: + container (Container): The container from which you want to read the exit code. + + Returns: + int: The exit code. + """ + try: + await container.exit_code() + except QueryError as e: + error_message = str(e) + if "exit code: " in error_message: + exit_code = re.search(r"exit code: (\d+)", error_message) + if exit_code: + return int(exit_code.group(1)) + else: + return 1 + raise + return 0 + + +async def with_stderr(container: Container) -> str: + try: + return await container.stderr() + except QueryError as e: + return str(e) + + +async def with_stdout(container: Container) -> str: + try: + return await container.stdout() + except QueryError as e: + return str(e) + + +def get_current_git_branch() -> str: + return git.Repo().active_branch.name + + +def get_current_git_revision() -> str: + return git.Repo().head.object.hexsha + + +async def get_modified_files_remote(current_git_branch: str, current_git_revision: str, diffed_branch: str = "origin/master") -> Set[str]: + async with Connection(DAGGER_CONFIG) as dagger_client: + modified_files = await ( + dagger_client.container() + .from_("alpine/git:latest") + .with_workdir("/repo") + .with_exec(["init"]) + .with_env_variable("CACHEBUSTER", current_git_revision) + .with_exec( + [ + "remote", + "add", + "--fetch", + "--track", + diffed_branch.split("/")[-1], + "--track", + current_git_branch, + "origin", + AIRBYTE_REPO_URL, + ] + ) + .with_exec(["checkout", "-t", f"origin/{current_git_branch}"]) + .with_exec(["diff", "--diff-filter=MA", "--name-only", f"{diffed_branch}...{current_git_revision}"]) + .stdout() + ) + return set(modified_files.split("\n")) + + +def get_modified_files_local(current_git_revision: str, diffed_branch: str = "master") -> Set[str]: + airbyte_repo = git.Repo() + modified_files = airbyte_repo.git.diff("--diff-filter=MA", "--name-only", f"{diffed_branch}...{current_git_revision}").split("\n") + return set(modified_files) + + +def get_modified_files(current_git_branch: str, current_git_revision: str, diffed_branch: str, is_local: bool = True) -> Set[str]: + if is_local: + return get_modified_files_local(current_git_revision, diffed_branch) + else: + return anyio.run(get_modified_files_remote, current_git_branch, current_git_revision, diffed_branch) + + +def get_modified_connectors(modified_files: Set[str]) -> Set[Connector]: + modified_connectors = [] + for file_path in modified_files: + if file_path.startswith(SOURCE_CONNECTOR_PATH_PREFIX) or file_path.startswith(DESTINATION_CONNECTOR_PATH_PREFIX): + modified_connectors.append(Connector(get_connector_name_from_path(file_path))) + return set(modified_connectors) diff --git a/tools/ci_connector_ops/ci_connector_ops/utils.py b/tools/ci_connector_ops/ci_connector_ops/utils.py index 98d6a28f8340..b06e059fa327 100644 --- a/tools/ci_connector_ops/ci_connector_ops/utils.py +++ b/tools/ci_connector_ops/ci_connector_ops/utils.py @@ -2,45 +2,66 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from dataclasses import dataclass import logging +import os +from dataclasses import dataclass +from enum import Enum +from functools import cached_property from pathlib import Path -from typing import Dict, Optional, Set, Tuple, List +from typing import Dict, List, Optional, Set, Tuple + import git import requests import yaml +from ci_credentials import SecretsManager +from rich.console import Console + +try: + from yaml import CLoader as Loader +# Some environments do not have a system C Yaml loader +except ImportError: + from yaml import Loader -AIRBYTE_REPO = git.Repo(search_parent_directories=True) -DIFFED_BRANCH = "origin/master" +console = Console() + +DIFFED_BRANCH = os.environ.get("DIFFED_BRANCH", "origin/master") OSS_CATALOG_URL = "https://storage.googleapis.com/prod-airbyte-cloud-connector-metadata-service/oss_catalog.json" CONNECTOR_PATH_PREFIX = "airbyte-integrations/connectors" SOURCE_CONNECTOR_PATH_PREFIX = CONNECTOR_PATH_PREFIX + "/source-" +DESTINATION_CONNECTOR_PATH_PREFIX = CONNECTOR_PATH_PREFIX + "/destination-" ACCEPTANCE_TEST_CONFIG_FILE_NAME = "acceptance-test-config.yml" AIRBYTE_DOCKER_REPO = "airbyte" SOURCE_DEFINITIONS_FILE_PATH = "airbyte-config/init/src/main/resources/seed/source_definitions.yaml" DESTINATION_DEFINITIONS_FILE_PATH = "airbyte-config/init/src/main/resources/seed/destination_definitions.yaml" DEFINITIONS_FILE_PATH = {"source": SOURCE_DEFINITIONS_FILE_PATH, "destination": DESTINATION_DEFINITIONS_FILE_PATH} + def download_catalog(catalog_url): response = requests.get(catalog_url) return response.json() + OSS_CATALOG = download_catalog(OSS_CATALOG_URL) + class ConnectorInvalidNameError(Exception): pass + class ConnectorVersionNotFound(Exception): pass + def read_definitions(definitions_file_path: str) -> Dict: with open(definitions_file_path) as definitions_file: - return yaml.safe_load(definitions_file) + return yaml.load(definitions_file, Loader=Loader) + def get_connector_name_from_path(path): return path.split("/")[2] -def get_changed_acceptance_test_config(diff_regex: Optional[str]=None) -> Set[str]: + +def get_changed_acceptance_test_config(diff_regex: Optional[str] = None) -> Set[str]: """Retrieve the set of connectors for which the acceptance_test_config file was changed in the current branch (compared to master). Args: @@ -49,22 +70,35 @@ def get_changed_acceptance_test_config(diff_regex: Optional[str]=None) -> Set[st Returns: Set[Connector]: Set of connectors that were changed """ + airbyte_repo = git.Repo(search_parent_directories=True) + if diff_regex is None: diff_command_args = ("--name-only", DIFFED_BRANCH) else: - diff_command_args = ("--name-only", f'-G{diff_regex}', DIFFED_BRANCH) + diff_command_args = ("--name-only", f"-G{diff_regex}", DIFFED_BRANCH) changed_acceptance_test_config_paths = { file_path - for file_path in AIRBYTE_REPO.git.diff(*diff_command_args).split("\n") + for file_path in airbyte_repo.git.diff(*diff_command_args).split("\n") if file_path.startswith(SOURCE_CONNECTOR_PATH_PREFIX) and file_path.endswith(ACCEPTANCE_TEST_CONFIG_FILE_NAME) } return {Connector(get_connector_name_from_path(changed_file)) for changed_file in changed_acceptance_test_config_paths} +class ConnectorLanguage(str, Enum): + PYTHON = "python" + JAVA = "java" + LOW_CODE = "low-code" + + +class ConnectorLanguageError(Exception): + pass + + @dataclass(frozen=True) class Connector: """Utility class to gather metadata about a connector.""" + technical_name: str def _get_type_and_name_from_technical_name(self) -> Tuple[str, str]: @@ -96,18 +130,35 @@ def icon_path(self) -> Path: def code_directory(self) -> Path: return Path(f"./airbyte-integrations/connectors/{self.technical_name}") + @property + def language(self) -> ConnectorLanguage: + if Path(self.code_directory / self.technical_name.replace("-", "_") / "manifest.yaml").is_file(): + return ConnectorLanguage.LOW_CODE + if Path(self.code_directory / "setup.py").is_file(): + return ConnectorLanguage.PYTHON + try: + with open(self.code_directory / "Dockerfile") as dockerfile: + if "FROM airbyte/integration-base-java" in dockerfile.read(): + return ConnectorLanguage.JAVA + except FileNotFoundError: + pass + return None + # raise ConnectorLanguageError(f"We could not infer {self.technical_name} connector language") + @property def version(self) -> str: with open(self.code_directory / "Dockerfile") as f: for line in f: if "io.airbyte.version" in line: return line.split("=")[1].strip() - raise ConnectorVersionNotFound(""" + raise ConnectorVersionNotFound( + """ Could not find the connector version from its Dockerfile. The io.airbyte.version tag is missing. - """) + """ + ) - @property + @cached_property def definition(self) -> Optional[dict]: """Find a connector definition from the catalog. Returns: @@ -117,7 +168,7 @@ def definition(self) -> Optional[dict]: definition_type = self.technical_name.split("-")[0] assert definition_type in ["source", "destination"] except AssertionError: - raise Exception(f"Could not determine the definition type for {self.technical_name}.") + return None definitions = read_definitions(DEFINITIONS_FILE_PATH[definition_type]) for definition in definitions: if definition["dockerRepository"].replace(f"{AIRBYTE_DOCKER_REPO}/", "") == self.technical_name: @@ -148,15 +199,24 @@ def acceptance_test_config(self) -> Optional[dict]: logging.warning(f"No {ACCEPTANCE_TEST_CONFIG_FILE_NAME} file found for {self.technical_name}") return None + def get_secret_manager(self, gsm_credentials: str): + return SecretsManager(connector_name=self.technical_name, gsm_credentials=gsm_credentials) + def __repr__(self) -> str: return self.technical_name + def get_changed_connectors() -> Set[Connector]: - """Retrieve a set of Connectors that were changed in the current branch (compared to master). - """ + """Retrieve a set of Connectors that were changed in the current branch (compared to master).""" + airbyte_repo = git.Repo(search_parent_directories=True) changed_source_connector_files = { file_path - for file_path in AIRBYTE_REPO.git.diff("--name-only", DIFFED_BRANCH).split("\n") + for file_path in airbyte_repo.git.diff("--name-only", DIFFED_BRANCH).split("\n") if file_path.startswith(SOURCE_CONNECTOR_PATH_PREFIX) } return {Connector(get_connector_name_from_path(changed_file)) for changed_file in changed_source_connector_files} + + +def get_all_released_connectors() -> Set: + all_definitions = OSS_CATALOG["sources"] + OSS_CATALOG["destinations"] + return {Connector(definition["dockerRepository"].replace("airbyte/", "")) for definition in all_definitions} diff --git a/tools/ci_connector_ops/setup.py b/tools/ci_connector_ops/setup.py index f181d67c2038..e864eb9d5136 100644 --- a/tools/ci_connector_ops/setup.py +++ b/tools/ci_connector_ops/setup.py @@ -2,6 +2,8 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from pathlib import Path + from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ @@ -9,20 +11,42 @@ "requests", "PyYAML~=6.0", "GitPython~=3.1.29", - "pandas~=1.5.3", - "pandas-gbq~=0.19.0", "pydantic~=1.10.4", - "fsspec~=2023.1.0", - "gcsfs~=2023.1.0", - "pytablewriter~=0.64.2", + "PyGithub~=1.58.0", + "rich", ] + +def local_pkg(name: str) -> str: + """Returns a path to a local package.""" + return f"{name} @ file://{Path.cwd().parent / name}" + + +# These internal packages are not yet published to a Pypi repository. +LOCAL_REQUIREMENTS = [local_pkg("ci_credentials")] + TEST_REQUIREMENTS = [ "pytest~=6.2.5", "pytest-mock~=3.10.0", "freezegun", ] +DEV_REQUIREMENTS = ["pyinstrument"] +# It is hard to containerize Pandas, it's only used in the QA engine, so I declared it as an extra requires +# TODO update the GHA that install the QA engine to install this extra +QA_ENGINE_REQUIREMENTS = [ + "pandas~=1.5.3", + "pandas-gbq~=0.19.0", + "fsspec~=2023.1.0", + "gcsfs~=2023.1.0", + "pytablewriter~=0.64.2", +] + +PIPELINES_REQUIREMENTS = [ + "dagger-io==0.4.0", + "asyncer", +] + setup( version="0.1.16", name="ci_connector_ops", @@ -30,11 +54,14 @@ author="Airbyte", author_email="contact@airbyte.io", packages=find_packages(), - install_requires=MAIN_REQUIREMENTS, + install_requires=MAIN_REQUIREMENTS + LOCAL_REQUIREMENTS, extras_require={ - "tests": TEST_REQUIREMENTS, + "tests": QA_ENGINE_REQUIREMENTS + TEST_REQUIREMENTS, + "dev": QA_ENGINE_REQUIREMENTS + TEST_REQUIREMENTS + DEV_REQUIREMENTS, + "pipelines": MAIN_REQUIREMENTS + PIPELINES_REQUIREMENTS, + "qa_engine": MAIN_REQUIREMENTS + QA_ENGINE_REQUIREMENTS, }, - python_requires=">=3.9", + # python_requires=">=3.10", TODO upgrade all our CI packages + GHA env to 3.10 package_data={"ci_connector_ops.qa_engine": ["connector_adoption.sql"]}, entry_points={ "console_scripts": [ @@ -44,6 +71,7 @@ "allowed-hosts-checks = ci_connector_ops.allowed_hosts_checks:check_allowed_hosts", "run-qa-engine = ci_connector_ops.qa_engine.main:main", "run-qa-checks = ci_connector_ops.qa_checks:run_qa_checks", + "connectors-ci = ci_connector_ops.pipelines.connectors_ci:connectors_ci", ], }, ) diff --git a/tools/ci_connector_ops/tests/test_acceptance_test_config_checks.py b/tools/ci_connector_ops/tests/test_acceptance_test_config_checks.py index 4b3fc7e96415..30af67a2b651 100644 --- a/tools/ci_connector_ops/tests/test_acceptance_test_config_checks.py +++ b/tools/ci_connector_ops/tests/test_acceptance_test_config_checks.py @@ -3,27 +3,31 @@ # import shutil -from typing import List, Optional -import yaml +from typing import List +import git import pytest - -from ci_connector_ops import acceptance_test_config_checks, utils +import yaml +from ci_connector_ops import acceptance_test_config_checks @pytest.fixture def mock_diffed_branched(mocker): - mocker.patch.object(acceptance_test_config_checks.utils, "DIFFED_BRANCH", utils.AIRBYTE_REPO.active_branch) - return utils.AIRBYTE_REPO.active_branch + airbyte_repo = git.Repo(search_parent_directories=True) + mocker.patch.object(acceptance_test_config_checks.utils, "DIFFED_BRANCH", airbyte_repo.active_branch) + return airbyte_repo.active_branch + @pytest.fixture def pokeapi_acceptance_test_config_path(): return "airbyte-integrations/connectors/source-pokeapi/acceptance-test-config.yml" + @pytest.fixture def ga_connector_file(): return "airbyte-integrations/connectors/source-amplitude/acceptance-test-config.yml" + @pytest.fixture def not_ga_backward_compatibility_change_expected_team(tmp_path, pokeapi_acceptance_test_config_path) -> List: expected_teams = [{"any-of": list(acceptance_test_config_checks.BACKWARD_COMPATIBILITY_REVIEWERS)}] @@ -34,6 +38,7 @@ def not_ga_backward_compatibility_change_expected_team(tmp_path, pokeapi_accepta yield expected_teams shutil.copyfile(backup_path, pokeapi_acceptance_test_config_path) + @pytest.fixture def not_ga_test_strictness_level_change_expected_team(tmp_path, pokeapi_acceptance_test_config_path) -> List: expected_teams = [{"any-of": list(acceptance_test_config_checks.TEST_STRICTNESS_LEVEL_REVIEWERS)}] @@ -44,6 +49,7 @@ def not_ga_test_strictness_level_change_expected_team(tmp_path, pokeapi_acceptan yield expected_teams shutil.copyfile(backup_path, pokeapi_acceptance_test_config_path) + @pytest.fixture def not_ga_bypass_reason_file_change_expected_team(tmp_path, pokeapi_acceptance_test_config_path): expected_teams = [] @@ -54,6 +60,7 @@ def not_ga_bypass_reason_file_change_expected_team(tmp_path, pokeapi_acceptance_ yield expected_teams shutil.copyfile(backup_path, pokeapi_acceptance_test_config_path) + @pytest.fixture def not_ga_not_tracked_change_expected_team(tmp_path, pokeapi_acceptance_test_config_path): expected_teams = [] @@ -64,6 +71,7 @@ def not_ga_not_tracked_change_expected_team(tmp_path, pokeapi_acceptance_test_co yield expected_teams shutil.copyfile(backup_path, pokeapi_acceptance_test_config_path) + @pytest.fixture def ga_connector_file_change_expected_team(tmp_path, ga_connector_file): expected_teams = list(acceptance_test_config_checks.GA_CONNECTOR_REVIEWERS) @@ -74,6 +82,7 @@ def ga_connector_file_change_expected_team(tmp_path, ga_connector_file): yield expected_teams shutil.copyfile(backup_path, ga_connector_file) + @pytest.fixture def ga_connector_backward_compatibility_file_change_expected_team(tmp_path, ga_connector_file): expected_teams = [{"any-of": list(acceptance_test_config_checks.BACKWARD_COMPATIBILITY_REVIEWERS)}] @@ -84,6 +93,7 @@ def ga_connector_backward_compatibility_file_change_expected_team(tmp_path, ga_c yield expected_teams shutil.copyfile(backup_path, ga_connector_file) + @pytest.fixture def ga_connector_bypass_reason_file_change_expected_team(tmp_path, ga_connector_file): expected_teams = [{"any-of": list(acceptance_test_config_checks.GA_BYPASS_REASON_REVIEWERS)}] @@ -94,6 +104,7 @@ def ga_connector_bypass_reason_file_change_expected_team(tmp_path, ga_connector_ yield expected_teams shutil.copyfile(backup_path, ga_connector_file) + @pytest.fixture def ga_connector_test_strictness_level_file_change_expected_team(tmp_path, ga_connector_file): expected_teams = [{"any-of": list(acceptance_test_config_checks.TEST_STRICTNESS_LEVEL_REVIEWERS)}] @@ -104,17 +115,21 @@ def ga_connector_test_strictness_level_file_change_expected_team(tmp_path, ga_co yield expected_teams shutil.copyfile(backup_path, ga_connector_file) + def verify_no_requirements_file_was_generated(captured: str): assert captured.out.split("\n")[0].split("=")[-1] == "false" + def verify_requirements_file_was_generated(captured: str): assert captured.out.split("\n")[0].split("=")[-1] == "true" + def verify_review_requirements_file_contains_expected_teams(requirements_file_path: str, expected_teams: List): with open(requirements_file_path, "r") as requirements_file: requirements = yaml.safe_load(requirements_file) assert requirements[0]["teams"] == expected_teams + def check_review_requirements_file(capsys, expected_teams: List): acceptance_test_config_checks.write_review_requirements_file() captured = capsys.readouterr() @@ -125,26 +140,38 @@ def check_review_requirements_file(capsys, expected_teams: List): requirements_file_path = acceptance_test_config_checks.REVIEW_REQUIREMENTS_FILE_PATH verify_review_requirements_file_contains_expected_teams(requirements_file_path, expected_teams) + def test_find_mandatory_reviewers_backward_compatibility(mock_diffed_branched, capsys, not_ga_backward_compatibility_change_expected_team): check_review_requirements_file(capsys, not_ga_backward_compatibility_change_expected_team) - + + def test_find_mandatory_reviewers_test_strictness_level(mock_diffed_branched, capsys, not_ga_test_strictness_level_change_expected_team): check_review_requirements_file(capsys, not_ga_test_strictness_level_change_expected_team) + def test_find_mandatory_reviewers_not_ga_bypass_reason(mock_diffed_branched, capsys, not_ga_bypass_reason_file_change_expected_team): check_review_requirements_file(capsys, not_ga_bypass_reason_file_change_expected_team) + def test_find_mandatory_reviewers_ga(mock_diffed_branched, capsys, ga_connector_file_change_expected_team): check_review_requirements_file(capsys, ga_connector_file_change_expected_team) -def test_find_mandatory_reviewers_ga_backward_compatibility(mock_diffed_branched, capsys, ga_connector_backward_compatibility_file_change_expected_team): + +def test_find_mandatory_reviewers_ga_backward_compatibility( + mock_diffed_branched, capsys, ga_connector_backward_compatibility_file_change_expected_team +): check_review_requirements_file(capsys, ga_connector_backward_compatibility_file_change_expected_team) + def test_find_mandatory_reviewers_ga_bypass_reason(mock_diffed_branched, capsys, ga_connector_bypass_reason_file_change_expected_team): check_review_requirements_file(capsys, ga_connector_bypass_reason_file_change_expected_team) -def test_find_mandatory_reviewers_ga_test_strictness_level(mock_diffed_branched, capsys, ga_connector_test_strictness_level_file_change_expected_team): + +def test_find_mandatory_reviewers_ga_test_strictness_level( + mock_diffed_branched, capsys, ga_connector_test_strictness_level_file_change_expected_team +): check_review_requirements_file(capsys, ga_connector_test_strictness_level_file_change_expected_team) + def test_find_mandatory_reviewers_no_tracked_changed(mock_diffed_branched, capsys, not_ga_not_tracked_change_expected_team): check_review_requirements_file(capsys, not_ga_not_tracked_change_expected_team) diff --git a/tools/ci_connector_ops/tests/test_qa_engine/test_main.py b/tools/ci_connector_ops/tests/test_qa_engine/test_main.py index 5e25401dcca3..dd40b54c361a 100644 --- a/tools/ci_connector_ops/tests/test_qa_engine/test_main.py +++ b/tools/ci_connector_ops/tests/test_qa_engine/test_main.py @@ -20,7 +20,7 @@ def test_main(mocker, dummy_qa_report, create_prs): mocker.patch.object(main.inputs, "fetch_adoption_metrics_per_connector_version") mocker.patch.object(main.validations, "get_qa_report", mocker.Mock(return_value=dummy_qa_report)) mocker.patch.object(main.validations, "get_connectors_eligible_for_cloud") - mocker.patch.object(main.cloud_availability_updater, "deploy_eligible_connectors_to_cloud_repo") + mocker.patch.object(main.cloud_availability_updater, "batch_deploy_eligible_connectors_to_cloud_repo") mocker.patch.object(main.outputs, "persist_qa_report") if create_prs: @@ -39,6 +39,6 @@ def test_main(mocker, dummy_qa_report, create_prs): ) if create_prs: main.validations.get_connectors_eligible_for_cloud.assert_called_with(main.validations.get_qa_report.return_value) - main.cloud_availability_updater.deploy_eligible_connectors_to_cloud_repo.assert_called_with( + main.cloud_availability_updater.batch_deploy_eligible_connectors_to_cloud_repo.assert_called_with( main.validations.get_connectors_eligible_for_cloud.return_value ) diff --git a/tools/ci_credentials/setup.py b/tools/ci_credentials/setup.py index 5213bc7a4765..bd34317aa595 100644 --- a/tools/ci_credentials/setup.py +++ b/tools/ci_credentials/setup.py @@ -3,9 +3,20 @@ # +from pathlib import Path + from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["requests", "ci_common_utils", "click~=8.1.3"] +MAIN_REQUIREMENTS = ["requests", "click~=8.1.3"] + + +def local_pkg(name: str) -> str: + """Returns a path to a local package.""" + return f"{name} @ file://{Path.cwd().parent / name}" + + +# These internal packages are not yet published to a Pypi repository. +LOCAL_REQUIREMENTS = [local_pkg("ci_common_utils")] TEST_REQUIREMENTS = ["requests-mock", "pytest"] @@ -16,7 +27,7 @@ author="Airbyte", author_email="contact@airbyte.io", packages=find_packages(), - install_requires=MAIN_REQUIREMENTS, + install_requires=MAIN_REQUIREMENTS + LOCAL_REQUIREMENTS, python_requires=">=3.9", extras_require={ "tests": TEST_REQUIREMENTS,