Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): ensure sentry is initialized with graph tags #11949

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
SystemMetadataClass,
TelemetryClientIdClass,
)
from datahub.telemetry.telemetry import telemetry_instance
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.str_enum import StrEnum
from datahub.utilities.urns.urn import Urn, guess_entity_type
Expand Down Expand Up @@ -1819,4 +1820,5 @@ def get_default_graph() -> DataHubGraph:
graph_config = config_utils.load_client_config()
graph = DataHubGraph(graph_config)
graph.test_connection()
telemetry_instance.set_context(server=graph)
return graph
9 changes: 5 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
)
from datahub.ingestion.transformer.transform_registry import transform_registry
from datahub.metadata.schema_classes import MetadataChangeProposalClass
from datahub.telemetry import stats, telemetry
from datahub.telemetry import stats
from datahub.telemetry.telemetry import telemetry_instance
from datahub.utilities._custom_package_loader import model_version_name
from datahub.utilities.global_warning_util import (
clear_global_warnings,
Expand Down Expand Up @@ -273,8 +274,9 @@ def __init__(
if self.graph is None and isinstance(self.sink, DatahubRestSink):
with _add_init_error_context("setup default datahub client"):
self.graph = self.sink.emitter.to_graph()
self.graph.test_connection()
self.ctx.graph = self.graph
telemetry.telemetry_instance.update_capture_exception_context(server=self.graph)
telemetry_instance.set_context(server=self.graph)

with set_graph_context(self.graph):
with _add_init_error_context("configure reporters"):
Expand Down Expand Up @@ -615,7 +617,7 @@ def log_ingestion_stats(self) -> None:
sink_warnings = len(self.sink.get_report().warnings)
global_warnings = len(get_global_warnings())

telemetry.telemetry_instance.ping(
telemetry_instance.ping(
"ingest_stats",
{
"source_type": self.source_type,
Expand All @@ -637,7 +639,6 @@ def log_ingestion_stats(self) -> None:
),
"has_pipeline_name": bool(self.config.pipeline_name),
},
self.ctx.graph,
)

def _approx_all_vals(self, d: LossyList[Any]) -> int:
Expand Down
32 changes: 23 additions & 9 deletions metadata-ingestion/src/datahub/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import uuid
from functools import wraps
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, TypeVar
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, TypeVar

from mixpanel import Consumer, Mixpanel
from typing_extensions import ParamSpec
Expand All @@ -16,10 +16,12 @@
from datahub.cli.config_utils import DATAHUB_ROOT_FOLDER
from datahub.cli.env_utils import get_boolean_env_variable
from datahub.configuration.common import ExceptionWithProps
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import _custom_package_path
from datahub.utilities.perf_timer import PerfTimer

if TYPE_CHECKING:
from datahub.ingestion.graph.client import DataHubGraph

logger = logging.getLogger(__name__)

DATAHUB_FOLDER = Path(DATAHUB_ROOT_FOLDER)
Expand Down Expand Up @@ -117,7 +119,11 @@ class Telemetry:
tracking_init: bool = False
sentry_enabled: bool = False

context_properties: Dict[str, Any] = {}

def __init__(self):
self.context_properties = {}

if SENTRY_DSN:
self.sentry_enabled = True
try:
Expand Down Expand Up @@ -157,6 +163,9 @@ def __init__(self):
except Exception as e:
logger.debug(f"Error connecting to mixpanel: {e}")

# Initialize the default properties for all events.
self.set_context()

def update_config(self) -> bool:
"""
Update the config file with the current client ID and enabled status.
Expand Down Expand Up @@ -238,18 +247,22 @@ def load_config(self) -> bool:

return False

def update_capture_exception_context(
def set_context(
self,
server: Optional[DataHubGraph] = None,
server: Optional["DataHubGraph"] = None,
properties: Optional[Dict[str, Any]] = None,
) -> None:
self.context_properties = {
**self._server_props(server),
**(properties or {}),
}

if self.sentry_enabled:
from sentry_sdk import set_tag

properties = {
**_default_telemetry_properties(),
**self._server_props(server),
**(properties or {}),
**self.context_properties,
}

for key in properties:
Expand Down Expand Up @@ -297,7 +310,6 @@ def ping(
self,
event_name: str,
properties: Optional[Dict[str, Any]] = None,
server: Optional[DataHubGraph] = None,
) -> None:
"""
Send a single telemetry event.
Expand All @@ -323,14 +335,15 @@ def ping(

properties = {
**_default_telemetry_properties(),
**self._server_props(server),
**self.context_properties,
**properties,
}
self.mp.track(self.client_id, event_name, properties)
except Exception as e:
logger.debug(f"Error reporting telemetry: {e}")

def _server_props(self, server: Optional[DataHubGraph]) -> Dict[str, str]:
@classmethod
def _server_props(cls, server: Optional["DataHubGraph"]) -> Dict[str, str]:
if not server:
return {
"server_type": "n/a",
Expand Down Expand Up @@ -435,6 +448,7 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T:
**call_props,
"status": "error",
**_error_props(e),
"code": e.code,
},
)
telemetry_instance.capture_exception(e)
Expand Down
Loading