Skip to content

Commit

Permalink
cloud availability updater: automatic PR creation (#22568)
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored Feb 8, 2023
1 parent 4ab4cec commit d918385
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 126 deletions.
12 changes: 7 additions & 5 deletions .github/workflows/run-qa-engine.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ name: Run QA Engine

on:
workflow_dispatch:
schedule:
# 1pm UTC is 6am PDT.
# same time as Generate Build Report
- cron: "0 13 * * *"
# schedule:
## 1pm UTC is 6am PDT.
## same time as Generate Build Report
# - cron: "0 13 * * *"

jobs:
run-qa-engine:
Expand All @@ -28,5 +28,7 @@ jobs:
run: pip install --quiet -e ./tools/ci_connector_ops
- name: Run QA Engine
env:
QA_ENGINE_AIRBYTE_DATA_PROD_SA: '${{ secrets.QA_ENGINE_AIRBYTE_DATA_PROD_SA }}'
LOGLEVEL: INFO
QA_ENGINE_AIRBYTE_DATA_PROD_SA: "${{ secrets.QA_ENGINE_AIRBYTE_DATA_PROD_SA }}"
GITHUB_API_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
run: run-qa-engine
Original file line number Diff line number Diff line change
@@ -1,47 +1,58 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


import os
import logging
from pathlib import Path
import shutil
import subprocess
from typing import Optional
import tempfile
from pathlib import Path
from typing import Iterable, Optional

import git
import requests

from .models import ConnectorQAReport
from .constants import (
AIRBYTE_CLOUD_GITHUB_REPO_URL,
AIRBYTE_CLOUD_MAIN_BRANCH_NAME
AIRBYTE_PLATFORM_INTERNAL_GITHUB_REPO_URL,
AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME,
AIRBYTE_PLATFORM_INTERNAL_PR_ENDPOINT,
AIRBYTE_PLATFORM_INTERNAL_REPO_OWNER,
GITHUB_API_COMMON_HEADERS,
)
from .models import ConnectorQAReport

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def clone_airbyte_cloud_repo(local_repo_path: Path) -> git.Repo:
logging.info(f"Cloning {AIRBYTE_CLOUD_GITHUB_REPO_URL} to {local_repo_path}")
return git.Repo.clone_from(AIRBYTE_CLOUD_GITHUB_REPO_URL, local_repo_path, branch=AIRBYTE_CLOUD_MAIN_BRANCH_NAME)
logger.info(f"Cloning {AIRBYTE_PLATFORM_INTERNAL_GITHUB_REPO_URL} to {local_repo_path}")
return git.Repo.clone_from(
AIRBYTE_PLATFORM_INTERNAL_GITHUB_REPO_URL, local_repo_path, branch=AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME
)


def get_definitions_mask_path(local_repo_path, definition_type: str) -> Path:
definitions_mask_path = local_repo_path / f"cloud-config/cloud-config-seed/src/main/resources/seed/{definition_type}_definitions_mask.yaml"
definitions_mask_path = (
local_repo_path / f"cloud-config/cloud-config-seed/src/main/resources/seed/{definition_type}_definitions_mask.yaml"
)
if not definitions_mask_path.exists():
raise FileNotFoundError(f"Can't find the {definition_type} definitions mask")
return definitions_mask_path


def checkout_new_branch(airbyte_cloud_repo: git.Repo, new_branch_name: str) -> git.Head:
new_branch = airbyte_cloud_repo.create_head(new_branch_name)
new_branch.checkout()
logging.info(f"Checked out branch {new_branch_name}.")
logger.info(f"Checked out branch {new_branch_name}.")
return new_branch


def update_definitions_mask(connector: ConnectorQAReport, definitions_mask_path: Path) -> Optional[Path]:
with open(definitions_mask_path, "r") as definition_mask:
connector_already_in_mask = connector.connector_definition_id in definition_mask.read()
if connector_already_in_mask:
logging.warning(f"{connector.connector_name}'s definition id is already in {definitions_mask_path}.")
logger.warning(f"{connector.connector_name}'s definition id is already in {definitions_mask_path}.")
return None

to_append = f"""# {connector.connector_name} (from cloud availability updater)
Expand All @@ -50,31 +61,64 @@ def update_definitions_mask(connector: ConnectorQAReport, definitions_mask_path:

with open(definitions_mask_path, "a") as f:
f.write(to_append)
logging.info(f"Updated {definitions_mask_path} with {connector.connector_name}'s definition id.")
logger.info(f"Updated {definitions_mask_path} with {connector.connector_name}'s definition id.")
return definitions_mask_path


def run_generate_cloud_connector_catalog(airbyte_cloud_repo_path: Path) -> str:
result = subprocess.check_output(
f"cd {airbyte_cloud_repo_path} && ./gradlew :cloud-config:cloud-config-seed:generateCloudConnectorCatalog",
shell=True
)
logging.info("Ran generateCloudConnectorCatalog Gradle Task")
f"cd {airbyte_cloud_repo_path} && ./gradlew :cloud-config:cloud-config-seed:generateCloudConnectorCatalog", shell=True
)
logger.info("Ran generateCloudConnectorCatalog Gradle Task")
return result.decode()


def commit_all_files(airbyte_cloud_repo: git.Repo, commit_message: str):
airbyte_cloud_repo.git.add('--all')
airbyte_cloud_repo.git.add("--all")
airbyte_cloud_repo.git.commit(m=commit_message)
logging.info(f"Committed file changes.")
logger.info("Committed file changes.")


def push_branch(airbyte_cloud_repo: git.Repo, branch:str):
airbyte_cloud_repo.git.push("--set-upstream", "origin", branch)
logging.info(f"Pushed branch {branch} to origin")
def push_branch(airbyte_cloud_repo: git.Repo, branch: str):
airbyte_cloud_repo.git.push("--force", "--set-upstream", "origin", branch)
logger.info(f"Pushed branch {branch} to origin")

def deploy_new_connector_to_cloud_repo(
airbyte_cloud_repo_path: Path,
airbyte_cloud_repo: git.Repo,
connector: ConnectorQAReport
):

def pr_already_created_for_branch(head_branch: str) -> bool:
response = requests.get(
AIRBYTE_PLATFORM_INTERNAL_PR_ENDPOINT,
headers=GITHUB_API_COMMON_HEADERS,
params={"head": f"{AIRBYTE_PLATFORM_INTERNAL_REPO_OWNER}:{head_branch}", "state": "open"},
)
response.raise_for_status()
return len(response.json()) > 0


def create_pr(connector: ConnectorQAReport, branch: str) -> Optional[requests.Response]:
body = f"""The Cloud Availability Updater decided that it's the right time to make {connector.connector_name} available on Cloud!
- Technical name: {connector.connector_technical_name}
- Version: {connector.connector_version}
- Definition ID: {connector.connector_definition_id}
- OSS sync success rate: {connector.sync_success_rate}
- OSS number of connections: {connector.number_of_connections}
"""
data = {
"title": f"🤖 Add {connector.connector_technical_name} to cloud",
"body": body,
"head": branch,
"base": AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME,
}
if not pr_already_created_for_branch(branch):
response = requests.post(AIRBYTE_PLATFORM_INTERNAL_PR_ENDPOINT, headers=GITHUB_API_COMMON_HEADERS, json=data)
response.raise_for_status()
pr_url = response.json().get("url")
logger.info(f"A PR was opened for {connector.connector_technical_name}: {pr_url}")
return response
else:
logger.warning(f"A PR already exists for branch {branch}")


def deploy_new_connector_to_cloud_repo(airbyte_cloud_repo_path: Path, airbyte_cloud_repo: git.Repo, connector: ConnectorQAReport):
"""Updates the local definitions mask on Airbyte cloud repo.
Calls the generateCloudConnectorCatalog gradle task.
Commits these changes on a new branch.
Expand All @@ -85,15 +129,22 @@ def deploy_new_connector_to_cloud_repo(
airbyte_cloud_repo (git.Repo): The Airbyte Cloud repo instance.
connector (ConnectorQAReport): The connector to add to a definitions mask.
"""
airbyte_cloud_repo.git.checkout(AIRBYTE_CLOUD_MAIN_BRANCH_NAME)
airbyte_cloud_repo.git.checkout(AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME)
new_branch_name = f"cloud-availability-updater/deploy-{connector.connector_technical_name}"
checkout_new_branch(airbyte_cloud_repo, new_branch_name)
definitions_mask_path = get_definitions_mask_path(airbyte_cloud_repo_path, connector.connector_type)
update_definitions_mask(connector, definitions_mask_path)
run_generate_cloud_connector_catalog(airbyte_cloud_repo_path)
commit_all_files(
airbyte_cloud_repo,
f"🤖 Add {connector.connector_name} connector to cloud"
)
push_branch(airbyte_cloud_repo, new_branch_name)
airbyte_cloud_repo.git.checkout(AIRBYTE_CLOUD_MAIN_BRANCH_NAME)
updated_files = update_definitions_mask(connector, definitions_mask_path)
if updated_files:
run_generate_cloud_connector_catalog(airbyte_cloud_repo_path)
commit_all_files(airbyte_cloud_repo, f"🤖 Add {connector.connector_name} connector to cloud")
push_branch(airbyte_cloud_repo, new_branch_name)
create_pr(connector, new_branch_name)
airbyte_cloud_repo.git.checkout(AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME)


def deploy_eligible_connectors_to_cloud_repo(eligible_connectors: Iterable):
cloud_repo_path = Path(tempfile.mkdtemp())
airbyte_cloud_repo = clone_airbyte_cloud_repo(cloud_repo_path)
for connector in eligible_connectors:
deploy_new_connector_to_cloud_repo(cloud_repo_path, airbyte_cloud_repo, connector)
shutil.rmtree(cloud_repo_path)
24 changes: 19 additions & 5 deletions tools/ci_connector_ops/ci_connector_ops/qa_engine/constants.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import os

CONNECTOR_BUILD_OUTPUT_URL = "https://dnsgjos7lj2fu.cloudfront.net/tests/history/connectors"
CLOUD_CATALOG_URL = "https://storage.googleapis.com/prod-airbyte-cloud-connector-metadata-service/cloud_catalog.json"
OSS_CATALOG_URL = "https://storage.googleapis.com/prod-airbyte-cloud-connector-metadata-service/oss_catalog.json"

INAPPROPRIATE_FOR_CLOUD_USE_CONNECTORS = [
"8be1cf83-fde1-477f-a4ad-318d23c9f3c6", # Local CSV
"a625d593-bba5-4a1c-a53d-2d246268a816", # Local JSON
"b76be0a6-27dc-4560-95f6-2623da0bd7b6" # Local SQL Lite
"8be1cf83-fde1-477f-a4ad-318d23c9f3c6", # Local CSV
"a625d593-bba5-4a1c-a53d-2d246268a816", # Local JSON
"b76be0a6-27dc-4560-95f6-2623da0bd7b6", # Local SQL Lite
]

GCS_QA_REPORT_PATH = "gs://prod-airbyte-cloud-connector-metadata-service/qa_report.json"
AIRBYTE_CLOUD_GITHUB_REPO_URL = "https://github.com/airbytehq/airbyte-cloud.git"
AIRBYTE_CLOUD_MAIN_BRANCH_NAME = "master"
AIRBYTE_PLATFORM_INTERNAL_REPO_OWNER = "airbytehq"
AIRBYTE_PLATFORM_INTERNAL_REPO_NAME = "airbyte-platform-internal"
AIRBYTE_PLATFORM_INTERNAL_GITHUB_REPO_URL = (
f"https://github.com/{AIRBYTE_PLATFORM_INTERNAL_REPO_OWNER}/{AIRBYTE_PLATFORM_INTERNAL_REPO_NAME}.git"
)
AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME = "master"
AIRBYTE_PLATFORM_INTERNAL_PR_ENDPOINT = (
f"https://api.github.com/repos/{AIRBYTE_PLATFORM_INTERNAL_REPO_OWNER}/{AIRBYTE_PLATFORM_INTERNAL_REPO_NAME}/pulls"
)
GITHUB_API_TOKEN = os.environ.get("GITHUB_API_TOKEN")
GITHUB_API_COMMON_HEADERS = {
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
"Authorization": f"Bearer {GITHUB_API_TOKEN}",
}
24 changes: 17 additions & 7 deletions tools/ci_connector_ops/ci_connector_ops/qa_engine/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,28 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging

from . import cloud_availability_updater, enrichments, inputs, validations
from .constants import CLOUD_CATALOG_URL, OSS_CATALOG_URL
from . import enrichments, inputs, validations

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger(__name__)


def main():
logger.info("Fetch the OSS connectors catalog.")
oss_catalog = inputs.fetch_remote_catalog(OSS_CATALOG_URL)
logger.info("Fetch the Cloud connectors catalog.")
cloud_catalog = inputs.fetch_remote_catalog(CLOUD_CATALOG_URL)
logger.info("Fetch adoption metrics.")
adoption_metrics_per_connector_version = inputs.fetch_adoption_metrics_per_connector_version()
enriched_catalog = enrichments.get_enriched_catalog(
oss_catalog,
cloud_catalog,
adoption_metrics_per_connector_version
)
validations.get_qa_report(enriched_catalog, len(oss_catalog))
logger.info("Start the enriched catalog generation.")
enriched_catalog = enrichments.get_enriched_catalog(oss_catalog, cloud_catalog, adoption_metrics_per_connector_version)
logger.info("Start the QA report generation.")
qa_report = validations.get_qa_report(enriched_catalog, len(oss_catalog))
logger.info("Start the QA report generation.")
eligible_connectors = validations.get_connectors_eligible_for_cloud(qa_report)
logger.info("Start eligible connectors deployment to Cloud.")
cloud_availability_updater.deploy_eligible_connectors_to_cloud_repo(eligible_connectors)
41 changes: 23 additions & 18 deletions tools/ci_connector_ops/ci_connector_ops/qa_engine/validations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,49 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging
from datetime import datetime
from typing import Iterable
from typing import List

import pandas as pd
import requests

from .constants import INAPPROPRIATE_FOR_CLOUD_USE_CONNECTORS
from .inputs import BUILD_STATUSES, fetch_latest_build_status_for_connector_version
from .models import ConnectorQAReport, QAReport
from .inputs import fetch_latest_build_status_for_connector_version, BUILD_STATUSES

TRUTHY_COLUMNS_TO_BE_ELIGIBLE = [
"documentation_is_available",
"is_appropriate_for_cloud_use",
"latest_build_is_successful"
]
logger = logging.getLogger(__name__)


TRUTHY_COLUMNS_TO_BE_ELIGIBLE = ["documentation_is_available", "is_appropriate_for_cloud_use", "latest_build_is_successful"]


class QAReportGenerationError(Exception):
pass


def url_is_reachable(url: str) -> bool:
response = requests.get(url)
return response.status_code == 200


def is_appropriate_for_cloud_use(definition_id: str) -> bool:
return definition_id not in INAPPROPRIATE_FOR_CLOUD_USE_CONNECTORS


def is_eligible_for_promotion_to_cloud(connector_qa_data: pd.Series) -> bool:
if connector_qa_data["is_on_cloud"]:
return False
return all([
connector_qa_data[col]
for col in TRUTHY_COLUMNS_TO_BE_ELIGIBLE
])
if connector_qa_data["is_on_cloud"]:
return False
return all([connector_qa_data[col] for col in TRUTHY_COLUMNS_TO_BE_ELIGIBLE])


def latest_build_is_successful(connector_qa_data: pd.Series) -> bool:
connector_technical_name = connector_qa_data["connector_technical_name"]
connector_version = connector_qa_data["connector_version"]
latest_build_status = fetch_latest_build_status_for_connector_version(connector_technical_name, connector_version)
return latest_build_status == BUILD_STATUSES.SUCCESS


def get_qa_report(enriched_catalog: pd.DataFrame, oss_catalog_length: int) -> pd.DataFrame:
"""Perform validation steps on top of the enriched catalog.
Adds the following columns:
Expand Down Expand Up @@ -74,13 +77,15 @@ def get_qa_report(enriched_catalog: pd.DataFrame, oss_catalog_length: int) -> pd
qa_report["report_generation_datetime"] = datetime.utcnow()

# Only select dataframe columns defined in the ConnectorQAReport model.
qa_report= qa_report[[field.name for field in ConnectorQAReport.__fields__.values()]]
qa_report = qa_report[[field.name for field in ConnectorQAReport.__fields__.values()]]
# Validate the report structure with pydantic QAReport model.
QAReport(connectors_qa_report=qa_report.to_dict(orient="records"))
if len(qa_report) != oss_catalog_length:
raise QAReportGenerationError("The QA report does not contain all the connectors defined in the OSS catalog.")
raise QAReportGenerationError("The QA report does not contain all the connectors defined in the OSS catalog.")
return qa_report

def get_connectors_eligible_for_cloud(qa_report: pd.DataFrame) -> Iterable[ConnectorQAReport]:
for _, row in qa_report[qa_report["is_eligible_for_promotion_to_cloud"]].iterrows():
yield ConnectorQAReport(**row)

def get_connectors_eligible_for_cloud(qa_report: pd.DataFrame) -> List[ConnectorQAReport]:
eligible_connectors = [ConnectorQAReport(**row) for _, row in qa_report[qa_report["is_eligible_for_promotion_to_cloud"]].iterrows()]
logger.info(f"{len(eligible_connectors)} connectors are eligible for Cloud.")
return eligible_connectors
6 changes: 3 additions & 3 deletions tools/ci_connector_ops/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"pandas-gbq~=0.19.0",
"pydantic~=1.10.4",
"fsspec~=2023.1.0",
"gcsfs~=2023.1.0"
"gcsfs~=2023.1.0",
]

TEST_REQUIREMENTS = [
Expand All @@ -21,7 +21,7 @@
]

setup(
version="0.1.10",
version="0.1.11",
name="ci_connector_ops",
description="Packaged maintained by the connector operations team to perform CI for connectors",
author="Airbyte",
Expand All @@ -40,7 +40,7 @@
"print-mandatory-reviewers = ci_connector_ops.acceptance_test_config_checks:print_mandatory_reviewers",
"allowed-hosts-checks = ci_connector_ops.allowed_hosts_checks:check_allowed_hosts",
"run-qa-engine = ci_connector_ops.qa_engine.main:main",
"run-qa-checks = ci_connector_ops.qa_checks:run_qa_checks"
"run-qa-checks = ci_connector_ops.qa_checks:run_qa_checks",
],
},
)
Loading

0 comments on commit d918385

Please sign in to comment.