Skip to content

Commit

Permalink
ref(kafka): Validate topics on startup (#3543)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde authored May 22, 2024
1 parent 4e61bcc commit 5c63716
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- Extract status for spans. ([#3606](https://github.com/getsentry/relay/pull/3606))
- Forward `received_at` timestamp for buckets sent to Kafka. ([#3561](https://github.com/getsentry/relay/pull/3561))
- Limit metric name to 150 characters. ([#3628](https://github.com/getsentry/relay/pull/3628))
- Add validation of Kafka topics on startup. ([#3543](https://github.com/getsentry/relay/pull/3543))

## 24.5.0

Expand Down
9 changes: 9 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,9 @@ pub struct Processing {
/// Kafka topic names.
#[serde(default)]
pub topics: TopicAssignments,
/// Whether to validate the supplied topics by calling Kafka's metadata endpoints.
#[serde(default)]
pub kafka_validate_topics: bool,
/// Redis hosts to connect to for storing state for rate limits.
#[serde(default)]
pub redis: Option<RedisConfig>,
Expand All @@ -1037,6 +1040,7 @@ impl Default for Processing {
kafka_config: Vec::new(),
secondary_kafka_configs: BTreeMap::new(),
topics: TopicAssignments::default(),
kafka_validate_topics: false,
redis: None,
attachment_chunk_size: default_chunk_size(),
projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
Expand Down Expand Up @@ -2252,6 +2256,11 @@ impl Config {
)
}

/// Whether to validate the topics against Kafka.
pub fn kafka_validate_topics(&self) -> bool {
self.values.processing.kafka_validate_topics
}

/// All unused but configured topic assignments.
pub fn unused_topic_assignments(&self) -> &BTreeMap<String, TopicAssignment> {
&self.values.processing.topics.unused
Expand Down
43 changes: 37 additions & 6 deletions relay-kafka/src/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use utils::{Context, ThreadedProducer};
mod schemas;

const REPORT_FREQUENCY: Duration = Duration::from_secs(1);
const KAFKA_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30);

/// Kafka producer errors.
#[derive(Error, Debug)]
Expand Down Expand Up @@ -55,6 +56,14 @@ pub enum ClientError {
/// Configuration is wrong and it cannot be used to identify the number of a shard.
#[error("invalid kafka shard")]
InvalidShard,

/// Failed to fetch the metadata of Kafka.
#[error("failed to fetch the metadata of Kafka")]
MetadataFetchError(rdkafka::error::KafkaError),

/// Failed to validate the topic.
#[error("failed to validate the topic with name {0}: {1:?}")]
TopicError(String, rdkafka_sys::rd_kafka_resp_err_t),
}

/// Describes the type which can be sent using kafka producer provided by this crate.
Expand Down Expand Up @@ -94,6 +103,22 @@ impl Producer {
producer,
}
}

/// Validates the topic by fetching the metadata of the topic directly from Kafka.
fn validate_topic(&self) -> Result<(), ClientError> {
let client = self.producer.client();
let metadata = client
.fetch_metadata(Some(&self.topic_name), KAFKA_FETCH_METADATA_TIMEOUT)
.map_err(ClientError::MetadataFetchError)?;

for topic in metadata.topics() {
if let Some(error) = topic.error() {
return Err(ClientError::TopicError(topic.name().to_string(), error));
}
}

Ok(())
}
}

impl fmt::Debug for Producer {
Expand Down Expand Up @@ -188,6 +213,7 @@ impl KafkaClientBuilder {
mut self,
topic: KafkaTopic,
params: &KafkaParams,
validate_topic: bool,
) -> Result<Self, ClientError> {
let mut client_config = ClientConfig::new();

Expand All @@ -200,10 +226,11 @@ impl KafkaClientBuilder {
let config_name = config_name.map(str::to_string);

if let Some(producer) = self.reused_producers.get(&config_name) {
self.producers.insert(
topic,
Producer::new((*topic_name).to_string(), Arc::clone(producer)),
);
let producer = Producer::new((*topic_name).to_string(), Arc::clone(producer));
if validate_topic {
producer.validate_topic()?;
}
self.producers.insert(topic, producer);
return Ok(self);
}

Expand All @@ -219,8 +246,12 @@ impl KafkaClientBuilder {

self.reused_producers
.insert(config_name, Arc::clone(&producer));
self.producers
.insert(topic, Producer::new((*topic_name).to_string(), producer));

let producer = Producer::new((*topic_name).to_string(), producer);
if validate_topic {
producer.validate_topic()?;
}
self.producers.insert(topic, producer);

Ok(self)
}
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ impl KafkaOutcomesProducer {
for topic in &[KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
let kafka_config = &config.kafka_config(*topic).context(ServiceError::Kafka)?;
client_builder = client_builder
.add_kafka_topic_config(*topic, kafka_config)
.add_kafka_topic_config(*topic, kafka_config, config.kafka_validate_topics())
.context(ServiceError::Kafka)?;
}

Expand Down
6 changes: 5 additions & 1 deletion relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! This module contains the service that forwards events and attachments to the Sentry store.
//! The service uses Kafka topics to forward data to Sentry
use anyhow::Context;
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::error::Error;
Expand Down Expand Up @@ -30,6 +31,7 @@ use uuid::Uuid;
use crate::envelope::{AttachmentType, Envelope, Item, ItemType};

use crate::metrics::MetricOutcomes;
use crate::service::ServiceError;
use crate::services::global_config::GlobalConfigHandle;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::Processed;
Expand Down Expand Up @@ -65,7 +67,9 @@ impl Producer {
**t != KafkaTopic::Outcomes && **t != KafkaTopic::OutcomesBilling
}) {
let kafka_config = &config.kafka_config(*topic)?;
client_builder = client_builder.add_kafka_topic_config(*topic, kafka_config)?;
client_builder = client_builder
.add_kafka_topic_config(*topic, kafka_config, config.kafka_validate_topics())
.context(ServiceError::Kafka)?;
}

Ok(Self {
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,13 @@ def test_invalid_kafka_config_should_fail(mini_sentry, relay_with_processing):
assert "__unknown" in error
error = str(mini_sentry.test_failures.pop(0))
assert "profiles" in error.lower()


def test_invalid_topics_raise_error(mini_sentry, relay_with_processing):
options = {"processing": {"kafka_validate_topics": True}}

relay = relay_with_processing(options=options, wait_health_check=False)
assert relay.wait_for_exit() != 0

error = str(mini_sentry.test_failures.pop(0))
assert "failed to validate the topic with name" in error

0 comments on commit 5c63716

Please sign in to comment.