Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Code locations for metrics #2526

Merged
merged 10 commits into from
Nov 24, 2023
1 change: 1 addition & 0 deletions sentry_sdk/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,4 @@
FlushedMetricValue = Union[int, float]

BucketKey = Tuple[MetricType, str, MeasurementUnit, MetricTagsInternal]
MetricMetaKey = Tuple[MetricType, str, MeasurementUnit]
8 changes: 6 additions & 2 deletions sentry_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,15 @@ def _capture_envelope(envelope):
self.session_flusher = SessionFlusher(capture_func=_capture_envelope)

self.metrics_aggregator = None # type: Optional[MetricsAggregator]
if self.options.get("_experiments", {}).get("enable_metrics"):
experiments = self.options.get("_experiments", {})
if experiments.get("enable_metrics"):
from sentry_sdk.metrics import MetricsAggregator

self.metrics_aggregator = MetricsAggregator(
capture_func=_capture_envelope
capture_func=_capture_envelope,
enable_code_locations=bool(
experiments.get("metric_code_locations")
),
)

max_request_body_size = ("always", "never", "small", "medium")
Expand Down
1 change: 1 addition & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"transport_num_pools": Optional[int],
"enable_metrics": Optional[bool],
"before_emit_metric": Optional[Callable[[str, MetricTags], bool]],
"metric_code_locations": Optional[bool],
},
total=False,
)
Expand Down
138 changes: 113 additions & 25 deletions sentry_sdk/metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import io
import re
import sys
import threading
import random
import time
Expand All @@ -11,8 +12,14 @@
from contextlib import contextmanager

