Skip to content

Commit

Permalink
feat(consumers): Add option to enable dlq for sentry unified consumers (
Browse files Browse the repository at this point in the history
#58474)

- Adds a `--enable-dql` for devserver option to specify which consumers
to enable dlq
- Adds configuration about dlq topic for `ingest-metrics`, and
`ingest-performance-metrics`
  • Loading branch information
john-z-yang authored Oct 23, 2023
1 parent 448ab26 commit b85278d
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 1 deletion.
5 changes: 5 additions & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3248,9 +3248,11 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
KAFKA_INGEST_ATTACHMENTS = "ingest-attachments"
KAFKA_INGEST_TRANSACTIONS = "ingest-transactions"
KAFKA_INGEST_METRICS = "ingest-metrics"
KAFKA_INGEST_METRICS_DLQ = "ingest-metrics-dlq"
KAFKA_SNUBA_METRICS = "snuba-metrics"
KAFKA_PROFILES = "profiles"
KAFKA_INGEST_PERFORMANCE_METRICS = "ingest-performance-metrics"
KAFKA_INGEST_GENERIC_METRICS_DLQ = "ingest-generic-metrics-dlq"
KAFKA_SNUBA_GENERIC_METRICS = "snuba-generic-metrics"
KAFKA_INGEST_REPLAY_EVENTS = "ingest-replay-events"
KAFKA_INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings"
Expand Down Expand Up @@ -3296,12 +3298,15 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
KAFKA_INGEST_TRANSACTIONS: {"cluster": "default"},
# Topic for receiving metrics from Relay
KAFKA_INGEST_METRICS: {"cluster": "default"},
# Topic for routing invalid messages from KAFKA_INGEST_METRICS
KAFKA_INGEST_METRICS_DLQ: {"cluster": "default"},
# Topic for indexer translated metrics
KAFKA_SNUBA_METRICS: {"cluster": "default"},
# Topic for receiving profiles from Relay
KAFKA_PROFILES: {"cluster": "default"},
KAFKA_INGEST_PERFORMANCE_METRICS: {"cluster": "default"},
KAFKA_SNUBA_GENERIC_METRICS: {"cluster": "default"},
KAFKA_INGEST_GENERIC_METRICS_DLQ: {"cluster": "default"},
KAFKA_INGEST_REPLAY_EVENTS: {"cluster": "default"},
KAFKA_INGEST_REPLAYS_RECORDINGS: {"cluster": "default"},
KAFKA_INGEST_OCCURRENCES: {"cluster": "default"},
Expand Down
14 changes: 14 additions & 0 deletions src/sentry/conf/types/consumer_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,17 @@ class ConsumerDefinition(TypedDict, total=False):
require_synchronization: bool
synchronize_commit_group_default: str
synchronize_commit_log_topic_default: str

dlq_topic: str
dlq_max_invalid_ratio: float | None
dlq_max_consecutive_count: int | None


def validate_consumer_definition(consumer_definition: ConsumerDefinition) -> None:
if "dlq_topic" not in consumer_definition and (
"dlq_max_invalid_ratio" in consumer_definition
or "dlq_max_consecutive_count" in consumer_definition
):
raise ValueError(
"Invalid consumer definition, dlq_max_invalid_ratio/dlq_max_consecutive_count is configured, but dlq_topic is not"
)
44 changes: 43 additions & 1 deletion src/sentry/consumers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
from __future__ import annotations

import logging
import uuid
from typing import List, Mapping, Optional, Sequence

import click
from arroyo.backends.abstract import Consumer
from arroyo.backends.kafka import KafkaProducer
from arroyo.dlq import DlqLimit, DlqPolicy, KafkaDlqProducer
from arroyo.processing.processor import StreamProcessor
from arroyo.processing.strategies import Healthcheck
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from django.conf import settings

from sentry.conf.types.consumer_definition import ConsumerDefinition
from sentry.conf.types.consumer_definition import ConsumerDefinition, validate_consumer_definition
from sentry.consumers.validate_schema import ValidateSchema
from sentry.utils.imports import import_string
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

logger = logging.getLogger(__name__)

DEFAULT_BLOCK_SIZE = int(32 * 1e6)

Expand Down Expand Up @@ -283,6 +289,7 @@ def get_stream_processor(
synchronize_commit_log_topic: Optional[str],
synchronize_commit_group: Optional[str],
healthcheck_file_path: Optional[str],
enable_dlq: bool,
validate_schema: bool = False,
group_instance_id: Optional[str] = None,
) -> StreamProcessor:
Expand All @@ -294,6 +301,12 @@ def get_stream_processor(
f"Most likely there is another subcommand in 'sentry run' "
f"responsible for this consumer"
)
try:
validate_consumer_definition(consumer_definition)
except ValueError as e:
raise click.ClickException(
f"Invalid consumer definition configured for {consumer_name}"
) from e

strategy_factory_cls = import_string(consumer_definition["strategy_factory"])
logical_topic = consumer_definition["topic"]
Expand Down Expand Up @@ -396,12 +409,41 @@ def build_consumer_config(group_id: str):
healthcheck_file_path, strategy_factory
)

