Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

feat(capture): Make overflow limiter optional via configuration #19

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
print_sink: false,
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
redis_url: "redis://localhost:6379/".to_string(),
overflow_enabled: true,
overflow_burst_limit: NonZeroU32::new(5).unwrap(),
overflow_per_second_limit: NonZeroU32::new(10).unwrap(),
overflow_forced_keys: None,
Expand Down
3 changes: 3 additions & 0 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub struct Config {
pub redis_url: String,
pub otel_url: Option<String>,

#[envconfig(default = "true")]
pub overflow_enabled: bool,

#[envconfig(default = "100")]
pub overflow_per_second_limit: NonZeroU32,

Expand Down
43 changes: 25 additions & 18 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,31 @@ where
.register("rdkafka".to_string(), Duration::seconds(30))
.await;

let partition = OverflowLimiter::new(
config.overflow_per_second_limit,
config.overflow_burst_limit,
config.overflow_forced_keys,
);
if config.export_prometheus {
let partition = partition.clone();
tokio::spawn(async move {
partition.report_metrics().await;
});
}
{
// Ensure that the rate limiter state does not grow unbounded
let partition = partition.clone();
tokio::spawn(async move {
partition.clean_state().await;
});
}
let partition = match config.overflow_enabled {
true => {
let partition = OverflowLimiter::new(
config.overflow_per_second_limit,
config.overflow_burst_limit,
config.overflow_forced_keys,
);
if config.export_prometheus {
let partition = partition.clone();
tokio::spawn(async move {
partition.report_metrics().await;
});
}
{
// Ensure that the rate limiter state does not grow unbounded
let partition = partition.clone();
tokio::spawn(async move {
partition.clean_state().await;
});
}
Some(partition)
}
false => None,
};

let sink = KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

Expand Down
16 changes: 11 additions & 5 deletions capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ impl rdkafka::ClientContext for KafkaContext {
pub struct KafkaSink {
producer: FutureProducer<KafkaContext>,
topic: String,
partition: OverflowLimiter,
partition: Option<OverflowLimiter>,
}

impl KafkaSink {
pub fn new(
config: KafkaConfig,
liveness: HealthHandle,
partition: OverflowLimiter,
partition: Option<OverflowLimiter>,
) -> anyhow::Result<KafkaSink> {
info!("connecting to Kafka brokers at {}...", config.kafka_hosts);

Expand Down Expand Up @@ -206,7 +206,10 @@ impl KafkaSink {
impl Event for KafkaSink {
#[instrument(skip_all)]
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
let limited = self.partition.is_limited(&event.key());
let limited = match &self.partition {
Some(limiter) => limiter.is_limited(&event.key()),
None => false,
};
let ack =
Self::kafka_send(self.producer.clone(), self.topic.clone(), event, limited).await?;
histogram!("capture_event_batch_size").record(1.0);
Expand All @@ -222,7 +225,10 @@ impl Event for KafkaSink {
for event in events {
let producer = self.producer.clone();
let topic = self.topic.clone();
let limited = self.partition.is_limited(&event.key());
let limited = match &self.partition {
Some(limiter) => limiter.is_limited(&event.key()),
None => false,
};

// We await kafka_send to get events in the producer queue sequentially
let ack = Self::kafka_send(producer, topic, event, limited).await?;
Expand Down Expand Up @@ -295,7 +301,7 @@ mod tests {
kafka_topic: "events_plugin_ingestion".to_string(),
kafka_tls: false,
};
let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink");
let sink = KafkaSink::new(config, handle, Some(limiter)).expect("failed to create sink");
(cluster, sink)
}

Expand Down
Loading