diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/commands.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/commands.py index c16512368747..fbb968d87062 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/commands.py +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/commands.py @@ -5,8 +5,14 @@ import pathlib import click -from metadata_service.constants import METADATA_FILE_NAME -from metadata_service.gcs_upload import MetadataUploadInfo, upload_metadata_to_gcs +from metadata_service.constants import METADATA_FILE_NAME, RELEASE_CANDIDATE_GCS_FOLDER_NAME +from metadata_service.gcs_upload import ( + MetadataDeleteInfo, + MetadataUploadInfo, + delete_release_candidate_from_gcs, + promote_release_candidate_in_gcs, + upload_metadata_to_gcs, +) from metadata_service.validators.metadata_validator import PRE_UPLOAD_VALIDATORS, ValidatorOptions, validate_and_load from pydantic import ValidationError @@ -24,6 +30,17 @@ def log_metadata_upload_info(metadata_upload_info: MetadataUploadInfo): ) +def log_metadata_deletion_info(metadata_deletion_info: MetadataDeleteInfo): + for remote_file in metadata_deletion_info.deleted_files: + if remote_file.deleted: + click.secho(f"The {remote_file.description} was deleted ({remote_file.blob_id}).", fg="green") + else: + click.secho( + f"The {remote_file.description} was not deleted ({remote_file.blob_id}).", + fg="red", + ) + + @click.group(help="Airbyte Metadata Service top-level command group.") def metadata_service(): pass @@ -63,3 +80,30 @@ def upload(metadata_file_path: pathlib.Path, docs_path: pathlib.Path, bucket_nam exit(0) else: exit(5) + + +@metadata_service.command(help="Rollback a release candidate by deleting its metadata files from a GCS bucket.") +@click.argument("connector-docker-repository", type=click.STRING) +@click.argument("connector-version", type=click.STRING) +@click.argument("bucket-name", type=click.STRING) +def rollback_release_candidate(connector_docker_repository: str, connector_version: str, bucket_name: str): + try: + deletion_info = delete_release_candidate_from_gcs(bucket_name, connector_docker_repository, connector_version) + log_metadata_deletion_info(deletion_info) + except (FileNotFoundError, ValueError) as e: + click.secho(f"The release candidate could not be deleted: {str(e)}", fg="red") + exit(1) + + +@metadata_service.command(help="Promote a release candidate by moving its metadata files to the main release folder in a GCS bucket.") +@click.argument("connector-docker-repository", type=click.STRING) +@click.argument("connector-version", type=click.STRING) +@click.argument("bucket-name", type=click.STRING) +def promote_release_candidate(connector_docker_repository: str, connector_version: str, bucket_name: str): + try: + upload_info, deletion_info = promote_release_candidate_in_gcs(bucket_name, connector_docker_repository, connector_version) + log_metadata_upload_info(upload_info) + log_metadata_deletion_info(deletion_info) + except (FileNotFoundError, ValueError) as e: + click.secho(f"The release candidate could not be promoted: {str(e)}", fg="red") + exit(1) diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/gcs_upload.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/gcs_upload.py index 3624a7b9b5c6..97149cda0604 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/gcs_upload.py +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/gcs_upload.py @@ -44,6 +44,14 @@ class UploadedFile: blob_id: Optional[str] +@dataclass(frozen=True) +class DeletedFile: + id: str + deleted: bool + description: str + blob_id: Optional[str] + + @dataclass(frozen=True) class MetadataUploadInfo: metadata_uploaded: bool @@ -51,6 +59,12 @@ class MetadataUploadInfo: uploaded_files: List[UploadedFile] +@dataclass(frozen=True) +class MetadataDeleteInfo: + metadata_deleted: bool + deleted_files: List[DeletedFile] + + def get_metadata_remote_file_path(dockerRepository: str, version: str) -> str: """Get the path to the metadata file for a specific version of a connector. @@ -120,6 +134,14 @@ def _save_blob_to_gcs(blob_to_save: storage.blob.Blob, file_path: str, disable_c return True +def _delete_blob_from_gcs(blob_to_delete: storage.blob.Blob) -> bool: + """Deletes a blob from the bucket.""" + print(f"Deleting {blob_to_delete.name}...") + blob_to_delete.delete() + + return True + + def upload_file_if_changed( local_file_path: Path, bucket: storage.bucket.Bucket, blob_path: str, disable_cache: bool = False ) -> Tuple[bool, str]: @@ -279,6 +301,16 @@ def _apply_modifications_to_metadata_file(original_metadata_file_path: Path, val return _write_metadata_to_tmp_file(metadata) +def _get_storage_client() -> storage.Client: + gcs_creds = os.environ.get("GCS_CREDENTIALS") + if not gcs_creds: + raise ValueError("Please set the GCS_CREDENTIALS env var.") + + service_account_info = json.loads(gcs_creds) + credentials = service_account.Credentials.from_service_account_info(service_account_info) + return storage.Client(credentials=credentials) + + def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path, validator_opts: ValidatorOptions) -> MetadataUploadInfo: """Upload a metadata file to a GCS bucket. @@ -304,13 +336,8 @@ def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path, validator is_release_candidate = getattr(metadata.data.releases, "isReleaseCandidate", False) should_upload_release_candidate = is_release_candidate and not is_pre_release should_upload_latest = not is_release_candidate and not is_pre_release - gcs_creds = os.environ.get("GCS_CREDENTIALS") - if not gcs_creds: - raise ValueError("Please set the GCS_CREDENTIALS env var.") - service_account_info = json.loads(gcs_creds) - credentials = service_account.Credentials.from_service_account_info(service_account_info) - storage_client = storage.Client(credentials=credentials) + storage_client = _get_storage_client() bucket = storage_client.bucket(bucket_name) docs_path = Path(validator_opts.docs_path) @@ -401,3 +428,124 @@ def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path, validator ), ], ) + + +def delete_release_candidate_from_gcs(bucket_name: str, docker_repository, connector_version) -> MetadataDeleteInfo: + """Delete a release candidate from a GCS bucket. + The release candidate and version metadata file will be deleted. + We first check that the release candidate metadata file hash matches the version metadata file hash. + + Args: + bucket_name (str): Name of the GCS bucket from which the release candidate will be deleted. + docker_repository (str): Name of the connector docker image. + connector_version (str): Version of the connector. + """ + storage_client = _get_storage_client() + bucket = storage_client.bucket(bucket_name) + + version_path = get_metadata_remote_file_path(docker_repository, connector_version) + rc_path = get_metadata_remote_file_path(docker_repository, RELEASE_CANDIDATE_GCS_FOLDER_NAME) + + version_blob = bucket.blob(version_path) + rc_blob = bucket.blob(rc_path) + + if not version_blob.exists(): + raise FileNotFoundError(f"Version metadata file {version_path} does not exist in the bucket. ") + if not rc_blob.exists(): + raise FileNotFoundError(f"Release candidate metadata file {rc_path} does not exist in the bucket. ") + if rc_blob.md5_hash != version_blob.md5_hash: + raise ValueError( + f"Release candidate metadata file {rc_path} hash does not match the version metadata file {version_path} hash. Unsafe to delete." + ) + + deleted_files = [] + rc_blob.delete() + deleted_files.append( + DeletedFile( + id="release_candidate_metadata", + deleted=True, + description="release candidate metadata", + blob_id=rc_blob.id, + ) + ) + version_blob.delete() + deleted_files.append( + DeletedFile( + id="version_metadata", + deleted=True, + description="versioned metadata", + blob_id=version_blob.id, + ) + ) + + return MetadataDeleteInfo( + metadata_deleted=True, + deleted_files=deleted_files, + ) + + +def promote_release_candidate_in_gcs( + bucket_name: str, docker_repository, connector_version +) -> Tuple[MetadataUploadInfo, MetadataDeleteInfo]: + """Promote a release candidate to the latest version in a GCS bucket. + The release candidate metadata file will be copied to the latest metadata file and then deleted. + We first check that the release candidate metadata file hash matches the version metadata file hash. + + Args: + bucket_name (str): Name of the GCS bucket from which the release candidate will be deleted. + docker_repository (str): Name of the connector docker image. + connector_version (str): Version of the connector. + """ + + storage_client = _get_storage_client() + bucket = storage_client.bucket(bucket_name) + + version_path = get_metadata_remote_file_path(docker_repository, connector_version) + rc_path = get_metadata_remote_file_path(docker_repository, RELEASE_CANDIDATE_GCS_FOLDER_NAME) + latest_path = get_metadata_remote_file_path(docker_repository, LATEST_GCS_FOLDER_NAME) + + version_blob = bucket.blob(version_path) + latest_blob = bucket.blob(latest_path) + rc_blob = bucket.blob(rc_path) + + if not version_blob.exists(): + raise FileNotFoundError(f"Version metadata file {version_path} does not exist in the bucket.") + if not rc_blob.exists(): + raise FileNotFoundError(f"Release candidate metadata file {rc_path} does not exist in the bucket.") + + if rc_blob.md5_hash != version_blob.md5_hash: + raise ValueError( + f"Release candidate metadata file {rc_path} hash does not match the version metadata file {version_path} hash. Unsafe to promote." + ) + + uploaded_files = [] + deleted_files = [] + + bucket.copy_blob(rc_blob, bucket, latest_blob) + uploaded_files.append( + UploadedFile( + id="latest_metadata", + uploaded=True, + description="latest metadata", + blob_id=latest_blob.id, + ) + ) + + rc_blob.delete() + deleted_files.append( + DeletedFile( + id="release_candidate_metadata", + deleted=True, + description="release candidate metadata", + blob_id=rc_blob.id, + ) + ) + + return MetadataUploadInfo( + metadata_uploaded=True, + metadata_file_path=str(version_path), + uploaded_files=uploaded_files, + ), MetadataDeleteInfo( + metadata_deleted=True, + deleted_files=deleted_files, + ) diff --git a/airbyte-ci/connectors/metadata_service/lib/pyproject.toml b/airbyte-ci/connectors/metadata_service/lib/pyproject.toml index d97f8edd7a1e..bbdeadf96d2d 100644 --- a/airbyte-ci/connectors/metadata_service/lib/pyproject.toml +++ b/airbyte-ci/connectors/metadata_service/lib/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metadata-service" -version = "0.13.1" +version = "0.14.0" description = "" authors = ["Ben Church "] readme = "README.md"