Skip to content

Commit

Permalink
Discard open spans after 10 minutes (#2801)
Browse files Browse the repository at this point in the history
OTel spans that are handled in the Sentry span processor can never be finished/closed. This leads to a memory leak. This change makes sure that open spans will be removed from memory after 10 minutes to prevent memory usage from growing constantly.

Fixes #2722

---------

Co-authored-by: Daniel Szoke <szokeasaurusrex@users.noreply.github.com>
  • Loading branch information
antonpirker and szokeasaurusrex authored Mar 12, 2024
1 parent f40e27f commit 1a8db5e
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 3 deletions.
50 changes: 47 additions & 3 deletions sentry_sdk/integrations/opentelemetry/span_processor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from time import time

from opentelemetry.context import get_value # type: ignore
from opentelemetry.sdk.trace import SpanProcessor # type: ignore
from opentelemetry.semconv.trace import SpanAttributes # type: ignore
Expand Down Expand Up @@ -33,6 +35,7 @@
from sentry_sdk._types import Event, Hint

OPEN_TELEMETRY_CONTEXT = "otel"
SPAN_MAX_TIME_OPEN_MINUTES = 10


def link_trace_context_to_error_event(event, otel_span_map):
Expand Down Expand Up @@ -76,6 +79,9 @@ class SentrySpanProcessor(SpanProcessor): # type: ignore
# The mapping from otel span ids to sentry spans
otel_span_map = {} # type: Dict[str, Union[Transaction, SentrySpan]]

# The currently open spans. Elements will be discarded after SPAN_MAX_TIME_OPEN_MINUTES
open_spans = {} # type: dict[int, set[str]]

def __new__(cls):
# type: () -> SentrySpanProcessor
if not hasattr(cls, "instance"):
Expand All @@ -90,6 +96,24 @@ def global_event_processor(event, hint):
# type: (Event, Hint) -> Event
return link_trace_context_to_error_event(event, self.otel_span_map)

def _prune_old_spans(self):
# type: (SentrySpanProcessor) -> None
"""
Prune spans that have been open for too long.
"""
current_time_minutes = int(time() / 60)
for span_start_minutes in list(
self.open_spans.keys()
): # making a list because we change the dict
# prune empty open spans buckets
if self.open_spans[span_start_minutes] == set():
self.open_spans.pop(span_start_minutes)

# prune old buckets
elif current_time_minutes - span_start_minutes > SPAN_MAX_TIME_OPEN_MINUTES:
for span_id in self.open_spans.pop(span_start_minutes):
self.otel_span_map.pop(span_id, None)

def on_start(self, otel_span, parent_context=None):
# type: (OTelSpan, Optional[SpanContext]) -> None
hub = Hub.current
Expand Down Expand Up @@ -125,7 +149,9 @@ def on_start(self, otel_span, parent_context=None):
sentry_span = sentry_parent_span.start_child(
span_id=trace_data["span_id"],
description=otel_span.name,
start_timestamp=utc_from_timestamp(otel_span.start_time / 1e9),
start_timestamp=utc_from_timestamp(
otel_span.start_time / 1e9
), # OTel spans have nanosecond precision
instrumenter=INSTRUMENTER.OTEL,
)
else:
Expand All @@ -135,12 +161,22 @@ def on_start(self, otel_span, parent_context=None):
parent_span_id=parent_span_id,
trace_id=trace_data["trace_id"],
baggage=trace_data["baggage"],
start_timestamp=utc_from_timestamp(otel_span.start_time / 1e9),
start_timestamp=utc_from_timestamp(
otel_span.start_time / 1e9
), # OTel spans have nanosecond precision
instrumenter=INSTRUMENTER.OTEL,
)

self.otel_span_map[trace_data["span_id"]] = sentry_span

span_start_in_minutes = int(
otel_span.start_time / 1e9 / 60
) # OTel spans have nanosecond precision
self.open_spans.setdefault(span_start_in_minutes, set()).add(
trace_data["span_id"]
)
self._prune_old_spans()

def on_end(self, otel_span):
# type: (OTelSpan) -> None
hub = Hub.current
Expand Down Expand Up @@ -173,7 +209,15 @@ def on_end(self, otel_span):
else:
self._update_span_with_otel_data(sentry_span, otel_span)

