From a42532a30564cc5a6a6f0214d0c64459a3f45ccd Mon Sep 17 00:00:00 2001 From: Ben Church Date: Thu, 9 May 2024 13:18:16 -0700 Subject: [PATCH] bug(registry): fix auto materialize (#38094) --- .../orchestrator/assets/registry_entry.py | 10 ++++--- .../orchestrator/orchestrator/hacks.py | 30 ++++++++++++++++++- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry_entry.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry_entry.py index b42b21416954..e2299ca899b6 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry_entry.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry_entry.py @@ -464,7 +464,7 @@ def safe_get_commit_sha(metadata_dict: Union[dict, BaseModel]) -> Optional[str]: auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST), ) @sentry.instrument_asset_op -def metadata_entry(context: OpExecutionContext, airbyte_slack_users: pd.DataFrame) -> Output[Optional[LatestMetadataEntry]]: +def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadataEntry]]: """Parse and compute the LatestMetadataEntry for the given metadata file.""" etag = context.partition_key context.log.info(f"Processing metadata file with etag {etag}") @@ -475,6 +475,8 @@ def metadata_entry(context: OpExecutionContext, airbyte_slack_users: pd.DataFram if not matching_blob: raise Exception(f"Could not find blob with etag {etag}") + airbyte_slack_users = HACKS.get_airbyte_slack_users_from_graph(context) + metadata_dict = yaml_blob_to_dict(matching_blob) user_identifier = safe_get_slack_user_identifier(airbyte_slack_users, metadata_dict) commit_sha = safe_get_commit_sha(metadata_dict) @@ -548,9 +550,7 @@ def metadata_entry(context: OpExecutionContext, airbyte_slack_users: pd.DataFram auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST), ) @sentry.instrument_asset_op -def registry_entry( - context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry], airbyte_slack_users: pd.DataFrame -) -> Output[Optional[dict]]: +def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry]) -> Output[Optional[dict]]: """ Generate the registry entry files from the given metadata file, and persist it to GCS. """ @@ -558,6 +558,8 @@ def registry_entry( # if the metadata entry is invalid, return an empty dict return Output(metadata={"empty_metadata": True}, value=None) + airbyte_slack_users = HACKS.get_airbyte_slack_users_from_graph(context) + user_identifier = safe_get_slack_user_identifier(airbyte_slack_users, metadata_entry.metadata_definition) commit_sha = safe_get_commit_sha(metadata_entry.metadata_definition) diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/hacks.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/hacks.py index 0ec38d096db3..bcc55fe1b81a 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/hacks.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/hacks.py @@ -2,8 +2,10 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from typing import Union +from typing import Optional, Union +import pandas as pd +from dagster import OpExecutionContext from metadata_service.constants import METADATA_FILE_NAME from metadata_service.gcs_upload import get_metadata_remote_file_path from metadata_service.models.generated.ConnectorRegistryDestinationDefinition import ConnectorRegistryDestinationDefinition @@ -109,3 +111,29 @@ def sanitize_docker_repo_name_for_dependency_file(docker_repo_name: str) -> str: """ return docker_repo_name.replace("airbyte/", "") + + +def get_airbyte_slack_users_from_graph(context: OpExecutionContext) -> Optional[pd.DataFrame]: + """ + Get the airbyte slack users from the graph. + + Important: Directly relates to the airbyte_slack_users asset. Requires the asset to be materialized in the graph. + + Problem: + I guess having dynamic partitioned assets that automatically materialize depending on another asset is a bit too much to ask for. + + Solution: + Just get the asset from the graph, but dont declare it as a dependency. + + Context: + https://airbytehq-team.slack.com/archives/C048P9GADFW/p1715276222825929 + """ + try: + from orchestrator import defn + + airbyte_slack_users = defn.load_asset_value("airbyte_slack_users", instance=context.instance) + context.log.info(f"Got airbyte slack users from graph: {airbyte_slack_users}") + return airbyte_slack_users + except Exception as e: + context.log.error(f"Failed to get airbyte slack users from graph: {e}") + return None