Skip to content

Commit

Permalink
dekaf: Fix sanity check to also bail out if a data-preview session is…
Browse files Browse the repository at this point in the history
… then used to ask for documents newer than the write-head
  • Loading branch information
jshearer committed Oct 18, 2024
1 parent 22636f4 commit db73eeb
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use kafka_protocol::{
},
protocol::{buf::ByteBuf, Decodable, Encodable, Message, StrBytes},
};
use std::{cmp::max, sync::Arc, time::Duration};
use std::{
collections::{hash_map::Entry, HashMap},
time::{SystemTime, UNIX_EPOCH},
};
use std::{sync::Arc, time::Duration, cmp::max};
use tracing::instrument;

struct PendingRead {
Expand Down Expand Up @@ -430,7 +430,9 @@ impl Session {
// so long as the request is still a data preview request. If not, bail out
Entry::Occupied(entry) => {
let data_preview_state = entry.get();
if data_preview_state.offset - fetch_offset > 12 {
if fetch_offset > data_preview_state.offset
|| data_preview_state.offset - fetch_offset > 12
{
bail!("Session was used for fetching preview data, cannot be used for fetching non-preview data.")
}
Some(data_preview_state.to_owned())
Expand Down

0 comments on commit db73eeb

Please sign in to comment.