import sentry_sdk
from sentry_sdk._compat import text_type
from sentry_sdk.utils import now, nanosecond_time, to_timestamp
from sentry_sdk._compat import text_type, utc_from_timestamp, iteritems
from sentry_sdk.utils import (
now,
nanosecond_time,
to_timestamp,
serialize_frame,
json_dumps,
)
from sentry_sdk.envelope import Envelope, Item
from sentry_sdk.tracing import (
TRANSACTION_SOURCE_ROUTE,
Expand All @@ -24,18 +31,21 @@

if TYPE_CHECKING:
from typing import Any
from typing import Callable
from typing import Dict
from typing import Generator
from typing import Iterable
from typing import Callable
from typing import List
from typing import Optional
from typing import Generator
from typing import Set
from typing import Tuple
from typing import Union

from sentry_sdk._types import BucketKey
from sentry_sdk._types import DurationUnit
from sentry_sdk._types import FlushedMetricValue
from sentry_sdk._types import MeasurementUnit
from sentry_sdk._types import MetricMetaKey
from sentry_sdk._types import MetricTagValue
from sentry_sdk._types import MetricTags
from sentry_sdk._types import MetricTagsInternal
Expand All @@ -46,6 +56,7 @@
_thread_local = threading.local()
_sanitize_key = partial(re.compile(r"[^a-zA-Z0-9_/.-]+").sub, "_")
_sanitize_value = partial(re.compile(r"[^\w\d_:/@\.{}\[\]$-]+", re.UNICODE).sub, "_")
_set = set # set is shadowed below

GOOD_TRANSACTION_SOURCES = frozenset(
[
Expand All @@ -57,6 +68,18 @@
)


def get_code_location(stacklevel):
# type: (int) -> Optional[Dict[str, Any]]
try:
frm = sys._getframe(stacklevel + 4)
except Exception:
return None

return serialize_frame(
frm, include_local_variables=False, include_source_context=False
)


@contextmanager
def recursion_protection():
# type: () -> Generator[bool, None, None]
Expand Down Expand Up @@ -247,7 +270,7 @@ def _encode_metrics(flushable_buckets):
# relay side emission and should not happen commonly.

for timestamp, buckets in flushable_buckets:
for bucket_key, metric in buckets.items():
for bucket_key, metric in iteritems(buckets):
metric_type, metric_name, metric_unit, metric_tags = bucket_key
metric_name = _sanitize_key(metric_name)
_write(metric_name.encode("utf-8"))
Expand Down Expand Up @@ -283,6 +306,20 @@ def _encode_metrics(flushable_buckets):
return out.getvalue()


def _encode_locations(timestamp, code_locations):
# type: (int, Iterable[Tuple[MetricMetaKey, Dict[str, Any]]]) -> bytes
mapping = {} # type: Dict[str, List[Any]]

for key, loc in code_locations:
metric_type, name, unit = key
mri = "{}:{}@{}".format(metric_type, _sanitize_key(name), unit)

loc["type"] = "location"
mapping.setdefault(mri, []).append(loc)

return json_dumps({"timestamp": timestamp, "mapping": mapping})


METRIC_TYPES = {
"c": CounterMetric,
"g": GaugeMetric,
Expand Down Expand Up @@ -311,9 +348,13 @@ class MetricsAggregator(object):
def __init__(
self,
capture_func, # type: Callable[[Envelope], None]
enable_code_locations=False, # type: bool
):
# type: (...) -> None
self.buckets = {} # type: Dict[int, Any]
self._enable_code_locations = enable_code_locations
self._seen_locations = _set() # type: Set[Tuple[int, MetricMetaKey]]
self._pending_locations = {} # type: Dict[int, List[Tuple[MetricMetaKey, Any]]]
self._buckets_total_weight = 0
self._capture_func = capture_func
self._lock = Lock()
Expand Down Expand Up @@ -366,9 +407,7 @@ def _flush_loop(self):

def _flush(self):
# type: (...) -> None
flushable_buckets = self._flushable_buckets()
if flushable_buckets:
self._emit(flushable_buckets)
self._emit(self._flushable_buckets(), self._flushable_locations())

def _flushable_buckets(self):
# type: (...) -> (Iterable[Tuple[int, Dict[BucketKey, Metric]]])
Expand All @@ -385,21 +424,28 @@ def _flushable_buckets(self):
self._force_flush = False
else:
flushable_buckets = []
for buckets_timestamp, buckets in self.buckets.items():
for buckets_timestamp, buckets in iteritems(self.buckets):
# If the timestamp of the bucket is newer that the rollup we want to skip it.
if buckets_timestamp <= cutoff:
flushable_buckets.append((buckets_timestamp, buckets))

# We will clear the elements while holding the lock, in order to avoid requesting it downstream again.
for buckets_timestamp, buckets in flushable_buckets:
for _, metric in buckets.items():
for _, metric in iteritems(buckets):
weight_to_remove += metric.weight
del self.buckets[buckets_timestamp]

self._buckets_total_weight -= weight_to_remove

return flushable_buckets

def _flushable_locations(self):
# type: (...) -> Dict[int, List[Tuple[MetricMetaKey, Dict[str, Any]]]]
with self._lock:
locations = self._pending_locations
self._pending_locations = {}
return locations

@metrics_noop
def add(
self,
Expand All @@ -409,6 +455,7 @@ def add(
unit, # type: MeasurementUnit
tags, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
stacklevel=0, # type: int
):
# type: (...) -> None
if not self._ensure_thread() or self._flusher is None:
Expand Down Expand Up @@ -441,6 +488,24 @@ def add(

self._buckets_total_weight += metric.weight - previous_weight

# Store code location once per metric and per day (of bucket timestamp)
if self._enable_code_locations:
meta_key = (ty, key, unit)
start_of_day = utc_from_timestamp(timestamp).replace(
hour=0, minute=0, second=0, microsecond=0, tzinfo=None
)
start_of_day = int(to_timestamp(start_of_day))

if (start_of_day, meta_key) not in self._seen_locations:
self._seen_locations.add((start_of_day, meta_key))
loc = get_code_location(stacklevel)
if loc is not None:
# Group metadata by day to make flushing more efficient.
# There needs to be one envelope item per timestamp.
self._pending_locations.setdefault(start_of_day, []).append(
(meta_key, loc)
)

# Given the new weight we consider whether we want to force flush.
self._consider_force_flush()

Expand Down Expand Up @@ -471,13 +536,23 @@ def _consider_force_flush(self):
def _emit(
self,
flushable_buckets, # type: (Iterable[Tuple[int, Dict[BucketKey, Metric]]])
code_locations, # type: Dict[int, List[Tuple[MetricMetaKey, Dict[str, Any]]]]
):
# type: (...) -> Envelope
encoded_metrics = _encode_metrics(flushable_buckets)
metric_item = Item(payload=encoded_metrics, type="statsd")
envelope = Envelope(items=[metric_item])
self._capture_func(envelope)
return envelope
# type: (...) -> Optional[Envelope]
envelope = Envelope()

if flushable_buckets:
encoded_metrics = _encode_metrics(flushable_buckets)
envelope.add_item(Item(payload=encoded_metrics, type="statsd"))

for timestamp, locations in iteritems(code_locations):
encoded_locations = _encode_locations(timestamp, locations)
envelope.add_item(Item(payload=encoded_locations, type="metric_meta"))

if envelope.items:
self._capture_func(envelope)
return envelope
return None

def _serialize_tags(
self, tags # type: Optional[MetricTags]
Expand All @@ -487,7 +562,7 @@ def _serialize_tags(
return ()

rv = []
for key, value in tags.items():
for key, value in iteritems(tags):
# If the value is a collection, we want to flatten it.
if isinstance(value, (list, tuple)):
for inner_value in value:
Expand Down Expand Up @@ -536,12 +611,13 @@ def incr(
unit="none", # type: MeasurementUnit
tags=None, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
stacklevel=0, # type: int
):
# type: (...) -> None
"""Increments a counter."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("c", key, value, unit, tags, timestamp)
aggregator.add("c", key, value, unit, tags, timestamp, stacklevel)


class _Timing(object):
Expand All @@ -552,6 +628,7 @@ def __init__(
timestamp, # type: Optional[Union[float, datetime]]
value, # type: Optional[float]
unit, # type: DurationUnit
stacklevel, # type: int
):
# type: (...) -> None
self.key = key
Expand All @@ -560,6 +637,7 @@ def __init__(
self.value = value
self.unit = unit
self.entered = None # type: Optional[float]
self.stacklevel = stacklevel

def _validate_invocation(self, context):
# type: (str) -> None
Expand All @@ -579,7 +657,9 @@ def __exit__(self, exc_type, exc_value, tb):
aggregator, tags = _get_aggregator_and_update_tags(self.key, self.tags)
if aggregator is not None:
elapsed = TIMING_FUNCTIONS[self.unit]() - self.entered # type: ignore
aggregator.add("d", self.key, elapsed, self.unit, tags, self.timestamp)
aggregator.add(
"d", self.key, elapsed, self.unit, tags, self.timestamp, self.stacklevel
)

def __call__(self, f):
# type: (Any) -> Any
Expand All @@ -589,7 +669,11 @@ def __call__(self, f):
def timed_func(*args, **kwargs):
# type: (*Any, **Any) -> Any
with timing(
key=self.key, tags=self.tags, timestamp=self.timestamp, unit=self.unit
key=self.key,
tags=self.tags,
timestamp=self.timestamp,
unit=self.unit,
stacklevel=self.stacklevel + 1,
):
return f(*args, **kwargs)

Expand All @@ -602,6 +686,7 @@ def timing(
unit="second", # type: DurationUnit
tags=None, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
stacklevel=0, # type: int
):
# type: (...) -> _Timing
"""Emits a distribution with the time it takes to run the given code block.
Expand All @@ -615,8 +700,8 @@ def timing(
if value is not None:
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("d", key, value, unit, tags, timestamp)
return _Timing(key, tags, timestamp, value, unit)
aggregator.add("d", key, value, unit, tags, timestamp, stacklevel)
return _Timing(key, tags, timestamp, value, unit, stacklevel)


def distribution(
Expand All @@ -625,12 +710,13 @@ def distribution(
unit="none", # type: MeasurementUnit
tags=None, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
stacklevel=0, # type: int
):
# type: (...) -> None
"""Emits a distribution."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("d", key, value, unit, tags, timestamp)
aggregator.add("d", key, value, unit, tags, timestamp, stacklevel)


def set(
Expand All @@ -639,12 +725,13 @@ def set(
unit="none", # type: MeasurementUnit
tags=None, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
stacklevel=0, # type: int
):
# type: (...) -> None
"""Emits a set."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("s", key, value, unit, tags, timestamp)
aggregator.add("s", key, value, unit, tags, timestamp, stacklevel)


def gauge(
Expand All @@ -653,9 +740,10 @@ def gauge(
unit="none", # type: MetricValue
tags=None, # type: Optional[MetricTags]
timestamp=None, # type: Optional[Union[float, datetime]]
stacklevel=0, # type: int
):
# type: (...) -> None
"""Emits a gauge."""
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
if aggregator is not None:
aggregator.add("g", key, value, unit, tags, timestamp)
aggregator.add("g", key, value, unit, tags, timestamp, stacklevel)
Loading