Skip to content

Commit

Permalink
dekaf: Random delay when retrying transient error
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Sep 16, 2024
1 parent 8e7e279 commit f763d27
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ metrics-prometheus = { workspace = true }
percent-encoding = { workspace = true }
postgrest = { workspace = true }
prometheus = { workspace = true }
rand = { workspace = true }
regex = { workspace = true }
rsasl = { workspace = true }
rustls = { workspace = true }
Expand Down
5 changes: 5 additions & 0 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use doc::AsNode;
use futures::StreamExt;
use gazette::journal::{ReadJsonLine, ReadJsonLines};
use gazette::{broker, journal, uuid};
use std::time::Duration;

pub struct Read {
/// Journal offset to be served by this Read.
Expand Down Expand Up @@ -112,7 +113,11 @@ impl Read {
}
},
Err(err) if err.is_transient() => {
use rand::Rng;

tracing::warn!(%err, "Retrying transient read error");
let delay = Duration::from_millis(rand::thread_rng().gen_range(300..2000));
tokio::time::sleep(delay).await;
// We can retry transient errors just by continuing to poll the stream
// TODO: We might have a counter here and give up after a few attempts
continue;
Expand Down

0 comments on commit f763d27

Please sign in to comment.