diff --git a/crates/dekaf/src/lib.rs b/crates/dekaf/src/lib.rs index 328864d77e..c81c484962 100644 --- a/crates/dekaf/src/lib.rs +++ b/crates/dekaf/src/lib.rs @@ -194,6 +194,7 @@ async fn handle_api( // https://github.com/confluentinc/librdkafka/blob/e03d3bb91ed92a38f38d9806b8d8deffe78a1de5/src/rdkafka_request.c#L2823 let (header, request) = dec_request(frame, version)?; tracing::debug!(client_id=?header.client_id, "Got client ID!"); + session.client_id = header.client_id.clone().map(|id| id.to_string()); Ok(enc_resp(out, &header, session.api_versions(request).await?)) } ApiKey::SaslHandshakeKey => { diff --git a/crates/dekaf/src/main.rs b/crates/dekaf/src/main.rs index 20fac234ce..72ab4ccfa1 100644 --- a/crates/dekaf/src/main.rs +++ b/crates/dekaf/src/main.rs @@ -12,12 +12,16 @@ use futures::{FutureExt, TryStreamExt}; use rsasl::config::SASLConfig; use rustls::pki_types::CertificateDer; use std::{ + collections::HashMap, fs::File, io, path::{Path, PathBuf}, sync::Arc, }; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio::{ + io::{AsyncRead, AsyncWrite, AsyncWriteExt}, + sync::RwLock, +}; use tracing_subscriber::{filter::LevelFilter, EnvFilter}; use url::Url; @@ -113,6 +117,8 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); tracing::info!("Starting dekaf"); + let offset_map = Arc::new(RwLock::new(HashMap::new())); + let (api_endpoint, api_key) = if cli.local { (LOCAL_PG_URL.to_owned(), LOCAL_PG_PUBLIC_TOKEN.to_string()) } else { @@ -219,7 +225,7 @@ async fn main() -> anyhow::Result<()> { continue }; - tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone())); + tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned(), offset_map.clone()), socket, addr, stop.clone())); } _ = &mut stop => break, } @@ -240,7 +246,7 @@ async fn main() -> anyhow::Result<()> { }; socket.set_nodelay(true)?; - tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone())); + tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned(), offset_map.clone()), socket, addr, stop.clone())); } _ = &mut stop => break, } diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 8fa4e2dff7..8f2c1f679b 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -5,7 +5,7 @@ use doc::AsNode; use futures::StreamExt; use gazette::journal::{ReadJsonLine, ReadJsonLines}; use gazette::{broker, journal, uuid}; -use kafka_protocol::records::Compression; +use kafka_protocol::records::{Compression, TimestampType}; use lz4_flex::frame::BlockMode; use std::time::{Duration, Instant}; @@ -28,6 +28,8 @@ pub struct Read { // Keep these details around so we can create a new ReadRequest if we need to skip forward journal_name: String, + + pub(crate) rewrite_offsets_from: Option, } pub enum BatchResult { @@ -39,6 +41,12 @@ pub enum BatchResult { TimeoutNoData, } +#[derive(Copy, Clone)] +pub enum ReadTarget { + Bytes(usize), + Docs(usize), +} + impl Read { pub fn new( client: journal::Client, @@ -47,6 +55,7 @@ impl Read { offset: i64, key_schema_id: u32, value_schema_id: u32, + rewrite_offsets_from: Option, ) -> Self { let (not_before_sec, _) = collection.not_before.to_unix(); @@ -79,17 +88,18 @@ impl Read { value_schema_id, journal_name: partition.spec.name.clone(), + rewrite_offsets_from, } } #[tracing::instrument(skip_all,fields(journal_name=self.journal_name))] pub async fn next_batch( mut self, - target_bytes: usize, + target: ReadTarget, timeout: Instant, ) -> anyhow::Result<(Self, BatchResult)> { use kafka_protocol::records::{ - Compression, Record, RecordBatchEncoder, RecordEncodeOptions, TimestampType, + Compression, Record, RecordBatchEncoder, RecordEncodeOptions, }; let mut records: Vec = Vec::new(); @@ -109,7 +119,10 @@ impl Read { let mut did_timeout = false; - while records_bytes < target_bytes { + while match target { + ReadTarget::Bytes(target_bytes) => records_bytes < target_bytes, + ReadTarget::Docs(target_docs) => records.len() < target_docs, + } { let read = match tokio::select! { biased; // Attempt to read before yielding. @@ -169,6 +182,8 @@ impl Read { ReadJsonLine::Doc { root, next_offset } => (root, next_offset), }; + let mut record_bytes: usize = 0; + let Some(doc::ArchivedNode::String(uuid)) = self.uuid_ptr.query(root.get()) else { let serialized_doc = root.get().to_debug_json_value(); anyhow::bail!( @@ -215,14 +230,14 @@ impl Read { buf.put_i16(9999); // ControlMessageType: unused: i16 buf.put_i16(9999); - records_bytes += 4; + record_bytes += 4; Some(buf.split().freeze()) } else { tmp.push(0); tmp.extend(self.key_schema_id.to_be_bytes()); () = avro::encode_key(&mut tmp, &self.key_schema, root.get(), &self.key_ptr)?; - records_bytes += tmp.len(); + record_bytes += tmp.len(); buf.extend_from_slice(&tmp); tmp.clear(); Some(buf.split().freeze()) @@ -236,7 +251,7 @@ impl Read { tmp.extend(self.value_schema_id.to_be_bytes()); () = avro::encode(&mut tmp, &self.value_schema, root.get())?; - records_bytes += tmp.len(); + record_bytes += tmp.len(); buf.extend_from_slice(&tmp); tmp.clear(); Some(buf.split().freeze()) @@ -257,7 +272,11 @@ impl Read { // // Note that sequence must increment at the same rate // as offset for efficient record batch packing. - let kafka_offset = next_offset - 1; + let kafka_offset = if let Some(rewrite_from) = self.rewrite_offsets_from { + rewrite_from + records.len() as i64 + } else { + next_offset - 1 + }; records.push(Record { control: is_control, @@ -273,6 +292,7 @@ impl Read { transactional: false, value, }); + records_bytes += record_bytes; } let opts = RecordEncodeOptions { @@ -297,12 +317,14 @@ impl Read { metrics::counter!("dekaf_bytes_read", "journal_name" => self.journal_name.to_owned()) .increment(records_bytes as u64); + let frozen = buf.freeze(); + Ok(( self, match (records.len() > 0, did_timeout) { (false, true) => BatchResult::TimeoutNoData, - (true, true) => BatchResult::TimeoutExceededBeforeTarget(buf.freeze()), - (true, false) => BatchResult::TargetExceededBeforeTimeout(buf.freeze()), + (true, true) => BatchResult::TimeoutExceededBeforeTarget(frozen), + (true, false) => BatchResult::TargetExceededBeforeTimeout(frozen), (false, false) => { unreachable!("shouldn't be able see no documents, and also not timeout") } diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index f2d483d407..49029f36e4 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -22,6 +22,7 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; use std::{sync::Arc, time::Duration}; +use tokio::sync::RwLock; use tracing::instrument; struct PendingRead { @@ -35,15 +36,23 @@ pub struct Session { reads: HashMap<(TopicName, i32), PendingRead>, secret: String, auth: Option, + latest_partition_offsets: Arc>>, + pub client_id: Option, } impl Session { - pub fn new(app: Arc, secret: String) -> Self { + pub fn new( + app: Arc, + secret: String, + offsets: Arc>>, + ) -> Self { Self { app, reads: HashMap::new(), auth: None, secret, + latest_partition_offsets: offsets, + client_id: None, } } @@ -265,65 +274,76 @@ impl Session { // Concurrently fetch Collection instances and offsets for all requested topics and partitions. // Map each "topic" into Vec<(Partition Index, Option<(Journal Offset, Timestamp))>. - let collections: anyhow::Result)>)>> = - futures::future::try_join_all(request.topics.into_iter().map(|topic| async move { - let maybe_collection = Collection::new( - client, - from_downstream_topic_name(topic.name.clone()).as_str(), - ) - .await?; - - let Some(collection) = maybe_collection else { - return Ok(( - topic.name, - topic - .partitions - .iter() - .map(|p| (p.partition_index, None)) - .collect(), - )); - }; - let collection = &collection; - - // Concurrently fetch requested offset for each named partition. - let offsets: anyhow::Result<_> = futures::future::try_join_all( - topic.partitions.into_iter().map(|partition| async move { - Ok(( - partition.partition_index, - collection - .fetch_partition_offset( - partition.partition_index as usize, - partition.timestamp, // In millis. - ) - .await?, - )) - }), - ) - .await; + let collections: anyhow::Result< + Vec<(TopicName, Vec<(i32, i64, Option<(i64, i64, i64)>)>)>, + > = futures::future::try_join_all(request.topics.into_iter().map(|topic| async move { + let maybe_collection = Collection::new( + client, + from_downstream_topic_name(topic.name.clone()).as_str(), + ) + .await?; - Ok((topic.name, offsets?)) - })) + let Some(collection) = maybe_collection else { + return Ok(( + topic.name, + topic + .partitions + .iter() + .map(|p| (p.partition_index, p.timestamp, None)) + .collect(), + )); + }; + let collection = &collection; + + // Concurrently fetch requested offset for each named partition. + let offsets: anyhow::Result<_> = futures::future::try_join_all( + topic.partitions.into_iter().map(|partition| async move { + Ok(( + partition.partition_index, + partition.timestamp, + collection + .fetch_partition_offset( + partition.partition_index as usize, + partition.timestamp, // In millis. + ) + .await?, + )) + }), + ) .await; + Ok((topic.name, offsets?)) + })) + .await; + let collections = collections?; use messages::list_offsets_response::{ ListOffsetsPartitionResponse, ListOffsetsTopicResponse, }; + let mut new_latest_offsets = HashMap::new(); + // Map topics, partition indices, and fetched offsets into a comprehensive response. let response = collections .into_iter() .map(|(topic_name, offsets)| { let partitions = offsets .into_iter() - .map(|(partition_index, maybe_offset)| { - let Some((offset, timestamp)) = maybe_offset else { + .map(|(partition_index, request_timestamp, maybe_offset)| { + let Some((offset, fragment_start, timestamp)) = maybe_offset else { return ListOffsetsPartitionResponse::default() .with_partition_index(partition_index) .with_error_code(ResponseError::UnknownTopicOrPartition.code()); }; + if request_timestamp == -1 { + new_latest_offsets.insert( + (topic_name.to_owned(), partition_index), + (offset, fragment_start), + ); + } + ListOffsetsPartitionResponse::default() .with_partition_index(partition_index) .with_offset(offset) @@ -337,6 +357,11 @@ impl Session { }) .collect(); + self.latest_partition_offsets + .write() + .await + .extend(new_latest_offsets); + Ok(messages::ListOffsetsResponse::default().with_topics(response)) } @@ -363,8 +388,7 @@ impl Session { .authenticated_client() .await?; - let timeout_at = - std::time::Instant::now() + std::time::Duration::from_millis(max_wait_ms as u64); + let timeout = std::time::Duration::from_millis(max_wait_ms as u64); let mut hit_timeout = false; @@ -373,6 +397,19 @@ impl Session { let mut key = (from_downstream_topic_name(topic_request.topic.clone()), 0); for partition_request in &topic_request.partitions { + let read_guard = self.latest_partition_offsets.read().await; + let fetched_offset = read_guard + .get(&(key.0.to_owned(), partition_request.partition)) + .copied(); + + drop(read_guard); + + let diff = if let Some((offset, fragment_start)) = fetched_offset { + Some((offset - partition_request.fetch_offset, fragment_start)) + } else { + None + }; + key.1 = partition_request.partition; let fetch_offset = partition_request.fetch_offset; @@ -394,21 +431,44 @@ impl Session { let (key_schema_id, value_schema_id) = collection .registered_schema_ids(&client.pg_client()) .await?; - - let read: Read = Read::new( - collection.journal_client.clone(), - &collection, - partition, - fetch_offset, - key_schema_id, - value_schema_id, - ); let pending = PendingRead { offset: fetch_offset, last_write_head: fetch_offset, - handle: tokio_util::task::AbortOnDropHandle::new(tokio::spawn( - read.next_batch(partition_request.partition_max_bytes as usize, timeout_at), - )), + handle: tokio_util::task::AbortOnDropHandle::new(match diff { + // Startree: 0, Tinybird: 12 + Some((diff, fragment_start)) if diff <= 12 => tokio::spawn( + Read::new( + collection.journal_client.clone(), + &collection, + partition, + fragment_start, + key_schema_id, + value_schema_id, + Some(partition_request.fetch_offset - 1), + ) + .next_batch( + crate::read::ReadTarget::Docs(diff as usize + 1), + std::time::Instant::now() + timeout, + ), + ), + _ => tokio::spawn( + Read::new( + collection.journal_client.clone(), + &collection, + partition, + fetch_offset, + key_schema_id, + value_schema_id, + None, + ) + .next_batch( + crate::read::ReadTarget::Bytes( + partition_request.partition_max_bytes as usize, + ), + std::time::Instant::now() + timeout, + ), + ), + }), }; tracing::info!( @@ -451,19 +511,14 @@ impl Session { }; let (read, batch) = (&mut pending.handle).await??; - pending.offset = read.offset; - pending.last_write_head = read.last_write_head; - pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn( - read.next_batch(partition_request.partition_max_bytes as usize, timeout_at), - )); - let (timeout, batch) = match batch { + let (did_timeout, batch) = match batch { BatchResult::TargetExceededBeforeTimeout(b) => (false, Some(b)), BatchResult::TimeoutExceededBeforeTarget(b) => (true, Some(b)), BatchResult::TimeoutNoData => (true, None), }; - if timeout { + if did_timeout { hit_timeout = true } @@ -477,6 +532,20 @@ impl Session { .with_high_watermark(pending.last_write_head) // Map to kafka cursor. .with_last_stable_offset(pending.last_write_head), ); + + if read.rewrite_offsets_from.is_none() { + pending.offset = read.offset; + pending.last_write_head = read.last_write_head; + pending.handle = + tokio_util::task::AbortOnDropHandle::new(tokio::spawn(read.next_batch( + crate::read::ReadTarget::Bytes( + partition_request.partition_max_bytes as usize, + ), + std::time::Instant::now() + timeout, + ))); + } else { + self.reads.remove(&key); + } } topic_responses.push( diff --git a/crates/dekaf/src/topology.rs b/crates/dekaf/src/topology.rs index 34b885f52d..74e81e24a7 100644 --- a/crates/dekaf/src/topology.rs +++ b/crates/dekaf/src/topology.rs @@ -185,7 +185,7 @@ impl Collection { &self, partition_index: usize, timestamp_millis: i64, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let Some(partition) = self.partitions.get(partition_index) else { return Ok(None); }; @@ -212,18 +212,18 @@ impl Collection { }; let response = self.journal_client.list_fragments(request).await?; - let (offset, mod_time) = match response.fragments.get(0) { + let (offset, fragment_start, mod_time) = match response.fragments.get(0) { Some(broker::fragments_response::Fragment { spec: Some(spec), .. }) => { if timestamp_millis == -1 { // Subtract one to reflect the largest fetch-able offset of the fragment. - (spec.end - 1, spec.mod_time) + (spec.end - 1, spec.begin, spec.mod_time) } else { - (spec.begin, spec.mod_time) + (spec.begin, spec.begin, spec.mod_time) } } - _ => (0, 0), + _ => (0, 0, 0), }; tracing::debug!( @@ -235,7 +235,7 @@ impl Collection { "fetched offset" ); - Ok(Some((offset, mod_time))) + Ok(Some((offset, fragment_start, mod_time))) } /// Build a journal client by resolving the collections data-plane gateway and an access token.