Skip to content

Commit

Permalink
metadata service: upload metadata files to gcs (airbytehq#25115)
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored and btkcodedev committed Apr 26, 2023
1 parent 1ffe136 commit 941b5d9
Show file tree
Hide file tree
Showing 7 changed files with 782 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import click
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import pathlib

import click
from metadata_service.gcs_upload import upload_metadata_to_gcs
from metadata_service.validators.metadata_validator import validate_metadata_file
from pydantic import ValidationError


@click.group(help="Airbyte Metadata Service top-level command group.")
Expand All @@ -12,7 +17,7 @@ def metadata_service():
@metadata_service.command(help="Validate a given metadata YAML file.")
@click.argument("file_path", type=click.Path(exists=True, path_type=pathlib.Path))
def validate(file_path: pathlib.Path):
file_path = file_path if not file_path.is_dir() else file_path / "metadata.yml"
file_path = file_path if not file_path.is_dir() else file_path / "metadata.yaml"

click.echo(f"Validating {file_path}...")

Expand All @@ -23,3 +28,22 @@ def validate(file_path: pathlib.Path):
click.echo(f"{file_path} is not a valid ConnectorMetadataDefinitionV1 YAML file.")
click.echo(str(error))
exit(1)


@metadata_service.command(help="Upload a metadata YAML file to a GCS bucket.")
@click.argument("metadata-file-path", type=click.Path(exists=True, path_type=pathlib.Path))
@click.argument("bucket-name", type=click.STRING)
@click.option(
"--service-account-file-path", "-sa", type=click.Path(exists=True, path_type=pathlib.Path), envvar="GOOGLE_APPLICATION_CREDENTIALS"
)
def upload(metadata_file_path: pathlib.Path, bucket_name: str, service_account_file_path: pathlib.Path):
metadata_file_path = metadata_file_path if not metadata_file_path.is_dir() else metadata_file_path / "metadata.yaml"
try:
uploaded, blob_id = upload_metadata_to_gcs(bucket_name, metadata_file_path, service_account_file_path)
except (ValidationError, FileNotFoundError) as e:
click.secho(f"The metadata file could not be uploaded: {str(e)}", color="red")
exit(1)
if uploaded:
click.secho(f"The metadata file {metadata_file_path} was uploaded to {blob_id}.", color="green")
else:
click.secho(f"The metadata file {metadata_file_path} was not uploaded.", color="yellow")
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from pathlib import Path
from typing import Tuple

import yaml
from google.cloud import storage
from google.oauth2 import service_account
from metadata_service.models.generated.ConnectorMetadataDefinitionV1 import ConnectorMetadataDefinitionV1


def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path, service_account_file_path: Path) -> Tuple[bool, str]:
"""Upload a metadata file to a GCS bucket.
If the per 'version' key already exists it won't be overwritten.
Also updates the 'latest' key on each new version.
Args:
bucket_name (str): Name of the GCS bucket to which the metadata file will be uploade.
metadata_file_path (Path): Path to the metadata file.
service_account_file_path (Path): Path to the JSON file with the service account allowed to read and write on the bucket.
Returns:
Tuple[bool, str]: Whether the metadata file was uploaded and its blob id.
"""
uploaded = False
raw_metadata = yaml.safe_load(metadata_file_path.read_text())
metadata = ConnectorMetadataDefinitionV1.parse_obj(raw_metadata)

credentials = service_account.Credentials.from_service_account_file(service_account_file_path)
storage_client = storage.Client(credentials=credentials)
bucket = storage_client.bucket(bucket_name)

version_blob = bucket.blob(f"metadata/{metadata.data.dockerRepository}/{metadata.data.dockerImageTag}/metadata.yaml")
latest_blob = bucket.blob(f"metadata/{metadata.data.dockerRepository}/latest/metadata.yaml")
if not version_blob.exists():
version_blob.upload_from_filename(str(metadata_file_path))
uploaded = True
if version_blob.etag != latest_blob.etag:
latest_blob.upload_from_filename(str(metadata_file_path))
return uploaded, version_blob.id
Loading

0 comments on commit 941b5d9

Please sign in to comment.