-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(spans): Add a new recombiner consumer (#66804)
We're moving away from celery tasks + rabbit and instead want a second consumer that processes segments. This PR adds the new topic and consumer and refactors code away from tasks. This code doesn't run in prod yet. In a follow up, the `process-spans` consumer will fetch segments that are ready to be processed and push them to the `buffered-segments` topic
- Loading branch information
1 parent
a8d8013
commit ff45ba4
Showing
12 changed files
with
172 additions
and
165 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
53 changes: 53 additions & 0 deletions
53
src/sentry/spans/consumers/detect_performance_issues/factory.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import logging | ||
from collections.abc import Mapping | ||
from typing import Any | ||
|
||
from arroyo.backends.kafka.consumer import KafkaPayload | ||
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory | ||
from arroyo.processing.strategies.commit import CommitOffsets | ||
from arroyo.processing.strategies.run_task import RunTask | ||
from arroyo.types import BrokerValue, Commit, Message, Partition | ||
from sentry_kafka_schemas import get_codec | ||
from sentry_kafka_schemas.codecs import Codec, ValidationError | ||
from sentry_kafka_schemas.schema_types.buffered_segments_v1 import BufferedSegment | ||
|
||
from sentry.spans.consumers.detect_performance_issues.message import process_segment | ||
|
||
BUFFERED_SEGMENT_SCHEMA: Codec[BufferedSegment] = get_codec("buffered-segments") | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def _deserialize_segment(value: bytes) -> Mapping[str, Any]: | ||
return BUFFERED_SEGMENT_SCHEMA.decode(value) | ||
|
||
|
||
def process_message(message: Message[KafkaPayload]): | ||
try: | ||
segment = _deserialize_segment(message.payload.value) | ||
except ValidationError: | ||
logger.exception("Failed to deserialize segment payload") | ||
return | ||
|
||
process_segment(segment["spans"]) | ||
|
||
|
||
def _process_message(message: Message[KafkaPayload]): | ||
assert isinstance(message.value, BrokerValue) | ||
|
||
try: | ||
process_message(message) | ||
except Exception: | ||
logger.exception("Failed to process segment payload") | ||
|
||
|
||
class DetectPerformanceIssuesStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): | ||
def create_with_partitions( | ||
self, | ||
commit: Commit, | ||
partitions: Mapping[Partition, int], | ||
) -> ProcessingStrategy[KafkaPayload]: | ||
return RunTask( | ||
function=_process_message, | ||
next_step=CommitOffsets(commit), | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
54 changes: 54 additions & 0 deletions
54
tests/sentry/spans/consumers/detect_performance_issues/test_factory.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
from datetime import datetime | ||
from unittest import mock | ||
|
||
from arroyo.backends.kafka import KafkaPayload | ||
from arroyo.types import BrokerValue, Message, Partition | ||
from arroyo.types import Topic as ArroyoTopic | ||
|
||
from sentry.conf.types.kafka_definition import Topic | ||
from sentry.spans.consumers.detect_performance_issues.factory import ( | ||
DetectPerformanceIssuesStrategyFactory, | ||
) | ||
from sentry.utils import json | ||
from sentry.utils.kafka_config import get_topic_definition | ||
from tests.sentry.spans.consumers.process.test_factory import build_mock_span | ||
|
||
|
||
def build_mock_message(data, topic=None): | ||
message = mock.Mock() | ||
message.value.return_value = json.dumps(data) | ||
if topic: | ||
message.topic.return_value = topic | ||
return message | ||
|
||
|
||
@mock.patch("sentry.spans.consumers.detect_performance_issues.factory.process_segment") | ||
def test_consumer_processes_segment(mock_process_segment): | ||
|
||
topic = ArroyoTopic(get_topic_definition(Topic.BUFFERED_SEGMENTS)["real_topic_name"]) | ||
partition = Partition(topic, 0) | ||
strategy = DetectPerformanceIssuesStrategyFactory().create_with_partitions( | ||
commit=mock.Mock(), | ||
partitions={}, | ||
) | ||
|
||
span_data = build_mock_span(project_id=1) | ||
segment_data = {"spans": [span_data]} | ||
message = build_mock_message(segment_data, topic) | ||
|
||
strategy.submit( | ||
Message( | ||
BrokerValue( | ||
KafkaPayload(b"key", message.value().encode("utf-8"), []), | ||
partition, | ||
1, | ||
datetime.now(), | ||
) | ||
) | ||
) | ||
|
||
strategy.poll() | ||
strategy.join(1) | ||
strategy.terminate() | ||
|
||
mock_process_segment.assert_called_once_with(segment_data["spans"]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.