Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Slack Alert lifecycle to Dagster for Metadata publish #28759

Merged
merged 11 commits into from
Aug 3, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ GITHUB_METADATA_SERVICE_TOKEN=""
NIGHTLY_REPORT_SLACK_WEBHOOK_URL=""
# METADATA_CDN_BASE_URL="https://connectors.airbyte.com/files"
DOCKER_HUB_USERNAME=""
DOCKER_HUB_PASSWORD=""
DOCKER_HUB_PASSWORD=""
SLACK_TOKEN = ""
PUBLISH_UPDATE_CHANNEL="#ben-test"
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from dagster import Definitions, ScheduleDefinition, load_assets_from_modules
from dagster import Definitions, ScheduleDefinition, EnvVar, load_assets_from_modules
from dagster_slack import SlackResource

from orchestrator.resources.gcp import gcp_gcs_client, gcs_directory_blobs, gcs_file_blob, gcs_file_manager
from orchestrator.resources.github import github_client, github_connector_repo, github_connectors_directory, github_workflow_runs
Expand Down Expand Up @@ -54,6 +55,10 @@
]
)

SLACK_RESOURCE_TREE = {
"slack": SlackResource(token=EnvVar("SLACK_TOKEN")),
}

GITHUB_RESOURCE_TREE = {
"github_client": github_client.configured({"github_token": {"env": "GITHUB_METADATA_SERVICE_TOKEN"}}),
"github_connector_repo": github_connector_repo.configured({"connector_repo_name": CONNECTOR_REPO_NAME}),
Expand All @@ -79,6 +84,7 @@
}

METADATA_RESOURCE_TREE = {
**SLACK_RESOURCE_TREE,
**GCS_RESOURCE_TREE,
"all_metadata_file_blobs": gcs_directory_blobs.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*/{METADATA_FILE_NAME}$"}
Expand All @@ -89,6 +95,7 @@
}

REGISTRY_RESOURCE_TREE = {
**SLACK_RESOURCE_TREE,
**GCS_RESOURCE_TREE,
"latest_oss_registry_gcs_blob": gcs_file_blob.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REGISTRIES_FOLDER, "gcs_filename": "oss_registry.json"}
Expand All @@ -99,6 +106,7 @@
}

