Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support routing stale messages to lowpri topic #82322

Merged
merged 16 commits into from
Dec 20, 2024
2 changes: 2 additions & 0 deletions src/sentry/conf/types/kafka_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class ConsumerDefinition(TypedDict, total=False):
dlq_max_invalid_ratio: float | None
dlq_max_consecutive_count: int | None

stale_topic: Topic


def validate_consumer_definition(consumer_definition: ConsumerDefinition) -> None:
if "dlq_topic" not in consumer_definition and (
Expand Down
42 changes: 23 additions & 19 deletions src/sentry/consumers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@

import click
from arroyo.backends.abstract import Consumer
from arroyo.backends.kafka import KafkaProducer
from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
from arroyo.backends.kafka.consumer import KafkaConsumer
from arroyo.commit import ONCE_PER_SECOND
from arroyo.dlq import DlqLimit, DlqPolicy, KafkaDlqProducer
from arroyo.dlq import DlqLimit, DlqPolicy
from arroyo.processing.processor import StreamProcessor
from arroyo.processing.strategies import Healthcheck
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
Expand All @@ -22,11 +21,12 @@
Topic,
validate_consumer_definition,
)
from sentry.consumers.dlq import DlqStaleMessagesStrategyFactoryWrapper, maybe_build_dlq_producer
from sentry.consumers.validate_schema import ValidateSchema
from sentry.eventstream.types import EventStreamEventType
from sentry.ingest.types import ConsumerType
from sentry.utils.imports import import_string
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
from sentry.utils.kafka_config import get_topic_definition

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -371,6 +371,7 @@ def ingest_transactions_options() -> list[click.Option]:
"strategy_factory": "sentry.ingest.consumer.factory.IngestTransactionsStrategyFactory",
"click_options": ingest_transactions_options(),
"dlq_topic": Topic.INGEST_TRANSACTIONS_DLQ,
"stale_topic": Topic.INGEST_TRANSACTIONS_DLQ,
},
"ingest-metrics": {
"topic": Topic.INGEST_METRICS,
Expand Down Expand Up @@ -469,6 +470,8 @@ def get_stream_processor(
synchronize_commit_group: str | None = None,
healthcheck_file_path: str | None = None,
enable_dlq: bool = True,
# If set, messages above this age will be rerouted to the stale topic if one is configured
stale_threshold_sec: int | None = None,
enforce_schema: bool = False,
group_instance_id: str | None = None,
) -> StreamProcessor:
Expand Down Expand Up @@ -578,37 +581,38 @@ def build_consumer_config(group_id: str):
consumer_topic.value, enforce_schema, strategy_factory
)

if stale_threshold_sec:
strategy_factory = DlqStaleMessagesStrategyFactoryWrapper(
stale_threshold_sec, strategy_factory
)

if healthcheck_file_path is not None:
strategy_factory = HealthcheckStrategyFactoryWrapper(
healthcheck_file_path, strategy_factory
)

if enable_dlq and consumer_definition.get("dlq_topic"):
try:
dlq_topic = consumer_definition["dlq_topic"]
except KeyError as e:
raise click.BadParameter(
f"Cannot enable DLQ for consumer: {consumer_name}, no DLQ topic has been defined for it"
) from e
try:
dlq_topic_defn = get_topic_definition(dlq_topic)
cluster_setting = dlq_topic_defn["cluster"]
except ValueError as e:
raise click.BadParameter(
f"Cannot enable DLQ for consumer: {consumer_name}, DLQ topic {dlq_topic} is not configured in this environment"
) from e
dlq_topic = consumer_definition["dlq_topic"]
else:
dlq_topic = None

if stale_threshold_sec and consumer_definition.get("stale_topic"):
stale_topic = consumer_definition["stale_topic"]
else:
stale_topic = None
kneeyo1 marked this conversation as resolved.
Show resolved Hide resolved

producer_config = get_kafka_producer_cluster_options(cluster_setting)
dlq_producer = KafkaProducer(producer_config)
dlq_producer = maybe_build_dlq_producer(dlq_topic=dlq_topic, stale_topic=stale_topic)

