Skip to content

Commit

Permalink
bug(registry): fix auto materialize (#38094)
Browse files Browse the repository at this point in the history
  • Loading branch information
bnchrch authored May 9, 2024
1 parent 9d95dc0 commit a42532a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ def safe_get_commit_sha(metadata_dict: Union[dict, BaseModel]) -> Optional[str]:
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
)
@sentry.instrument_asset_op
def metadata_entry(context: OpExecutionContext, airbyte_slack_users: pd.DataFrame) -> Output[Optional[LatestMetadataEntry]]:
def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadataEntry]]:
"""Parse and compute the LatestMetadataEntry for the given metadata file."""
etag = context.partition_key
context.log.info(f"Processing metadata file with etag {etag}")
Expand All @@ -475,6 +475,8 @@ def metadata_entry(context: OpExecutionContext, airbyte_slack_users: pd.DataFram
if not matching_blob:
raise Exception(f"Could not find blob with etag {etag}")

airbyte_slack_users = HACKS.get_airbyte_slack_users_from_graph(context)

metadata_dict = yaml_blob_to_dict(matching_blob)
user_identifier = safe_get_slack_user_identifier(airbyte_slack_users, metadata_dict)
commit_sha = safe_get_commit_sha(metadata_dict)
Expand Down Expand Up @@ -548,16 +550,16 @@ def metadata_entry(context: OpExecutionContext, airbyte_slack_users: pd.DataFram
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
)
@sentry.instrument_asset_op
def registry_entry(
context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry], airbyte_slack_users: pd.DataFrame
) -> Output[Optional[dict]]:
def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry]) -> Output[Optional[dict]]:
"""
Generate the registry entry files from the given metadata file, and persist it to GCS.
"""
if not metadata_entry:
# if the metadata entry is invalid, return an empty dict
return Output(metadata={"empty_metadata": True}, value=None)

airbyte_slack_users = HACKS.get_airbyte_slack_users_from_graph(context)

user_identifier = safe_get_slack_user_identifier(airbyte_slack_users, metadata_entry.metadata_definition)
commit_sha = safe_get_commit_sha(metadata_entry.metadata_definition)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Union
from typing import Optional, Union

import pandas as pd
from dagster import OpExecutionContext
from metadata_service.constants import METADATA_FILE_NAME
from metadata_service.gcs_upload import get_metadata_remote_file_path
from metadata_service.models.generated.ConnectorRegistryDestinationDefinition import ConnectorRegistryDestinationDefinition
Expand Down Expand Up @@ -109,3 +111,29 @@ def sanitize_docker_repo_name_for_dependency_file(docker_repo_name: str) -> str:
"""

return docker_repo_name.replace("airbyte/", "")


def get_airbyte_slack_users_from_graph(context: OpExecutionContext) -> Optional[pd.DataFrame]:
"""
Get the airbyte slack users from the graph.
Important: Directly relates to the airbyte_slack_users asset. Requires the asset to be materialized in the graph.
Problem:
I guess having dynamic partitioned assets that automatically materialize depending on another asset is a bit too much to ask for.
Solution:
Just get the asset from the graph, but dont declare it as a dependency.
Context:
https://airbytehq-team.slack.com/archives/C048P9GADFW/p1715276222825929
"""
try:
from orchestrator import defn

airbyte_slack_users = defn.load_asset_value("airbyte_slack_users", instance=context.instance)
context.log.info(f"Got airbyte slack users from graph: {airbyte_slack_users}")
return airbyte_slack_users
except Exception as e:
context.log.error(f"Failed to get airbyte slack users from graph: {e}")
return None

0 comments on commit a42532a

Please sign in to comment.