Skip to content

Commit

Permalink
Dagster: Add sentry logging (#28822)
Browse files Browse the repository at this point in the history
* Add sentry

* add sentry decorator

* Add traces

* Use sentry trace

* Improve duplicate logging

* Add comments

* DNC

* Fix up issues

* Move to scopes

* Remove breadcrumb

* Update lock
  • Loading branch information
bnchrch authored Aug 3, 2023
1 parent 641a65a commit 464409a
Show file tree
Hide file tree
Showing 13 changed files with 344 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@ GITHUB_METADATA_SERVICE_TOKEN=""
NIGHTLY_REPORT_SLACK_WEBHOOK_URL=""
# METADATA_CDN_BASE_URL="https://connectors.airbyte.com/files"
DOCKER_HUB_USERNAME=""
DOCKER_HUB_PASSWORD=""
DOCKER_HUB_PASSWORD=""
# SENTRY_DSN=""
# SENTRY_ENVIRONMENT="dev"
# SENTRY_TRACES_SAMPLE_RATE=1.0
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from orchestrator.jobs.connector_test_report import generate_nightly_reports, generate_connector_test_summary_reports
from orchestrator.sensors.registry import registry_updated_sensor
from orchestrator.sensors.gcs import new_gcs_blobs_sensor
from orchestrator.logging.sentry import setup_dagster_sentry

from orchestrator.config import (
REPORT_FOLDER,
Expand Down Expand Up @@ -175,6 +176,9 @@
This is the entry point for the orchestrator.
It is a list of all the jobs, assets, resources, schedules, and sensors that are available to the orchestrator.
"""

setup_dagster_sentry()

defn = Definitions(
jobs=JOBS,
assets=ASSETS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
render_connector_test_badge,
)
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe
from orchestrator.logging import sentry


T = TypeVar("T")
Expand Down Expand Up @@ -126,6 +127,7 @@ def compute_connector_nightly_report_history(


@asset(required_resource_keys={"latest_nightly_complete_file_blobs", "latest_nightly_test_output_file_blobs"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def generate_nightly_report(context: OpExecutionContext) -> Output[pd.DataFrame]:
"""
Generate the Connector Nightly Report from the latest 10 nightly runs
Expand Down Expand Up @@ -154,6 +156,7 @@ def generate_nightly_report(context: OpExecutionContext) -> Output[pd.DataFrame]


@asset(required_resource_keys={"all_connector_test_output_file_blobs"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def last_10_connector_test_results(context: OpExecutionContext) -> OutputDataFrame:
gcs_file_blobs = context.resources.all_connector_test_output_file_blobs

Expand Down Expand Up @@ -194,6 +197,7 @@ def last_10_connector_test_results(context: OpExecutionContext) -> OutputDataFra


@asset(required_resource_keys={"registry_report_directory_manager"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def persist_connectors_test_summary_files(context: OpExecutionContext, last_10_connector_test_results: OutputDataFrame) -> OutputDataFrame:
registry_report_directory_manager = context.resources.registry_report_directory_manager

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from dagster import Output, asset, OpExecutionContext
import pandas as pd
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe
from orchestrator.logging import sentry


GROUP_NAME = "github"


@asset(required_resource_keys={"github_connectors_directory"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def github_connector_folders(context):
"""
Return a list of all the folders in the github connectors directory.
Expand All @@ -18,6 +20,7 @@ def github_connector_folders(context):


@asset(required_resource_keys={"github_connector_nightly_workflow_successes"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def github_connector_nightly_workflow_successes(context: OpExecutionContext) -> OutputDataFrame:
"""
Return a list of all the latest nightly workflow runs for the connectors repo.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from orchestrator.utils.object_helpers import are_values_equal, merge_values
from orchestrator.models.metadata import PartialMetadataDefinition, MetadataDefinition, LatestMetadataEntry
from orchestrator.config import get_public_url_for_gcs_file
from orchestrator.logging import sentry

GROUP_NAME = "metadata"

Expand Down Expand Up @@ -176,6 +177,7 @@ def validate_metadata(metadata: PartialMetadataDefinition) -> tuple[bool, str]:


@asset(required_resource_keys={"latest_metadata_file_blobs"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def metadata_definitions(context: OpExecutionContext) -> List[LatestMetadataEntry]:
latest_metadata_file_blobs = context.resources.latest_metadata_file_blobs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import json
import sentry_sdk
from google.cloud import storage

from dagster import asset, OpExecutionContext, MetadataValue, Output
from dagster_gcp.gcs.file_manager import GCSFileManager, GCSFileHandle

from metadata_service.models.generated.ConnectorRegistryV0 import ConnectorRegistryV0
from metadata_service.utils import to_json_sanitized_dict

from orchestrator.assets.registry_entry import read_registry_entry_blob
from orchestrator.logging import sentry

from typing import List


GROUP_NAME = "registry"


@sentry_sdk.trace
def persist_registry_to_json(
registry: ConnectorRegistryV0, registry_name: str, registry_directory_manager: GCSFileManager
) -> GCSFileHandle:
Expand All @@ -37,6 +41,7 @@ def persist_registry_to_json(
return file_handle


@sentry_sdk.trace
def generate_and_persist_registry(
registry_entry_file_blobs: List[storage.Blob],
registry_directory_manager: GCSFileManager,
Expand Down Expand Up @@ -77,6 +82,7 @@ def generate_and_persist_registry(


@asset(required_resource_keys={"registry_directory_manager", "latest_oss_registry_entries_file_blobs"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def persisted_oss_registry(context: OpExecutionContext) -> Output[ConnectorRegistryV0]:
"""
This asset is used to generate the oss registry from the registry entries.
Expand All @@ -93,6 +99,7 @@ def persisted_oss_registry(context: OpExecutionContext) -> Output[ConnectorRegis


@asset(required_resource_keys={"registry_directory_manager", "latest_cloud_registry_entries_file_blobs"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def persisted_cloud_registry(context: OpExecutionContext) -> Output[ConnectorRegistryV0]:
"""
This asset is used to generate the cloud registry from the registry entries.
Expand All @@ -112,16 +119,19 @@ def persisted_cloud_registry(context: OpExecutionContext) -> Output[ConnectorReg


@asset(required_resource_keys={"latest_cloud_registry_gcs_blob"}, group_name=GROUP_NAME)
def latest_cloud_registry(latest_cloud_registry_dict: dict) -> ConnectorRegistryV0:
@sentry.instrument_asset_op
def latest_cloud_registry(_context: OpExecutionContext, latest_cloud_registry_dict: dict) -> ConnectorRegistryV0:
return ConnectorRegistryV0.parse_obj(latest_cloud_registry_dict)


@asset(required_resource_keys={"latest_oss_registry_gcs_blob"}, group_name=GROUP_NAME)
def latest_oss_registry(latest_oss_registry_dict: dict) -> ConnectorRegistryV0:
@sentry.instrument_asset_op
def latest_oss_registry(_context: OpExecutionContext, latest_oss_registry_dict: dict) -> ConnectorRegistryV0:
return ConnectorRegistryV0.parse_obj(latest_oss_registry_dict)


@asset(required_resource_keys={"latest_cloud_registry_gcs_blob"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def latest_cloud_registry_dict(context: OpExecutionContext) -> dict:
oss_registry_file = context.resources.latest_cloud_registry_gcs_blob
json_string = oss_registry_file.download_as_string().decode("utf-8")
Expand All @@ -130,6 +140,7 @@ def latest_cloud_registry_dict(context: OpExecutionContext) -> dict:


@asset(required_resource_keys={"latest_oss_registry_gcs_blob"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def latest_oss_registry_dict(context: OpExecutionContext) -> dict:
oss_registry_file = context.resources.latest_oss_registry_gcs_blob
json_string = oss_registry_file.download_as_string().decode("utf-8")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pandas as pd
import os
import copy
import sentry_sdk

from pydantic import ValidationError
from google.cloud import storage
Expand All @@ -19,7 +20,10 @@
from orchestrator.utils.dagster_helpers import OutputDataFrame
from orchestrator.models.metadata import MetadataDefinition, LatestMetadataEntry
from orchestrator.config import get_public_url_for_gcs_file, VALID_REGISTRIES, MAX_METADATA_PARTITION_RUN_REQUEST
from orchestrator.logging import sentry

import orchestrator.hacks as HACKS

from typing import List, Optional, Tuple, Union

PolymorphicRegistryEntry = Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition]
Expand All @@ -39,6 +43,7 @@ class MissingCachedSpecError(Exception):
# HELPERS


@sentry_sdk.trace
def apply_spec_to_registry_entry(registry_entry: dict, cached_specs: OutputDataFrame) -> dict:
cached_connector_version = {
(cached_spec["docker_repository"], cached_spec["docker_image_tag"]): cached_spec["spec_cache_path"]
Expand Down Expand Up @@ -115,6 +120,7 @@ def apply_overrides_from_registry(metadata_data: dict, override_registry_key: st


@deep_copy_params
@sentry_sdk.trace
def metadata_to_registry_entry(metadata_entry: LatestMetadataEntry, override_registry_key: str) -> dict:
"""Convert the metadata definition to a registry entry.
Expand Down Expand Up @@ -164,6 +170,7 @@ def metadata_to_registry_entry(metadata_entry: LatestMetadataEntry, override_reg
return overridden_metadata_data


@sentry_sdk.trace
def read_registry_entry_blob(registry_entry_blob: storage.Blob) -> TaggedRegistryEntry:
json_string = registry_entry_blob.download_as_string().decode("utf-8")
registry_entry_dict = json.loads(json_string)
Expand Down Expand Up @@ -192,6 +199,7 @@ def get_registry_entry_write_path(metadata_entry: LatestMetadataEntry, registry_
return os.path.join(metadata_folder, registry_name)


@sentry_sdk.trace
def persist_registry_entry_to_json(
registry_entry: PolymorphicRegistryEntry,
registry_name: str,
Expand All @@ -216,6 +224,7 @@ def persist_registry_entry_to_json(
return file_handle


@sentry_sdk.trace
def generate_and_persist_registry_entry(
metadata_entry: LatestMetadataEntry,
cached_specs: OutputDataFrame,
Expand Down Expand Up @@ -282,6 +291,7 @@ def delete_registry_entry(registry_name, registry_entry: LatestMetadataEntry, me
return file_handle.public_url if file_handle else None


@sentry_sdk.trace
def safe_parse_metadata_definition(metadata_blob: storage.Blob) -> Optional[MetadataDefinition]:
"""
Safely parse the metadata definition from the given metadata entry.
Expand Down Expand Up @@ -311,6 +321,7 @@ def safe_parse_metadata_definition(metadata_blob: storage.Blob) -> Optional[Meta
output_required=False,
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
)
@sentry.instrument_asset_op
def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadataEntry]]:
"""Parse and compute the LatestMetadataEntry for the given metadata file."""
etag = context.partition_key
Expand Down Expand Up @@ -365,6 +376,7 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
partitions_def=metadata_partitions_def,
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
)
@sentry.instrument_asset_op
def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry]) -> Output[Optional[dict]]:
"""
Generate the registry entry files from the given metadata file, and persist it to GCS.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sentry_sdk
import pandas as pd
from dagster import MetadataValue, Output, asset
from typing import List
Expand All @@ -11,6 +12,7 @@
)
from orchestrator.config import CONNECTOR_REPO_NAME, CONNECTOR_TEST_SUMMARY_FOLDER, REPORT_FOLDER, get_public_metadata_service_url
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe
from orchestrator.logging import sentry

from metadata_service.utils import to_json_sanitized_dict
from metadata_service.models.generated.ConnectorRegistryV0 import ConnectorRegistryV0
Expand Down Expand Up @@ -84,6 +86,7 @@ def test_summary_url(row: pd.DataFrame) -> str:
# 📊 Dataframe Augmentation


@sentry_sdk.trace
def augment_and_normalize_connector_dataframes(
cloud_df: pd.DataFrame, oss_df: pd.DataFrame, primary_key: str, connector_type: str, github_connector_folders: List[str]
) -> pd.DataFrame:
Expand Down Expand Up @@ -130,34 +133,39 @@ def augment_and_normalize_connector_dataframes(


@asset(group_name=GROUP_NAME)
@sentry_sdk.trace
def cloud_sources_dataframe(latest_cloud_registry: ConnectorRegistryV0) -> OutputDataFrame:
latest_cloud_registry_dict = to_json_sanitized_dict(latest_cloud_registry)
sources = latest_cloud_registry_dict["sources"]
return output_dataframe(pd.DataFrame(sources))


@asset(group_name=GROUP_NAME)
@sentry_sdk.trace
def oss_sources_dataframe(latest_oss_registry: ConnectorRegistryV0) -> OutputDataFrame:
latest_oss_registry_dict = to_json_sanitized_dict(latest_oss_registry)
sources = latest_oss_registry_dict["sources"]
return output_dataframe(pd.DataFrame(sources))


@asset(group_name=GROUP_NAME)
@sentry_sdk.trace
def cloud_destinations_dataframe(latest_cloud_registry: ConnectorRegistryV0) -> OutputDataFrame:
latest_cloud_registry_dict = to_json_sanitized_dict(latest_cloud_registry)
destinations = latest_cloud_registry_dict["destinations"]
return output_dataframe(pd.DataFrame(destinations))


@asset(group_name=GROUP_NAME)
@sentry_sdk.trace
def oss_destinations_dataframe(latest_oss_registry: ConnectorRegistryV0) -> OutputDataFrame:
latest_oss_registry_dict = to_json_sanitized_dict(latest_oss_registry)
destinations = latest_oss_registry_dict["destinations"]
return output_dataframe(pd.DataFrame(destinations))


@asset(group_name=GROUP_NAME)
@sentry_sdk.trace
def all_sources_dataframe(cloud_sources_dataframe, oss_sources_dataframe, github_connector_folders) -> pd.DataFrame:
"""
Merge the cloud and oss sources registries into a single dataframe.
Expand All @@ -173,6 +181,7 @@ def all_sources_dataframe(cloud_sources_dataframe, oss_sources_dataframe, github


@asset(group_name=GROUP_NAME)
@sentry_sdk.trace
def all_destinations_dataframe(cloud_destinations_dataframe, oss_destinations_dataframe, github_connector_folders) -> pd.DataFrame:
"""
Merge the cloud and oss destinations registries into a single dataframe.
Expand All @@ -188,6 +197,7 @@ def all_destinations_dataframe(cloud_destinations_dataframe, oss_destinations_da


@asset(required_resource_keys={"registry_report_directory_manager"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def connector_registry_report(context, all_destinations_dataframe, all_sources_dataframe):
"""
Generate a report of the connector registry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

import dpath.util
import yaml
from dagster import MetadataValue, Output, asset
import sentry_sdk

from dagster import MetadataValue, Output, asset, OpExecutionContext

from metadata_service.models.generated.ConnectorRegistryV0 import ConnectorRegistryV0
from orchestrator.logging import sentry

GROUP_NAME = "specs_secrets_mask"

# HELPERS


@sentry_sdk.trace
def get_secrets_properties_from_registry_entry(registry_entry: dict) -> List[str]:
"""Traverse a registry entry to spot properties in a spec that have the "airbyte_secret" field set to true.
Expand Down Expand Up @@ -46,7 +51,10 @@ def get_secrets_properties_from_registry_entry(registry_entry: dict) -> List[str


@asset(group_name=GROUP_NAME)
def all_specs_secrets(persisted_oss_registry: ConnectorRegistryV0, persisted_cloud_registry: ConnectorRegistryV0) -> Set[str]:
@sentry.instrument_asset_op
def all_specs_secrets(
context: OpExecutionContext, persisted_oss_registry: ConnectorRegistryV0, persisted_cloud_registry: ConnectorRegistryV0
) -> Set[str]:
oss_registry_from_metadata_dict = persisted_oss_registry.dict()
cloud_registry_from_metadata_dict = persisted_cloud_registry.dict()

Expand All @@ -63,7 +71,8 @@ def all_specs_secrets(persisted_oss_registry: ConnectorRegistryV0, persisted_clo


@asset(required_resource_keys={"registry_directory_manager"}, group_name=GROUP_NAME)
def specs_secrets_mask_yaml(context, all_specs_secrets: Set[str]) -> Output:
@sentry.instrument_asset_op
def specs_secrets_mask_yaml(context: OpExecutionContext, all_specs_secrets: Set[str]) -> Output:
yaml_string = yaml.dump({"properties": list(all_specs_secrets)})
registry_directory_manager = context.resources.registry_directory_manager
file_handle = registry_directory_manager.write_data(yaml_string.encode(), ext="yaml", key="specs_secrets_mask")
Expand Down
Loading

0 comments on commit 464409a

Please sign in to comment.