Skip to content

Commit

Permalink
airbyte-ci publish: promote and rollback release candidates
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere committed Sep 2, 2024
1 parent 21fddbd commit f54609d
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@
import pathlib

import click
from metadata_service.constants import METADATA_FILE_NAME
from metadata_service.gcs_upload import MetadataUploadInfo, upload_metadata_to_gcs
from metadata_service.constants import METADATA_FILE_NAME, RELEASE_CANDIDATE_GCS_FOLDER_NAME
from metadata_service.gcs_upload import (
MetadataDeleteInfo,
MetadataUploadInfo,
delete_release_candidate_from_gcs,
promote_release_candidate_in_gcs,
upload_metadata_to_gcs,
)
from metadata_service.validators.metadata_validator import PRE_UPLOAD_VALIDATORS, ValidatorOptions, validate_and_load
from pydantic import ValidationError

Expand All @@ -24,6 +30,17 @@ def log_metadata_upload_info(metadata_upload_info: MetadataUploadInfo):
)


def log_metadata_deletion_info(metadata_deletion_info: MetadataDeleteInfo):
for remote_file in metadata_deletion_info.deleted_files:
if remote_file.deleted:
click.secho(f"The {remote_file.description} was deleted ({remote_file.blob_id}).", fg="green")
else:
click.secho(
f"The {remote_file.description} was not deleted ({remote_file.blob_id}).",
fg="red",
)


@click.group(help="Airbyte Metadata Service top-level command group.")
def metadata_service():
pass
Expand Down Expand Up @@ -63,3 +80,30 @@ def upload(metadata_file_path: pathlib.Path, docs_path: pathlib.Path, bucket_nam
exit(0)
else:
exit(5)


@metadata_service.command(help="Rollback a release candidate by deleting its metadata files from a GCS bucket.")
@click.argument("connector-docker-repository", type=click.STRING)
@click.argument("connector-version", type=click.STRING)
@click.argument("bucket-name", type=click.STRING)
def rollback_release_candidate(connector_docker_repository: str, connector_version: str, bucket_name: str):
try:
deletion_info = delete_release_candidate_from_gcs(bucket_name, connector_docker_repository, connector_version)
log_metadata_deletion_info(deletion_info)
except (FileNotFoundError, ValueError) as e:
click.secho(f"The release candidate could not be deleted: {str(e)}", fg="red")
exit(1)


@metadata_service.command(help="Promote a release candidate by moving its metadata files to the main release folder in a GCS bucket.")
@click.argument("connector-docker-repository", type=click.STRING)
@click.argument("connector-version", type=click.STRING)
@click.argument("bucket-name", type=click.STRING)
def promote_release_candidate(connector_docker_repository: str, connector_version: str, bucket_name: str):
try:
upload_info, deletion_info = promote_release_candidate_in_gcs(bucket_name, connector_docker_repository, connector_version)
log_metadata_upload_info(upload_info)
log_metadata_deletion_info(deletion_info)
except (FileNotFoundError, ValueError) as e:
click.secho(f"The release candidate could not be promoted: {str(e)}", fg="red")
exit(1)
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,27 @@ class UploadedFile:
blob_id: Optional[str]


@dataclass(frozen=True)
class DeletedFile:
id: str
deleted: bool
description: str
blob_id: Optional[str]


@dataclass(frozen=True)
class MetadataUploadInfo:
metadata_uploaded: bool
metadata_file_path: str
uploaded_files: List[UploadedFile]


@dataclass(frozen=True)
class MetadataDeleteInfo:
metadata_deleted: bool
deleted_files: List[DeletedFile]


def get_metadata_remote_file_path(dockerRepository: str, version: str) -> str:
"""Get the path to the metadata file for a specific version of a connector.
Expand Down Expand Up @@ -120,6 +134,14 @@ def _save_blob_to_gcs(blob_to_save: storage.blob.Blob, file_path: str, disable_c
return True


def _delete_blob_from_gcs(blob_to_delete: storage.blob.Blob) -> bool:
"""Deletes a blob from the bucket."""
print(f"Deleting {blob_to_delete.name}...")
blob_to_delete.delete()

return True


def upload_file_if_changed(
local_file_path: Path, bucket: storage.bucket.Bucket, blob_path: str, disable_cache: bool = False
) -> Tuple[bool, str]:
Expand Down Expand Up @@ -279,6 +301,16 @@ def _apply_modifications_to_metadata_file(original_metadata_file_path: Path, val
return _write_metadata_to_tmp_file(metadata)


def _get_storage_client() -> storage.Client:
gcs_creds = os.environ.get("GCS_CREDENTIALS")
if not gcs_creds:
raise ValueError("Please set the GCS_CREDENTIALS env var.")

service_account_info = json.loads(gcs_creds)
credentials = service_account.Credentials.from_service_account_info(service_account_info)
return storage.Client(credentials=credentials)


def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path, validator_opts: ValidatorOptions) -> MetadataUploadInfo:
"""Upload a metadata file to a GCS bucket.
Expand All @@ -304,13 +336,8 @@ def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path, validator
is_release_candidate = getattr(metadata.data.releases, "isReleaseCandidate", False)
should_upload_release_candidate = is_release_candidate and not is_pre_release
should_upload_latest = not is_release_candidate and not is_pre_release
gcs_creds = os.environ.get("GCS_CREDENTIALS")
if not gcs_creds:
raise ValueError("Please set the GCS_CREDENTIALS env var.")

service_account_info = json.loads(gcs_creds)
credentials = service_account.Credentials.from_service_account_info(service_account_info)
storage_client = storage.Client(credentials=credentials)
storage_client = _get_storage_client()
bucket = storage_client.bucket(bucket_name)
docs_path = Path(validator_opts.docs_path)

Expand Down Expand Up @@ -401,3 +428,124 @@ def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path, validator
),
],
)