if dlq_producer:
dlq_policy = DlqPolicy(
KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic_defn["real_topic_name"])),
dlq_producer,
DlqLimit(
max_invalid_ratio=consumer_definition.get("dlq_max_invalid_ratio"),
max_consecutive_count=consumer_definition.get("dlq_max_consecutive_count"),
),
None,
)

else:
dlq_policy = None

Expand Down
154 changes: 154 additions & 0 deletions src/sentry/consumers/dlq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import logging
import time
from collections.abc import Mapping, MutableMapping
from concurrent.futures import Future
from datetime import datetime, timedelta, timezone
from enum import Enum

from arroyo.backends.kafka import KafkaPayload, KafkaProducer
from arroyo.dlq import InvalidMessage, KafkaDlqProducer
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.types import FILTERED_PAYLOAD, BrokerValue, Commit, FilteredPayload, Message, Partition
from arroyo.types import Topic as ArroyoTopic
from arroyo.types import Value

from sentry.conf.types.kafka_definition import Topic
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

logger = logging.getLogger(__name__)


class RejectReason(Enum):
STALE = "stale"
INVALID = "invalid"


class MultipleDestinationDlqProducer(KafkaDlqProducer):
"""
Produces to either the DLQ or stale message topic depending on the reason.
"""

def __init__(
self,
producers: Mapping[RejectReason, KafkaDlqProducer | None],
) -> None:
self.producers = producers

def produce(
self,
value: BrokerValue[KafkaPayload],
reason: str | None = None,
) -> Future[BrokerValue[KafkaPayload]]:

reject_reason = RejectReason(reason) if reason else RejectReason.INVALID
producer = self.producers.get(reject_reason)

if producer:
return producer.produce(value)
else:
# No DLQ producer configured for the reason.
logger.error("No DLQ producer configured for reason %s", reason)
future: Future[BrokerValue[KafkaPayload]] = Future()
future.set_running_or_notify_cancel()
future.set_result(value)
return future


def _get_dlq_producer(topic: Topic | None) -> KafkaDlqProducer | None:
if topic is None:
return None

topic_defn = get_topic_definition(topic)
config = get_kafka_producer_cluster_options(topic_defn["cluster"])
real_topic = topic_defn["real_topic_name"]
return KafkaDlqProducer(KafkaProducer(config), ArroyoTopic(real_topic))


def maybe_build_dlq_producer(
dlq_topic: Topic | None,
stale_topic: Topic | None,
) -> MultipleDestinationDlqProducer | None:
if dlq_topic is None and stale_topic is None:
return None

producers = {
RejectReason.INVALID: _get_dlq_producer(dlq_topic),
RejectReason.STALE: _get_dlq_producer(stale_topic),
}

return MultipleDestinationDlqProducer(producers)


class DlqStaleMessages(ProcessingStrategy[KafkaPayload]):
def __init__(
self,
stale_threshold_sec: int,
next_step: ProcessingStrategy[KafkaPayload | FilteredPayload],
) -> None:
self.stale_threshold_sec = stale_threshold_sec
self.next_step = next_step

# A filtered message is created so we commit periodically if all are stale.
self.last_forwarded_offsets = time.time()
self.offsets_to_forward: MutableMapping[Partition, int] = {}

def submit(self, message: Message[KafkaPayload]) -> None:
min_accepted_timestamp = datetime.now(timezone.utc) - timedelta(
seconds=self.stale_threshold_sec
)

if isinstance(message.value, BrokerValue):
if message.value.timestamp < min_accepted_timestamp:
self.offsets_to_forward[message.value.partition] = message.value.next_offset
raise InvalidMessage(
message.value.partition, message.value.offset, reason=RejectReason.STALE.value
)

# If we get a valid message for a partition later, don't emit a filtered message for it
if self.offsets_to_forward:
for partition in message.committable:
self.offsets_to_forward.pop(partition)

self.next_step.submit(message)

def poll(self) -> None:
self.next_step.poll()

# Ensure we commit frequently even if all messages are invalid
if self.offsets_to_forward:
if time.time() > self.last_forwarded_offsets + 1:
filtered_message = Message(Value(FILTERED_PAYLOAD, self.offsets_to_forward))
self.next_step.submit(filtered_message)
self.offsets_to_forward = {}
self.last_forwarded_offsets = time.time()

