diff --git a/airbyte-ci/connectors/connector_ops/pyproject.toml b/airbyte-ci/connectors/connector_ops/pyproject.toml index 0ec46a0b32dc..45277370a3ae 100644 --- a/airbyte-ci/connectors/connector_ops/pyproject.toml +++ b/airbyte-ci/connectors/connector_ops/pyproject.toml @@ -15,7 +15,7 @@ requests = "^2.31" PyYAML = "^6.0" GitPython = "^3.1.29" pydantic = "^1.9" -PyGithub = "^1.58.0" +PyGithub = "^2" rich = "^13.0.0" pydash = "^6.0.2" google-cloud-storage = "^2.8.0" diff --git a/airbyte-ci/connectors/pipelines/README.md b/airbyte-ci/connectors/pipelines/README.md index b8a1cb03abea..942d554374dc 100644 --- a/airbyte-ci/connectors/pipelines/README.md +++ b/airbyte-ci/connectors/pipelines/README.md @@ -748,6 +748,7 @@ E.G.: running Poe tasks on the modified internal packages of the current branch: | Version | PR | Description | | ------- | ---------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------- | +| 4.15.0 | [#38322](https://github.com/airbytehq/airbyte/pull/38322) | Introduce a SecretStore abstraction to fetch connector secrets from metadata files. | | 4.14.1 | [#38582](https://github.com/airbytehq/airbyte/pull/38582) | Fixed bugs in `up_to_date` flags, `pull_request` version change logic. | | 4.14.0 | [#38281](https://github.com/airbytehq/airbyte/pull/38281) | Conditionally run test suites according to `connectorTestSuitesOptions` in metadata files. | | 4.13.3 | [#38221](https://github.com/airbytehq/airbyte/pull/38221) | Add dagster cloud dev deployment pipeline opitions | diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/commands.py index bdf47c38d154..b5b86bab4d5f 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/commands.py @@ -54,12 +54,11 @@ async def build(ctx: click.Context, use_host_gradle_dist_tar: bool, build_archit git_repo_url=ctx.obj["git_repo_url"], ci_report_bucket=ctx.obj["ci_report_bucket_name"], report_output_prefix=ctx.obj["report_output_prefix"], - use_remote_secrets=ctx.obj["use_remote_secrets"], gha_workflow_run_url=ctx.obj.get("gha_workflow_run_url"), dagger_logs_url=ctx.obj.get("dagger_logs_url"), pipeline_start_timestamp=ctx.obj.get("pipeline_start_timestamp"), ci_context=ctx.obj.get("ci_context"), - ci_gcs_credentials=ctx.obj["ci_gcs_credentials"], + ci_gcp_credentials=ctx.obj["ci_gcp_credentials"], use_local_cdk=ctx.obj.get("use_local_cdk"), enable_report_auto_open=ctx.obj.get("enable_report_auto_open"), use_host_gradle_dist_tar=use_host_gradle_dist_tar, diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/commands.py index ab047382ceb0..eee4923ccd65 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/commands.py @@ -33,12 +33,11 @@ async def bump_version( git_repo_url=ctx.obj["git_repo_url"], ci_report_bucket=ctx.obj["ci_report_bucket_name"], report_output_prefix=ctx.obj["report_output_prefix"], - use_remote_secrets=ctx.obj["use_remote_secrets"], gha_workflow_run_url=ctx.obj.get("gha_workflow_run_url"), dagger_logs_url=ctx.obj.get("dagger_logs_url"), pipeline_start_timestamp=ctx.obj.get("pipeline_start_timestamp"), ci_context=ctx.obj.get("ci_context"), - ci_gcs_credentials=ctx.obj["ci_gcs_credentials"], + ci_gcp_credentials=ctx.obj["ci_gcp_credentials"], ci_git_user=ctx.obj["ci_git_user"], ci_github_access_token=ctx.obj["ci_github_access_token"], enable_report_auto_open=False, diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/pipeline.py index f39f1c9c5be0..f09d536d1fc5 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/pipeline.py @@ -153,8 +153,9 @@ def __init__( async def get_repo_dir(self) -> Directory: if not self.repo_dir: - self.repo_dir = await self.context.get_repo_dir() - return self.repo_dir + repo_dir = await self.context.get_repo_dir() + self.repo_dir = repo_dir + return repo_dir async def _run(self) -> StepResult: result = await self.update_metadata() diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/commands.py index a4924f5e66bf..234b3bf6bb26 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/commands.py @@ -4,12 +4,13 @@ import os from pathlib import Path -from typing import List, Optional, Set, Tuple +from typing import List, Set, Tuple import asyncclick as click from connector_ops.utils import ConnectorLanguage, SupportLevelEnum, get_all_connectors_in_repo # type: ignore from pipelines import main_logger -from pipelines.cli.click_decorators import click_append_to_context_object, click_ignore_unused_kwargs, click_merge_args_into_context_obj +from pipelines.cli.airbyte_ci import wrap_in_secret +from pipelines.cli.click_decorators import click_ignore_unused_kwargs, click_merge_args_into_context_obj from pipelines.cli.lazy_group import LazyGroup from pipelines.helpers.connectors.modifed import ConnectorWithModifiedFiles, get_connector_modified_files, get_modified_connectors from pipelines.helpers.git import get_modified_files @@ -108,37 +109,6 @@ def validate_environment(is_local: bool) -> None: raise click.UsageError(f"When running in a CI context a {required_env_var} environment variable must be set.") -def should_use_remote_secrets(use_remote_secrets: Optional[bool]) -> bool: - """Check if the connector secrets should be loaded from Airbyte GSM or from the local secrets directory. - - Args: - use_remote_secrets (Optional[bool]): Whether to use remote connector secrets or local connector secrets according to user inputs. - - Raises: - click.UsageError: If the --use-remote-secrets flag was provided but no GCP_GSM_CREDENTIALS environment variable was found. - - Returns: - bool: Whether to use remote connector secrets (True) or local connector secrets (False). - """ - gcp_gsm_credentials_is_set = bool(os.getenv("GCP_GSM_CREDENTIALS")) - if use_remote_secrets is None: - if gcp_gsm_credentials_is_set: - main_logger.info("GCP_GSM_CREDENTIALS environment variable found, using remote connector secrets.") - return True - else: - main_logger.info("No GCP_GSM_CREDENTIALS environment variable found, using local connector secrets.") - return False - if use_remote_secrets: - if gcp_gsm_credentials_is_set: - main_logger.info("GCP_GSM_CREDENTIALS environment variable found, using remote connector secrets.") - return True - else: - raise click.UsageError("The --use-remote-secrets flag was provided but no GCP_GSM_CREDENTIALS environment variable was found.") - else: - main_logger.info("Using local connector secrets as the --use-local-secrets flag was provided") - return False - - @click.group( cls=LazyGroup, help="Commands related to connectors and connector acceptance tests.", @@ -157,12 +127,6 @@ def should_use_remote_secrets(use_remote_secrets: Optional[bool]) -> bool: "pull_request": "pipelines.airbyte_ci.connectors.pull_request.commands.pull_request", }, ) -@click.option( - "--use-remote-secrets/--use-local-secrets", - help="Use Airbyte GSM connector secrets or local connector secrets.", - type=bool, - default=None, -) @click.option( "--name", "names", @@ -223,6 +187,7 @@ def should_use_remote_secrets(use_remote_secrets: Optional[bool]) -> bool: type=click.STRING, required=False, envvar="DOCKER_HUB_USERNAME", + callback=wrap_in_secret, ) @click.option( "--docker-hub-password", @@ -230,9 +195,9 @@ def should_use_remote_secrets(use_remote_secrets: Optional[bool]) -> bool: type=click.STRING, required=False, envvar="DOCKER_HUB_PASSWORD", + callback=wrap_in_secret, ) @click_merge_args_into_context_obj -@click_append_to_context_object("use_remote_secrets", lambda ctx: should_use_remote_secrets(ctx.obj["use_remote_secrets"])) @click.pass_context @click_ignore_unused_kwargs async def connectors( diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py index ecded9c516c2..9c981bcb2005 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py @@ -14,7 +14,7 @@ import yaml # type: ignore from asyncer import asyncify -from dagger import Directory, Platform, Secret +from dagger import Directory, Platform from github import PullRequest from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID from pipelines.airbyte_ci.connectors.reports import ConnectorReport @@ -26,6 +26,7 @@ from pipelines.helpers.slack import send_message_to_webhook from pipelines.helpers.utils import METADATA_FILE_NAME from pipelines.models.contexts.pipeline_context import PipelineContext +from pipelines.models.secrets import LocalDirectorySecretStore, Secret, SecretStore if TYPE_CHECKING: from pathlib import Path as NativePath @@ -54,11 +55,10 @@ def __init__( diffed_branch: str, git_repo_url: str, report_output_prefix: str, - use_remote_secrets: bool = True, ci_report_bucket: Optional[str] = None, - ci_gcs_credentials: Optional[str] = None, + ci_gcp_credentials: Optional[Secret] = None, ci_git_user: Optional[str] = None, - ci_github_access_token: Optional[str] = None, + ci_github_access_token: Optional[Secret] = None, connector_acceptance_test_image: str = DEFAULT_CONNECTOR_ACCEPTANCE_TEST_IMAGE, gha_workflow_run_url: Optional[str] = None, dagger_logs_url: Optional[str] = None, @@ -72,13 +72,14 @@ def __init__( use_local_cdk: bool = False, use_host_gradle_dist_tar: bool = False, enable_report_auto_open: bool = True, - docker_hub_username: Optional[str] = None, - docker_hub_password: Optional[str] = None, - s3_build_cache_access_key_id: Optional[str] = None, - s3_build_cache_secret_key: Optional[str] = None, + docker_hub_username: Optional[Secret] = None, + docker_hub_password: Optional[Secret] = None, + s3_build_cache_access_key_id: Optional[Secret] = None, + s3_build_cache_secret_key: Optional[Secret] = None, concurrent_cat: Optional[bool] = False, run_step_options: RunStepOptions = RunStepOptions(), targeted_platforms: Sequence[Platform] = BUILD_PLATFORMS, + secret_stores: Dict[str, SecretStore] | None = None, ) -> None: """Initialize a connector context. @@ -90,7 +91,6 @@ def __init__( diffed_branch: str: The branch to compare the current branch against. git_repo_url: str: The URL of the git repository. report_output_prefix (str): The S3 key to upload the test report to. - use_remote_secrets (bool, optional): Whether to download secrets for GSM or use the local secrets. Defaults to True. connector_acceptance_test_image (Optional[str], optional): The image to use to run connector acceptance tests. Defaults to DEFAULT_CONNECTOR_ACCEPTANCE_TEST_IMAGE. gha_workflow_run_url (Optional[str], optional): URL to the github action workflow run. Only valid for CI run. Defaults to None. dagger_logs_url (Optional[str], optional): URL to the dagger logs. Only valid for CI run. Defaults to None. @@ -102,17 +102,16 @@ def __init__( code_tests_only (bool, optional): Whether to ignore non-code tests like QA and metadata checks. Defaults to False. use_host_gradle_dist_tar (bool, optional): Used when developing java connectors with gradle. Defaults to False. enable_report_auto_open (bool, optional): Open HTML report in browser window. Defaults to True. - docker_hub_username (Optional[str], optional): Docker Hub username to use to read registries. Defaults to None. - docker_hub_password (Optional[str], optional): Docker Hub password to use to read registries. Defaults to None. - s3_build_cache_access_key_id (Optional[str], optional): Gradle S3 Build Cache credentials. Defaults to None. - s3_build_cache_secret_key (Optional[str], optional): Gradle S3 Build Cache credentials. Defaults to None. + docker_hub_username (Optional[Secret], optional): Docker Hub username to use to read registries. Defaults to None. + docker_hub_password (Optional[Secret], optional): Docker Hub password to use to read registries. Defaults to None. + s3_build_cache_access_key_id (Optional[Secret], optional): Gradle S3 Build Cache credentials. Defaults to None. + s3_build_cache_secret_key (Optional[Secret], optional): Gradle S3 Build Cache credentials. Defaults to None. concurrent_cat (bool, optional): Whether to run the CAT tests in parallel. Defaults to False. targeted_platforms (Optional[Iterable[Platform]], optional): The platforms to build the connector image for. Defaults to BUILD_PLATFORMS. """ self.pipeline_name = pipeline_name self.connector = connector - self.use_remote_secrets = use_remote_secrets self.connector_acceptance_test_image = connector_acceptance_test_image self._secrets_dir: Optional[Directory] = None self._updated_secrets_dir: Optional[Directory] = None @@ -127,7 +126,6 @@ def __init__( self.s3_build_cache_access_key_id = s3_build_cache_access_key_id self.s3_build_cache_secret_key = s3_build_cache_secret_key self.concurrent_cat = concurrent_cat - self._connector_secrets: Optional[Dict[str, Secret]] = None self.targeted_platforms = targeted_platforms super().__init__( @@ -146,25 +144,14 @@ def __init__( reporting_slack_channel=reporting_slack_channel, pull_request=pull_request, ci_report_bucket=ci_report_bucket, - ci_gcs_credentials=ci_gcs_credentials, + ci_gcp_credentials=ci_gcp_credentials, ci_git_user=ci_git_user, ci_github_access_token=ci_github_access_token, run_step_options=self._skip_metadata_disabled_test_suites(run_step_options), enable_report_auto_open=enable_report_auto_open, + secret_stores=secret_stores, ) - @property - def s3_build_cache_access_key_id_secret(self) -> Optional[Secret]: - if self.s3_build_cache_access_key_id: - return self.dagger_client.set_secret("s3_build_cache_access_key_id", self.s3_build_cache_access_key_id) - return None - - @property - def s3_build_cache_secret_key_secret(self) -> Optional[Secret]: - if self.s3_build_cache_access_key_id and self.s3_build_cache_secret_key: - return self.dagger_client.set_secret("s3_build_cache_secret_key", self.s3_build_cache_secret_key) - return None - @property def modified_files(self) -> FrozenSet[NativePath]: return self.connector.modified_files @@ -195,7 +182,7 @@ def live_tests_dir(self) -> Directory: @property def should_save_updated_secrets(self) -> bool: - return self.use_remote_secrets and self.updated_secrets_dir is not None + return self.ci_gcp_credentials is not None and self.updated_secrets_dir is not None @property def host_image_export_dir_path(self) -> str: @@ -222,21 +209,15 @@ def docker_image(self) -> str: return f"{self.docker_repository}:{self.docker_image_tag}" @property - def docker_hub_username_secret(self) -> Optional[Secret]: - if self.docker_hub_username is None: - return None - return self.dagger_client.set_secret("docker_hub_username", self.docker_hub_username) + def local_secret_store_name(self) -> str: + return f"{self.connector.technical_name}-local" @property - def docker_hub_password_secret(self) -> Optional[Secret]: - if self.docker_hub_password is None: - return None - return self.dagger_client.set_secret("docker_hub_password", self.docker_hub_password) - - async def get_connector_secrets(self) -> Dict[str, Secret]: - if self._connector_secrets is None: - self._connector_secrets = await secrets.get_connector_secrets(self) - return self._connector_secrets + def local_secret_store(self) -> Optional[LocalDirectorySecretStore]: + connector_secrets_path = self.connector.code_directory / "secrets" + if connector_secrets_path.is_dir(): + return LocalDirectorySecretStore(connector_secrets_path) + return None async def get_connector_dir(self, exclude: Optional[List[str]] = None, include: Optional[List[str]] = None) -> Directory: """Get the connector under test source code directory. diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_base_image/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_base_image/commands.py index e0d75537e1b0..f2e95a292400 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_base_image/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_base_image/commands.py @@ -56,12 +56,11 @@ async def migrate_to_base_image( git_repo_url=ctx.obj["git_repo_url"], ci_report_bucket=ctx.obj["ci_report_bucket_name"], report_output_prefix=ctx.obj["report_output_prefix"], - use_remote_secrets=ctx.obj["use_remote_secrets"], gha_workflow_run_url=ctx.obj.get("gha_workflow_run_url"), dagger_logs_url=ctx.obj.get("dagger_logs_url"), pipeline_start_timestamp=ctx.obj.get("pipeline_start_timestamp"), ci_context=ctx.obj.get("ci_context"), - ci_gcs_credentials=ctx.obj["ci_gcs_credentials"], + ci_gcp_credentials=ctx.obj["ci_gcp_credentials"], ci_git_user=ctx.obj["ci_git_user"], ci_github_access_token=ctx.obj["ci_github_access_token"], enable_report_auto_open=False, diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_poetry/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_poetry/commands.py index 2722afd97a77..edb6d1d802fd 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_poetry/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_poetry/commands.py @@ -43,12 +43,11 @@ async def migrate_to_poetry(ctx: click.Context, changelog: bool, bump: str | Non git_repo_url=ctx.obj["git_repo_url"], ci_report_bucket=ctx.obj["ci_report_bucket_name"], report_output_prefix=ctx.obj["report_output_prefix"], - use_remote_secrets=ctx.obj["use_remote_secrets"], gha_workflow_run_url=ctx.obj.get("gha_workflow_run_url"), dagger_logs_url=ctx.obj.get("dagger_logs_url"), pipeline_start_timestamp=ctx.obj.get("pipeline_start_timestamp"), ci_context=ctx.obj.get("ci_context"), - ci_gcs_credentials=ctx.obj["ci_gcs_credentials"], + ci_gcp_credentials=ctx.obj["ci_gcp_credentials"], ci_git_user=ctx.obj["ci_git_user"], ci_github_access_token=ctx.obj["ci_github_access_token"], enable_report_auto_open=False, diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/pipeline.py index b4055029a74f..3f31926874b0 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/pipeline.py @@ -98,9 +98,7 @@ async def run_connectors_pipelines( docker_hub_password = contexts[0].docker_hub_password if docker_hub_username and docker_hub_password: - docker_hub_username_secret = dagger_client.set_secret("DOCKER_HUB_USERNAME", docker_hub_username) - docker_hub_password_secret = dagger_client.set_secret("DOCKER_HUB_PASSWORD", docker_hub_password) - dockerd_service = docker.with_global_dockerd_service(dagger_client, docker_hub_username_secret, docker_hub_password_secret) + dockerd_service = docker.with_global_dockerd_service(dagger_client, docker_hub_username, docker_hub_password) else: dockerd_service = docker.with_global_dockerd_service(dagger_client) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/commands.py index 66388d4d166a..658b76857fc6 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/commands.py @@ -11,8 +11,10 @@ from pipelines.cli.click_decorators import click_ci_requirements_option from pipelines.cli.confirm_prompt import confirm from pipelines.cli.dagger_pipeline_command import DaggerPipelineCommand +from pipelines.cli.secrets import wrap_gcp_credentials_in_secret, wrap_in_secret from pipelines.consts import DEFAULT_PYTHON_PACKAGE_REGISTRY_CHECK_URL, DEFAULT_PYTHON_PACKAGE_REGISTRY_URL, ContextState from pipelines.helpers.utils import fail_if_missing_docker_hub_creds +from pipelines.models.secrets import Secret @click.command(cls=DaggerPipelineCommand, help="Publish all images for the selected connectors.") @@ -24,6 +26,7 @@ type=click.STRING, required=True, envvar="SPEC_CACHE_GCS_CREDENTIALS", + callback=wrap_gcp_credentials_in_secret, ) @click.option( "--spec-cache-bucket-name", @@ -38,6 +41,7 @@ type=click.STRING, required=True, envvar="METADATA_SERVICE_GCS_CREDENTIALS", + callback=wrap_gcp_credentials_in_secret, ) @click.option( "--metadata-service-bucket-name", @@ -64,6 +68,7 @@ help="Access token for python registry", type=click.STRING, envvar="PYTHON_REGISTRY_TOKEN", + callback=wrap_in_secret, ) @click.option( "--python-registry-url", @@ -83,20 +88,17 @@ async def publish( ctx: click.Context, pre_release: bool, - spec_cache_gcs_credentials: str, + spec_cache_gcs_credentials: Secret, spec_cache_bucket_name: str, metadata_service_bucket_name: str, - metadata_service_gcs_credentials: str, + metadata_service_gcs_credentials: Secret, slack_webhook: str, slack_channel: str, - python_registry_token: str, + python_registry_token: Secret, python_registry_url: str, python_registry_check_url: str, ) -> bool: - ctx.obj["spec_cache_gcs_credentials"] = spec_cache_gcs_credentials - ctx.obj["spec_cache_bucket_name"] = spec_cache_bucket_name - ctx.obj["metadata_service_bucket_name"] = metadata_service_bucket_name - ctx.obj["metadata_service_gcs_credentials"] = metadata_service_gcs_credentials + if ctx.obj["is_local"]: confirm( "Publishing from a local environment is not recommended and requires to be logged in Airbyte's DockerHub registry, do you want to continue?", @@ -114,8 +116,8 @@ async def publish( spec_cache_bucket_name=spec_cache_bucket_name, metadata_service_gcs_credentials=metadata_service_gcs_credentials, metadata_bucket_name=metadata_service_bucket_name, - docker_hub_username=ctx.obj["docker_hub_username"], - docker_hub_password=ctx.obj["docker_hub_password"], + docker_hub_username=Secret("docker_hub_username", ctx.obj["secret_stores"]["in_memory"]), + docker_hub_password=Secret("docker_hub_password", ctx.obj["secret_stores"]["in_memory"]), slack_webhook=slack_webhook, reporting_slack_channel=slack_channel, ci_report_bucket=ctx.obj["ci_report_bucket_name"], @@ -129,7 +131,7 @@ async def publish( dagger_logs_url=ctx.obj.get("dagger_logs_url"), pipeline_start_timestamp=ctx.obj.get("pipeline_start_timestamp"), ci_context=ctx.obj.get("ci_context"), - ci_gcs_credentials=ctx.obj["ci_gcs_credentials"], + ci_gcp_credentials=ctx.obj["ci_gcp_credentials"], pull_request=ctx.obj.get("pull_request"), s3_build_cache_access_key_id=ctx.obj.get("s3_build_cache_access_key_id"), s3_build_cache_secret_key=ctx.obj.get("s3_build_cache_secret_key"), diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/context.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/context.py index 35dadd969c26..250d4c51ba20 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/context.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/context.py @@ -7,29 +7,26 @@ from typing import Optional import asyncclick as click -from dagger import Secret from github import PullRequest from pipelines.airbyte_ci.connectors.context import ConnectorContext from pipelines.consts import ContextState from pipelines.helpers.connectors.modifed import ConnectorWithModifiedFiles -from pipelines.helpers.gcs import sanitize_gcs_credentials from pipelines.helpers.utils import format_duration +from pipelines.models.secrets import Secret class PublishConnectorContext(ConnectorContext): - docker_hub_username_secret: Secret - docker_hub_password_secret: Secret - def __init__( self, connector: ConnectorWithModifiedFiles, pre_release: bool, - spec_cache_gcs_credentials: str, + spec_cache_gcs_credentials: Secret, spec_cache_bucket_name: str, - metadata_service_gcs_credentials: str, + metadata_service_gcs_credentials: Secret, metadata_bucket_name: str, - docker_hub_username: str, - docker_hub_password: str, + docker_hub_username: Secret, + docker_hub_password: Secret, + ci_gcp_credentials: Secret, slack_webhook: str, reporting_slack_channel: str, ci_report_bucket: str, @@ -45,18 +42,17 @@ def __init__( dagger_logs_url: Optional[str] = None, pipeline_start_timestamp: Optional[int] = None, ci_context: Optional[str] = None, - ci_gcs_credentials: Optional[str] = None, pull_request: Optional[PullRequest.PullRequest] = None, - s3_build_cache_access_key_id: Optional[str] = None, - s3_build_cache_secret_key: Optional[str] = None, + s3_build_cache_access_key_id: Optional[Secret] = None, + s3_build_cache_secret_key: Optional[Secret] = None, use_local_cdk: bool = False, - python_registry_token: Optional[str] = None, + python_registry_token: Optional[Secret] = None, ) -> None: self.pre_release = pre_release self.spec_cache_bucket_name = spec_cache_bucket_name self.metadata_bucket_name = metadata_bucket_name - self.spec_cache_gcs_credentials = sanitize_gcs_credentials(spec_cache_gcs_credentials) - self.metadata_service_gcs_credentials = sanitize_gcs_credentials(metadata_service_gcs_credentials) + self.spec_cache_gcs_credentials = spec_cache_gcs_credentials + self.metadata_service_gcs_credentials = metadata_service_gcs_credentials self.python_registry_token = python_registry_token self.python_registry_url = python_registry_url self.python_registry_check_url = python_registry_check_url @@ -82,7 +78,7 @@ def __init__( ci_context=ci_context, slack_webhook=slack_webhook, reporting_slack_channel=reporting_slack_channel, - ci_gcs_credentials=ci_gcs_credentials, + ci_gcp_credentials=ci_gcp_credentials, should_save_report=True, use_local_cdk=use_local_cdk, docker_hub_username=docker_hub_username, @@ -91,13 +87,12 @@ def __init__( s3_build_cache_secret_key=s3_build_cache_secret_key, ) - @property - def metadata_service_gcs_credentials_secret(self) -> Secret: - return self.dagger_client.set_secret("metadata_service_gcs_credentials", self.metadata_service_gcs_credentials) - - @property - def spec_cache_gcs_credentials_secret(self) -> Secret: - return self.dagger_client.set_secret("spec_cache_gcs_credentials", self.spec_cache_gcs_credentials) + # Reassigning current class required instance attribute + # Which are optional in the super class + # for type checking + self.docker_hub_username: Secret = docker_hub_username + self.docker_hub_password: Secret = docker_hub_password + self.ci_gcp_credentials: Secret = ci_gcp_credentials @property def pre_release_suffix(self) -> str: diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py index 7cd7e4c0a45c..a606e7e68fb8 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py @@ -123,7 +123,7 @@ async def _run(self, built_containers_per_platform: Dict[Platform, Container]) - file, key, self.context.metadata_bucket_name, - self.context.metadata_service_gcs_credentials_secret, + self.context.metadata_service_gcs_credentials, flags=['--cache-control="no-cache"'], ) if exit_code != 0: @@ -282,7 +282,7 @@ async def _run(self, built_connector: Container) -> StepResult: file, key, self.context.spec_cache_bucket_name, - self.context.spec_cache_gcs_credentials_secret, + self.context.spec_cache_gcs_credentials, flags=['--cache-control="no-cache"'], ) if exit_code != 0: @@ -309,9 +309,9 @@ async def run_connector_publish_pipeline(context: PublishConnectorContext, semap metadata_upload_step = MetadataUpload( context=context, - metadata_service_gcs_credentials_secret=context.metadata_service_gcs_credentials_secret, - docker_hub_username_secret=context.docker_hub_username_secret, - docker_hub_password_secret=context.docker_hub_password_secret, + metadata_service_gcs_credentials=context.metadata_service_gcs_credentials, + docker_hub_username=context.docker_hub_username, + docker_hub_password=context.docker_hub_password, metadata_bucket_name=context.metadata_bucket_name, pre_release=context.pre_release, pre_release_tag=context.docker_image_tag, diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/pull_request/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/pull_request/pipeline.py index f5f7fa62bb27..b7b8070b246a 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/pull_request/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/pull_request/pipeline.py @@ -10,7 +10,7 @@ from pathlib import Path from typing import TYPE_CHECKING, List, Set -from github import Github, GithubException, InputGitTreeElement, UnknownObjectException +from github import Auth, Github, GithubException, InputGitTreeElement, UnknownObjectException from pipelines import main_logger from pipelines.airbyte_ci.connectors.bump_version.pipeline import ( AddChangelogEntry, @@ -160,7 +160,7 @@ async def create_github_pull_request( if not context.ci_github_access_token: raise Exception("GitHub access token is required to create a pull request. Set the CI_GITHUB_ACCESS_TOKEN environment variable.") - g = Github(context.ci_github_access_token) + g = Github(auth=Auth.Token(context.ci_github_access_token.value)) connector = context.connector connector_full_name = connector.technical_name logger = main_logger diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/reports.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/reports.py index adbc57dce628..3888ae8550ad 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/reports.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/reports.py @@ -152,7 +152,7 @@ async def save_html_report(self) -> None: dagger_client=self.pipeline_context.dagger_client, bucket=self.pipeline_context.ci_report_bucket, # type: ignore key=self.html_report_remote_storage_key, - gcs_credentials=self.pipeline_context.ci_gcs_credentials_secret, # type: ignore + gcs_credentials=self.pipeline_context.ci_gcp_credentials, # type: ignore ) self.pipeline_context.logger.info(f"HTML report uploaded to {gcs_url}") diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py index d7745fe392f2..c153bed46edf 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py @@ -14,11 +14,12 @@ from pipelines.airbyte_ci.connectors.test.steps.common import RegressionTests from pipelines.cli.click_decorators import click_ci_requirements_option from pipelines.cli.dagger_pipeline_command import DaggerPipelineCommand -from pipelines.consts import LOCAL_BUILD_PLATFORM, ContextState +from pipelines.consts import LOCAL_BUILD_PLATFORM, MAIN_CONNECTOR_TESTING_SECRET_STORE_ALIAS, ContextState from pipelines.helpers.execution import argument_parsing from pipelines.helpers.execution.run_steps import RunStepOptions from pipelines.helpers.github import update_global_commit_status_check_for_tests from pipelines.helpers.utils import fail_if_missing_docker_hub_creds +from pipelines.models.secrets import GSMSecretStore from pipelines.models.steps import STEP_PARAMS GITHUB_GLOBAL_CONTEXT_FOR_TESTS = "Connectors CI tests" @@ -105,6 +106,11 @@ async def test( ctx.obj["global_status_check_context"] = global_status_check_context ctx.obj["global_status_check_description"] = global_status_check_description + if ctx.obj["ci_gcp_credentials"]: + ctx.obj["secret_stores"][MAIN_CONNECTOR_TESTING_SECRET_STORE_ALIAS] = GSMSecretStore(ctx.obj["ci_gcp_credentials"]) + else: + main_logger.warn(f"The credentials to connect to {MAIN_CONNECTOR_TESTING_SECRET_STORE_ALIAS} were are not defined.") + if only_steps and skip_steps: raise click.UsageError("Cannot use both --only-step and --skip-step at the same time.") if not only_steps: @@ -140,13 +146,12 @@ async def test( ci_github_access_token=ctx.obj["ci_github_access_token"], ci_report_bucket=ctx.obj["ci_report_bucket_name"], report_output_prefix=ctx.obj["report_output_prefix"], - use_remote_secrets=ctx.obj["use_remote_secrets"], gha_workflow_run_url=ctx.obj.get("gha_workflow_run_url"), dagger_logs_url=ctx.obj.get("dagger_logs_url"), pipeline_start_timestamp=ctx.obj.get("pipeline_start_timestamp"), ci_context=ctx.obj.get("ci_context"), pull_request=ctx.obj.get("pull_request"), - ci_gcs_credentials=ctx.obj["ci_gcs_credentials"], + ci_gcp_credentials=ctx.obj["ci_gcp_credentials"], code_tests_only=code_tests_only, use_local_cdk=ctx.obj.get("use_local_cdk"), s3_build_cache_access_key_id=ctx.obj.get("s3_build_cache_access_key_id"), @@ -156,6 +161,7 @@ async def test( concurrent_cat=concurrent_cat, run_step_options=run_step_options, targeted_platforms=[LOCAL_BUILD_PLATFORM], + secret_stores=ctx.obj["secret_stores"], ) for connector in ctx.obj["selected_connectors_with_modified_files"] ] diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py index 217b6e1b6f10..68dae9da4464 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py @@ -5,14 +5,14 @@ """This module groups steps made to run tests agnostic to a connector language.""" import datetime +import logging import os import time -import traceback from abc import ABC, abstractmethod from functools import cached_property from pathlib import Path from textwrap import dedent -from typing import ClassVar, List, Optional +from typing import ClassVar, Dict, List, Optional import requests # type: ignore import semver @@ -26,7 +26,106 @@ from pipelines.dagger.actions import secrets from pipelines.dagger.actions.python.poetry import with_poetry from pipelines.helpers.utils import METADATA_FILE_NAME, get_exec_result +from pipelines.models.secrets import Secret, SecretNotFoundError, SecretStore from pipelines.models.steps import STEP_PARAMS, MountPath, Step, StepResult, StepStatus +from pydash import find # type: ignore + + +def _handle_missing_secret_store( + secret_info: Dict[str, str | Dict[str, str]], raise_on_missing: bool, logger: Optional[logging.Logger] = None +) -> None: + assert isinstance(secret_info["secretStore"], dict), "The secretStore field must be a dict" + message = f"Secret {secret_info['name']} can't be retrieved as {secret_info['secretStore']['alias']} is not available" + if raise_on_missing: + raise SecretNotFoundError(message) + if logger is not None: + logger.warn(message) + + +def _process_secret( + secret_info: Dict[str, str | Dict[str, str]], + secret_stores: Dict[str, SecretStore], + raise_on_missing: bool, + logger: Optional[logging.Logger] = None, +) -> Optional[Secret]: + assert isinstance(secret_info["secretStore"], dict), "The secretStore field must be a dict" + secret_store_alias = secret_info["secretStore"]["alias"] + if secret_store_alias not in secret_stores: + _handle_missing_secret_store(secret_info, raise_on_missing, logger) + return None + else: + # All these asserts and casting are there to make MyPy happy + # The dict structure being nested MyPy can't figure if the values are str or dict + assert isinstance(secret_info["name"], str), "The secret name field must be a string" + if file_name := secret_info.get("fileName"): + assert isinstance(secret_info["fileName"], str), "The secret fileName must be a string" + file_name = str(secret_info["fileName"]) + else: + file_name = None + return Secret(secret_info["name"], secret_stores[secret_store_alias], file_name=file_name) + + +def get_secrets_from_connector_test_suites_option( + connector_test_suites_options: List[Dict[str, str | Dict[str, List[Dict[str, str | Dict[str, str]]]]]], + suite_name: str, + secret_stores: Dict[str, SecretStore], + raise_on_missing_secret_store: bool = True, + logger: logging.Logger | None = None, +) -> List[Secret]: + """Get secrets declared in metadata connectorTestSuitesOptions for a test suite name. + It will use the secret store alias declared in connectorTestSuitesOptions. + If the secret store is not available a warning or and error could be raised according to the raise_on_missing_secret_store parameter value. + We usually want to raise an error when running in CI context and log a warning when running locally, as locally we can fallback on local secrets. + + Args: + connector_test_suites_options (List[Dict[str, str | Dict]]): The connector under test test suite options + suite_name (str): The test suite name + secret_stores (Dict[str, SecretStore]): The available secrets stores + raise_on_missing_secret_store (bool, optional): Raise an error if the secret store declared in the connectorTestSuitesOptions is not available. Defaults to True. + logger (logging.Logger | None, optional): Logger to log a warning if the secret store declared in the connectorTestSuitesOptions is not available. Defaults to None. + + Raises: + SecretNotFoundError: Raised if the secret store declared in the connectorTestSuitesOptions is not available and raise_on_missing_secret_store is truthy. + + Returns: + List[Secret]: List of secrets declared in the connectorTestSuitesOptions for a test suite name. + """ + secrets: List[Secret] = [] + enabled_test_suite = find(connector_test_suites_options, lambda x: x["suite"] == suite_name) + + if enabled_test_suite and "testSecrets" in enabled_test_suite: + for secret_info in enabled_test_suite["testSecrets"]: + if secret := _process_secret(secret_info, secret_stores, raise_on_missing_secret_store, logger): + secrets.append(secret) + + return secrets + + +def get_connector_secrets_for_test_suite( + test_suite_name: str, context: ConnectorContext, connector_test_suites_options: List, local_secrets: List[Secret] +) -> List[Secret]: + """Get secrets to use for a test suite. + Always merge secrets declared in metadata's connectorTestSuiteOptions with secrets declared locally. + + Args: + test_suite_name (str): Name of the test suite to get secrets for + context (ConnectorContext): The current connector context + connector_test_suites_options (Dict): The current connector test suite options (from metadata) + local_secrets (List[Secret]): The local connector secrets. + + Returns: + List[Secret]: Secrets to use to run the passed test suite name. + """ + return ( + get_secrets_from_connector_test_suites_option( + connector_test_suites_options, + test_suite_name, + context.secret_stores, + raise_on_missing_secret_store=context.is_ci, + logger=context.logger, + ) + + local_secrets + ) class VersionCheck(Step, ABC): @@ -161,14 +260,9 @@ def __init__(self, context: ConnectorContext) -> None: internal_tools=[ MountPath(INTERNAL_TOOL_PATHS.CONNECTORS_QA.value), ], - secrets={ - k: v - for k, v in { - "DOCKER_HUB_USERNAME": context.docker_hub_username_secret, - "DOCKER_HUB_PASSWORD": context.docker_hub_password_secret, - }.items() - if v - }, + secret_env_variables={"DOCKER_HUB_USERNAME": context.docker_hub_username, "DOCKER_HUB_PASSWORD": context.docker_hub_password} + if context.docker_hub_username and context.docker_hub_password + else None, command=["connectors-qa", "run", f"--name={technical_name}"], ) @@ -212,14 +306,15 @@ def base_cat_command(self) -> List[str]: command += ["--numprocesses=auto"] # Using pytest-xdist to run tests in parallel, auto means using all available cores return command - def __init__(self, context: ConnectorContext, concurrent_test_run: Optional[bool] = False) -> None: + def __init__(self, context: ConnectorContext, secrets: List[Secret], concurrent_test_run: Optional[bool] = False) -> None: """Create a step to run acceptance tests for a connector if it has an acceptance test config file. Args: context (ConnectorContext): The current test context, providing a connector object, a dagger client and a repository directory. + secrets (List[Secret]): List of secrets to mount to the connector container under test. concurrent_test_run (Optional[bool], optional): Whether to run acceptance tests in parallel. Defaults to False. """ - super().__init__(context) + super().__init__(context, secrets) self.concurrent_test_run = concurrent_test_run async def get_cat_command(self, connector_dir: Directory) -> List[str]: @@ -295,7 +390,7 @@ async def _build_connector_acceptance_test(self, connector_under_test_container: .with_new_file("/tmp/container_id.txt", contents=str(connector_container_id)) .with_workdir("/test_input") .with_mounted_directory("/test_input", test_input) - .with_(await secrets.mounted_connector_secrets(self.context, self.CONTAINER_SECRETS_DIRECTORY)) + .with_(await secrets.mounted_connector_secrets(self.context, self.CONTAINER_SECRETS_DIRECTORY, self.secrets)) ) if "_EXPERIMENTAL_DAGGER_RUNNER_HOST" in os.environ: self.context.logger.info("Using experimental dagger runner host to run CAT with dagger-in-dagger") @@ -492,7 +587,7 @@ async def _build_regression_test_container(self, target_container_id: str) -> Co "config", "http-basic.airbyte-platform-internal-source", self.github_user, - self.context.ci_github_access_token or "", + self.context.ci_github_access_token.value if self.context.ci_github_access_token else "", ] ) # Add GCP credentials from the environment and point google to their location (also required for connection-retriever) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py index a4259b8f67c8..0b31a79d6411 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py @@ -17,7 +17,7 @@ from pipelines.airbyte_ci.connectors.build_image.steps.normalization import BuildOrPullNormalization from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID from pipelines.airbyte_ci.connectors.context import ConnectorContext -from pipelines.airbyte_ci.connectors.test.steps.common import AcceptanceTests +from pipelines.airbyte_ci.connectors.test.steps.common import AcceptanceTests, get_connector_secrets_for_test_suite from pipelines.airbyte_ci.steps.gradle import GradleTask from pipelines.consts import LOCAL_BUILD_PLATFORM from pipelines.dagger.actions.system import docker @@ -127,17 +127,27 @@ def _get_acceptance_test_steps(context: ConnectorContext) -> List[StepToRun]: """ Generate the steps to run the acceptance tests for a Java connector. """ + connector_test_suites_options = context.metadata.get("connectorTestSuitesOptions", []) + local_secrets = context.local_secret_store.get_all_secrets() if context.local_secret_store else [] + + integration_tests_secrets = get_connector_secrets_for_test_suite( + "integrationTests", context, connector_test_suites_options, local_secrets + ) + acceptance_tests_secrets = get_connector_secrets_for_test_suite( + "acceptanceTests", context, connector_test_suites_options, local_secrets + ) + # Run tests in parallel return [ StepToRun( id=CONNECTOR_TEST_STEP_ID.INTEGRATION, - step=IntegrationTests(context), + step=IntegrationTests(context, secrets=integration_tests_secrets), args=_create_integration_step_args_factory(context), depends_on=[CONNECTOR_TEST_STEP_ID.BUILD], ), StepToRun( id=CONNECTOR_TEST_STEP_ID.ACCEPTANCE, - step=AcceptanceTests(context, True), + step=AcceptanceTests(context, secrets=acceptance_tests_secrets, concurrent_test_run=True), args=lambda results: {"connector_under_test_container": results[CONNECTOR_TEST_STEP_ID.BUILD].output[LOCAL_BUILD_PLATFORM]}, depends_on=[CONNECTOR_TEST_STEP_ID.BUILD], ), @@ -149,9 +159,25 @@ def get_test_steps(context: ConnectorContext) -> STEP_TREE: Get all the tests steps for a Java connector. """ + connector_test_suites_options = context.metadata.get("connectorTestSuitesOptions", []) + local_secrets = context.local_secret_store.get_all_secrets() if context.local_secret_store else [] + + unit_tests_secrets = get_connector_secrets_for_test_suite( + "unitTests", + context, + connector_test_suites_options, + local_secrets, + ) + steps: STEP_TREE = [ [StepToRun(id=CONNECTOR_TEST_STEP_ID.BUILD_TAR, step=BuildConnectorDistributionTar(context))], - [StepToRun(id=CONNECTOR_TEST_STEP_ID.UNIT, step=UnitTests(context), depends_on=[CONNECTOR_TEST_STEP_ID.BUILD_TAR])], + [ + StepToRun( + id=CONNECTOR_TEST_STEP_ID.UNIT, + step=UnitTests(context, secrets=unit_tests_secrets), + depends_on=[CONNECTOR_TEST_STEP_ID.BUILD_TAR], + ) + ], [ StepToRun( id=CONNECTOR_TEST_STEP_ID.BUILD, diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py index 1a70d0eed5cf..a5997cb7b4b7 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py @@ -15,7 +15,7 @@ from pipelines.airbyte_ci.connectors.build_image.steps.python_connectors import BuildConnectorImages from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID from pipelines.airbyte_ci.connectors.context import ConnectorContext -from pipelines.airbyte_ci.connectors.test.steps.common import AcceptanceTests, RegressionTests +from pipelines.airbyte_ci.connectors.test.steps.common import AcceptanceTests, RegressionTests, get_connector_secrets_for_test_suite from pipelines.consts import LOCAL_BUILD_PLATFORM from pipelines.dagger.actions import secrets from pipelines.dagger.actions.python.poetry import with_poetry @@ -149,7 +149,7 @@ async def install_testing_environment( Returns: Container: The container with the test environment installed. """ - secret_mounting_function = await secrets.mounted_connector_secrets(self.context, "secrets") + secret_mounting_function = await secrets.mounted_connector_secrets(self.context, "secrets", self.secrets) container_with_test_deps = ( # Install the connector python package in /test_environment with the extra dependencies @@ -179,6 +179,7 @@ class UnitTests(PytestStep): title = "Unit tests" test_directory_name = "unit_tests" + common_test_dependencies = ["pytest-cov==4.1.0"] MINIMUM_COVERAGE_FOR_CERTIFIED_CONNECTORS = 90 @@ -243,6 +244,7 @@ class IntegrationTests(PytestStep): title = "Integration tests" test_directory_name = "integration_tests" + bind_to_docker_host = True @@ -250,12 +252,21 @@ def get_test_steps(context: ConnectorContext) -> STEP_TREE: """ Get all the tests steps for a Python connector. """ + connector_test_suites_options = context.metadata.get("connectorTestSuitesOptions", []) + local_secrets = context.local_secret_store.get_all_secrets() if context.local_secret_store else [] + unit_tests_secrets = get_connector_secrets_for_test_suite("unitTests", context, connector_test_suites_options, local_secrets) + integration_tests_secrets = get_connector_secrets_for_test_suite( + "integrationTests", context, connector_test_suites_options, local_secrets + ) + acceptance_tests_secrets = get_connector_secrets_for_test_suite( + "acceptanceTests", context, connector_test_suites_options, local_secrets + ) return [ [StepToRun(id=CONNECTOR_TEST_STEP_ID.BUILD, step=BuildConnectorImages(context))], [ StepToRun( id=CONNECTOR_TEST_STEP_ID.UNIT, - step=UnitTests(context), + step=UnitTests(context, secrets=unit_tests_secrets), args=lambda results: {"connector_under_test": results[CONNECTOR_TEST_STEP_ID.BUILD].output[LOCAL_BUILD_PLATFORM]}, depends_on=[CONNECTOR_TEST_STEP_ID.BUILD], ) @@ -263,7 +274,7 @@ def get_test_steps(context: ConnectorContext) -> STEP_TREE: [ StepToRun( id=CONNECTOR_TEST_STEP_ID.INTEGRATION, - step=IntegrationTests(context), + step=IntegrationTests(context, secrets=integration_tests_secrets), args=lambda results: {"connector_under_test": results[CONNECTOR_TEST_STEP_ID.BUILD].output[LOCAL_BUILD_PLATFORM]}, depends_on=[CONNECTOR_TEST_STEP_ID.BUILD], ), @@ -275,7 +286,7 @@ def get_test_steps(context: ConnectorContext) -> STEP_TREE: ), StepToRun( id=CONNECTOR_TEST_STEP_ID.ACCEPTANCE, - step=AcceptanceTests(context, context.concurrent_cat), + step=AcceptanceTests(context, secrets=acceptance_tests_secrets, concurrent_test_run=context.concurrent_cat), args=lambda results: {"connector_under_test_container": results[CONNECTOR_TEST_STEP_ID.BUILD].output[LOCAL_BUILD_PLATFORM]}, depends_on=[CONNECTOR_TEST_STEP_ID.BUILD], ), diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_base_image/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_base_image/commands.py index 9743c949a6ae..c86cf92cfc06 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_base_image/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_base_image/commands.py @@ -29,12 +29,11 @@ async def upgrade_base_image(ctx: click.Context, set_if_not_exists: bool, docker git_repo_url=ctx.obj["git_repo_url"], ci_report_bucket=ctx.obj["ci_report_bucket_name"], report_output_prefix=ctx.obj["report_output_prefix"], - use_remote_secrets=ctx.obj["use_remote_secrets"], gha_workflow_run_url=ctx.obj.get("gha_workflow_run_url"), dagger_logs_url=ctx.obj.get("dagger_logs_url"), pipeline_start_timestamp=ctx.obj.get("pipeline_start_timestamp"), ci_context=ctx.obj.get("ci_context"), - ci_gcs_credentials=ctx.obj["ci_gcs_credentials"], + ci_gcp_credentials=ctx.obj["ci_gcp_credentials"], ci_git_user=ctx.obj["ci_git_user"], ci_github_access_token=ctx.obj["ci_github_access_token"], enable_report_auto_open=False, diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/commands.py index 2ff88c895008..35ce460599bf 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/commands.py @@ -29,12 +29,11 @@ async def upgrade_cdk( git_repo_url=ctx.obj["git_repo_url"], ci_report_bucket=ctx.obj["ci_report_bucket_name"], report_output_prefix=ctx.obj["report_output_prefix"], - use_remote_secrets=ctx.obj["use_remote_secrets"], gha_workflow_run_url=ctx.obj.get("gha_workflow_run_url"), dagger_logs_url=ctx.obj.get("dagger_logs_url"), pipeline_start_timestamp=ctx.obj.get("pipeline_start_timestamp"), ci_context=ctx.obj.get("ci_context"), - ci_gcs_credentials=ctx.obj["ci_gcs_credentials"], + ci_gcp_credentials=ctx.obj["ci_gcp_credentials"], ci_git_user=ctx.obj["ci_git_user"], ci_github_access_token=ctx.obj["ci_github_access_token"], enable_report_auto_open=False, diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py index 84318823dc2f..5a9452f19940 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py @@ -18,6 +18,7 @@ from pipelines.helpers.execution.run_steps import STEP_TREE, StepToRun, run_steps from pipelines.helpers.utils import DAGGER_CONFIG, get_secret_host_variable from pipelines.models.reports import Report +from pipelines.models.secrets import Secret from pipelines.models.steps import MountPath, Step, StepResult # STEPS @@ -36,14 +37,9 @@ def __init__(self, context: ConnectorContext) -> None: internal_tools=[ MountPath(INTERNAL_TOOL_PATHS.METADATA_SERVICE.value), ], - secrets={ - k: v - for k, v in { - "DOCKER_HUB_USERNAME": context.docker_hub_username_secret, - "DOCKER_HUB_PASSWORD": context.docker_hub_password_secret, - }.items() - if v - }, + secret_env_variables={"DOCKER_HUB_USERNAME": context.docker_hub_username, "DOCKER_HUB_PASSWORD": context.docker_hub_password} + if context.docker_hub_username and context.docker_hub_password + else None, command=[ "metadata_service", "validate", @@ -61,9 +57,9 @@ def __init__( self, context: ConnectorContext, metadata_bucket_name: str, - metadata_service_gcs_credentials_secret: dagger.Secret, - docker_hub_username_secret: dagger.Secret, - docker_hub_password_secret: dagger.Secret, + metadata_service_gcs_credentials: Secret, + docker_hub_username: Secret, + docker_hub_password: Secret, pre_release: bool = False, pre_release_tag: Optional[str] = None, ) -> None: @@ -91,10 +87,10 @@ def __init__( internal_tools=[ MountPath(INTERNAL_TOOL_PATHS.METADATA_SERVICE.value), ], - secrets={ - "DOCKER_HUB_USERNAME": docker_hub_username_secret, - "DOCKER_HUB_PASSWORD": docker_hub_password_secret, - "GCS_CREDENTIALS": metadata_service_gcs_credentials_secret, + secret_env_variables={ + "DOCKER_HUB_USERNAME": docker_hub_username, + "DOCKER_HUB_PASSWORD": docker_hub_password, + "GCS_CREDENTIALS": metadata_service_gcs_credentials, }, env_variables={ # The cache buster ensures we always run the upload command (in case of remote bucket change) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/poetry/publish/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/poetry/publish/commands.py index 1f05c7ed14ff..492aac819311 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/poetry/publish/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/poetry/publish/commands.py @@ -14,9 +14,11 @@ from pipelines.airbyte_ci.steps.python_registry import PublishToPythonRegistry from pipelines.cli.confirm_prompt import confirm from pipelines.cli.dagger_pipeline_command import DaggerPipelineCommand +from pipelines.cli.secrets import wrap_in_secret from pipelines.consts import DEFAULT_PYTHON_PACKAGE_REGISTRY_CHECK_URL, DEFAULT_PYTHON_PACKAGE_REGISTRY_URL from pipelines.models.contexts.click_pipeline_context import ClickPipelineContext, pass_pipeline_context from pipelines.models.contexts.python_registry_publish import PythonRegistryPublishContext +from pipelines.models.secrets import Secret from pipelines.models.steps import StepStatus @@ -45,6 +47,7 @@ def _validate_python_version(_ctx: dict, _param: dict, value: Optional[str]) -> type=click.STRING, required=True, envvar="PYTHON_REGISTRY_TOKEN", + callback=wrap_in_secret, ) @click.option( "--python-registry-url", @@ -69,7 +72,7 @@ def _validate_python_version(_ctx: dict, _param: dict, value: Optional[str]) -> async def publish( ctx: click.Context, click_pipeline_context: ClickPipelineContext, - python_registry_token: str, + python_registry_token: Secret, python_registry_url: str, publish_name: Optional[str], publish_version: Optional[str], @@ -86,7 +89,7 @@ async def publish( dagger_logs_url=ctx.obj.get("dagger_logs_url"), pipeline_start_timestamp=ctx.obj.get("pipeline_start_timestamp"), ci_context=ctx.obj.get("ci_context"), - ci_gcs_credentials=ctx.obj["ci_gcs_credentials"], + ci_gcp_credentials=ctx.obj["ci_gcp_credentials"], python_registry_token=python_registry_token, registry=python_registry_url, registry_check_url=DEFAULT_PYTHON_PACKAGE_REGISTRY_CHECK_URL, diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/docker.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/docker.py index 41fc25981c07..ce4ec95b2611 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/docker.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/docker.py @@ -2,12 +2,13 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from typing import List, Optional +from typing import Dict, List, Optional import dagger from pipelines.dagger.actions.python.pipx import with_installed_pipx_package from pipelines.dagger.containers.python import with_python_base from pipelines.models.contexts.pipeline_context import PipelineContext +from pipelines.models.secrets import Secret from pipelines.models.steps import MountPath, Step, StepResult @@ -16,9 +17,9 @@ def __init__( self, title: str, context: PipelineContext, - paths_to_mount: List[MountPath] = [], - internal_tools: List[MountPath] = [], - secrets: dict[str, dagger.Secret | None] = {}, + paths_to_mount: Optional[List[MountPath]] = None, + internal_tools: Optional[List[MountPath]] = None, + secret_env_variables: Optional[Dict[str, Secret]] = None, env_variables: dict[str, str] = {}, working_directory: str = "/", command: Optional[List[str]] = None, @@ -30,7 +31,7 @@ def __init__( context (PipelineContext): context of the step paths_to_mount (List[MountPath], optional): directory paths to mount. Defaults to []. internal_tools (List[MountPath], optional): internal tools to install. Defaults to []. - secrets (dict[str, dagger.Secret], optional): secrets to add to container. Defaults to {}. + secret_env_variables (List[Tuple[str, Secret]], optional): secrets to add to container as environment variables, a tuple of env var name > Secret object . Defaults to []. env_variables (dict[str, str], optional): env variables to set in container. Defaults to {}. working_directory (str, optional): working directory to run the command in. Defaults to "/". command (Optional[List[str]], optional): The default command to run. Defaults to None. @@ -38,10 +39,10 @@ def __init__( self._title = title super().__init__(context) - self.paths_to_mount = paths_to_mount + self.paths_to_mount = paths_to_mount if paths_to_mount else [] self.working_directory = working_directory - self.internal_tools = internal_tools - self.secrets = secrets + self.internal_tools = internal_tools if internal_tools else [] + self.secret_env_variables = secret_env_variables if secret_env_variables else {} self.env_variables = env_variables self.command = command @@ -76,10 +77,9 @@ def _set_env_variables(self, container: dagger.Container) -> dagger.Container: container = container.with_env_variable(key, value) return container - def _set_secrets(self, container: dagger.Container) -> dagger.Container: - for key, value in self.secrets.items(): - if value is not None: - container = container.with_secret_variable(key, value) + def _set_secret_env_variables(self, container: dagger.Container) -> dagger.Container: + for env_var_name, secret in self.secret_env_variables.items(): + container = container.with_secret_variable(env_var_name, secret.as_dagger_secret(self.context.dagger_client)) return container async def init_container(self) -> dagger.Container: @@ -88,7 +88,7 @@ async def init_container(self) -> dagger.Container: container = self._mount_paths(container) container = self._set_env_variables(container) - container = self._set_secrets(container) + container = self._set_secret_env_variables(container) container = await self._install_internal_tools(container) container = self._set_workdir(container) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/gradle.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/gradle.py index 310ea60cee55..7b4f79d1f739 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/gradle.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/gradle.py @@ -142,13 +142,13 @@ async def _run(self, *args: Any, **kwargs: Any) -> StepResult: ) # Augment the base container with S3 build cache secrets when available. - if self.context.s3_build_cache_access_key_id_secret: + if self.context.s3_build_cache_access_key_id: gradle_container_base = gradle_container_base.with_secret_variable( - "S3_BUILD_CACHE_ACCESS_KEY_ID", self.context.s3_build_cache_access_key_id_secret + "S3_BUILD_CACHE_ACCESS_KEY_ID", self.context.s3_build_cache_access_key_id.as_dagger_secret(self.dagger_client) ) - if self.context.s3_build_cache_secret_key_secret: + if self.context.s3_build_cache_secret_key: gradle_container_base = gradle_container_base.with_secret_variable( - "S3_BUILD_CACHE_SECRET_KEY", self.context.s3_build_cache_secret_key_secret + "S3_BUILD_CACHE_SECRET_KEY", self.context.s3_build_cache_secret_key.as_dagger_secret(self.dagger_client) ) # Running a gradle task like "help" with these arguments will trigger updating all dependencies. @@ -196,7 +196,7 @@ async def _run(self, *args: Any, **kwargs: Any) -> StepResult: # From this point on, we add layers which are task-dependent. if self.mount_connector_secrets: secrets_dir = f"{self.context.connector.code_directory}/secrets" - gradle_container = gradle_container.with_(await secrets.mounted_connector_secrets(self.context, secrets_dir)) + gradle_container = gradle_container.with_(await secrets.mounted_connector_secrets(self.context, secrets_dir, self.secrets)) if self.bind_to_docker_host: # If this GradleTask subclass needs docker, then install it and bind it to the existing global docker host container. gradle_container = pipelines.dagger.actions.system.docker.with_bound_docker_host(self.context, gradle_container) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/python_registry.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/python_registry.py index 2bfebec127b5..42b550df4fd3 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/python_registry.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/python_registry.py @@ -101,7 +101,6 @@ async def _publish(self, package_dir_to_publish: Directory, package_type: Packag return await self._poetry_publish(package_dir_to_publish) async def _poetry_publish(self, package_dir_to_publish: Directory) -> StepResult: - python_registry_token = self.context.dagger_client.set_secret("python_registry_token", self.context.python_registry_token) pyproject_toml = package_dir_to_publish.file(PYPROJECT_TOML_FILE_PATH) pyproject_toml_content = await pyproject_toml.contents() contents = tomli.loads(pyproject_toml_content) @@ -112,7 +111,7 @@ async def _poetry_publish(self, package_dir_to_publish: Directory) -> StepResult contents["tool"]["poetry"]["authors"] = ["Airbyte "] poetry_publish = ( self._get_base_container() - .with_secret_variable("PYTHON_REGISTRY_TOKEN", python_registry_token) + .with_secret_variable("PYTHON_REGISTRY_TOKEN", self.context.python_registry_token.as_dagger_secret(self.dagger_client)) .with_directory("package", package_dir_to_publish) .with_workdir("package") .with_new_file(PYPROJECT_TOML_FILE_PATH, contents=tomli_w.dumps(contents)) @@ -128,8 +127,6 @@ async def _poetry_publish(self, package_dir_to_publish: Directory) -> StepResult async def _pip_publish(self, package_dir_to_publish: Directory) -> StepResult: files = await package_dir_to_publish.entries() - pypi_username = self.context.dagger_client.set_secret("pypi_username", "__token__") - pypi_password = self.context.dagger_client.set_secret("pypi_password", self.context.python_registry_token) metadata: Dict[str, str] = { "name": str(self.context.package_metadata.name), "version": str(self.context.package_metadata.version), @@ -161,8 +158,8 @@ async def _pip_publish(self, package_dir_to_publish: Directory) -> StepResult: # Make sure these steps are always executed and not cached as they are triggering a side-effect (calling the registry) # Env var setting needs to be in this block as well to make sure a change of the env var will be propagated correctly .with_env_variable("CACHEBUSTER", str(uuid.uuid4())) - .with_secret_variable("TWINE_USERNAME", pypi_username) - .with_secret_variable("TWINE_PASSWORD", pypi_password) + .with_secret_variable("TWINE_USERNAME", self.context.dagger_client.set_secret("pypi_username", "__token__")) + .with_secret_variable("TWINE_PASSWORD", self.context.python_registry_token.as_dagger_secret(self.dagger_client)) .with_exec(["twine", "upload", "--verbose", "--repository-url", self.context.registry, "dist/*"]) ) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/pipeline.py index a05142d27eb5..9412b099c0b3 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/pipeline.py @@ -221,7 +221,7 @@ def prepare_container_for_poe_tasks( "config", "http-basic.airbyte-platform-internal-source", "octavia-squidington-iii", - pipeline_context_params["ci_github_access_token"], + pipeline_context_params["ci_github_access_token"].value, ] ) .with_exec(["poetry", "lock", "--no-update"]) diff --git a/airbyte-ci/connectors/pipelines/pipelines/cli/airbyte_ci.py b/airbyte-ci/connectors/pipelines/pipelines/cli/airbyte_ci.py index d7d4bbb25f48..c0d6abf2ed98 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/cli/airbyte_ci.py +++ b/airbyte-ci/connectors/pipelines/pipelines/cli/airbyte_ci.py @@ -33,12 +33,14 @@ ) from pipelines.cli.confirm_prompt import pre_confirm_all_flag from pipelines.cli.lazy_group import LazyGroup +from pipelines.cli.secrets import wrap_gcp_credentials_in_secret, wrap_in_secret from pipelines.cli.telemetry import click_track_command from pipelines.consts import DAGGER_WRAP_ENV_VAR_NAME, LOCAL_BUILD_PLATFORM, CIContext from pipelines.dagger.actions.connector.hooks import get_dagger_sdk_version from pipelines.helpers import github from pipelines.helpers.git import get_current_git_branch, get_current_git_revision from pipelines.helpers.utils import AIRBYTE_REPO_URL, get_current_epoch_time +from pipelines.models.secrets import InMemorySecretStore def log_context_info(ctx: click.Context) -> None: @@ -72,7 +74,6 @@ def _get_pull_request(ctx: click.Context) -> Optional[PullRequest.PullRequest]: can_get_pull_request = pull_request_number and ci_github_access_token if not can_get_pull_request: return None - return github.get_pull_request(pull_request_number, ci_github_access_token) @@ -161,19 +162,20 @@ def is_current_process_wrapped_by_dagger_run() -> bool: @click.option("--pipeline-start-timestamp", default=get_current_epoch_time, envvar="CI_PIPELINE_START_TIMESTAMP", type=int) @click.option("--pull-request-number", envvar="PULL_REQUEST_NUMBER", type=int) @click.option("--ci-git-user", default="octavia-squidington-iii", envvar="CI_GIT_USER", type=str) -@click.option("--ci-github-access-token", envvar="CI_GITHUB_ACCESS_TOKEN", type=str) +@click.option("--ci-github-access-token", envvar="CI_GITHUB_ACCESS_TOKEN", type=str, callback=wrap_in_secret) @click.option("--ci-report-bucket-name", envvar="CI_REPORT_BUCKET_NAME", type=str) @click.option("--ci-artifact-bucket-name", envvar="CI_ARTIFACT_BUCKET_NAME", type=str) @click.option( - "--ci-gcs-credentials", + "--ci-gcp-credentials", help="The service account to use during CI.", type=click.STRING, required=False, # Not required for pre-release or local pipelines envvar="GCP_GSM_CREDENTIALS", + callback=wrap_gcp_credentials_in_secret, ) @click.option("--ci-job-key", envvar="CI_JOB_KEY", type=str) -@click.option("--s3-build-cache-access-key-id", envvar="S3_BUILD_CACHE_ACCESS_KEY_ID", type=str) -@click.option("--s3-build-cache-secret-key", envvar="S3_BUILD_CACHE_SECRET_KEY", type=str) +@click.option("--s3-build-cache-access-key-id", envvar="S3_BUILD_CACHE_ACCESS_KEY_ID", type=str, callback=wrap_in_secret) +@click.option("--s3-build-cache-secret-key", envvar="S3_BUILD_CACHE_SECRET_KEY", type=str, callback=wrap_in_secret) @click.option("--show-dagger-logs/--hide-dagger-logs", default=False, type=bool) @click_ci_requirements_option() @click_track_command @@ -207,6 +209,9 @@ async def airbyte_ci(ctx: click.Context) -> None: # noqa D103 if not ctx.obj["is_local"]: log_context_info(ctx) + if not ctx.obj.get("secret_stores", {}).get("in_memory"): + ctx.obj["secret_stores"] = {"in_memory": InMemorySecretStore()} + if __name__ == "__main__": airbyte_ci() diff --git a/airbyte-ci/connectors/pipelines/pipelines/cli/dagger_pipeline_command.py b/airbyte-ci/connectors/pipelines/pipelines/cli/dagger_pipeline_command.py index 234b16feda89..dfc0bd9d626f 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/cli/dagger_pipeline_command.py +++ b/airbyte-ci/connectors/pipelines/pipelines/cli/dagger_pipeline_command.py @@ -54,9 +54,12 @@ async def invoke(self, ctx: click.Context) -> None: finally: if ctx.obj.get("dagger_logs_path"): main_logger.info(f"Dagger logs saved to {ctx.obj['dagger_logs_path']}") - if ctx.obj["is_ci"] and ctx.obj["ci_gcs_credentials"] and ctx.obj["ci_report_bucket_name"]: + if ctx.obj["is_ci"] and ctx.obj["ci_gcp_credentials"] and ctx.obj["ci_report_bucket_name"]: gcs_uri, public_url = upload_to_gcs( - ctx.obj["dagger_logs_path"], ctx.obj["ci_report_bucket_name"], dagger_logs_gcs_key, ctx.obj["ci_gcs_credentials"] + ctx.obj["dagger_logs_path"], + ctx.obj["ci_report_bucket_name"], + dagger_logs_gcs_key, + ctx.obj["ci_gcp_credentials"].value, ) main_logger.info(f"Dagger logs saved to {gcs_uri}. Public URL: {public_url}") diff --git a/airbyte-ci/connectors/pipelines/pipelines/cli/secrets.py b/airbyte-ci/connectors/pipelines/pipelines/cli/secrets.py new file mode 100644 index 000000000000..84923558d3d1 --- /dev/null +++ b/airbyte-ci/connectors/pipelines/pipelines/cli/secrets.py @@ -0,0 +1,40 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + +from typing import Any, Optional + +import asyncclick as click +from pipelines.helpers.gcs import sanitize_gcp_credentials +from pipelines.models.secrets import InMemorySecretStore, Secret + + +def wrap_in_secret(ctx: click.Context, param: click.Option, value: Any) -> Optional[Secret]: # noqa + # Validate callback usage + if value is None: + return None + assert param.name is not None + if not isinstance(value, str): + raise click.BadParameter(f"{param.name} value is not a string, only strings can be wrapped in a secret.") + + # Make sure the context object is set or set it with an empty dict + ctx.ensure_object(dict) + + # Instantiate a global in memory secret store in the context object if it's not yet set + if "secret_stores" not in ctx.obj: + ctx.obj["secret_stores"] = {} + if "in_memory" not in ctx.obj["secret_stores"]: + ctx.obj["secret_stores"]["in_memory"] = InMemorySecretStore() + + # Add the CLI option value to the in memory secret store and wrap it in a Secret + ctx.obj["secret_stores"]["in_memory"].add_secret(param.name, value) + return Secret(param.name, ctx.obj["secret_stores"]["in_memory"]) + + +def wrap_gcp_credentials_in_secret(ctx: click.Context, param: click.Option, value: Any) -> Optional[Secret]: # noqa + # Validate callback usage + if value is None: + return None + if not isinstance(value, str): + raise click.BadParameter(f"{param.name} value is not a string, only strings can be wrapped in a secret.") + + value = sanitize_gcp_credentials(value) + return wrap_in_secret(ctx, param, value) diff --git a/airbyte-ci/connectors/pipelines/pipelines/consts.py b/airbyte-ci/connectors/pipelines/pipelines/consts.py index f67f00854349..ecbde8c9a840 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/consts.py +++ b/airbyte-ci/connectors/pipelines/pipelines/consts.py @@ -63,6 +63,7 @@ SETUP_PY_FILE_PATH = "setup.py" DEFAULT_PYTHON_PACKAGE_REGISTRY_URL = "https://upload.pypi.org/legacy/" DEFAULT_PYTHON_PACKAGE_REGISTRY_CHECK_URL = "https://pypi.org/pypi" +MAIN_CONNECTOR_TESTING_SECRET_STORE_ALIAS = "airbyte-connector-testing-secret-store" class CIContext(str, Enum): diff --git a/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/remote_storage.py b/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/remote_storage.py index 7995ccb2f1b1..23ddcb564b97 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/remote_storage.py +++ b/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/remote_storage.py @@ -8,8 +8,9 @@ from pathlib import Path from typing import List, Optional, Tuple -from dagger import Client, File, Secret +from dagger import Client, File from pipelines.helpers.utils import get_exec_result, secret_host_variable, with_exit_code +from pipelines.models.secrets import Secret GOOGLE_CLOUD_SDK_TAG = "425.0.0-slim" @@ -54,7 +55,7 @@ async def upload_to_gcs( file_to_upload (File): The dagger File to upload. key (str): The key that will be written on the S3 bucket. bucket (str): The S3 bucket name. - gcs_credentials (Secret): The dagger secret holding the credentials to get and upload the targeted GCS bucket. + gcs_credentials (Secret): The secret holding the credentials to get and upload the targeted GCS bucket. flags (List[str]): Flags to be passed to the 'gcloud storage cp' command. cache_upload (bool): If false, the gcloud commands will be executed on each call. Returns: @@ -69,7 +70,7 @@ async def upload_to_gcs( dagger_client.container() .from_(f"google/cloud-sdk:{GOOGLE_CLOUD_SDK_TAG}") .with_workdir("/upload") - .with_new_file("credentials.json", contents=await gcs_credentials.plaintext()) + .with_mounted_secret("credentials.json", gcs_credentials.as_dagger_secret(dagger_client)) .with_env_variable("GOOGLE_APPLICATION_CREDENTIALS", "/upload/credentials.json") .with_file("to_upload", file_to_upload) ) diff --git a/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/secrets.py b/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/secrets.py index 2a943ddd3dd0..196f13d06621 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/secrets.py +++ b/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/secrets.py @@ -5,85 +5,22 @@ """This modules groups functions made to download/upload secrets from/to a remote secret service and provide these secret in a dagger Directory.""" from __future__ import annotations -import datetime from typing import TYPE_CHECKING -from anyio import Path -from dagger import Secret -from pipelines.helpers.utils import get_file_contents, get_secret_host_variable +from pipelines.helpers.utils import get_secret_host_variable +from pipelines.models.secrets import Secret if TYPE_CHECKING: - from typing import Callable, Dict + from typing import Callable, List from dagger import Container from pipelines.airbyte_ci.connectors.context import ConnectorContext -# List of overrides for the secrets masking logic. -# These keywords may have been marked as secrets, perhaps somewhat aggressively. -# Masking them, however, is annoying and pointless. -# This list should be extended (carefully) as needed. -NOT_REALLY_SECRETS = { - "admin", - "airbyte", - "host", -} - - -async def get_secrets_to_mask(ci_credentials_with_downloaded_secrets: Container, connector_technical_name: str) -> list[str]: - """This function will print the secrets to mask in the GitHub actions logs with the ::add-mask:: prefix. - We're not doing it directly from the ci_credentials tool because its stdout is wrapped around the dagger logger, - And GHA will only interpret lines starting with ::add-mask:: as secrets to mask. - """ - secrets_to_mask = [] - if secrets_to_mask_file := await get_file_contents(ci_credentials_with_downloaded_secrets, "/tmp/secrets_to_mask.txt"): - for secret_to_mask in secrets_to_mask_file.splitlines(): - if secret_to_mask in NOT_REALLY_SECRETS or secret_to_mask in connector_technical_name: - # Don't mask secrets which are also common words or connector name. - continue - # We print directly to stdout because the GHA runner will mask only if the log line starts with "::add-mask::" - # If we use the dagger logger, or context logger, the log line will start with other stuff and will not be masked - print(f"::add-mask::{secret_to_mask}") - secrets_to_mask.append(secret_to_mask) - return secrets_to_mask - - -async def download(context: ConnectorContext, gcp_gsm_env_variable_name: str = "GCP_GSM_CREDENTIALS") -> Dict[str, Secret]: - """Use the ci-credentials tool to download the secrets stored for a specific connector to a Directory. - - Args: - context (ConnectorContext): The context providing a connector object. - gcp_gsm_env_variable_name (str, optional): The name of the environment variable holding credentials to connect to Google Secret Manager. Defaults to "GCP_GSM_CREDENTIALS". - - Returns: - dict[str, Secret]: A dict mapping the secret file name to the dagger Secret object. - """ - # temp - fix circular import - from pipelines.dagger.containers.internal_tools import with_ci_credentials - - gsm_secret = get_secret_host_variable(context.dagger_client, gcp_gsm_env_variable_name) - secrets_path = f"/{context.connector.code_directory}/secrets" - ci_credentials = await with_ci_credentials(context, gsm_secret) - with_downloaded_secrets = ( - ci_credentials.with_exec(["mkdir", "-p", secrets_path]) - .with_env_variable( - "CACHEBUSTER", datetime.datetime.now().isoformat() - ) # Secrets can be updated on GSM anytime, we can't cache this step... - .with_exec(["ci_credentials", context.connector.technical_name, "write-to-storage"]) - ) - # We don't want to print secrets in the logs when running locally. - if context.is_ci: - context.secrets_to_mask = await get_secrets_to_mask(with_downloaded_secrets, context.connector.technical_name) - connector_secrets = {} - for secret_file in await with_downloaded_secrets.directory(secrets_path).entries(): - secret_plaintext = await with_downloaded_secrets.directory(secrets_path).file(secret_file).contents() - # We have to namespace secrets as Dagger derives session wide secret ID from their name - unique_secret_name = f"{context.connector.technical_name}_{secret_file}" - connector_secrets[secret_file] = context.dagger_client.set_secret(unique_secret_name, secret_plaintext) - - return connector_secrets - - +# TODO deprecate to use Secret and SecretStores +# Not prioritized as few connectors have to export their secrets back to GSM +# This would require exposing a secret update interface on the SecretStore +# and a more complex structure / Logic to map container files to Secret objects async def upload(context: ConnectorContext, gcp_gsm_env_variable_name: str = "GCP_GSM_CREDENTIALS") -> Container: """Use the ci-credentials tool to upload the secrets stored in the context's updated_secrets-dir. @@ -111,56 +48,18 @@ async def upload(context: ConnectorContext, gcp_gsm_env_variable_name: str = "GC ) -async def load_from_local(context: ConnectorContext) -> Dict[str, Secret]: - """Load the secrets from the local secrets directory for a connector. - - Args: - context (ConnectorContext): The context providing the connector directory. - - Returns: - dict[str, Secret]: A dict mapping the secret file name to the dagger Secret object. - """ - connector_secrets: Dict[str, Secret] = {} - local_secrets_path = Path(context.connector.code_directory / "secrets") - if not await local_secrets_path.is_dir(): - context.logger.warning(f"Local secrets directory {local_secrets_path} does not exist, no secrets will be loaded.") - return connector_secrets - async for secret_file in local_secrets_path.iterdir(): - secret_plaintext = await secret_file.read_text() - unique_secret_name = f"{context.connector.technical_name}_{secret_file.name}" - connector_secrets[secret_file.name] = context.dagger_client.set_secret(unique_secret_name, secret_plaintext) - if not connector_secrets: - context.logger.warning(f"Local secrets directory {local_secrets_path} is empty, no secrets will be loaded.") - return connector_secrets - - -async def get_connector_secrets(context: ConnectorContext) -> dict[str, Secret]: - """Download the secrets from GSM or use the local secrets directory for a connector. - - Args: - context (ConnectorContext): The context providing the connector directory and the use_remote_secrets flag. - - Returns: - dict[str, Secret]: A dict mapping the secret file name to the dagger Secret object. - """ - if context.use_remote_secrets: - connector_secrets = await download(context) - else: - connector_secrets = await load_from_local(context) - return connector_secrets - - -async def mounted_connector_secrets(context: ConnectorContext, secret_directory_path: str) -> Callable[[Container], Container]: +async def mounted_connector_secrets( + context: ConnectorContext, secret_directory_path: str, connector_secrets: List[Secret] +) -> Callable[[Container], Container]: """Returns an argument for a dagger container's with_ method which mounts all connector secrets in it. Args: context (ConnectorContext): The context providing a connector object and its secrets. secret_directory_path (str): Container directory where the secrets will be mounted, as files. - + connector_secrets (List[secrets]): List of secrets to mount to the connector container. Returns: fn (Callable[[Container], Container]): A function to pass as argument to the connector container's with_ method. """ - connector_secrets = await context.get_connector_secrets() java_log_scrub_pattern_secret = context.java_log_scrub_pattern_secret def with_secrets_mounted_as_dagger_secrets(container: Container) -> Container: @@ -171,8 +70,11 @@ def with_secrets_mounted_as_dagger_secrets(container: Container) -> Container: # test reports or any other build artifacts generated by a java connector test. container = container.with_secret_variable("LOG_SCRUB_PATTERN", java_log_scrub_pattern_secret) container = container.with_exec(["mkdir", "-p", secret_directory_path], skip_entrypoint=True) - for secret_file_name, secret in connector_secrets.items(): - container = container.with_mounted_secret(f"{secret_directory_path}/{secret_file_name}", secret) + for secret in connector_secrets: + if secret.file_name: + container = container.with_mounted_secret( + f"{secret_directory_path}/{secret.file_name}", secret.as_dagger_secret(context.dagger_client) + ) return container return with_secrets_mounted_as_dagger_secrets diff --git a/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/system/docker.py b/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/system/docker.py index a22291682e13..51a06172f935 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/system/docker.py +++ b/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/system/docker.py @@ -6,7 +6,9 @@ import uuid from typing import Callable, Dict, List, Optional, Union -from dagger import Client, Container, File, Secret, Service +from dagger import Client, Container, File +from dagger import Secret as DaggerSecret +from dagger import Service from pipelines import consts from pipelines.airbyte_ci.connectors.context import ConnectorContext from pipelines.consts import ( @@ -19,6 +21,7 @@ STORAGE_DRIVER, ) from pipelines.helpers.utils import sh_dash_c +from pipelines.models.secrets import Secret def get_base_dockerd_container(dagger_client: Client) -> Container: @@ -83,8 +86,8 @@ def get_daemon_config_json(registry_mirror_url: Optional[str] = None) -> str: def docker_login( dockerd_container: Container, - docker_registry_username_secret: Secret, - docker_registry_password_secret: Secret, + docker_registry_username: DaggerSecret, + docker_registry_password: DaggerSecret, ) -> Container: """Login to a docker registry if the username and password secrets are provided. @@ -96,13 +99,13 @@ def docker_login( Returns: Container: The container with the docker login command executed if the username and password secrets are provided. Noop otherwise. """ - if docker_registry_username_secret and docker_registry_username_secret: + if docker_registry_username and docker_registry_username: return ( dockerd_container # We use a cache buster here to guarantee the docker login is always executed. .with_env_variable("CACHEBUSTER", str(uuid.uuid4())) - .with_secret_variable("DOCKER_REGISTRY_USERNAME", docker_registry_username_secret) - .with_secret_variable("DOCKER_REGISTRY_PASSWORD", docker_registry_password_secret) + .with_secret_variable("DOCKER_REGISTRY_USERNAME", docker_registry_username) + .with_secret_variable("DOCKER_REGISTRY_PASSWORD", docker_registry_password) .with_exec( sh_dash_c([f"docker login -u $DOCKER_REGISTRY_USERNAME -p $DOCKER_REGISTRY_PASSWORD {DOCKER_REGISTRY_ADDRESS}"]), skip_entrypoint=True, @@ -114,16 +117,16 @@ def docker_login( def with_global_dockerd_service( dagger_client: Client, - docker_hub_username_secret: Optional[Secret] = None, - docker_hub_password_secret: Optional[Secret] = None, + docker_hub_username: Optional[Secret] = None, + docker_hub_password: Optional[Secret] = None, ) -> Service: """Create a container with a docker daemon running. We expose its 2375 port to use it as a docker host for docker-in-docker use cases. It is optionally connected to a DockerHub mirror if the DOCKER_REGISTRY_MIRROR_URL env var is set. Args: dagger_client (Client): The dagger client used to create the container. - docker_hub_username_secret (Optional[Secret]): The DockerHub username secret. - docker_hub_password_secret (Optional[Secret]): The DockerHub password secret. + docker_hub_username (Optional[Secret]): The DockerHub username secret. + docker_hub_password (Optional[Secret]): The DockerHub password secret. Returns: Container: The container running dockerd as a service """ @@ -140,9 +143,11 @@ def with_global_dockerd_service( daemon_config_json = get_daemon_config_json() dockerd_container = dockerd_container.with_new_file("/etc/docker/daemon.json", contents=daemon_config_json) - if docker_hub_username_secret and docker_hub_password_secret: + if docker_hub_username and docker_hub_password: # Docker login happens late because there's a cache buster in the docker login command. - dockerd_container = docker_login(dockerd_container, docker_hub_username_secret, docker_hub_password_secret) + dockerd_container = docker_login( + dockerd_container, docker_hub_username.as_dagger_secret(dagger_client), docker_hub_password.as_dagger_secret(dagger_client) + ) return dockerd_container.with_exec( ["dockerd", "--log-level=error", f"--host=tcp://0.0.0.0:{DOCKER_HOST_PORT}", "--tls=false"], insecure_root_capabilities=True ).as_service() @@ -221,11 +226,11 @@ def with_crane( # https://github.com/google/go-containerregistry/tree/main/cmd/crane#images base_container = context.dagger_client.container().from_("gcr.io/go-containerregistry/crane/debug:v0.15.1") - if context.docker_hub_username_secret and context.docker_hub_password_secret: + if context.docker_hub_username and context.docker_hub_password: base_container = ( - base_container.with_secret_variable("DOCKER_HUB_USERNAME", context.docker_hub_username_secret).with_secret_variable( - "DOCKER_HUB_PASSWORD", context.docker_hub_password_secret - ) + base_container.with_secret_variable( + "DOCKER_HUB_USERNAME", context.docker_hub_username.as_dagger_secret(context.dagger_client) + ).with_secret_variable("DOCKER_HUB_PASSWORD", context.docker_hub_password.as_dagger_secret(context.dagger_client)) # We need to use skip_entrypoint=True to avoid the entrypoint to be overridden by the crane command # We use sh -c to be able to use environment variables in the command # This is a workaround as the default crane entrypoint doesn't support environment variables diff --git a/airbyte-ci/connectors/pipelines/pipelines/dagger/containers/python.py b/airbyte-ci/connectors/pipelines/pipelines/dagger/containers/python.py index e638cc2881fd..8fd842253a87 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/dagger/containers/python.py +++ b/airbyte-ci/connectors/pipelines/pipelines/dagger/containers/python.py @@ -2,7 +2,6 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -import uuid from dagger import CacheSharingMode, CacheVolume, Client, Container from pipelines.airbyte_ci.connectors.context import PipelineContext diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/connectors/command.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/connectors/command.py index 7ecdc5d3c72a..cba01a042e96 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/connectors/command.py +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/connectors/command.py @@ -25,12 +25,11 @@ def get_connector_contexts(ctx: click.Context, pipeline_description: str, enable git_repo_url=ctx.obj["git_repo_url"], ci_report_bucket=ctx.obj["ci_report_bucket_name"], report_output_prefix=ctx.obj["report_output_prefix"], - use_remote_secrets=ctx.obj["use_remote_secrets"], gha_workflow_run_url=ctx.obj.get("gha_workflow_run_url"), dagger_logs_url=ctx.obj.get("dagger_logs_url"), pipeline_start_timestamp=ctx.obj.get("pipeline_start_timestamp"), ci_context=ctx.obj.get("ci_context"), - ci_gcs_credentials=ctx.obj["ci_gcs_credentials"], + ci_gcp_credentials=ctx.obj["ci_gcp_credentials"], ci_git_user=ctx.obj["ci_git_user"], ci_github_access_token=ctx.obj["ci_github_access_token"], enable_report_auto_open=enable_report_auto_open, diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/gcs.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/gcs.py index 71fbce43b44b..0917f035ea6c 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/gcs.py +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/gcs.py @@ -36,7 +36,7 @@ def upload_to_gcs(file_path: Path, bucket_name: str, object_name: str, credentia return gcs_uri, public_url -def sanitize_gcs_credentials(raw_value: str) -> str: +def sanitize_gcp_credentials(raw_value: str) -> str: """Try to parse the raw string input that should contain a json object with the GCS credentials. It will raise an exception if the parsing fails and help us to fail fast on invalid credentials input. diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/github.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/github.py index 999cbed533c9..dad959f3dcf6 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/github.py +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/github.py @@ -12,11 +12,12 @@ from connector_ops.utils import console # type: ignore from pipelines import main_logger from pipelines.consts import CIContext +from pipelines.models.secrets import Secret if TYPE_CHECKING: from logging import Logger -from github import Github, PullRequest +from github import Auth, Github, PullRequest AIRBYTE_GITHUB_REPO = "airbytehq/airbyte" @@ -56,7 +57,7 @@ def update_commit_status_check( safe_log(logger, f"Attempting to create {state} status for commit {sha} on Github in {context} context.") try: - github_client = Github(os.environ["CI_GITHUB_ACCESS_TOKEN"]) + github_client = Github(auth=Auth.Token(os.environ["CI_GITHUB_ACCESS_TOKEN"])) airbyte_repo = github_client.get_repo(AIRBYTE_GITHUB_REPO) except Exception as e: if logger: @@ -82,16 +83,16 @@ def update_commit_status_check( safe_log(logger, f"Created {state} status for commit {sha} on Github in {context} context with desc: {description}.") -def get_pull_request(pull_request_number: int, github_access_token: str) -> PullRequest.PullRequest: +def get_pull_request(pull_request_number: int, github_access_token: Secret) -> PullRequest.PullRequest: """Get a pull request object from its number. Args: pull_request_number (str): The number of the pull request to get. - github_access_token (str): The GitHub access token to use to authenticate. + github_access_token (Secret): The GitHub access token to use to authenticate. Returns: PullRequest: The pull request object. """ - github_client = Github(github_access_token) + github_client = Github(auth=Auth.Token(github_access_token.value)) airbyte_repo = github_client.get_repo(AIRBYTE_GITHUB_REPO) return airbyte_repo.get_pull(pull_request_number) diff --git a/airbyte-ci/connectors/pipelines/pipelines/models/artifacts.py b/airbyte-ci/connectors/pipelines/pipelines/models/artifacts.py index f1deafd445e7..4f8776f525fe 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/models/artifacts.py +++ b/airbyte-ci/connectors/pipelines/pipelines/models/artifacts.py @@ -7,6 +7,7 @@ import dagger from pipelines.consts import GCS_PUBLIC_DOMAIN from pipelines.dagger.actions import remote_storage +from pipelines.models.secrets import Secret @dataclass(kw_only=True) @@ -28,7 +29,7 @@ async def save_to_local_path(self, path: Path) -> Path: else: raise Exception(f"Failed to save artifact {self.name} to local path {path}") - async def upload_to_gcs(self, dagger_client: dagger.Client, bucket: str, key: str, gcs_credentials: dagger.Secret) -> str: + async def upload_to_gcs(self, dagger_client: dagger.Client, bucket: str, key: str, gcs_credentials: Secret) -> str: gcs_cp_flags = [f'--content-disposition=filename="{self.name}"'] if self.content_type is not None: gcs_cp_flags = gcs_cp_flags + [f"--content-type={self.content_type}"] diff --git a/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py b/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py index 02217ebe88b9..fd78c7d02c17 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py +++ b/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py @@ -12,19 +12,21 @@ from functools import lru_cache from glob import glob from types import TracebackType -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Dict from asyncer import asyncify -from dagger import Client, Directory, File, GitRepository, Secret, Service +from dagger import Client, Directory, File, GitRepository +from dagger import Secret as DaggerSecret +from dagger import Service from github import PullRequest from pipelines.airbyte_ci.connectors.reports import ConnectorReport from pipelines.consts import CIContext, ContextState from pipelines.helpers.execution.run_steps import RunStepOptions -from pipelines.helpers.gcs import sanitize_gcs_credentials from pipelines.helpers.github import update_commit_status_check from pipelines.helpers.slack import send_message_to_webhook from pipelines.helpers.utils import AIRBYTE_REPO_URL, java_log_scrub_pattern from pipelines.models.reports import Report +from pipelines.models.secrets import Secret, SecretStore if TYPE_CHECKING: from typing import List, Optional @@ -79,11 +81,12 @@ def __init__( reporting_slack_channel: Optional[str] = None, pull_request: Optional[PullRequest.PullRequest] = None, ci_report_bucket: Optional[str] = None, - ci_gcs_credentials: Optional[str] = None, + ci_gcp_credentials: Optional[Secret] = None, ci_git_user: Optional[str] = None, - ci_github_access_token: Optional[str] = None, + ci_github_access_token: Optional[Secret] = None, run_step_options: RunStepOptions = RunStepOptions(), enable_report_auto_open: bool = True, + secret_stores: Dict[str, SecretStore] | None = None, ) -> None: """Initialize a pipeline context. @@ -125,7 +128,7 @@ def __init__( self._dagger_client = None self._report = None self.dockerd_service = None - self.ci_gcs_credentials = sanitize_gcs_credentials(ci_gcs_credentials) if ci_gcs_credentials else None + self.ci_gcp_credentials = ci_gcp_credentials self.ci_report_bucket = ci_report_bucket self.ci_git_user = ci_git_user self.ci_github_access_token = ci_github_access_token @@ -134,6 +137,7 @@ def __init__( self.secrets_to_mask = [] self.run_step_options = run_step_options self.enable_report_auto_open = enable_report_auto_open + self.secret_stores = secret_stores if secret_stores else {} update_commit_status_check(**self.github_commit_status) @property @@ -166,18 +170,7 @@ def report(self, report: Report | ConnectorReport) -> None: self._report = report @property - def ci_gcs_credentials_secret(self) -> Secret | None: - if self.ci_gcs_credentials is not None: - return self.dagger_client.set_secret("ci_gcs_credentials", self.ci_gcs_credentials) - return None - - @property - def ci_github_access_token_secret(self) -> Secret: - assert self.ci_github_access_token is not None, "The ci_github_access_token was not set on this PipelineContext." - return self.dagger_client.set_secret("ci_github_access_token", self.ci_github_access_token) - - @property - def java_log_scrub_pattern_secret(self) -> Optional[Secret]: + def java_log_scrub_pattern_secret(self) -> Optional[DaggerSecret]: if not self.secrets_to_mask: return None return self.dagger_client.set_secret("log_scrub_pattern", java_log_scrub_pattern(self.secrets_to_mask)) @@ -223,7 +216,7 @@ def dagger_cloud_url(self) -> Optional[str]: @property def remote_storage_enabled(self) -> bool: - return self.is_ci and bool(self.ci_report_bucket) and bool(self.ci_gcs_credentials) + return self.is_ci and bool(self.ci_report_bucket) and bool(self.ci_gcp_credentials) def get_repo_file(self, file_path: str) -> File: """Get a file from the current repository. diff --git a/airbyte-ci/connectors/pipelines/pipelines/models/contexts/python_registry_publish.py b/airbyte-ci/connectors/pipelines/pipelines/models/contexts/python_registry_publish.py index b7b9e07ba879..9e8a75371b13 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/models/contexts/python_registry_publish.py +++ b/airbyte-ci/connectors/pipelines/pipelines/models/contexts/python_registry_publish.py @@ -9,6 +9,7 @@ from pipelines.airbyte_ci.connectors.context import PipelineContext from pipelines.airbyte_ci.connectors.publish.context import PublishConnectorContext from pipelines.consts import DEFAULT_PYTHON_PACKAGE_REGISTRY_URL +from pipelines.models.secrets import Secret @dataclass @@ -20,7 +21,7 @@ class PythonPackageMetadata: class PythonRegistryPublishContext(PipelineContext): def __init__( self, - python_registry_token: str, + python_registry_token: Secret, registry_check_url: str, package_path: str, report_output_prefix: str, @@ -35,7 +36,7 @@ def __init__( dagger_logs_url: Optional[str] = None, pipeline_start_timestamp: Optional[int] = None, ci_context: Optional[str] = None, - ci_gcs_credentials: Optional[str] = None, + ci_gcp_credentials: Optional[Secret] = None, package_name: Optional[str] = None, version: Optional[str] = None, ) -> None: @@ -60,7 +61,7 @@ def __init__( dagger_logs_url=dagger_logs_url, pipeline_start_timestamp=pipeline_start_timestamp, ci_context=ci_context, - ci_gcs_credentials=ci_gcs_credentials, + ci_gcp_credentials=ci_gcp_credentials, ) @classmethod @@ -90,8 +91,9 @@ async def from_publish_connector_context( release_candidate_tag = datetime.now().strftime("%Y%m%d%H%M") version = f"{version}.dev{release_candidate_tag}" + assert connector_context.python_registry_token is not None, "The connector context must have python_registry_token Secret attribute" pypi_context = cls( - python_registry_token=str(connector_context.python_registry_token), + python_registry_token=connector_context.python_registry_token, registry=str(connector_context.python_registry_url), registry_check_url=str(connector_context.python_registry_check_url), package_path=str(connector_context.connector.code_directory), @@ -108,7 +110,7 @@ async def from_publish_connector_context( dagger_logs_url=connector_context.dagger_logs_url, pipeline_start_timestamp=connector_context.pipeline_start_timestamp, ci_context=connector_context.ci_context, - ci_gcs_credentials=connector_context.ci_gcs_credentials, + ci_gcp_credentials=connector_context.ci_gcp_credentials, ) pypi_context.dagger_client = connector_context.dagger_client return pypi_context diff --git a/airbyte-ci/connectors/pipelines/pipelines/models/reports.py b/airbyte-ci/connectors/pipelines/pipelines/models/reports.py index 57e8c881bcab..744c01b21084 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/models/reports.py +++ b/airbyte-ci/connectors/pipelines/pipelines/models/reports.py @@ -104,7 +104,7 @@ async def save_json_report(self) -> None: dagger_client=self.pipeline_context.dagger_client, bucket=self.pipeline_context.ci_report_bucket, # type: ignore key=self.json_report_remote_storage_key, - gcs_credentials=self.pipeline_context.ci_gcs_credentials_secret, # type: ignore + gcs_credentials=self.pipeline_context.ci_gcp_credentials, # type: ignore ) self.pipeline_context.logger.info(f"JSON Report uploaded to {gcs_url}") else: @@ -125,7 +125,7 @@ async def save_step_result_artifacts(self) -> None: dagger_client=self.pipeline_context.dagger_client, bucket=self.pipeline_context.ci_report_bucket, # type: ignore key=f"{self.report_output_prefix}/artifacts/{slugify(step_result.step.title)}/{upload_time}_{artifact.name}", - gcs_credentials=self.pipeline_context.ci_gcs_credentials_secret, # type: ignore + gcs_credentials=self.pipeline_context.ci_gcp_credentials, # type: ignore ) self.pipeline_context.logger.info(f"Artifact {artifact.name} for {step_result.step.title} uploaded to {gcs_url}") else: diff --git a/airbyte-ci/connectors/pipelines/pipelines/models/secrets.py b/airbyte-ci/connectors/pipelines/pipelines/models/secrets.py new file mode 100644 index 000000000000..a0c90d83289d --- /dev/null +++ b/airbyte-ci/connectors/pipelines/pipelines/models/secrets.py @@ -0,0 +1,140 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + +from __future__ import annotations + +import hashlib +import json +import os +from abc import ABC, abstractmethod +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, List + +from dagger import Client as DaggerClient +from dagger import Secret as DaggerSecret +from google.cloud import secretmanager_v1 +from google.oauth2 import service_account # type: ignore + + +class SecretNotFoundError(Exception): + pass + + +class SecretString(str): + """The use of this string subtype will prevent accidental prints of secret value to the console.""" + + @property + def _masked_value(self) -> str: + return "