-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ref(kafka): Validate topics on startup #3543
Conversation
relay-config/src/config.rs
Outdated
@@ -999,6 +999,9 @@ pub struct Processing { | |||
/// Kafka topic names. | |||
#[serde(default)] | |||
pub topics: TopicAssignments, | |||
/// Whether to validate the supplied topics by calling Kafka's metadata endpoints. | |||
#[serde(default)] | |||
pub validate_kafka_topics: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub validate_kafka_topics: bool, | |
pub kafka_validate_topics: bool, |
relay-kafka/src/producer/mod.rs
Outdated
fn validate_topic(&self) -> Result<(), ClientError> { | ||
let client = self.producer.client(); | ||
let metadata = client | ||
.fetch_metadata(Some(&self.topic_name), Duration::from_secs(5)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timeout?
client_builder = client_builder.add_kafka_topic_config(*topic, kafka_config)?; | ||
client_builder = client_builder | ||
.add_kafka_topic_config(*topic, kafka_config, config.validate_kafka_topics()) | ||
.context(ServiceError::Kafka)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean map_err
here?
.context(ServiceError::Kafka)?; | |
.map_err(ServiceError::Kafka)?; |
@@ -736,7 +736,7 @@ impl KafkaOutcomesProducer { | |||
for topic in &[KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] { | |||
let kafka_config = &config.kafka_config(*topic).context(ServiceError::Kafka)?; | |||
client_builder = client_builder | |||
.add_kafka_topic_config(*topic, kafka_config) | |||
.add_kafka_topic_config(*topic, kafka_config, config.kafka_validate_topics()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: To make this more builder pattern, it would be nice to have a method call outside of the loop
.add_kafka_topic_config(*topic, kafka_config)
}
builder.set_topic_validation(conig.kafka_validate_topics());
builder.build()
assert relay.wait_for_exit() != 0 | ||
|
||
error = str(mini_sentry.test_failures.pop(0)) | ||
assert "failed to validate the topic with name" in error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What topic does this fail on?
This PR adds Kafka topics validation when Relay is starting up. This is done to prevent misconfiguration of Relay leading to errors when producing events.
Closes: #3404