if enable_dlq:
try:
dlq_topic = consumer_definition["dlq_topic"]
except KeyError as e:
raise click.BadParameter(
f"Cannot enable DLQ for consumer: {consumer_name}, no DLQ topic has been defined for it"
) from e
try:
cluster_setting = get_topic_definition(dlq_topic)["cluster"]
except ValueError as e:
raise click.BadParameter(
f"Cannot enable DLQ for consumer: {consumer_name}, DLQ topic {dlq_topic} is not configured in this environment"
) from e

producer_config = get_kafka_producer_cluster_options(cluster_setting)
dlq_producer = KafkaProducer(producer_config)

dlq_policy = DlqPolicy(
KafkaDlqProducer(dlq_producer, Topic(dlq_topic)),
DlqLimit(
max_invalid_ratio=consumer_definition["dlq_max_invalid_ratio"],
max_consecutive_count=consumer_definition["dlq_max_consecutive_count"],
),
None,
)
else:
dlq_policy = None

return StreamProcessor(
consumer=consumer,
topic=Topic(topic),
processor_factory=strategy_factory,
commit_policy=ONCE_PER_SECOND,
join_timeout=join_timeout,
dlq_policy=dlq_policy,
)


Expand Down
7 changes: 7 additions & 0 deletions src/sentry/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,12 @@ def profiles_consumer(**options):
"--healthcheck-file-path",
help="A file to touch roughly every second to indicate that the consumer is still alive. See https://getsentry.github.io/arroyo/strategies/healthcheck.html for more information.",
)
@click.option(
"--enable-dlq",
help="Enable dlq to route invalid messages to. See https://getsentry.github.io/arroyo/dlqs.html#arroyo.dlq.DlqPolicy for more information.",
is_flag=True,
default=False,
)
@click.option(
"--log-level",
type=click.Choice(["debug", "info", "warning", "error", "critical"], case_sensitive=False),
Expand Down Expand Up @@ -748,6 +754,7 @@ def dev_consumer(consumer_names):
max_poll_interval_ms=None,
synchronize_commit_group=None,
synchronize_commit_log_topic=None,
enable_dlq=False,
healthcheck_file_path=None,
validate_schema=True,
)
Expand Down
29 changes: 29 additions & 0 deletions tests/sentry/conf/test_consumer_definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import List

import pytest

from sentry.conf.types.consumer_definition import ConsumerDefinition, validate_consumer_definition
from sentry.consumers import KAFKA_CONSUMERS
from sentry.testutils.cases import TestCase


class ConsumersDefinitionTest(TestCase):
def test_exception_on_invalid_consumer_definition(self):
invalid_definitions: List[ConsumerDefinition] = [
{
"topic": "topic",
"strategy_factory": "sentry.sentry_metrics.consumers.indexer.parallel.MetricsConsumerStrategyFactory",
"static_args": {
"ingest_profile": "release-health",
},
"dlq_max_invalid_ratio": 0.01,
"dlq_max_consecutive_count": 1000,
}
]
for invalid_definition in invalid_definitions:
with pytest.raises(ValueError):
validate_consumer_definition(invalid_definition)

def test_kafka_consumer_definition_validity(self):
for definition in KAFKA_CONSUMERS.values():
validate_consumer_definition(definition)

0 comments on commit b85278d

Please sign in to comment.