Skip to content

Commit

Permalink
dekaf: Clean up and clarify some anonymous types, as well as ensuring…
Browse files Browse the repository at this point in the history
… that a session that has been used for data previews cannot subsequently be re-used for non-preview fetches
  • Loading branch information
jshearer committed Oct 16, 2024
1 parent 232dc6c commit 58dffbb
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 96 deletions.
253 changes: 166 additions & 87 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use super::{App, Collection, Read};
use crate::{
from_downstream_topic_name, from_upstream_topic_name, read::BatchResult,
to_downstream_topic_name, to_upstream_topic_name, topology::fetch_all_collection_names,
from_downstream_topic_name, from_upstream_topic_name,
read::BatchResult,
to_downstream_topic_name, to_upstream_topic_name,
topology::{fetch_all_collection_names, PartitionOffset},
Authenticated,
};
use anyhow::Context;
use anyhow::{bail, Context};
use bytes::{BufMut, Bytes, BytesMut};
use kafka_protocol::{
error::{ParseResponseErrorCode, ResponseError},
Expand All @@ -18,10 +20,7 @@ use kafka_protocol::{
protocol::{buf::ByteBuf, Decodable, Encodable, Message, StrBytes},
};
use std::{
collections::{
hash_map::{Entry, OccupiedEntry},
HashMap,
},
collections::{hash_map::Entry, HashMap},
time::{SystemTime, UNIX_EPOCH},
};
use std::{sync::Arc, time::Duration};
Expand All @@ -33,12 +32,19 @@ struct PendingRead {
handle: tokio_util::task::AbortOnDropHandle<anyhow::Result<(Read, BatchResult)>>,
}

#[derive(Clone, Debug)]
enum SessionDataPreviewState {
Unknown,
NotDataPreview,
DataPreview(HashMap<(TopicName, i32), PartitionOffset>),
}

pub struct Session {
app: Arc<App>,
reads: HashMap<(TopicName, i32), PendingRead>,
secret: String,
auth: Option<Authenticated>,
data_preview_offsets: HashMap<(TopicName, i32), Option<(i64, i64)>>,
data_preview_state: SessionDataPreviewState,
pub client_id: Option<String>,
}

Expand All @@ -50,7 +56,7 @@ impl Session {
auth: None,
secret,
client_id: None,
data_preview_offsets: HashMap::new(),
data_preview_state: SessionDataPreviewState::Unknown,
}
}

