Skip to content

Commit

Permalink
qa-engine: implement early enrichments and validations on QA report (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored Jan 25, 2023
1 parent 438c1eb commit 734d548
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 32 deletions.
3 changes: 3 additions & 0 deletions tools/ci_connector_ops/ci_connector_ops/qa_engine/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
15 changes: 15 additions & 0 deletions tools/ci_connector_ops/ci_connector_ops/qa_engine/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


CLOUD_CATALOG_URL = "https://storage.googleapis.com/prod-airbyte-cloud-connector-metadata-service/cloud_catalog.json"
OSS_CATALOG_URL = "https://storage.googleapis.com/prod-airbyte-cloud-connector-metadata-service/oss_catalog.json"

INAPPROPRIATE_FOR_CLOUD_USE_CONNECTORS = [
"8be1cf83-fde1-477f-a4ad-318d23c9f3c6", # Local CSV
"a625d593-bba5-4a1c-a53d-2d246268a816", # Local JSON
"b76be0a6-27dc-4560-95f6-2623da0bd7b6" # Local SQL Lite
]

GCS_QA_REPORT_PATH = "gs://prod-airbyte-cloud-connector-metadata-service/qa_report.json"
42 changes: 42 additions & 0 deletions tools/ci_connector_ops/ci_connector_ops/qa_engine/enrichments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import pandas as pd

def get_enriched_catalog(oss_catalog: pd.DataFrame, cloud_catalog: pd.DataFrame) -> pd.DataFrame:
"""Merge OSS and Cloud catalog in a single dataframe on their definition id.
Transformations:
- Rename columns to snake case.
- Rename name column to connector_name.
- Rename docker_image_tag to connector_version.
- Replace null value for release_stage with alpha.
Enrichments:
- is_on_cloud: determined by the merge operation results.
- connector_technical_name: built from the docker repository field. airbyte/source-pokeapi -> source-pokeapi.
Args:
oss_catalog (pd.DataFrame): The open source catalog dataframe.
cloud_catalog (pd.DataFrame): The cloud catalog dataframe.
Returns:
pd.DataFrame: The enriched catalog.
"""
enriched_catalog = pd.merge(
oss_catalog,
cloud_catalog,
how="left",
on="connector_definition_id",
indicator=True,
suffixes=("", "_del"),
)
enriched_catalog.columns = enriched_catalog.columns.str.replace(
"(?<=[a-z])(?=[A-Z])", "_", regex=True
).str.lower() # column names to snake case
enriched_catalog["is_on_cloud"] = enriched_catalog["_merge"] == "both"
enriched_catalog = enriched_catalog.drop(columns="_merge")
enriched_catalog["connector_name"] = enriched_catalog["name"]
enriched_catalog["connector_technical_name"] = enriched_catalog["docker_repository"].str.replace("airbyte/", "")
enriched_catalog["connector_version"] = enriched_catalog["docker_image_tag"]
enriched_catalog["release_stage"] = enriched_catalog["release_stage"].fillna("unknown")
return enriched_catalog
4 changes: 1 addition & 3 deletions tools/ci_connector_ops/ci_connector_ops/qa_engine/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
import requests
import pandas as pd

CLOUD_CATALOG_URL = "https://storage.googleapis.com/prod-airbyte-cloud-connector-metadata-service/cloud_catalog.json"
OSS_CATALOG_URL = "https://storage.googleapis.com/prod-airbyte-cloud-connector-metadata-service/oss_catalog.json"

from .constants import CLOUD_CATALOG_URL, OSS_CATALOG_URL

def fetch_remote_catalog(catalog_url: str) -> pd.DataFrame:
"""Fetch a combined remote catalog and return a single DataFrame
Expand Down
30 changes: 7 additions & 23 deletions tools/ci_connector_ops/ci_connector_ops/qa_engine/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,13 @@
#


import pandas as pd
from .models import QAReport
from .constants import GCS_QA_REPORT_PATH
from .enrichments import get_enriched_catalog
from .inputs import CLOUD_CATALOG, OSS_CATALOG
from .validations import get_qa_report

GCS_QA_REPORT_PATH = "gs://prod-airbyte-cloud-connector-metadata-service/qa_report.json"
DUMMY_REPORT = pd.DataFrame([
{
"connector_type": "source",
"connector_name": "test",
"docker_image_tag": "0.0.0",
"release_stage": "alpha",
"is_on_cloud": False,
"latest_build_is_successful": False,
"documentation_is_available": False,
"number_of_connections": 0,
"number_of_users": 0,
"sync_success_rate": .99
}
])

def write_qa_report_to_gcs(qa_report: pd.DataFrame, output_file_path: str):
# Validate the report structure with pydantic QAReport model.
QAReport(connectors_qa_report=qa_report.to_dict(orient="records"))
qa_report.to_json(output_file_path, orient="records")

def main():
write_qa_report_to_gcs(DUMMY_REPORT, GCS_QA_REPORT_PATH)
enriched_catalog = get_enriched_catalog(OSS_CATALOG, CLOUD_CATALOG)
qa_report = get_qa_report(enriched_catalog)
qa_report.to_json(GCS_QA_REPORT_PATH, orient="records")
7 changes: 6 additions & 1 deletion tools/ci_connector_ops/ci_connector_ops/qa_engine/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,28 @@

from enum import Enum
from typing import List

from pydantic import BaseModel

class ConnectorTypeEnum(str, Enum):
source = "source"
destination = "destination"

class ReleaseStageEnum(str, Enum):
unknown = "unknown"
alpha = "alpha"
beta = "beta"
generally_available = "generally_available"

class ConnectorQAReport(BaseModel):
connector_type: ConnectorTypeEnum
connector_name: str
docker_image_tag: str
connector_technical_name: str
connector_definition_id: str
connector_version: str
release_stage: ReleaseStageEnum
is_on_cloud: bool
is_appropriate_for_cloud_use: bool
latest_build_is_successful: bool
documentation_is_available: bool
number_of_connections: int
Expand Down
60 changes: 60 additions & 0 deletions tools/ci_connector_ops/ci_connector_ops/qa_engine/validations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import pandas as pd
import requests

from .constants import INAPPROPRIATE_FOR_CLOUD_USE_CONNECTORS
from .inputs import OSS_CATALOG
from .models import ConnectorQAReport, QAReport

class QAReportGenerationError(Exception):
pass

def url_is_reachable(url: str) -> bool:
response = requests.get(url)
return response.status_code == 200

def is_appropriate_for_cloud_use(definition_id: str) -> bool:
return definition_id not in INAPPROPRIATE_FOR_CLOUD_USE_CONNECTORS

def get_qa_report(enriched_catalog: pd.DataFrame) -> pd.DataFrame:
"""Perform validation steps on top of the enriched catalog.
Adds the following columns:
- documentation_is_available:
GET the documentation URL and expect a 200 status code.
- is_appropriate_for_cloud_use:
Determined from an hardcoded list of definition ids inappropriate for cloud use.
- latest_build_is_successful:
Check if the latest build for the current connector version is successful.
- number_of_connections:
Get the number of connections using this connector version from our datawarehouse.
- number_of_users:
Get the number of users using this connector version from our datawarehouse.
- sync_success_rate:
Get the sync success rate of the connections with this connector version from our datawarehouse.
Args:
enriched_catalog (pd.DataFrame): The enriched catalog.
Returns:
pd.DataFrame: The final QA report.
"""
qa_report = enriched_catalog.copy(deep=True)
qa_report["documentation_is_available"] = qa_report.documentation_url.apply(url_is_reachable)
qa_report["is_appropriate_for_cloud_use"] = qa_report.connector_definition_id.apply(is_appropriate_for_cloud_use)

# TODO YET TO IMPLEMENT VALIDATIONS
qa_report["latest_build_is_successful"] = False # TODO, tracked in https://github.com/airbytehq/airbyte/issues/21720
qa_report["number_of_connections"] = 0 # TODO, tracked in https://github.com/airbytehq/airbyte/issues/21721
qa_report["number_of_users"] = 0 # TODO, tracked in https://github.com/airbytehq/airbyte/issues/21721
qa_report["sync_success_rate"] = .0 # TODO, tracked in https://github.com/airbytehq/airbyte/issues/21721

# Only select dataframe columns defined in the ConnectorQAReport model.
qa_report= qa_report[[field.name for field in ConnectorQAReport.__fields__.values()]]
# Validate the report structure with pydantic QAReport model.
QAReport(connectors_qa_report=qa_report.to_dict(orient="records"))
if len(qa_report) != len(OSS_CATALOG):
raise QAReportGenerationError("The QA report does not contain all the connectors defined in the OSS catalog.")
return qa_report
10 changes: 9 additions & 1 deletion tools/ci_connector_ops/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,23 @@
"gcsfs~=2023.1.0"
]

TEST_REQUIREMENTS = [
"pytest~=6.2.5",
"pytest-mock~=3.10.0",
]


setup(
version="0.1.3",
version="0.1.4",
name="ci_connector_ops",
description="Packaged maintained by the connector operations team to perform CI for connectors",
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=MAIN_REQUIREMENTS,
extras_require={
"tests": TEST_REQUIREMENTS,
},
python_requires=">=3.9",
entry_points={
"console_scripts": [
Expand Down
3 changes: 3 additions & 0 deletions tools/ci_connector_ops/tests/test_qa_engine/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
38 changes: 38 additions & 0 deletions tools/ci_connector_ops/tests/test_qa_engine/test_enrichments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import re

import pandas as pd
import pytest

from ci_connector_ops.qa_engine import inputs, enrichments

@pytest.fixture
def enriched_catalog() -> pd.DataFrame:
return enrichments.get_enriched_catalog(inputs.OSS_CATALOG, inputs.CLOUD_CATALOG)

@pytest.fixture
def enriched_catalog_columns(enriched_catalog: pd.DataFrame) -> set:
return set(enriched_catalog.columns)

def test_merge_performed_correctly(enriched_catalog):
assert len(enriched_catalog) == len(inputs.OSS_CATALOG)

def test_new_columns_are_added(enriched_catalog_columns):
expected_new_columns = {
"is_on_cloud",
"connector_name",
"connector_technical_name",
"connector_version"
}
assert expected_new_columns.issubset(enriched_catalog_columns)

def test_no_column_are_removed_and_lowercased(enriched_catalog_columns):
for column in inputs.OSS_CATALOG:
assert re.sub(r"(?<!^)(?=[A-Z])", "_", column).lower() in enriched_catalog_columns

def test_release_stage_not_null(enriched_catalog):
assert len(enriched_catalog["release_stage"].dropna()) == len(enriched_catalog["release_stage"])
33 changes: 29 additions & 4 deletions tools/ci_connector_ops/tests/test_qa_engine/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,35 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import pandas

import pandas as pd
import pytest

from ci_connector_ops.qa_engine import main

def test_write_qa_report_to_gcs(tmp_path):
@pytest.fixture
def dummy_report() -> pd.DataFrame:
return pd.DataFrame([
{
"connector_type": "source",
"connector_name": "test",
"docker_image_tag": "0.0.0",
"release_stage": "alpha",
"is_on_cloud": False,
"latest_build_is_successful": False,
"documentation_is_available": False,
"number_of_connections": 0,
"number_of_users": 0,
"sync_success_rate": .99
}
])

def test_main(tmp_path, mocker, dummy_report):
output_path = tmp_path / "output.json"
main.write_qa_report_to_gcs(main.DUMMY_REPORT, output_path)
assert pandas.read_json(output_path).to_dict() == main.DUMMY_REPORT.to_dict()
mocker.patch.object(main, "GCS_QA_REPORT_PATH", output_path)
mocker.patch.object(main, "get_enriched_catalog")
mocker.patch.object(main, "get_qa_report", mocker.Mock(return_value=dummy_report))
main.main()
main.get_enriched_catalog.assert_called_with(main.OSS_CATALOG, main.CLOUD_CATALOG)
main.get_qa_report.assert_called_with(main.get_enriched_catalog.return_value)
assert pd.read_json(output_path).to_dict() == dummy_report.to_dict()
34 changes: 34 additions & 0 deletions tools/ci_connector_ops/tests/test_qa_engine/test_validations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import pandas as pd
import pytest

from ci_connector_ops.qa_engine import inputs, enrichments, models, validations

@pytest.fixture
def enriched_catalog() -> pd.DataFrame:
return enrichments.get_enriched_catalog(inputs.OSS_CATALOG, inputs.CLOUD_CATALOG)

@pytest.fixture
def qa_report(enriched_catalog, mocker) -> pd.DataFrame:
mocker.patch.object(validations, "url_is_reachable", mocker.Mock(return_value=True))
return validations.get_qa_report(enriched_catalog)

@pytest.fixture
def qa_report_columns(qa_report: pd.DataFrame) -> set:
return set(qa_report.columns)

def test_all_columns_are_declared(qa_report_columns: set):
expected_columns = set([field.name for field in models.ConnectorQAReport.__fields__.values()])
assert qa_report_columns == expected_columns

def test_not_null_values_after_validation(qa_report: pd.DataFrame):
assert len(qa_report.dropna()) == len(qa_report)

def test_report_generation_error(enriched_catalog, mocker):
mocker.patch.object(validations, "url_is_reachable", mocker.Mock(return_value=True))
with pytest.raises(validations.QAReportGenerationError):
return validations.get_qa_report(enriched_catalog.sample(10))

0 comments on commit 734d548

Please sign in to comment.