def delete_release_candidate_from_gcs(bucket_name: str, docker_repository, connector_version) -> MetadataDeleteInfo:
"""Delete a release candidate from a GCS bucket.
The release candidate and version metadata file will be deleted.
We first check that the release candidate metadata file hash matches the version metadata file hash.
Args:
bucket_name (str): Name of the GCS bucket from which the release candidate will be deleted.
docker_repository (str): Name of the connector docker image.
connector_version (str): Version of the connector.
"""
storage_client = _get_storage_client()
bucket = storage_client.bucket(bucket_name)

version_path = get_metadata_remote_file_path(docker_repository, connector_version)
rc_path = get_metadata_remote_file_path(docker_repository, RELEASE_CANDIDATE_GCS_FOLDER_NAME)

version_blob = bucket.blob(version_path)
rc_blob = bucket.blob(rc_path)

if not version_blob.exists():
raise FileNotFoundError(f"Version metadata file {version_path} does not exist in the bucket. ")
if not rc_blob.exists():
raise FileNotFoundError(f"Release candidate metadata file {rc_path} does not exist in the bucket. ")
if rc_blob.md5_hash != version_blob.md5_hash:
raise ValueError(
f"Release candidate metadata file {rc_path} hash does not match the version metadata file {version_path} hash. Unsafe to delete."
)

deleted_files = []
rc_blob.delete()
deleted_files.append(
DeletedFile(
id="release_candidate_metadata",
deleted=True,
description="release candidate metadata",
blob_id=rc_blob.id,
)
)
version_blob.delete()
deleted_files.append(
DeletedFile(
id="version_metadata",
deleted=True,
description="versioned metadata",
blob_id=version_blob.id,
)
)

return MetadataDeleteInfo(
metadata_deleted=True,
deleted_files=deleted_files,
)


def promote_release_candidate_in_gcs(
bucket_name: str, docker_repository, connector_version
) -> Tuple[MetadataUploadInfo, MetadataDeleteInfo]:
"""Promote a release candidate to the latest version in a GCS bucket.
The release candidate metadata file will be copied to the latest metadata file and then deleted.
We first check that the release candidate metadata file hash matches the version metadata file hash.
Args:
bucket_name (str): Name of the GCS bucket from which the release candidate will be deleted.
docker_repository (str): Name of the connector docker image.
connector_version (str): Version of the connector.
"""

storage_client = _get_storage_client()
bucket = storage_client.bucket(bucket_name)

version_path = get_metadata_remote_file_path(docker_repository, connector_version)
rc_path = get_metadata_remote_file_path(docker_repository, RELEASE_CANDIDATE_GCS_FOLDER_NAME)
latest_path = get_metadata_remote_file_path(docker_repository, LATEST_GCS_FOLDER_NAME)

version_blob = bucket.blob(version_path)
latest_blob = bucket.blob(latest_path)
rc_blob = bucket.blob(rc_path)

if not version_blob.exists():
raise FileNotFoundError(f"Version metadata file {version_path} does not exist in the bucket.")
if not rc_blob.exists():
raise FileNotFoundError(f"Release candidate metadata file {rc_path} does not exist in the bucket.")

if rc_blob.md5_hash != version_blob.md5_hash:
raise ValueError(
f"Release candidate metadata file {rc_path} hash does not match the version metadata file {version_path} hash. Unsafe to promote."
)

uploaded_files = []
deleted_files = []

bucket.copy_blob(rc_blob, bucket, latest_blob)
uploaded_files.append(
UploadedFile(
id="latest_metadata",
uploaded=True,
description="latest metadata",
blob_id=latest_blob.id,
)
)

rc_blob.delete()
deleted_files.append(
DeletedFile(
id="release_candidate_metadata",
deleted=True,
description="release candidate metadata",
blob_id=rc_blob.id,
)
)

return MetadataUploadInfo(
metadata_uploaded=True,
metadata_file_path=str(version_path),
uploaded_files=uploaded_files,
), MetadataDeleteInfo(
metadata_deleted=True,
deleted_files=deleted_files,
)
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/metadata_service/lib/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metadata-service"
version = "0.13.1"
version = "0.14.0"
description = ""
authors = ["Ben Church <ben@airbyte.io>"]
readme = "README.md"
Expand Down

0 comments on commit f54609d

Please sign in to comment.