Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Slack Alert lifecycle to Dagster for Metadata publish #28759

Merged
merged 11 commits into from
Aug 3, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ NIGHTLY_REPORT_SLACK_WEBHOOK_URL=""
# METADATA_CDN_BASE_URL="https://connectors.airbyte.com/files"
DOCKER_HUB_USERNAME=""
DOCKER_HUB_PASSWORD=""
SLACK_TOKEN = ""
PUBLISH_UPDATE_CHANNEL="#ben-test"
# SENTRY_DSN=""
# SENTRY_ENVIRONMENT="dev"
# SENTRY_TRACES_SAMPLE_RATE=1.0
# SENTRY_TRACES_SAMPLE_RATE=1.0
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from dagster import Definitions, ScheduleDefinition, load_assets_from_modules
from dagster import Definitions, ScheduleDefinition, EnvVar, load_assets_from_modules
from dagster_slack import SlackResource

from orchestrator.resources.gcp import gcp_gcs_client, gcs_directory_blobs, gcs_file_blob, gcs_file_manager
from orchestrator.resources.github import github_client, github_connector_repo, github_connectors_directory, github_workflow_runs
Expand Down Expand Up @@ -55,6 +56,10 @@
]
)

SLACK_RESOURCE_TREE = {
"slack": SlackResource(token=EnvVar("SLACK_TOKEN")),
}

GITHUB_RESOURCE_TREE = {
"github_client": github_client.configured({"github_token": {"env": "GITHUB_METADATA_SERVICE_TOKEN"}}),
"github_connector_repo": github_connector_repo.configured({"connector_repo_name": CONNECTOR_REPO_NAME}),
Expand All @@ -80,6 +85,7 @@
}

METADATA_RESOURCE_TREE = {
**SLACK_RESOURCE_TREE,
**GCS_RESOURCE_TREE,
"all_metadata_file_blobs": gcs_directory_blobs.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*/{METADATA_FILE_NAME}$"}
Expand All @@ -90,6 +96,7 @@
}

REGISTRY_RESOURCE_TREE = {
**SLACK_RESOURCE_TREE,
**GCS_RESOURCE_TREE,
"latest_oss_registry_gcs_blob": gcs_file_blob.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REGISTRIES_FOLDER, "gcs_filename": "oss_registry.json"}
Expand All @@ -100,6 +107,7 @@
}

