diff --git a/.github/workflows/run-qa-engine.yml b/.github/workflows/run-qa-engine.yml index eab27e1106cf..c6047e24138d 100644 --- a/.github/workflows/run-qa-engine.yml +++ b/.github/workflows/run-qa-engine.yml @@ -2,10 +2,10 @@ name: Run QA Engine on: workflow_dispatch: - # schedule: - ## 1pm UTC is 6am PDT. - ## same time as Generate Build Report - # - cron: "0 13 * * *" + schedule: + # 1pm UTC is 6am PDT. + # same time as Generate Build Report + - cron: "0 13 * * *" jobs: run-qa-engine: @@ -18,7 +18,7 @@ jobs: - name: Setup Cloud SDK uses: google-github-actions/setup-gcloud@v0 with: - service_account_key: ${{ secrets.PROD_SPEC_CACHE_SA_KEY }} + service_account_key: ${{ secrets.QA_ENGINE_AIRBYTE_DATA_PROD_SA }} export_default_credentials: true - name: Install Python uses: actions/setup-python@v4 @@ -29,6 +29,7 @@ jobs: - name: Run QA Engine env: LOGLEVEL: INFO - QA_ENGINE_AIRBYTE_DATA_PROD_SA: "${{ secrets.QA_ENGINE_AIRBYTE_DATA_PROD_SA }}" GITHUB_API_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} run: run-qa-engine + # TODO: enable PR creation when all the QA checks are implemented: + # run: run-qa-engine --create-prs diff --git a/tools/ci_connector_ops/ci_connector_ops/qa_engine/constants.py b/tools/ci_connector_ops/ci_connector_ops/qa_engine/constants.py index caf0da96a239..b7afafb5d3eb 100644 --- a/tools/ci_connector_ops/ci_connector_ops/qa_engine/constants.py +++ b/tools/ci_connector_ops/ci_connector_ops/qa_engine/constants.py @@ -13,7 +13,7 @@ "b76be0a6-27dc-4560-95f6-2623da0bd7b6", # Local SQL Lite ] -GCS_QA_REPORT_PATH = "gs://prod-airbyte-cloud-connector-metadata-service/qa_report.json" +GCS_QA_REPORT_PATH = "gs://airbyte-data-connectors-qa-engine/" AIRBYTE_PLATFORM_INTERNAL_REPO_OWNER = "airbytehq" AIRBYTE_PLATFORM_INTERNAL_REPO_NAME = "airbyte-platform-internal" AIRBYTE_PLATFORM_INTERNAL_GITHUB_REPO_URL = ( diff --git a/tools/ci_connector_ops/ci_connector_ops/qa_engine/inputs.py b/tools/ci_connector_ops/ci_connector_ops/qa_engine/inputs.py index f172b0848c3b..4af8770f0b74 100644 --- a/tools/ci_connector_ops/ci_connector_ops/qa_engine/inputs.py +++ b/tools/ci_connector_ops/ci_connector_ops/qa_engine/inputs.py @@ -2,22 +2,19 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -import os -from importlib.resources import files -import json import logging +from enum import Enum +from importlib.resources import files +from typing import Optional -from .constants import CONNECTOR_BUILD_OUTPUT_URL - -from google.oauth2 import service_account -import requests import pandas as pd -from typing import Optional +import requests -from enum import Enum +from .constants import CONNECTOR_BUILD_OUTPUT_URL LOGGER = logging.getLogger(__name__) + class BUILD_STATUSES(str, Enum): SUCCESS = "success" FAILURE = "failure" @@ -31,7 +28,6 @@ def from_string(cls, string_value: Optional[str]) -> "BUILD_STATUSES": return BUILD_STATUSES[string_value.upper()] - def get_connector_build_output_url(connector_technical_name: str, connector_version: str) -> str: """ Get the connector build output url. @@ -39,7 +35,8 @@ def get_connector_build_output_url(connector_technical_name: str, connector_vers """ return f"{CONNECTOR_BUILD_OUTPUT_URL}/{connector_technical_name}/version-{connector_version}.json" -def fetch_latest_build_status_for_connector_version(connector_technical_name: str, connector_version: str) ->BUILD_STATUSES: + +def fetch_latest_build_status_for_connector_version(connector_technical_name: str, connector_version: str) -> BUILD_STATUSES: """Fetch the latest build status for a given connector version.""" connector_build_output_url = get_connector_build_output_url(connector_technical_name, connector_version) connector_build_output_response = requests.get(connector_build_output_url) @@ -58,6 +55,7 @@ def fetch_latest_build_status_for_connector_version(connector_technical_name: st else: return BUILD_STATUSES.NOT_FOUND + def fetch_remote_catalog(catalog_url: str) -> pd.DataFrame: """Fetch a combined remote catalog and return a single DataFrame with sources and destinations defined by the connector_type column. @@ -77,6 +75,7 @@ def fetch_remote_catalog(catalog_url: str) -> pd.DataFrame: destinations["connector_definition_id"] = destinations.destinationDefinitionId return pd.concat([sources, destinations]) + def fetch_adoption_metrics_per_connector_version() -> pd.DataFrame: """Retrieve adoptions metrics for each connector version from our data warehouse. @@ -84,15 +83,16 @@ def fetch_adoption_metrics_per_connector_version() -> pd.DataFrame: pd.DataFrame: A dataframe with adoption metrics per connector version. """ connector_adoption_sql = files("ci_connector_ops.qa_engine").joinpath("connector_adoption.sql").read_text() - bq_credentials = service_account.Credentials.from_service_account_info(json.loads(os.environ["QA_ENGINE_AIRBYTE_DATA_PROD_SA"])) - adoption_metrics = pd.read_gbq(connector_adoption_sql, project_id="airbyte-data-prod", credentials=bq_credentials) - return adoption_metrics[[ - "connector_definition_id", - "connector_version", - "number_of_connections", - "number_of_users", - "succeeded_syncs_count", - "failed_syncs_count", - "total_syncs_count", - "sync_success_rate", - ]] + adoption_metrics = pd.read_gbq(connector_adoption_sql, project_id="airbyte-data-prod") + return adoption_metrics[ + [ + "connector_definition_id", + "connector_version", + "number_of_connections", + "number_of_users", + "succeeded_syncs_count", + "failed_syncs_count", + "total_syncs_count", + "sync_success_rate", + ] + ] diff --git a/tools/ci_connector_ops/ci_connector_ops/qa_engine/main.py b/tools/ci_connector_ops/ci_connector_ops/qa_engine/main.py index 1fd2d0781f80..a06199b7745d 100644 --- a/tools/ci_connector_ops/ci_connector_ops/qa_engine/main.py +++ b/tools/ci_connector_ops/ci_connector_ops/qa_engine/main.py @@ -4,15 +4,19 @@ import logging -from . import cloud_availability_updater, enrichments, inputs, validations -from .constants import CLOUD_CATALOG_URL, OSS_CATALOG_URL +import click + +from . import cloud_availability_updater, enrichments, inputs, outputs, validations +from .constants import CLOUD_CATALOG_URL, GCS_QA_REPORT_PATH, OSS_CATALOG_URL logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -def main(): +@click.command() +@click.option("--create-prs", is_flag=True) +def main(create_prs): logger.info("Fetch the OSS connectors catalog.") oss_catalog = inputs.fetch_remote_catalog(OSS_CATALOG_URL) logger.info("Fetch the Cloud connectors catalog.") @@ -23,7 +27,10 @@ def main(): enriched_catalog = enrichments.get_enriched_catalog(oss_catalog, cloud_catalog, adoption_metrics_per_connector_version) logger.info("Start the QA report generation.") qa_report = validations.get_qa_report(enriched_catalog, len(oss_catalog)) - logger.info("Start the QA report generation.") - eligible_connectors = validations.get_connectors_eligible_for_cloud(qa_report) - logger.info("Start eligible connectors deployment to Cloud.") - cloud_availability_updater.deploy_eligible_connectors_to_cloud_repo(eligible_connectors) + logger.info("Persist QA report to GCS") + outputs.persist_qa_report(qa_report, GCS_QA_REPORT_PATH, public_fields_only=False) + + if create_prs: + logger.info("Start eligible connectors deployment to Cloud.") + eligible_connectors = validations.get_connectors_eligible_for_cloud(qa_report) + cloud_availability_updater.deploy_eligible_connectors_to_cloud_repo(eligible_connectors) diff --git a/tools/ci_connector_ops/ci_connector_ops/qa_engine/outputs.py b/tools/ci_connector_ops/ci_connector_ops/qa_engine/outputs.py index 88253303a30d..7b91b28bbdb7 100644 --- a/tools/ci_connector_ops/ci_connector_ops/qa_engine/outputs.py +++ b/tools/ci_connector_ops/ci_connector_ops/qa_engine/outputs.py @@ -3,13 +3,18 @@ # +from datetime import datetime + import pandas as pd from .models import ConnectorQAReport -def persist_qa_report(qa_report: pd.DataFrame, path: str, public_fields_only: bool =True): + +def persist_qa_report(qa_report: pd.DataFrame, path: str, public_fields_only: bool = True) -> str: + report_generation_date = datetime.strftime(qa_report["report_generation_datetime"].max(), "%Y%m%d") + path = path + f"{report_generation_date}_qa_report.jsonl" final_fields = [ - field.name for field in ConnectorQAReport.__fields__.values() - if field.field_info.extra["is_public"] or not public_fields_only + field.name for field in ConnectorQAReport.__fields__.values() if field.field_info.extra["is_public"] or not public_fields_only ] - qa_report[final_fields].to_json(path, orient="records") + qa_report[final_fields].to_json(path, orient="records", lines=True) + return path diff --git a/tools/ci_connector_ops/setup.py b/tools/ci_connector_ops/setup.py index bba0ff2580ee..45295b55423c 100644 --- a/tools/ci_connector_ops/setup.py +++ b/tools/ci_connector_ops/setup.py @@ -5,6 +5,7 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ + "click~=8.1.3", "requests", "PyYAML~=6.0", "GitPython~=3.1.29", diff --git a/tools/ci_connector_ops/tests/test_qa_engine/test_cloud_availability_updater.py b/tools/ci_connector_ops/tests/test_qa_engine/test_cloud_availability_updater.py index ff0c2e2ae21a..b20ee8801813 100644 --- a/tools/ci_connector_ops/tests/test_qa_engine/test_cloud_availability_updater.py +++ b/tools/ci_connector_ops/tests/test_qa_engine/test_cloud_availability_updater.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. # @@ -167,11 +167,11 @@ def test_create_pr(mocker, pr_already_created): response = cloud_availability_updater.create_pr(connector, "my_awesome_branch") expected_url = "https://api.github.com/repos/airbytehq/airbyte-platform-internal/pulls" expected_body = f"""The Cloud Availability Updater decided that it's the right time to make {connector.connector_name} available on Cloud! - Technical name: {connector.connector_technical_name} - Version: {connector.connector_version} - Definition ID: {connector.connector_definition_id} - OSS sync success rate: {connector.sync_success_rate} - OSS number of connections: {connector.number_of_connections} + - Technical name: {connector.connector_technical_name} + - Version: {connector.connector_version} + - Definition ID: {connector.connector_definition_id} + - OSS sync success rate: {connector.sync_success_rate} + - OSS number of connections: {connector.number_of_connections} """ expected_data = { "title": "🤖 Add source-foobar to cloud", diff --git a/tools/ci_connector_ops/tests/test_qa_engine/test_inputs.py b/tools/ci_connector_ops/tests/test_qa_engine/test_inputs.py index 6faf7cb54322..3c2ee97af1d6 100644 --- a/tools/ci_connector_ops/tests/test_qa_engine/test_inputs.py +++ b/tools/ci_connector_ops/tests/test_qa_engine/test_inputs.py @@ -4,13 +4,13 @@ from importlib.resources import files +from unittest.mock import MagicMock, call import pandas as pd import pytest -from unittest.mock import MagicMock, call import requests +from ci_connector_ops.qa_engine import constants, inputs -from ci_connector_ops.qa_engine import inputs, constants @pytest.mark.parametrize("catalog_url", [constants.OSS_CATALOG_URL, constants.CLOUD_CATALOG_URL]) def test_fetch_remote_catalog(catalog_url): @@ -20,21 +20,24 @@ def test_fetch_remote_catalog(catalog_url): assert all(expected_column in catalog.columns for expected_column in expected_columns) assert set(catalog.connector_type.unique()) == {"source", "destination"} + def test_fetch_adoption_metrics_per_connector_version(mocker): - fake_bigquery_results = pd.DataFrame([{ - "connector_definition_id": "abcdefgh", - "connector_version": "0.0.0", - "number_of_connections": 10, - "number_of_users": 2, - "succeeded_syncs_count": 12, - "failed_syncs_count": 1, - "total_syncs_count": 3, - "sync_success_rate": .99, - "unexpected_column": "foobar" - }]) + fake_bigquery_results = pd.DataFrame( + [ + { + "connector_definition_id": "abcdefgh", + "connector_version": "0.0.0", + "number_of_connections": 10, + "number_of_users": 2, + "succeeded_syncs_count": 12, + "failed_syncs_count": 1, + "total_syncs_count": 3, + "sync_success_rate": 0.99, + "unexpected_column": "foobar", + } + ] + ) mocker.patch.object(inputs.pd, "read_gbq", mocker.Mock(return_value=fake_bigquery_results)) - mocker.patch.object(inputs.os, "environ", {"QA_ENGINE_AIRBYTE_DATA_PROD_SA": '{"type": "fake_service_account"}'}) - mocker.patch.object(inputs.service_account.Credentials, "from_service_account_info") expected_columns = { "connector_definition_id", "connector_version", @@ -50,79 +53,67 @@ def test_fetch_adoption_metrics_per_connector_version(mocker): adoption_metrics_per_connector_version = inputs.fetch_adoption_metrics_per_connector_version() assert isinstance(adoption_metrics_per_connector_version, pd.DataFrame) assert set(adoption_metrics_per_connector_version.columns) == expected_columns - inputs.service_account.Credentials.from_service_account_info.assert_called_with( - {"type": "fake_service_account"} - ) - inputs.pd.read_gbq.assert_called_with( - expected_sql_query, - project_id=expected_project_id, - credentials=inputs.service_account.Credentials.from_service_account_info.return_value - ) + inputs.pd.read_gbq.assert_called_with(expected_sql_query, project_id=expected_project_id) -@pytest.mark.parametrize("connector_name, connector_version, mocked_json_payload, mocked_status_code, expected_status", [ - ( - "connectors/source-pokeapi", - "0.1.5", - { - "link": "https://github.com/airbytehq/airbyte/actions/runs/4029659593", - "outcome": "success", - "docker_version": "0.1.5", - "timestamp": "1674872401", - "connector": "connectors/source-pokeapi" - }, - 200, - inputs.BUILD_STATUSES.SUCCESS - ), - ( - "connectors/source-pokeapi", - "0.1.5", - { - "link": "https://github.com/airbytehq/airbyte/actions/runs/4029659593", - "outcome": "failure", - "docker_version": "0.1.5", - "timestamp": "1674872401", - "connector": "connectors/source-pokeapi" - }, - 200, - inputs.BUILD_STATUSES.FAILURE - ), - ( - "connectors/source-pokeapi", - "0.1.5", - None, - 404, - inputs.BUILD_STATUSES.NOT_FOUND - ), - ( - "connectors/source-pokeapi", - "0.1.5", - { - "link": "https://github.com/airbytehq/airbyte/actions/runs/4029659593", - "docker_version": "0.1.5", - "timestamp": "1674872401", - "connector": "connectors/source-pokeapi" - }, - 200, - inputs.BUILD_STATUSES.NOT_FOUND - ), - ( - "connectors/source-pokeapi", - "0.1.5", - None, - 404, - inputs.BUILD_STATUSES.NOT_FOUND - ), -]) -def test_fetch_latest_build_status_for_connector_version(mocker, connector_name, connector_version, mocked_json_payload, mocked_status_code, expected_status): + +@pytest.mark.parametrize( + "connector_name, connector_version, mocked_json_payload, mocked_status_code, expected_status", + [ + ( + "connectors/source-pokeapi", + "0.1.5", + { + "link": "https://github.com/airbytehq/airbyte/actions/runs/4029659593", + "outcome": "success", + "docker_version": "0.1.5", + "timestamp": "1674872401", + "connector": "connectors/source-pokeapi", + }, + 200, + inputs.BUILD_STATUSES.SUCCESS, + ), + ( + "connectors/source-pokeapi", + "0.1.5", + { + "link": "https://github.com/airbytehq/airbyte/actions/runs/4029659593", + "outcome": "failure", + "docker_version": "0.1.5", + "timestamp": "1674872401", + "connector": "connectors/source-pokeapi", + }, + 200, + inputs.BUILD_STATUSES.FAILURE, + ), + ("connectors/source-pokeapi", "0.1.5", None, 404, inputs.BUILD_STATUSES.NOT_FOUND), + ( + "connectors/source-pokeapi", + "0.1.5", + { + "link": "https://github.com/airbytehq/airbyte/actions/runs/4029659593", + "docker_version": "0.1.5", + "timestamp": "1674872401", + "connector": "connectors/source-pokeapi", + }, + 200, + inputs.BUILD_STATUSES.NOT_FOUND, + ), + ("connectors/source-pokeapi", "0.1.5", None, 404, inputs.BUILD_STATUSES.NOT_FOUND), + ], +) +def test_fetch_latest_build_status_for_connector_version( + mocker, connector_name, connector_version, mocked_json_payload, mocked_status_code, expected_status +): # Mock the api call to get the latest build status for a connector version mock_response = MagicMock() mock_response.json.return_value = mocked_json_payload mock_response.status_code = mocked_status_code - mock_get = mocker.patch.object(requests, 'get', return_value=mock_response) + mock_get = mocker.patch.object(requests, "get", return_value=mock_response) assert inputs.fetch_latest_build_status_for_connector_version(connector_name, connector_version) == expected_status assert mock_get.call_args == call(f"{constants.CONNECTOR_BUILD_OUTPUT_URL}/{connector_name}/version-{connector_version}.json") + def test_fetch_latest_build_status_for_connector_version_invalid_status(mocker, caplog): connector_name = "connectors/source-pokeapi" connector_version = "0.1.5" @@ -131,13 +122,13 @@ def test_fetch_latest_build_status_for_connector_version_invalid_status(mocker, "outcome": "unknown_outcome_123", "docker_version": "0.1.5", "timestamp": "1674872401", - "connector": "connectors/source-pokeapi" + "connector": "connectors/source-pokeapi", } # Mock the api call to get the latest build status for a connector version mock_response = MagicMock() mock_response.json.return_value = mocked_json_payload mock_response.status_code = 200 - mocker.patch.object(requests, 'get', return_value=mock_response) + mocker.patch.object(requests, "get", return_value=mock_response) assert inputs.fetch_latest_build_status_for_connector_version(connector_name, connector_version) == inputs.BUILD_STATUSES.NOT_FOUND - assert 'Error: Unexpected build status value: unknown_outcome_123 for connector connectors/source-pokeapi:0.1.5' in caplog.text + assert "Error: Unexpected build status value: unknown_outcome_123 for connector connectors/source-pokeapi:0.1.5" in caplog.text diff --git a/tools/ci_connector_ops/tests/test_qa_engine/test_main.py b/tools/ci_connector_ops/tests/test_qa_engine/test_main.py index 6d918abd62ff..5e25401dcca3 100644 --- a/tools/ci_connector_ops/tests/test_qa_engine/test_main.py +++ b/tools/ci_connector_ops/tests/test_qa_engine/test_main.py @@ -1,12 +1,17 @@ # -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import pytest from ci_connector_ops.qa_engine import main +from click.testing import CliRunner -def test_main(mocker, dummy_qa_report): +@pytest.mark.parametrize("create_prs", [False, True]) +def test_main(mocker, dummy_qa_report, create_prs): + runner = CliRunner() + mock_oss_catalog = mocker.Mock(__len__=mocker.Mock(return_value=42)) mock_cloud_catalog = mocker.Mock() @@ -16,8 +21,12 @@ def test_main(mocker, dummy_qa_report): mocker.patch.object(main.validations, "get_qa_report", mocker.Mock(return_value=dummy_qa_report)) mocker.patch.object(main.validations, "get_connectors_eligible_for_cloud") mocker.patch.object(main.cloud_availability_updater, "deploy_eligible_connectors_to_cloud_repo") + mocker.patch.object(main.outputs, "persist_qa_report") - main.main() + if create_prs: + runner.invoke(main.main, ["--create-prs"]) + else: + runner.invoke(main.main) assert main.inputs.fetch_remote_catalog.call_count == 2 main.inputs.fetch_remote_catalog.assert_has_calls([mocker.call(main.OSS_CATALOG_URL), mocker.call(main.CLOUD_CATALOG_URL)]) @@ -25,7 +34,11 @@ def test_main(mocker, dummy_qa_report): mock_oss_catalog, mock_cloud_catalog, main.inputs.fetch_adoption_metrics_per_connector_version.return_value ) main.validations.get_qa_report.assert_called_with(main.enrichments.get_enriched_catalog.return_value, len(mock_oss_catalog)) - main.validations.get_connectors_eligible_for_cloud.assert_called_with(main.validations.get_qa_report.return_value) - main.cloud_availability_updater.deploy_eligible_connectors_to_cloud_repo.assert_called_with( - main.validations.get_connectors_eligible_for_cloud.return_value + main.outputs.persist_qa_report.assert_called_with( + main.validations.get_qa_report.return_value, main.GCS_QA_REPORT_PATH, public_fields_only=False ) + if create_prs: + main.validations.get_connectors_eligible_for_cloud.assert_called_with(main.validations.get_qa_report.return_value) + main.cloud_availability_updater.deploy_eligible_connectors_to_cloud_repo.assert_called_with( + main.validations.get_connectors_eligible_for_cloud.return_value + ) diff --git a/tools/ci_connector_ops/tests/test_qa_engine/test_outputs.py b/tools/ci_connector_ops/tests/test_qa_engine/test_outputs.py index 7507b6b5876c..ba0236e8ec37 100644 --- a/tools/ci_connector_ops/tests/test_qa_engine/test_outputs.py +++ b/tools/ci_connector_ops/tests/test_qa_engine/test_outputs.py @@ -5,18 +5,14 @@ import pandas as pd import pytest - from ci_connector_ops.qa_engine import outputs + @pytest.mark.parametrize("public_fields_only", [True, False]) -def test_persist_qa_report_public_fields_only(tmp_path, dummy_qa_report, public_fields_only): - output_path = tmp_path / "qa_report.json" - outputs.persist_qa_report(dummy_qa_report, output_path, public_fields_only=public_fields_only) - qa_report_from_disk = pd.read_json(output_path) - private_fields = { - field.name for field in outputs.ConnectorQAReport.__fields__.values() - if not field.field_info.extra["is_public"] - } +def test_persist_qa_report(tmp_path, dummy_qa_report, public_fields_only): + output_path = outputs.persist_qa_report(dummy_qa_report, str(tmp_path), public_fields_only=public_fields_only) + qa_report_from_disk = pd.read_json(output_path, lines=True) + private_fields = {field.name for field in outputs.ConnectorQAReport.__fields__.values() if not field.field_info.extra["is_public"]} available_fields = set(qa_report_from_disk.columns) if public_fields_only: assert not private_fields.issubset(available_fields)