Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

qa-engine: implement fetch_adoption_metrics_per_connector_version #21840

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/run-qa-engine.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ jobs:
- name: Install ci-connector-ops package
run: pip install --quiet -e ./tools/ci_connector_ops
- name: Run QA Engine
env:
QA_ENGINE_AIRBYTE_DATA_PROD_SA: '${{ secrets.QA_ENGINE_AIRBYTE_DATA_PROD_SA }}'
run: run-qa-engine
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
WITH official_connector_syncs AS (
alafanechere marked this conversation as resolved.
Show resolved Hide resolved
SELECT * FROM airbyte_warehouse.connector_sync WHERE is_officially_published and (job_status = "failed" or job_status = "succeeded")
),
adoption_per_version AS (
SELECT
connector_definition_id,
docker_repository,
connector_version,
COUNT(distinct(user_id)) as number_of_users,
COUNT(distinct(connection_id)) as number_of_connections
FROM
official_connector_syncs
GROUP BY connector_definition_id, docker_repository, connector_version
),
job_status_per_version AS (
SELECT
connector_definition_id,
docker_repository,
connector_version,
job_status,
COUNT(1) as sync_count
FROM official_connector_syncs
GROUP BY connector_definition_id, docker_repository, connector_version, job_status
),
success_failure_by_connector_version AS (
SELECT
connector_definition_id,
docker_repository,
connector_version,
ifnull(failed, 0) as failed_syncs_count,
ifnull(succeeded, 0) as succeeded_syncs_count,
ifnull(succeeded, 0) + ifnull(failed, 0) as total_syncs_count,
SAFE_DIVIDE(ifnull(succeeded, 0), ifnull(succeeded, 0) + ifnull(failed, 0)) as sync_success_rate
FROM job_status_per_version
PIVOT
(
max(sync_count)
FOR job_status in ('failed', 'succeeded')
)
)
SELECT
*
FROM
adoption_per_version
LEFT JOIN success_failure_by_connector_version
USING (connector_definition_id, docker_repository, connector_version);
19 changes: 13 additions & 6 deletions tools/ci_connector_ops/ci_connector_ops/qa_engine/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import os
from importlib.resources import files
import json

from google.oauth2 import service_account
import requests
import pandas as pd

Expand Down Expand Up @@ -31,18 +35,21 @@ def fetch_adoption_metrics_per_connector_version() -> pd.DataFrame:
"""Retrieve adoptions metrics for each connector version from our data warehouse.

Returns:
pd.DataFrame: A Dataframe with adoption metrics per connector version.
pd.DataFrame: A dataframe with adoption metrics per connector version.
"""
# TODO: directly query BigQuery
# use query in https://airbyte.metabaseapp.com/question/1642-adoption-and-success-rate-per-connector-version-oss-cloud
return pd.DataFrame(columns=[
connector_adoption_sql = files("ci_connector_ops.qa_engine").joinpath("connector_adoption.sql").read_text()
bq_credentials = service_account.Credentials.from_service_account_info(json.loads(os.environ["QA_ENGINE_AIRBYTE_DATA_PROD_SA"]))
alafanechere marked this conversation as resolved.
Show resolved Hide resolved
adoption_metrics = pd.read_gbq(connector_adoption_sql, project_id="airbyte-data-prod", credentials=bq_credentials)
return adoption_metrics[[
"connector_definition_id",
"connector_version",
"number_of_connections",
"number_of_users",
"succeeded_syncs_count",
"failed_syncs_count",
"total_syncs_count",
"sync_success_rate",
])
]]

CLOUD_CATALOG = fetch_remote_catalog(CLOUD_CATALOG_URL)
OSS_CATALOG = fetch_remote_catalog(OSS_CATALOG_URL)
ADOPTION_METRICS_PER_CONNECTOR_VERSION = fetch_adoption_metrics_per_connector_version()
4 changes: 3 additions & 1 deletion tools/ci_connector_ops/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"PyYAML~=6.0",
"GitPython~=3.1.29",
"pandas~=1.5.3",
"pandas-gbq~=0.19.0",
"pydantic~=1.10.4",
"fsspec~=2023.1.0",
"gcsfs~=2023.1.0"
Expand All @@ -22,7 +23,7 @@


setup(
version="0.1.4",
version="0.1.5",
Copy link
Contributor Author

@alafanechere alafanechere Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anticipating a version bump. 0.1.4 is in #21776 and I expect #21776 to be merged before this PR.

name="ci_connector_ops",
description="Packaged maintained by the connector operations team to perform CI for connectors",
author="Airbyte",
Expand All @@ -33,6 +34,7 @@
"tests": TEST_REQUIREMENTS,
},
python_requires=">=3.9",
package_data={"ci_connector_ops.qa_engine": ["connector_adoption.sql"]},
entry_points={
"console_scripts": [
"check-test-strictness-level = ci_connector_ops.sat_config_checks:check_test_strictness_level",
Expand Down
34 changes: 31 additions & 3 deletions tools/ci_connector_ops/tests/test_qa_engine/test_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#


from importlib.resources import files

import pandas as pd
import pytest

Expand All @@ -16,15 +18,41 @@ def test_fetch_remote_catalog(catalog_url):
assert all(expected_column in catalog.columns for expected_column in expected_columns)
assert set(catalog.connector_type.unique()) == {"source", "destination"}

def test_fetch_adoption_metrics_per_connector_version():
def test_fetch_adoption_metrics_per_connector_version(mocker):
fake_bigquery_results = pd.DataFrame([{
"connector_definition_id": "abcdefgh",
"connector_version": "0.0.0",
"number_of_connections": 10,
"number_of_users": 2,
"succeeded_syncs_count": 12,
"failed_syncs_count": 1,
"total_syncs_count": 3,
"sync_success_rate": .99,
"unexpected_column": "foobar"
}])
mocker.patch.object(inputs.pd, "read_gbq", mocker.Mock(return_value=fake_bigquery_results))
mocker.patch.object(inputs.os, "environ", {"QA_ENGINE_AIRBYTE_DATA_PROD_SA": '{"type": "fake_service_account"}'})
mocker.patch.object(inputs.service_account.Credentials, "from_service_account_info")
expected_columns = {
"connector_definition_id",
"connector_version",
"number_of_connections",
"number_of_users",
"succeeded_syncs_count",
"failed_syncs_count",
"total_syncs_count",
"sync_success_rate",
}

expected_sql_query = files("ci_connector_ops.qa_engine").joinpath("connector_adoption.sql").read_text()
expected_project_id = "airbyte-data-prod"
adoption_metrics_per_connector_version = inputs.fetch_adoption_metrics_per_connector_version()
assert len(adoption_metrics_per_connector_version) == 0
assert isinstance(adoption_metrics_per_connector_version, pd.DataFrame)
assert set(adoption_metrics_per_connector_version.columns) == expected_columns
inputs.service_account.Credentials.from_service_account_info.assert_called_with(
{"type": "fake_service_account"}
)
inputs.pd.read_gbq.assert_called_with(
expected_sql_query,
project_id=expected_project_id,
credentials=inputs.service_account.Credentials.from_service_account_info.return_value
)