sentry_span.finish(end_timestamp=utc_from_timestamp(otel_span.end_time / 1e9))
sentry_span.finish(
end_timestamp=utc_from_timestamp(otel_span.end_time / 1e9)
) # OTel spans have nanosecond precision

span_start_in_minutes = int(
otel_span.start_time / 1e9 / 60
) # OTel spans have nanosecond precision
self.open_spans.setdefault(span_start_in_minutes, set()).discard(span_id)
self._prune_old_spans()

def _is_sentry_span(self, hub, otel_span):
# type: (Hub, OTelSpan) -> bool
Expand Down
92 changes: 92 additions & 0 deletions tests/integrations/opentelemetry/test_span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,3 +531,95 @@ def test_link_trace_context_to_error_event():
assert "contexts" in event
assert "trace" in event["contexts"]
assert event["contexts"]["trace"] == fake_trace_context


def test_pruning_old_spans_on_start():
otel_span = MagicMock()
otel_span.name = "Sample OTel Span"
otel_span.start_time = time.time_ns()
span_context = SpanContext(
trace_id=int("1234567890abcdef1234567890abcdef", 16),
span_id=int("1234567890abcdef", 16),
is_remote=True,
)
otel_span.get_span_context.return_value = span_context
otel_span.parent = MagicMock()
otel_span.parent.span_id = int("abcdef1234567890", 16)

parent_context = {}
fake_client = MagicMock()
fake_client.options = {"instrumenter": "otel"}
fake_client.dsn = "https://1234567890abcdef@o123456.ingest.sentry.io/123456"

current_hub = MagicMock()
current_hub.client = fake_client

fake_hub = MagicMock()
fake_hub.current = current_hub

with mock.patch(
"sentry_sdk.integrations.opentelemetry.span_processor.Hub", fake_hub
):
span_processor = SentrySpanProcessor()

span_processor.otel_span_map = {
"111111111abcdef": MagicMock(), # should stay
"2222222222abcdef": MagicMock(), # should go
"3333333333abcdef": MagicMock(), # should go
}
current_time_minutes = int(time.time() / 60)
span_processor.open_spans = {
current_time_minutes - 3: {"111111111abcdef"}, # should stay
current_time_minutes
- 11: {"2222222222abcdef", "3333333333abcdef"}, # should go
}

span_processor.on_start(otel_span, parent_context)
assert sorted(list(span_processor.otel_span_map.keys())) == [
"111111111abcdef",
"1234567890abcdef",
]
assert sorted(list(span_processor.open_spans.values())) == [
{"111111111abcdef"},
{"1234567890abcdef"},
]


def test_pruning_old_spans_on_end():
otel_span = MagicMock()
otel_span.name = "Sample OTel Span"
otel_span.start_time = time.time_ns()
span_context = SpanContext(
trace_id=int("1234567890abcdef1234567890abcdef", 16),
span_id=int("1234567890abcdef", 16),
is_remote=True,
)
otel_span.get_span_context.return_value = span_context
otel_span.parent = MagicMock()
otel_span.parent.span_id = int("abcdef1234567890", 16)

fake_sentry_span = MagicMock(spec=Span)
fake_sentry_span.set_context = MagicMock()
fake_sentry_span.finish = MagicMock()

span_processor = SentrySpanProcessor()
span_processor._get_otel_context = MagicMock()
span_processor._update_span_with_otel_data = MagicMock()

span_processor.otel_span_map = {
"111111111abcdef": MagicMock(), # should stay
"2222222222abcdef": MagicMock(), # should go
"3333333333abcdef": MagicMock(), # should go
"1234567890abcdef": fake_sentry_span, # should go (because it is closed)
}
current_time_minutes = int(time.time() / 60)
span_processor.open_spans = {
current_time_minutes: {"1234567890abcdef"}, # should go (because it is closed)
current_time_minutes - 3: {"111111111abcdef"}, # should stay
current_time_minutes
- 11: {"2222222222abcdef", "3333333333abcdef"}, # should go
}

span_processor.on_end(otel_span)
assert sorted(list(span_processor.otel_span_map.keys())) == ["111111111abcdef"]
assert sorted(list(span_processor.open_spans.values())) == [{"111111111abcdef"}]

0 comments on commit 1a8db5e

Please sign in to comment.