REGISTRY_ENTRY_RESOURCE_TREE = {
**SLACK_RESOURCE_TREE,
**GCS_RESOURCE_TREE,
"latest_cloud_registry_entries_file_blobs": gcs_directory_blobs.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*latest/cloud.json$"}
Expand Down Expand Up @@ -140,13 +148,13 @@
job=generate_oss_registry,
resources_def=REGISTRY_ENTRY_RESOURCE_TREE,
gcs_blobs_resource_key="latest_oss_registry_entries_file_blobs",
interval=30,
interval=60,
Comment on lines -143 to +151
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we intentionally slowing things down here? What's the reasoning for doing so (just wondering)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Snuck it in to deal with a senor resource issue.

https://airbytehq-team.slack.com/archives/C056HGD1QSW/p1690931225565009

Very minor change, figured I would Save an approve-and-merge.

),
new_gcs_blobs_sensor(
job=generate_cloud_registry,
resources_def=REGISTRY_ENTRY_RESOURCE_TREE,
gcs_blobs_resource_key="latest_cloud_registry_entries_file_blobs",
interval=30,
interval=60,
),
new_gcs_blobs_sensor(
job=generate_nightly_reports,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def generate_nightly_report(context: OpExecutionContext) -> Output[pd.DataFrame]
nightly_report_complete_md = render_connector_nightly_report_md(nightly_report_connector_matrix_df, nightly_report_complete_df)
slack_webhook_url = os.getenv("NIGHTLY_REPORT_SLACK_WEBHOOK_URL")
if slack_webhook_url:
send_slack_webhook(slack_webhook_url, nightly_report_complete_md)
send_slack_webhook(slack_webhook_url, nightly_report_complete_md, wrap_in_code_block=True)

return Output(
nightly_report_connector_matrix_df,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from metadata_service.models.transform import to_json_sanitized_dict

from orchestrator.assets.registry_entry import read_registry_entry_blob
from orchestrator.logging.publish_connector_lifecycle import PublishConnectorLifecycle, PublishConnectorLifecycleStage, StageStatus
from orchestrator.logging import sentry

from typing import List
Expand Down Expand Up @@ -43,6 +44,7 @@ def persist_registry_to_json(

@sentry_sdk.trace
def generate_and_persist_registry(
context: OpExecutionContext,
registry_entry_file_blobs: List[storage.Blob],
registry_directory_manager: GCSFileManager,
registry_name: str,
Expand All @@ -56,6 +58,12 @@ def generate_and_persist_registry(
Returns:
Output[ConnectorRegistryV0]: The registry.
"""
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_GENERATION,
StageStatus.IN_PROGRESS,
f"Generating {registry_name} registry...",
)
registry_dict = {"sources": [], "destinations": []}
for blob in registry_entry_file_blobs:
registry_entry, connector_type = read_registry_entry_blob(blob)
Expand All @@ -75,13 +83,20 @@ def generate_and_persist_registry(
"gcs_path": MetadataValue.url(file_handle.public_url),
}

PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_GENERATION,
StageStatus.SUCCESS,
f"New {registry_name} registry available at {file_handle.public_url}",
)

return Output(metadata=metadata, value=registry_model)


# Registry Generation


@asset(required_resource_keys={"registry_directory_manager", "latest_oss_registry_entries_file_blobs"}, group_name=GROUP_NAME)
@asset(required_resource_keys={"slack", "registry_directory_manager", "latest_oss_registry_entries_file_blobs"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def persisted_oss_registry(context: OpExecutionContext) -> Output[ConnectorRegistryV0]:
"""
Expand All @@ -92,13 +107,14 @@ def persisted_oss_registry(context: OpExecutionContext) -> Output[ConnectorRegis
latest_oss_registry_entries_file_blobs = context.resources.latest_oss_registry_entries_file_blobs

return generate_and_persist_registry(
context=context,
registry_entry_file_blobs=latest_oss_registry_entries_file_blobs,
registry_directory_manager=registry_directory_manager,
registry_name=registry_name,
)


@asset(required_resource_keys={"registry_directory_manager", "latest_cloud_registry_entries_file_blobs"}, group_name=GROUP_NAME)
@asset(required_resource_keys={"slack", "registry_directory_manager", "latest_cloud_registry_entries_file_blobs"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def persisted_cloud_registry(context: OpExecutionContext) -> Output[ConnectorRegistryV0]:
"""
Expand All @@ -109,6 +125,7 @@ def persisted_cloud_registry(context: OpExecutionContext) -> Output[ConnectorReg
latest_cloud_registry_entries_file_blobs = context.resources.latest_cloud_registry_entries_file_blobs

return generate_and_persist_registry(
context=context,
registry_entry_file_blobs=latest_cloud_registry_entries_file_blobs,
registry_directory_manager=registry_directory_manager,
registry_name=registry_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from orchestrator.utils.dagster_helpers import OutputDataFrame
from orchestrator.models.metadata import MetadataDefinition, LatestMetadataEntry
from orchestrator.config import get_public_url_for_gcs_file, VALID_REGISTRIES, MAX_METADATA_PARTITION_RUN_REQUEST
from orchestrator.logging.publish_connector_lifecycle import PublishConnectorLifecycle, PublishConnectorLifecycleStage, StageStatus
from orchestrator.logging import sentry

import orchestrator.hacks as HACKS
Expand Down Expand Up @@ -315,7 +316,7 @@ def safe_parse_metadata_definition(metadata_blob: storage.Blob) -> Optional[Meta


@asset(
required_resource_keys={"all_metadata_file_blobs"},
required_resource_keys={"slack", "all_metadata_file_blobs"},
group_name=GROUP_NAME,
partitions_def=metadata_partitions_def,
output_required=False,
Expand All @@ -334,7 +335,12 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
raise Exception(f"Could not find blob with etag {etag}")

metadata_file_path = matching_blob.name
context.log.info(f"Found metadata file with path {metadata_file_path} for etag {etag}")
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.METADATA_VALIDATION,
StageStatus.IN_PROGRESS,
f"Found metadata file with path {metadata_file_path} for etag {etag}",
)

# read the matching_blob into a metadata definition
metadata_def = safe_parse_metadata_definition(matching_blob)
Expand All @@ -348,7 +354,12 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat

# return only if the metadata definition is valid
if not metadata_def:
context.log.warn(f"Could not parse metadata definition for {metadata_file_path}")
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.METADATA_VALIDATION,
StageStatus.FAILED,
f"Could not parse metadata definition for {metadata_file_path}, dont panic, this can be expected for old metadata files",
)
return Output(value=None, metadata=dagster_metadata)

icon_file_path = metadata_file_path.replace(METADATA_FILE_NAME, ICON_FILE_NAME)
Expand All @@ -367,11 +378,18 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
file_path=metadata_file_path,
)

PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.METADATA_VALIDATION,
StageStatus.SUCCESS,
f"Successfully parsed metadata definition for {metadata_file_path}",
)

return Output(value=metadata_entry, metadata=dagster_metadata)


@asset(
required_resource_keys={"root_metadata_directory_manager"},
required_resource_keys={"slack", "root_metadata_directory_manager"},
group_name=GROUP_NAME,
partitions_def=metadata_partitions_def,
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
Expand All @@ -385,6 +403,13 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM
# if the metadata entry is invalid, return an empty dict
return Output(metadata={"empty_metadata": True}, value=None)

PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION,
StageStatus.IN_PROGRESS,
f"Generating registry entry for {metadata_entry.file_path}",
)

cached_specs = pd.DataFrame(list_cached_specs())

root_metadata_directory_manager = context.resources.root_metadata_directory_manager
Expand Down Expand Up @@ -413,4 +438,22 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM
**dagster_metadata_delete,
}

# Log the registry entries that were created
for registry_name, registry_url in persisted_registry_entries.items():
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION,
StageStatus.SUCCESS,
f"Successfully generated {registry_name} registry entry for {metadata_entry.file_path} at {registry_url}",
)

# Log the registry entries that were deleted
for registry_name, registry_url in deleted_registry_entries.items():
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION,
StageStatus.SUCCESS,
f"Successfully deleted {registry_name} registry entry for {metadata_entry.file_path}",
)

return Output(metadata=dagster_metadata, value=persisted_registry_entries)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dagster import define_asset_job, AssetSelection, job, SkipReason, op
from orchestrator.assets import registry_entry
from orchestrator.config import MAX_METADATA_PARTITION_RUN_REQUEST, HIGH_QUEUE_PRIORITY
from orchestrator.logging.publish_connector_lifecycle import PublishConnectorLifecycle, PublishConnectorLifecycleStage, StageStatus

oss_registry_inclusive = AssetSelection.keys("persisted_oss_registry", "specs_secrets_mask_yaml").upstream()
generate_oss_registry = define_asset_job(name="generate_oss_registry", selection=oss_registry_inclusive)
Expand All @@ -19,29 +20,41 @@
)


@op(required_resource_keys={"all_metadata_file_blobs"})
@op(required_resource_keys={"slack", "all_metadata_file_blobs"})
def add_new_metadata_partitions_op(context):
"""
This op is responsible for polling for new metadata files and adding their etag to the dynamic partition.
"""
all_metadata_file_blobs = context.resources.all_metadata_file_blobs
partition_name = registry_entry.metadata_partitions_def.name

new_etags_found = [
blob.etag for blob in all_metadata_file_blobs if not context.instance.has_dynamic_partition(partition_name, blob.etag)
]
new_files_found = {
blob.etag: blob.name for blob in all_metadata_file_blobs if not context.instance.has_dynamic_partition(partition_name, blob.etag)
}

new_etags_found = list(new_files_found.keys())
context.log.info(f"New etags found: {new_etags_found}")

if not new_etags_found:
return SkipReason(f"No new metadata files to process in GCS bucket")

# if there are more than the MAX_METADATA_PARTITION_RUN_REQUEST, we need to split them into multiple runs
etags_to_process = new_etags_found
if len(new_etags_found) > MAX_METADATA_PARTITION_RUN_REQUEST:
new_etags_found = new_etags_found[:MAX_METADATA_PARTITION_RUN_REQUEST]
context.log.info(f"Only processing first {MAX_METADATA_PARTITION_RUN_REQUEST} new blobs: {new_etags_found}")
etags_to_process = etags_to_process[:MAX_METADATA_PARTITION_RUN_REQUEST]
context.log.info(f"Only processing first {MAX_METADATA_PARTITION_RUN_REQUEST} new blobs: {etags_to_process}")

context.instance.add_dynamic_partitions(partition_name, new_etags_found)
context.instance.add_dynamic_partitions(partition_name, etags_to_process)

# format new_files_found into a loggable string
new_metadata_log_string = "\n".join([f"{new_files_found[etag]} *{etag}* " for etag in etags_to_process])

PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.METADATA_SENSOR,
StageStatus.SUCCESS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd maybe make this an in progress? but not strongly opinionated on that one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point they are successfully queued! Next step is processing.

f"*Queued {len(etags_to_process)}/{len(new_etags_found)} new metadata files for processing:*\n\n {new_metadata_log_string}",
)


@job(tags={"dagster/priority": HIGH_QUEUE_PRIORITY})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import os

from enum import Enum
from dagster import OpExecutionContext
from orchestrator.ops.slack import send_slack_message


class StageStatus(str, Enum):
IN_PROGRESS = "in_progress"
SUCCESS = "success"
FAILED = "failed"

def __str__(self) -> str:
# convert to upper case
return self.value.replace("_", " ").upper()

def to_emoji(self) -> str:
if self == StageStatus.IN_PROGRESS:
return "🟡"
elif self == StageStatus.SUCCESS:
return "🟢"
elif self == StageStatus.FAILED:
return "🔴"
else:
return ""


class PublishConnectorLifecycleStage(str, Enum):
METADATA_SENSOR = "metadata_sensor"
METADATA_VALIDATION = "metadata_validation"
REGISTRY_ENTRY_GENERATION = "registry_entry_generation"
REGISTRY_GENERATION = "registry_generation"

def __str__(self) -> str:
# convert to title case
return self.value.replace("_", " ").title()


class PublishConnectorLifecycle:
"""
This class is used to log the lifecycle of a publishing a connector to the registries.

It is used to log to the logger and slack (if enabled).

This is nessesary as this lifecycle is not a single job, asset, resource, schedule, or sensor.
"""

@staticmethod
def stage_to_log_level(stage_status: StageStatus) -> str:
if stage_status == StageStatus.FAILED:
return "error"
else:
return "info"

@staticmethod
def create_log_message(
lifecycle_stage: PublishConnectorLifecycleStage,
stage_status: StageStatus,
message: str,
) -> str:
emoji = stage_status.to_emoji()
return f"*{emoji} _{lifecycle_stage}_ {stage_status}*: {message}"

@staticmethod
def log(context: OpExecutionContext, lifecycle_stage: PublishConnectorLifecycleStage, stage_status: StageStatus, message: str):
"""Publish a connector notification log to logger and slack (if enabled)."""
message = PublishConnectorLifecycle.create_log_message(lifecycle_stage, stage_status, message)

level = PublishConnectorLifecycle.stage_to_log_level(stage_status)
log_method = getattr(context.log, level)
log_method(message)
channel = os.getenv("PUBLISH_UPDATE_CHANNEL")
if channel:
slack_message = f"🤖 {message}"
send_slack_message(context, channel, slack_message)
Loading