From b250a8929d9238e7d8ab30b6e5af7dc1ec1b79bd Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Wed, 29 Nov 2023 12:26:11 +0100 Subject: [PATCH] feat: metric span summaries (#2522) --- sentry_sdk/consts.py | 2 + sentry_sdk/metrics.py | 212 +++++++++++++++++++++++++++++++++--------- sentry_sdk/tracing.py | 22 +++++ tests/test_metrics.py | 200 ++++++++++++++++++++++++++++++++++++++- 4 files changed, 388 insertions(+), 48 deletions(-) diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 785dba0c9d..0158237a74 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -46,6 +46,8 @@ "transport_zlib_compression_level": Optional[int], "transport_num_pools": Optional[int], "enable_metrics": Optional[bool], + "metrics_summary_sample_rate": Optional[float], + "should_summarize_metric": Optional[Callable[[str, MetricTags], bool]], "before_emit_metric": Optional[Callable[[str, MetricTags], bool]], "metric_code_locations": Optional[bool], }, diff --git a/sentry_sdk/metrics.py b/sentry_sdk/metrics.py index a36cf7c812..fa977f6b52 100644 --- a/sentry_sdk/metrics.py +++ b/sentry_sdk/metrics.py @@ -340,6 +340,58 @@ def _encode_locations(timestamp, code_locations): } +class LocalAggregator(object): + __slots__ = ("_measurements",) + + def __init__(self): + # type: (...) -> None + self._measurements = ( + {} + ) # type: Dict[Tuple[str, MetricTagsInternal], Tuple[float, float, int, float]] + + def add( + self, + ty, # type: MetricType + key, # type: str + value, # type: float + unit, # type: MeasurementUnit + tags, # type: MetricTagsInternal + ): + # type: (...) -> None + export_key = "%s:%s@%s" % (ty, key, unit) + bucket_key = (export_key, tags) + + old = self._measurements.get(bucket_key) + if old is not None: + v_min, v_max, v_count, v_sum = old + v_min = min(v_min, value) + v_max = max(v_max, value) + v_count += 1 + v_sum += value + else: + v_min = v_max = v_sum = value + v_count = 1 + self._measurements[bucket_key] = (v_min, v_max, v_count, v_sum) + + def to_json(self): + # type: (...) -> Dict[str, Any] + rv = {} + for (export_key, tags), ( + v_min, + v_max, + v_count, + v_sum, + ) in self._measurements.items(): + rv[export_key] = { + "tags": _tags_to_dict(tags), + "min": v_min, + "max": v_max, + "count": v_count, + "sum": v_sum, + } + return rv + + class MetricsAggregator(object): ROLLUP_IN_SECONDS = 10.0 MAX_WEIGHT = 100000 @@ -455,11 +507,12 @@ def add( unit, # type: MeasurementUnit tags, # type: Optional[MetricTags] timestamp=None, # type: Optional[Union[float, datetime]] + local_aggregator=None, # type: Optional[LocalAggregator] stacklevel=0, # type: int ): # type: (...) -> None if not self._ensure_thread() or self._flusher is None: - return + return None if timestamp is None: timestamp = time.time() @@ -469,11 +522,12 @@ def add( bucket_timestamp = int( (timestamp // self.ROLLUP_IN_SECONDS) * self.ROLLUP_IN_SECONDS ) + serialized_tags = _serialize_tags(tags) bucket_key = ( ty, key, unit, - self._serialize_tags(tags), + serialized_tags, ) with self._lock: @@ -486,7 +540,8 @@ def add( metric = local_buckets[bucket_key] = METRIC_TYPES[ty](value) previous_weight = 0 - self._buckets_total_weight += metric.weight - previous_weight + added = metric.weight - previous_weight + self._buckets_total_weight += added # Store code location once per metric and per day (of bucket timestamp) if self._enable_code_locations: @@ -509,6 +564,10 @@ def add( # Given the new weight we consider whether we want to force flush. self._consider_force_flush() + if local_aggregator is not None: + local_value = float(added if ty == "s" else value) + local_aggregator.add(ty, key, local_value, unit, serialized_tags) + def kill(self): # type: (...) -> None if self._flusher is None: @@ -554,55 +613,87 @@ def _emit( return envelope return None - def _serialize_tags( - self, tags # type: Optional[MetricTags] - ): - # type: (...) -> MetricTagsInternal - if not tags: - return () - - rv = [] - for key, value in iteritems(tags): - # If the value is a collection, we want to flatten it. - if isinstance(value, (list, tuple)): - for inner_value in value: - if inner_value is not None: - rv.append((key, text_type(inner_value))) - elif value is not None: - rv.append((key, text_type(value))) - # It's very important to sort the tags in order to obtain the - # same bucket key. - return tuple(sorted(rv)) +def _serialize_tags( + tags, # type: Optional[MetricTags] +): + # type: (...) -> MetricTagsInternal + if not tags: + return () + + rv = [] + for key, value in iteritems(tags): + # If the value is a collection, we want to flatten it. + if isinstance(value, (list, tuple)): + for inner_value in value: + if inner_value is not None: + rv.append((key, text_type(inner_value))) + elif value is not None: + rv.append((key, text_type(value))) + + # It's very important to sort the tags in order to obtain the + # same bucket key. + return tuple(sorted(rv)) + + +def _tags_to_dict(tags): + # type: (MetricTagsInternal) -> Dict[str, Any] + rv = {} # type: Dict[str, Any] + for tag_name, tag_value in tags: + old_value = rv.get(tag_name) + if old_value is not None: + if isinstance(old_value, list): + old_value.append(tag_value) + else: + rv[tag_name] = [old_value, tag_value] + else: + rv[tag_name] = tag_value + return rv def _get_aggregator_and_update_tags(key, tags): - # type: (str, Optional[MetricTags]) -> Tuple[Optional[MetricsAggregator], Optional[MetricTags]] + # type: (str, Optional[MetricTags]) -> Tuple[Optional[MetricsAggregator], Optional[LocalAggregator], Optional[MetricTags]] """Returns the current metrics aggregator if there is one.""" hub = sentry_sdk.Hub.current client = hub.client if client is None or client.metrics_aggregator is None: - return None, tags + return None, None, tags + + experiments = client.options.get("_experiments", {}) updated_tags = dict(tags or ()) # type: Dict[str, MetricTagValue] updated_tags.setdefault("release", client.options["release"]) updated_tags.setdefault("environment", client.options["environment"]) scope = hub.scope + local_aggregator = None + + # We go with the low-level API here to access transaction information as + # this one is the same between just errors and errors + performance transaction_source = scope._transaction_info.get("source") if transaction_source in GOOD_TRANSACTION_SOURCES: - transaction = scope._transaction - if transaction: - updated_tags.setdefault("transaction", transaction) + transaction_name = scope._transaction + if transaction_name: + updated_tags.setdefault("transaction", transaction_name) + if scope._span is not None: + sample_rate = experiments.get("metrics_summary_sample_rate") or 0.0 + should_summarize_metric_callback = experiments.get( + "should_summarize_metric" + ) + if random.random() < sample_rate and ( + should_summarize_metric_callback is None + or should_summarize_metric_callback(key, updated_tags) + ): + local_aggregator = scope._span._get_local_aggregator() - callback = client.options.get("_experiments", {}).get("before_emit_metric") - if callback is not None: + before_emit_callback = experiments.get("before_emit_metric") + if before_emit_callback is not None: with recursion_protection() as in_metrics: if not in_metrics: - if not callback(key, updated_tags): - return None, updated_tags + if not before_emit_callback(key, updated_tags): + return None, None, updated_tags - return client.metrics_aggregator, updated_tags + return client.metrics_aggregator, local_aggregator, updated_tags def incr( @@ -615,9 +706,11 @@ def incr( ): # type: (...) -> None """Increments a counter.""" - aggregator, tags = _get_aggregator_and_update_tags(key, tags) + aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(key, tags) if aggregator is not None: - aggregator.add("c", key, value, unit, tags, timestamp, stacklevel) + aggregator.add( + "c", key, value, unit, tags, timestamp, local_aggregator, stacklevel + ) class _Timing(object): @@ -637,6 +730,7 @@ def __init__( self.value = value self.unit = unit self.entered = None # type: Optional[float] + self._span = None # type: Optional[sentry_sdk.tracing.Span] self.stacklevel = stacklevel def _validate_invocation(self, context): @@ -650,17 +744,37 @@ def __enter__(self): # type: (...) -> _Timing self.entered = TIMING_FUNCTIONS[self.unit]() self._validate_invocation("context-manager") + self._span = sentry_sdk.start_span(op="metric.timing", description=self.key) + if self.tags: + for key, value in self.tags.items(): + if isinstance(value, (tuple, list)): + value = ",".join(sorted(map(str, value))) + self._span.set_tag(key, value) + self._span.__enter__() return self def __exit__(self, exc_type, exc_value, tb): # type: (Any, Any, Any) -> None - aggregator, tags = _get_aggregator_and_update_tags(self.key, self.tags) + assert self._span, "did not enter" + aggregator, local_aggregator, tags = _get_aggregator_and_update_tags( + self.key, self.tags + ) if aggregator is not None: elapsed = TIMING_FUNCTIONS[self.unit]() - self.entered # type: ignore aggregator.add( - "d", self.key, elapsed, self.unit, tags, self.timestamp, self.stacklevel + "d", + self.key, + elapsed, + self.unit, + tags, + self.timestamp, + local_aggregator, + self.stacklevel, ) + self._span.__exit__(exc_type, exc_value, tb) + self._span = None + def __call__(self, f): # type: (Any) -> Any self._validate_invocation("decorator") @@ -698,9 +812,11 @@ def timing( - it can be used as a decorator """ if value is not None: - aggregator, tags = _get_aggregator_and_update_tags(key, tags) + aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(key, tags) if aggregator is not None: - aggregator.add("d", key, value, unit, tags, timestamp, stacklevel) + aggregator.add( + "d", key, value, unit, tags, timestamp, local_aggregator, stacklevel + ) return _Timing(key, tags, timestamp, value, unit, stacklevel) @@ -714,9 +830,11 @@ def distribution( ): # type: (...) -> None """Emits a distribution.""" - aggregator, tags = _get_aggregator_and_update_tags(key, tags) + aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(key, tags) if aggregator is not None: - aggregator.add("d", key, value, unit, tags, timestamp, stacklevel) + aggregator.add( + "d", key, value, unit, tags, timestamp, local_aggregator, stacklevel + ) def set( @@ -729,21 +847,25 @@ def set( ): # type: (...) -> None """Emits a set.""" - aggregator, tags = _get_aggregator_and_update_tags(key, tags) + aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(key, tags) if aggregator is not None: - aggregator.add("s", key, value, unit, tags, timestamp, stacklevel) + aggregator.add( + "s", key, value, unit, tags, timestamp, local_aggregator, stacklevel + ) def gauge( key, # type: str value, # type: float - unit="none", # type: MetricValue + unit="none", # type: MeasurementUnit tags=None, # type: Optional[MetricTags] timestamp=None, # type: Optional[Union[float, datetime]] stacklevel=0, # type: int ): # type: (...) -> None """Emits a gauge.""" - aggregator, tags = _get_aggregator_and_update_tags(key, tags) + aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(key, tags) if aggregator is not None: - aggregator.add("g", key, value, unit, tags, timestamp, stacklevel) + aggregator.add( + "g", key, value, unit, tags, timestamp, local_aggregator, stacklevel + ) diff --git a/sentry_sdk/tracing.py b/sentry_sdk/tracing.py index 26c413a34e..e5860250c4 100644 --- a/sentry_sdk/tracing.py +++ b/sentry_sdk/tracing.py @@ -102,6 +102,7 @@ class Span(object): "hub", "_context_manager_state", "_containing_transaction", + "_local_aggregator", ) def __new__(cls, **kwargs): @@ -162,6 +163,7 @@ def __init__( self.timestamp = None # type: Optional[datetime] self._span_recorder = None # type: Optional[_SpanRecorder] + self._local_aggregator = None # type: Optional[LocalAggregator] # TODO this should really live on the Transaction class rather than the Span # class @@ -170,6 +172,13 @@ def init_span_recorder(self, maxlen): if self._span_recorder is None: self._span_recorder = _SpanRecorder(maxlen) + def _get_local_aggregator(self): + # type: (...) -> LocalAggregator + rv = self._local_aggregator + if rv is None: + rv = self._local_aggregator = LocalAggregator() + return rv + def __repr__(self): # type: () -> str return ( @@ -501,6 +510,11 @@ def to_json(self): if self.status: self._tags["status"] = self.status + if self._local_aggregator is not None: + metrics_summary = self._local_aggregator.to_json() + if metrics_summary: + rv["_metrics_summary"] = metrics_summary + tags = self._tags if tags: rv["tags"] = tags @@ -724,6 +738,13 @@ def finish(self, hub=None, end_timestamp=None): event["measurements"] = self._measurements + # This is here since `to_json` is not invoked. This really should + # be gone when we switch to onlyspans. + if self._local_aggregator is not None: + metrics_summary = self._local_aggregator.to_json() + if metrics_summary: + event["_metrics_summary"] = metrics_summary + return hub.capture_event(event) def set_measurement(self, name, value, unit=""): @@ -1005,3 +1026,4 @@ async def my_async_function(): has_tracing_enabled, maybe_create_breadcrumbs_from_span, ) +from sentry_sdk.metrics import LocalAggregator diff --git a/tests/test_metrics.py b/tests/test_metrics.py index 15cfb9d37f..b821785214 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -3,14 +3,15 @@ import sys import time +from sentry_sdk import Hub, metrics, push_scope, start_transaction +from sentry_sdk.tracing import TRANSACTION_SOURCE_ROUTE +from sentry_sdk.envelope import parse_json + try: from unittest import mock # python 3.3 and above except ImportError: import mock # python < 3.3 -from sentry_sdk import Hub, metrics, push_scope -from sentry_sdk.envelope import parse_json - def parse_metrics(bytes): rv = [] @@ -509,6 +510,199 @@ def test_transaction_name(sentry_init, capture_envelopes): } +def test_metric_summaries(sentry_init, capture_envelopes): + sentry_init( + release="fun-release@1.0.0", + environment="not-fun-env", + enable_tracing=True, + _experiments={"enable_metrics": True, "metrics_summary_sample_rate": 1.0}, + ) + ts = time.time() + envelopes = capture_envelopes() + + with start_transaction( + op="stuff", name="/foo", source=TRANSACTION_SOURCE_ROUTE + ) as transaction: + metrics.incr("root-counter", timestamp=ts) + with metrics.timing("my-timer-metric", tags={"a": "b"}, timestamp=ts): + for x in range(10): + metrics.distribution("my-dist", float(x), timestamp=ts) + + Hub.current.flush() + + (transaction, envelope) = envelopes + + # Metrics Emission + assert envelope.items[0].headers["type"] == "statsd" + m = parse_metrics(envelope.items[0].payload.get_bytes()) + + assert len(m) == 3 + + assert m[0][1] == "my-dist@none" + assert m[0][2] == "d" + assert len(m[0][3]) == 10 + assert sorted(m[0][3]) == list(map(str, map(float, range(10)))) + assert m[0][4] == { + "transaction": "/foo", + "release": "fun-release@1.0.0", + "environment": "not-fun-env", + } + + assert m[1][1] == "my-timer-metric@second" + assert m[1][2] == "d" + assert len(m[1][3]) == 1 + assert m[1][4] == { + "a": "b", + "transaction": "/foo", + "release": "fun-release@1.0.0", + "environment": "not-fun-env", + } + + assert m[2][1] == "root-counter@none" + assert m[2][2] == "c" + assert m[2][3] == ["1.0"] + assert m[2][4] == { + "transaction": "/foo", + "release": "fun-release@1.0.0", + "environment": "not-fun-env", + } + + # Measurement Attachment + t = transaction.items[0].get_transaction_event() + + assert t["_metrics_summary"] == { + "c:root-counter@none": { + "count": 1, + "min": 1.0, + "max": 1.0, + "sum": 1.0, + "tags": { + "transaction": "/foo", + "release": "fun-release@1.0.0", + "environment": "not-fun-env", + }, + } + } + + assert t["spans"][0]["_metrics_summary"]["d:my-dist@none"] == { + "count": 10, + "min": 0.0, + "max": 9.0, + "sum": 45.0, + "tags": { + "environment": "not-fun-env", + "release": "fun-release@1.0.0", + "transaction": "/foo", + }, + } + + assert t["spans"][0]["tags"] == {"a": "b"} + timer = t["spans"][0]["_metrics_summary"]["d:my-timer-metric@second"] + assert timer["count"] == 1 + assert timer["max"] == timer["min"] == timer["sum"] + assert timer["sum"] > 0 + assert timer["tags"] == { + "a": "b", + "environment": "not-fun-env", + "release": "fun-release@1.0.0", + "transaction": "/foo", + } + + +def test_metrics_summary_disabled(sentry_init, capture_envelopes): + sentry_init( + release="fun-release@1.0.0", + environment="not-fun-env", + enable_tracing=True, + _experiments={"enable_metrics": True}, + ) + ts = time.time() + envelopes = capture_envelopes() + + with start_transaction( + op="stuff", name="/foo", source=TRANSACTION_SOURCE_ROUTE + ) as transaction: + with metrics.timing("my-timer-metric", tags={"a": "b"}, timestamp=ts): + pass + + Hub.current.flush() + + (transaction, envelope) = envelopes + + # Metrics Emission + assert envelope.items[0].headers["type"] == "statsd" + m = parse_metrics(envelope.items[0].payload.get_bytes()) + + assert len(m) == 1 + assert m[0][1] == "my-timer-metric@second" + assert m[0][2] == "d" + assert len(m[0][3]) == 1 + assert m[0][4] == { + "a": "b", + "transaction": "/foo", + "release": "fun-release@1.0.0", + "environment": "not-fun-env", + } + + # Measurement Attachment + t = transaction.items[0].get_transaction_event() + assert "_metrics_summary" not in t + assert "_metrics_summary" not in t["spans"][0] + + +def test_metrics_summary_filtered(sentry_init, capture_envelopes): + def should_summarize_metric(key, tags): + return key == "foo" + + sentry_init( + release="fun-release@1.0.0", + environment="not-fun-env", + enable_tracing=True, + _experiments={ + "enable_metrics": True, + "metrics_summary_sample_rate": 1.0, + "should_summarize_metric": should_summarize_metric, + }, + ) + ts = time.time() + envelopes = capture_envelopes() + + with start_transaction( + op="stuff", name="/foo", source=TRANSACTION_SOURCE_ROUTE + ) as transaction: + metrics.timing("foo", value=1.0, tags={"a": "b"}, timestamp=ts) + metrics.timing("bar", value=1.0, tags={"a": "b"}, timestamp=ts) + + Hub.current.flush() + + (transaction, envelope) = envelopes + + # Metrics Emission + assert envelope.items[0].headers["type"] == "statsd" + m = parse_metrics(envelope.items[0].payload.get_bytes()) + + assert len(m) == 2 + assert m[0][1] == "bar@second" + assert m[1][1] == "foo@second" + + # Measurement Attachment + t = transaction.items[0].get_transaction_event()["_metrics_summary"] + assert t == { + "d:foo@second": { + "tags": { + "a": "b", + "environment": "not-fun-env", + "release": "fun-release@1.0.0", + "transaction": "/foo", + }, + "min": 1.0, + "max": 1.0, + "count": 1, + "sum": 1.0, + } + } + + def test_tag_normalization(sentry_init, capture_envelopes): sentry_init( release="fun-release@1.0.0",