REGISTRY_ENTRY_RESOURCE_TREE = {
**SLACK_RESOURCE_TREE,
**GCS_RESOURCE_TREE,
"latest_cloud_registry_entries_file_blobs": gcs_directory_blobs.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*latest/cloud.json$"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def generate_nightly_report(context: OpExecutionContext) -> Output[pd.DataFrame]
nightly_report_complete_md = render_connector_nightly_report_md(nightly_report_connector_matrix_df, nightly_report_complete_df)
slack_webhook_url = os.getenv("NIGHTLY_REPORT_SLACK_WEBHOOK_URL")
if slack_webhook_url:
send_slack_webhook(slack_webhook_url, nightly_report_complete_md)
send_slack_webhook(slack_webhook_url, nightly_report_complete_md, wrap_in_code_block=True)

return Output(
nightly_report_connector_matrix_df,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from metadata_service.models.generated.ConnectorRegistryV0 import ConnectorRegistryV0
from metadata_service.utils import to_json_sanitized_dict
from orchestrator.assets.registry_entry import read_registry_entry_blob
from orchestrator.logging.publish_connector_lifecycle import PublishConnectorLifecycle, PublishConnectorLifecycleStage, StageStatus

from typing import List

Expand Down Expand Up @@ -38,6 +39,7 @@ def persist_registry_to_json(


def generate_and_persist_registry(
context: OpExecutionContext,
registry_entry_file_blobs: List[storage.Blob],
registry_directory_manager: GCSFileManager,
registry_name: str,
Expand All @@ -51,6 +53,12 @@ def generate_and_persist_registry(
Returns:
Output[ConnectorRegistryV0]: The registry.
"""
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_GENERATION,
StageStatus.IN_PROGRESS,
f"Generating {registry_name} registry...",
)
registry_dict = {"sources": [], "destinations": []}
for blob in registry_entry_file_blobs:
registry_entry, connector_type = read_registry_entry_blob(blob)
Expand All @@ -70,13 +78,20 @@ def generate_and_persist_registry(
"gcs_path": MetadataValue.url(file_handle.public_url),
}

PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_GENERATION,
StageStatus.SUCCESS,
f"New {registry_name} registry available at {file_handle.public_url}",
)

return Output(metadata=metadata, value=registry_model)


# Registry Generation


@asset(required_resource_keys={"registry_directory_manager", "latest_oss_registry_entries_file_blobs"}, group_name=GROUP_NAME)
@asset(required_resource_keys={"slack", "registry_directory_manager", "latest_oss_registry_entries_file_blobs"}, group_name=GROUP_NAME)
def persisted_oss_registry(context: OpExecutionContext) -> Output[ConnectorRegistryV0]:
"""
This asset is used to generate the oss registry from the registry entries.
Expand All @@ -86,13 +101,14 @@ def persisted_oss_registry(context: OpExecutionContext) -> Output[ConnectorRegis
latest_oss_registry_entries_file_blobs = context.resources.latest_oss_registry_entries_file_blobs

return generate_and_persist_registry(
context=context,
registry_entry_file_blobs=latest_oss_registry_entries_file_blobs,
registry_directory_manager=registry_directory_manager,
registry_name=registry_name,
)


@asset(required_resource_keys={"registry_directory_manager", "latest_cloud_registry_entries_file_blobs"}, group_name=GROUP_NAME)
@asset(required_resource_keys={"slack", "registry_directory_manager", "latest_cloud_registry_entries_file_blobs"}, group_name=GROUP_NAME)
def persisted_cloud_registry(context: OpExecutionContext) -> Output[ConnectorRegistryV0]:
"""
This asset is used to generate the cloud registry from the registry entries.
Expand All @@ -102,6 +118,7 @@ def persisted_cloud_registry(context: OpExecutionContext) -> Output[ConnectorReg
latest_cloud_registry_entries_file_blobs = context.resources.latest_cloud_registry_entries_file_blobs

return generate_and_persist_registry(
context=context,
registry_entry_file_blobs=latest_cloud_registry_entries_file_blobs,
registry_directory_manager=registry_directory_manager,
registry_name=registry_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from orchestrator.utils.dagster_helpers import OutputDataFrame
from orchestrator.models.metadata import MetadataDefinition, LatestMetadataEntry
from orchestrator.config import get_public_url_for_gcs_file, VALID_REGISTRIES, MAX_METADATA_PARTITION_RUN_REQUEST
from orchestrator.logging.publish_connector_lifecycle import PublishConnectorLifecycle, PublishConnectorLifecycleStage, StageStatus
import orchestrator.hacks as HACKS

from typing import List, Optional, Tuple, Union

PolymorphicRegistryEntry = Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition]
Expand Down Expand Up @@ -305,7 +307,7 @@ def safe_parse_metadata_definition(metadata_blob: storage.Blob) -> Optional[Meta


@asset(
required_resource_keys={"all_metadata_file_blobs"},
required_resource_keys={"slack", "all_metadata_file_blobs"},
group_name=GROUP_NAME,
partitions_def=metadata_partitions_def,
output_required=False,
Expand All @@ -323,7 +325,12 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
raise Exception(f"Could not find blob with etag {etag}")

metadata_file_path = matching_blob.name
context.log.info(f"Found metadata file with path {metadata_file_path} for etag {etag}")
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.METADATA_VALIDATION,
StageStatus.IN_PROGRESS,
f"Found metadata file with path {metadata_file_path} for etag {etag}",
)

# read the matching_blob into a metadata definition
metadata_def = safe_parse_metadata_definition(matching_blob)
Expand All @@ -337,7 +344,12 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat

# return only if the metadata definition is valid
if not metadata_def:
context.log.warn(f"Could not parse metadata definition for {metadata_file_path}")
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.METADATA_VALIDATION,
StageStatus.FAILED,
f"Could not parse metadata definition for {metadata_file_path}, dont panic, this can be expected for old metadata files",
)
return Output(value=None, metadata=dagster_metadata)

icon_file_path = metadata_file_path.replace(METADATA_FILE_NAME, ICON_FILE_NAME)
Expand All @@ -356,11 +368,18 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
file_path=metadata_file_path,
)

PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.METADATA_VALIDATION,
StageStatus.SUCCESS,
f"Successfully parsed metadata definition for {metadata_file_path}",
)

return Output(value=metadata_entry, metadata=dagster_metadata)


@asset(
required_resource_keys={"root_metadata_directory_manager"},
required_resource_keys={"slack", "root_metadata_directory_manager"},
group_name=GROUP_NAME,
partitions_def=metadata_partitions_def,
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
Expand All @@ -373,6 +392,13 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM
# if the metadata entry is invalid, return an empty dict
return Output(metadata={"empty_metadata": True}, value=None)

PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION,
StageStatus.IN_PROGRESS,
f"Generating registry entry for {metadata_entry.file_path}",
)

cached_specs = pd.DataFrame(list_cached_specs())

root_metadata_directory_manager = context.resources.root_metadata_directory_manager
Expand Down Expand Up @@ -401,4 +427,22 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM
**dagster_metadata_delete,
}

# Log the registry entries that were created
for registry_name, registry_url in persisted_registry_entries.items():
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION,
StageStatus.SUCCESS,
f"Successfully generated {registry_name} registry entry for {metadata_entry.file_path} at {registry_url}",
)

# Log the registry entries that were deleted
for registry_name, registry_url in deleted_registry_entries.items():
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION,
StageStatus.SUCCESS,
f"Successfully deleted {registry_name} registry entry for {metadata_entry.file_path}",
)

return Output(metadata=dagster_metadata, value=persisted_registry_entries)
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@

PolymorphicRegistryEntry = Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition]

def _is_docker_repository_overridden(metadata_entry: LatestMetadataEntry, registry_entry: PolymorphicRegistryEntry,) -> bool:

def _is_docker_repository_overridden(
metadata_entry: LatestMetadataEntry,
registry_entry: PolymorphicRegistryEntry,
) -> bool:
"""Check if the docker repository is overridden in the registry entry."""
registry_entry_docker_repository = registry_entry.dockerRepository
metadata_docker_repository = metadata_entry.metadata_definition.data.dockerRepository
return registry_entry_docker_repository != metadata_docker_repository


def _get_version_specific_registry_entry_file_path(registry_entry, registry_name):
"""Get the file path for the version specific registry entry file."""
docker_reposiory = registry_entry.dockerRepository
Expand All @@ -26,11 +31,15 @@ def _get_version_specific_registry_entry_file_path(registry_entry, registry_name
registry_entry_file_path = assumed_metadata_file_path.replace(METADATA_FILE_NAME, registry_name)
return registry_entry_file_path


def _check_for_invalid_write_path(write_path: str):
"""Check if the write path is valid."""

if "latest" in write_path:
raise ValueError("Cannot write to a path that contains 'latest'. That is reserved for the latest metadata file and its direct transformations")
raise ValueError(
"Cannot write to a path that contains 'latest'. That is reserved for the latest metadata file and its direct transformations"
)


def write_registry_to_overrode_file_paths(
registry_entry: PolymorphicRegistryEntry,
Expand Down Expand Up @@ -72,7 +81,8 @@ def write_registry_to_overrode_file_paths(
overrode_registry_entry_version_write_path = _get_version_specific_registry_entry_file_path(registry_entry, registry_name)
_check_for_invalid_write_path(overrode_registry_entry_version_write_path)
logger.info(f"Writing registry entry to {overrode_registry_entry_version_write_path}")
file_handle = registry_directory_manager.write_data(registry_entry_json.encode("utf-8"), ext="json", key=overrode_registry_entry_version_write_path)
file_handle = registry_directory_manager.write_data(
registry_entry_json.encode("utf-8"), ext="json", key=overrode_registry_entry_version_write_path
)
logger.info(f"Successfully wrote registry entry to {file_handle.public_url}")
return file_handle

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dagster import define_asset_job, AssetSelection, job, SkipReason, op
from orchestrator.assets import registry_entry
from orchestrator.config import MAX_METADATA_PARTITION_RUN_REQUEST, HIGH_QUEUE_PRIORITY
from orchestrator.logging.publish_connector_lifecycle import PublishConnectorLifecycle, PublishConnectorLifecycleStage, StageStatus

oss_registry_inclusive = AssetSelection.keys("persisted_oss_registry", "specs_secrets_mask_yaml").upstream()
generate_oss_registry = define_asset_job(name="generate_oss_registry", selection=oss_registry_inclusive)
Expand All @@ -19,29 +20,41 @@
)


@op(required_resource_keys={"all_metadata_file_blobs"})
@op(required_resource_keys={"slack", "all_metadata_file_blobs"})
def add_new_metadata_partitions_op(context):
"""
This op is responsible for polling for new metadata files and adding their etag to the dynamic partition.
"""
all_metadata_file_blobs = context.resources.all_metadata_file_blobs
partition_name = registry_entry.metadata_partitions_def.name

new_etags_found = [
blob.etag for blob in all_metadata_file_blobs if not context.instance.has_dynamic_partition(partition_name, blob.etag)
]
new_files_found = {
blob.etag: blob.name for blob in all_metadata_file_blobs if not context.instance.has_dynamic_partition(partition_name, blob.etag)
}

new_etags_found = list(new_files_found.keys())
context.log.info(f"New etags found: {new_etags_found}")

if not new_etags_found:
return SkipReason(f"No new metadata files to process in GCS bucket")

# if there are more than the MAX_METADATA_PARTITION_RUN_REQUEST, we need to split them into multiple runs
etags_to_process = new_etags_found
if len(new_etags_found) > MAX_METADATA_PARTITION_RUN_REQUEST:
new_etags_found = new_etags_found[:MAX_METADATA_PARTITION_RUN_REQUEST]
context.log.info(f"Only processing first {MAX_METADATA_PARTITION_RUN_REQUEST} new blobs: {new_etags_found}")
etags_to_process = etags_to_process[:MAX_METADATA_PARTITION_RUN_REQUEST]
context.log.info(f"Only processing first {MAX_METADATA_PARTITION_RUN_REQUEST} new blobs: {etags_to_process}")

context.instance.add_dynamic_partitions(partition_name, new_etags_found)
context.instance.add_dynamic_partitions(partition_name, etags_to_process)

# format new_files_found into a loggable string
new_metadata_log_string = "\n".join([f"{new_files_found[etag]} *{etag}* " for etag in etags_to_process])

PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.METADATA_SENSOR,
StageStatus.SUCCESS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd maybe make this an in progress? but not strongly opinionated on that one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point they are successfully queued! Next step is processing.

f"*Queued {len(etags_to_process)}/{len(new_etags_found)} new metadata files for processing:*\n\n {new_metadata_log_string}",
)


@job(tags={"dagster/priority": HIGH_QUEUE_PRIORITY})
Expand Down
Loading