Expand Down Expand Up @@ -271,48 +277,46 @@ impl Session {
.await?;

// Concurrently fetch Collection instances and offsets for all requested topics and partitions.
// Map each "topic" into Vec<(Partition Index, Option<(Journal Offset, Timestamp))>.
let collections: anyhow::Result<
Vec<(TopicName, Vec<(i32, i64, Option<(i64, i64, i64)>)>)>,
> = futures::future::try_join_all(request.topics.into_iter().map(|topic| async move {
let maybe_collection = Collection::new(
client,
from_downstream_topic_name(topic.name.clone()).as_str(),
)
.await?;
// Map each "topic" into Vec<(Partition Index, Option<PartitionOffset>.
let collections: anyhow::Result<Vec<(TopicName, Vec<(i32, Option<PartitionOffset>)>)>> =
futures::future::try_join_all(request.topics.into_iter().map(|topic| async move {
let maybe_collection = Collection::new(
client,
from_downstream_topic_name(topic.name.clone()).as_str(),
)
.await?;

let Some(collection) = maybe_collection else {
return Ok((
topic.name,
topic
.partitions
.iter()
.map(|p| (p.partition_index, p.timestamp, None))
.collect(),
));
};
let collection = &collection;

// Concurrently fetch requested offset for each named partition.
let offsets: anyhow::Result<_> = futures::future::try_join_all(
topic.partitions.into_iter().map(|partition| async move {
Ok((
partition.partition_index,
partition.timestamp,
collection
.fetch_partition_offset(
partition.partition_index as usize,
partition.timestamp, // In millis.
)
.await?,
))
}),
)
.await;
let Some(collection) = maybe_collection else {
return Ok((
topic.name,
topic
.partitions
.iter()
.map(|p| (p.partition_index, None))
.collect(),
));
};
let collection = &collection;

// Concurrently fetch requested offset for each named partition.
let offsets: anyhow::Result<_> = futures::future::try_join_all(
topic.partitions.into_iter().map(|partition| async move {
Ok((
partition.partition_index,
collection
.fetch_partition_offset(
partition.partition_index as usize,
partition.timestamp, // In millis.
)
.await?,
))
}),
)
.await;

Ok((topic.name, offsets?))
}))
.await;
Ok((topic.name, offsets?))
}))
.await;

let collections = collections?;

Expand All @@ -326,8 +330,13 @@ impl Session {
.map(|(topic_name, offsets)| {
let partitions = offsets
.into_iter()
.map(|(partition_index, request_timestamp, maybe_offset)| {
let Some((offset, fragment_start, timestamp)) = maybe_offset else {
.map(|(partition_index, maybe_offset)| {
let Some(PartitionOffset {
offset,
mod_time: timestamp,
..
}) = maybe_offset
else {
return ListOffsetsPartitionResponse::default()
.with_partition_index(partition_index)
.with_error_code(ResponseError::UnknownTopicOrPartition.code());
Expand Down Expand Up @@ -376,7 +385,8 @@ impl Session {
.as_mut()
.ok_or(anyhow::anyhow!("Session not authenticated"))?
.authenticated_client()
.await?;
.await?
.clone();

let timeout = std::time::Duration::from_millis(max_wait_ms as u64);

Expand All @@ -390,50 +400,64 @@ impl Session {
key.1 = partition_request.partition;
let fetch_offset = partition_request.fetch_offset;

let data_preview_params: Option<(i64, i64)> = match self
.data_preview_offsets
.entry(key.to_owned())
let data_preview_params: Option<PartitionOffset> = match self
.data_preview_state
.to_owned()
{
Entry::Occupied(entry) => match entry.get() {
Some((offset, fragment_start)) => {
tracing::debug!(collection=?key.0,partition=key.1, offset, fragment_start, fetch_offset, "Session already marked as data-preview for this partition");
Some((*offset, *fragment_start))
}
None => {
tracing::debug!(collection=?key.0,partition=key.1, "Session already marked as not data-preview for this partition");
None
}
},
Entry::Vacant(entry) => {
tracing::debug!(collection=?key.0,partition=key.1, fetch_offset,"Loading latest offset for this partition to check if session is data-preview");
let collection = Collection::new(&client, key.0.as_str())
.await?
.ok_or(anyhow::anyhow!("Collection {} not found", key.0.as_str()))?;

if let Some((latest_offset, fragment_start, _)) = collection
.fetch_partition_offset(key.1 as usize, -1)
// On the first Fetch call, check to see whether it is considered a data-preview
// fetch or not. If so, flag the whole session as being tainted, and also keep track
// of the neccesary offset data in order to serve the rewritten data preview responses.
SessionDataPreviewState::Unknown => {
if let Some(state) = self
.is_fetch_data_preview(key.0.to_string(), key.1, fetch_offset)
.await?
{
if latest_offset - fetch_offset < 13 {
tracing::debug!(collection=?key.0,partition=key.1, fetch_offset, latest_offset, diff=latest_offset - fetch_offset, "Marking session as data-preview for this partition");
let diff = Some((latest_offset, fragment_start));
entry.insert(diff);
diff
} else {
tracing::debug!(collection=?key.0,partition=key.1, fetch_offset, latest_offset, diff=latest_offset - fetch_offset, "Marking session as not data-preview for this partition");
entry.insert(None);
None
}
let mut data_preview_state = HashMap::new();
data_preview_state.insert(key.to_owned(), state);
self.data_preview_state =
SessionDataPreviewState::DataPreview(data_preview_state);
Some(state)
} else {
self.data_preview_state = SessionDataPreviewState::NotDataPreview;
None
}
}
// If the first Fetch request in a session was not considered for data preview,
// then skip all further checks in order to avoid slowing down fetches.
SessionDataPreviewState::NotDataPreview => None,
SessionDataPreviewState::DataPreview(mut state) => {
match state.entry(key.to_owned()) {
// If a session is marked as being used for data preview, and this Fetch request
// is for a topic/partition that we've already loaded the offsets for, re-use them
// so long as the request is still a data preview request. If not, bail out
Entry::Occupied(entry) => {
let data_preview_state = entry.get();
if data_preview_state.offset - fetch_offset > 12 {
bail!("Session was used for fetching preview data, cannot be used for fetching non-preview data.")
}
Some(data_preview_state.to_owned())
}
// Otherwise, load the offsets for this new topic/partition, and also ensure that this is
// still a data-preview request. If not, bail out.
Entry::Vacant(entry) => {
if let Some(state) = self
.is_fetch_data_preview(key.0.to_string(), key.1, fetch_offset)
.await?
{
entry.insert(state);
Some(state)
} else {
bail!("Session was used for fetching preview data, cannot be used for fetching non-preview data.")
}
}
}
}
};

if matches!(self.reads.get(&key), Some(pending) if pending.offset == fetch_offset) {
continue; // Common case: fetch is at the pending offset.
}
let Some(collection) = Collection::new(client, &key.0).await? else {
let Some(collection) = Collection::new(&client, &key.0).await? else {
tracing::debug!(collection = ?&key.0, "Collection doesn't exist!");
continue; // Collection doesn't exist.
};
Expand All @@ -452,9 +476,11 @@ impl Session {
last_write_head: fetch_offset,
handle: tokio_util::task::AbortOnDropHandle::new(match data_preview_params {
// Startree: 0, Tinybird: 12
Some((latest_offset, fragment_start))
if latest_offset - fetch_offset <= 12 =>
{
Some(PartitionOffset {
fragment_start,
offset: latest_offset,
..
}) if latest_offset - fetch_offset <= 12 => {
let diff = latest_offset - fetch_offset;
tokio::spawn(
Read::new(
Expand Down Expand Up @@ -1165,4 +1191,57 @@ impl Session {
TopicName(StrBytes::from_string(name))
}
}

/// If the fetched offset is within a fixed number of offsets from the end of the journal,
/// return Some with a PartitionOffset containing the beginning and end of the latest fragment.
#[tracing::instrument(skip(self))]
async fn is_fetch_data_preview(
&mut self,
collection_name: String,
partition: i32,
fetch_offset: i64,
) -> anyhow::Result<Option<PartitionOffset>> {
let client = self
.auth
.as_mut()
.ok_or(anyhow::anyhow!("Session not authenticated"))?
.authenticated_client()
.await?;

tracing::debug!(
"Loading latest offset for this partition to check if session is data-preview"
);
let collection = Collection::new(&client, collection_name.as_str())
.await?
.ok_or(anyhow::anyhow!("Collection {} not found", collection_name))?;

if let Some(
partition_offset @ PartitionOffset {
offset: latest_offset,
..
},
) = collection
.fetch_partition_offset(partition as usize, -1)
.await?
{
if latest_offset - fetch_offset < 13 {
tracing::debug!(
latest_offset,
diff = latest_offset - fetch_offset,
"Marking session as data-preview"
);
Ok(Some(partition_offset))
} else {
tracing::debug!(
fetch_offset,
latest_offset,
diff = latest_offset - fetch_offset,
"Marking session as non-data-preview"
);
Ok(None)
}
} else {
Ok(None)
}
}
}
Loading

0 comments on commit 58dffbb

Please sign in to comment.