def join(self, timeout: float | None = None) -> None:
self.next_step.join(timeout)

def close(self) -> None:
self.next_step.close()

def terminate(self) -> None:
self.next_step.terminate()


class DlqStaleMessagesStrategyFactoryWrapper(ProcessingStrategyFactory[KafkaPayload]):
"""
Wrapper used to dlq a message with a stale timestamp before it is passed to
the rest of the pipeline. The InvalidMessage is raised with a
"stale" reason so it can be routed to a separate stale topic.
"""

def __init__(
self,
stale_threshold_sec: int,
inner: ProcessingStrategyFactory[KafkaPayload | FilteredPayload],
) -> None:
self.stale_threshold_sec = stale_threshold_sec
self.inner = inner

def create_with_partitions(
self, commit: Commit, partitions: Mapping[Partition, int]
) -> ProcessingStrategy[KafkaPayload]:
rv = self.inner.create_with_partitions(commit, partitions)
return DlqStaleMessages(self.stale_threshold_sec, rv)
6 changes: 6 additions & 0 deletions src/sentry/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,11 @@ def cron(**options: Any) -> None:
is_flag=True,
default=True,
)
@click.option(
"--stale-threshold-sec",
type=click.IntRange(min=300),
help="Routes stale messages to stale topic if provided. This feature is currently being tested, do not pass in production yet.",
)
@click.option(
"--log-level",
type=click.Choice(["debug", "info", "warning", "error", "critical"], case_sensitive=False),
Expand Down Expand Up @@ -500,6 +505,7 @@ def dev_consumer(consumer_names: tuple[str, ...]) -> None:
synchronize_commit_group=None,
synchronize_commit_log_topic=None,
enable_dlq=False,
stale_threshold_sec=None,
healthcheck_file_path=None,
enforce_schema=True,
)
Expand Down
Empty file.
73 changes: 73 additions & 0 deletions tests/sentry/consumers/test_dlq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import time
from datetime import datetime, timedelta, timezone
from unittest.mock import Mock

import msgpack
import pytest
from arroyo.backends.kafka import KafkaPayload
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message, Partition, Topic

from sentry.consumers.dlq import DlqStaleMessagesStrategyFactoryWrapper
from sentry.testutils.pytest.fixtures import django_db_all


def make_message(
payload: bytes, partition: Partition, offset: int, timestamp: datetime | None = None
) -> Message:
return Message(
BrokerValue(
KafkaPayload(None, payload, []),
partition,
offset,
timestamp if timestamp else datetime.now(),
)
)


@pytest.mark.parametrize("stale_threshold_sec", [300])
@django_db_all
def test_dlq_stale_messages(factories, stale_threshold_sec) -> None:
# Tests messages that have gotten stale (default longer than 5 minutes)

organization = factories.create_organization()
project = factories.create_project(organization=organization)

empty_event_payload = msgpack.packb(
{
"type": "event",
"project_id": project.id,
"payload": b"{}",
"start_time": int(time.time()),
"event_id": "aaa",
}
)

partition = Partition(Topic("topic"), 0)
offset = 10
inner_factory_mock = Mock()
inner_strategy_mock = Mock()
inner_factory_mock.create_with_partitions = Mock(return_value=inner_strategy_mock)
factory = DlqStaleMessagesStrategyFactoryWrapper(
stale_threshold_sec=stale_threshold_sec,
inner=inner_factory_mock,
)
strategy = factory.create_with_partitions(Mock(), Mock())

for time_diff in range(10, 0, -1):
message = make_message(
empty_event_payload,
partition,
offset - time_diff,
timestamp=datetime.now(timezone.utc) - timedelta(minutes=time_diff),
)
if time_diff < 5:
strategy.submit(message)
else:
with pytest.raises(InvalidMessage) as exc_info:
strategy.submit(message)

assert exc_info.value.partition == partition
assert exc_info.value.offset == offset - time_diff

assert inner_strategy_mock.submit.call_count == 4
Loading