From daed90d43a7f00d38afe7448e7167045049bcecf Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 22 Nov 2024 16:25:13 -0800 Subject: [PATCH 1/2] fix(ingest): ensure sentry is initialized with graph tags --- .../src/datahub/ingestion/graph/client.py | 2 ++ .../src/datahub/ingestion/run/pipeline.py | 9 ++++---- .../src/datahub/telemetry/telemetry.py | 23 ++++++++++++++----- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 759aebcfd46b0a..4aa937639e9590 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -67,6 +67,7 @@ SystemMetadataClass, TelemetryClientIdClass, ) +from datahub.telemetry.telemetry import telemetry_instance from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.str_enum import StrEnum from datahub.utilities.urns.urn import Urn, guess_entity_type @@ -1819,4 +1820,5 @@ def get_default_graph() -> DataHubGraph: graph_config = config_utils.load_client_config() graph = DataHubGraph(graph_config) graph.test_connection() + telemetry_instance.set_context(server=graph) return graph diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 7c3a42c3e08931..667129ff83584a 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -44,7 +44,8 @@ ) from datahub.ingestion.transformer.transform_registry import transform_registry from datahub.metadata.schema_classes import MetadataChangeProposalClass -from datahub.telemetry import stats, telemetry +from datahub.telemetry import stats +from datahub.telemetry.telemetry import telemetry_instance from datahub.utilities._custom_package_loader import model_version_name from datahub.utilities.global_warning_util import ( clear_global_warnings, @@ -273,8 +274,9 @@ def __init__( if self.graph is None and isinstance(self.sink, DatahubRestSink): with _add_init_error_context("setup default datahub client"): self.graph = self.sink.emitter.to_graph() + self.graph.test_connection() self.ctx.graph = self.graph - telemetry.telemetry_instance.update_capture_exception_context(server=self.graph) + telemetry_instance.set_context(server=self.graph) with set_graph_context(self.graph): with _add_init_error_context("configure reporters"): @@ -615,7 +617,7 @@ def log_ingestion_stats(self) -> None: sink_warnings = len(self.sink.get_report().warnings) global_warnings = len(get_global_warnings()) - telemetry.telemetry_instance.ping( + telemetry_instance.ping( "ingest_stats", { "source_type": self.source_type, @@ -637,7 +639,6 @@ def log_ingestion_stats(self) -> None: ), "has_pipeline_name": bool(self.config.pipeline_name), }, - self.ctx.graph, ) def _approx_all_vals(self, d: LossyList[Any]) -> int: diff --git a/metadata-ingestion/src/datahub/telemetry/telemetry.py b/metadata-ingestion/src/datahub/telemetry/telemetry.py index 4faf04ee2d2c76..6a9091c4bf2f3a 100644 --- a/metadata-ingestion/src/datahub/telemetry/telemetry.py +++ b/metadata-ingestion/src/datahub/telemetry/telemetry.py @@ -117,7 +117,11 @@ class Telemetry: tracking_init: bool = False sentry_enabled: bool = False + context_properties: Dict[str, Any] = {} + def __init__(self): + self.context_properties = {} + if SENTRY_DSN: self.sentry_enabled = True try: @@ -157,6 +161,9 @@ def __init__(self): except Exception as e: logger.debug(f"Error connecting to mixpanel: {e}") + # Initialize the default properties for all events. + self.set_context() + def update_config(self) -> bool: """ Update the config file with the current client ID and enabled status. @@ -238,18 +245,22 @@ def load_config(self) -> bool: return False - def update_capture_exception_context( + def set_context( self, server: Optional[DataHubGraph] = None, properties: Optional[Dict[str, Any]] = None, ) -> None: + self.context_properties = { + **self._server_props(server), + **(properties or {}), + } + if self.sentry_enabled: from sentry_sdk import set_tag properties = { **_default_telemetry_properties(), - **self._server_props(server), - **(properties or {}), + **self.context_properties, } for key in properties: @@ -297,7 +308,6 @@ def ping( self, event_name: str, properties: Optional[Dict[str, Any]] = None, - server: Optional[DataHubGraph] = None, ) -> None: """ Send a single telemetry event. @@ -323,14 +333,15 @@ def ping( properties = { **_default_telemetry_properties(), - **self._server_props(server), + **self.context_properties, **properties, } self.mp.track(self.client_id, event_name, properties) except Exception as e: logger.debug(f"Error reporting telemetry: {e}") - def _server_props(self, server: Optional[DataHubGraph]) -> Dict[str, str]: + @classmethod + def _server_props(cls, server: Optional[DataHubGraph]) -> Dict[str, str]: if not server: return { "server_type": "n/a", From 502f8fdaddd2bf86c96f072bc139835a279f04a0 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 25 Nov 2024 14:44:38 -0500 Subject: [PATCH 2/2] fix import cycle --- metadata-ingestion/src/datahub/telemetry/telemetry.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/telemetry/telemetry.py b/metadata-ingestion/src/datahub/telemetry/telemetry.py index 6a9091c4bf2f3a..22b2cb6a101af9 100644 --- a/metadata-ingestion/src/datahub/telemetry/telemetry.py +++ b/metadata-ingestion/src/datahub/telemetry/telemetry.py @@ -7,7 +7,7 @@ import uuid from functools import wraps from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, TypeVar +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, TypeVar from mixpanel import Consumer, Mixpanel from typing_extensions import ParamSpec @@ -16,10 +16,12 @@ from datahub.cli.config_utils import DATAHUB_ROOT_FOLDER from datahub.cli.env_utils import get_boolean_env_variable from datahub.configuration.common import ExceptionWithProps -from datahub.ingestion.graph.client import DataHubGraph from datahub.metadata.schema_classes import _custom_package_path from datahub.utilities.perf_timer import PerfTimer +if TYPE_CHECKING: + from datahub.ingestion.graph.client import DataHubGraph + logger = logging.getLogger(__name__) DATAHUB_FOLDER = Path(DATAHUB_ROOT_FOLDER) @@ -247,7 +249,7 @@ def load_config(self) -> bool: def set_context( self, - server: Optional[DataHubGraph] = None, + server: Optional["DataHubGraph"] = None, properties: Optional[Dict[str, Any]] = None, ) -> None: self.context_properties = { @@ -341,7 +343,7 @@ def ping( logger.debug(f"Error reporting telemetry: {e}") @classmethod - def _server_props(cls, server: Optional[DataHubGraph]) -> Dict[str, str]: + def _server_props(cls, server: Optional["DataHubGraph"]) -> Dict[str, str]: if not server: return { "server_type": "n/a", @@ -446,6 +448,7 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: **call_props, "status": "error", **_error_props(e), + "code": e.code, }, ) telemetry_instance.capture_exception(e)