Skip to content

Commit

Permalink
dekaf: Detect special-case of data preview UIs, and serve documents
Browse files Browse the repository at this point in the history
where normally no documents would get served
  • Loading branch information
jshearer committed Oct 16, 2024
1 parent e6a78da commit 775e028
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 80 deletions.
1 change: 1 addition & 0 deletions crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ async fn handle_api(
// https://github.com/confluentinc/librdkafka/blob/e03d3bb91ed92a38f38d9806b8d8deffe78a1de5/src/rdkafka_request.c#L2823
let (header, request) = dec_request(frame, version)?;
tracing::debug!(client_id=?header.client_id, "Got client ID!");
session.client_id = header.client_id.clone().map(|id| id.to_string());
Ok(enc_resp(out, &header, session.api_versions(request).await?))
}
ApiKey::SaslHandshakeKey => {
Expand Down
12 changes: 9 additions & 3 deletions crates/dekaf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ use futures::{FutureExt, TryStreamExt};
use rsasl::config::SASLConfig;
use rustls::pki_types::CertificateDer;
use std::{
collections::HashMap,
fs::File,
io,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
sync::RwLock,
};
use tracing_subscriber::{filter::LevelFilter, EnvFilter};
use url::Url;

Expand Down Expand Up @@ -113,6 +117,8 @@ async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
tracing::info!("Starting dekaf");

let offset_map = Arc::new(RwLock::new(HashMap::new()));

let (api_endpoint, api_key) = if cli.local {
(LOCAL_PG_URL.to_owned(), LOCAL_PG_PUBLIC_TOKEN.to_string())
} else {
Expand Down Expand Up @@ -219,7 +225,7 @@ async fn main() -> anyhow::Result<()> {
continue
};

tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone()));
tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned(), offset_map.clone()), socket, addr, stop.clone()));
}
_ = &mut stop => break,
}
Expand All @@ -240,7 +246,7 @@ async fn main() -> anyhow::Result<()> {
};
socket.set_nodelay(true)?;

tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone()));
tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned(), offset_map.clone()), socket, addr, stop.clone()));
}
_ = &mut stop => break,
}
Expand Down
42 changes: 32 additions & 10 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use doc::AsNode;
use futures::StreamExt;
use gazette::journal::{ReadJsonLine, ReadJsonLines};
use gazette::{broker, journal, uuid};
use kafka_protocol::records::Compression;
use kafka_protocol::records::{Compression, TimestampType};
use lz4_flex::frame::BlockMode;
use std::time::{Duration, Instant};

Expand All @@ -28,6 +28,8 @@ pub struct Read {

// Keep these details around so we can create a new ReadRequest if we need to skip forward
journal_name: String,

pub(crate) rewrite_offsets_from: Option<i64>,
}

pub enum BatchResult {
Expand All @@ -39,6 +41,12 @@ pub enum BatchResult {
TimeoutNoData,
}

#[derive(Copy, Clone)]
pub enum ReadTarget {
Bytes(usize),
Docs(usize),
}

impl Read {
pub fn new(
client: journal::Client,
Expand All @@ -47,6 +55,7 @@ impl Read {
offset: i64,
key_schema_id: u32,
value_schema_id: u32,
rewrite_offsets_from: Option<i64>,
) -> Self {
let (not_before_sec, _) = collection.not_before.to_unix();

Expand Down Expand Up @@ -79,17 +88,18 @@ impl Read {
value_schema_id,

journal_name: partition.spec.name.clone(),
rewrite_offsets_from,
}
}

#[tracing::instrument(skip_all,fields(journal_name=self.journal_name))]
pub async fn next_batch(
mut self,
target_bytes: usize,
target: ReadTarget,
timeout: Instant,
) -> anyhow::Result<(Self, BatchResult)> {
use kafka_protocol::records::{
Compression, Record, RecordBatchEncoder, RecordEncodeOptions, TimestampType,
Compression, Record, RecordBatchEncoder, RecordEncodeOptions,
};

let mut records: Vec<Record> = Vec::new();
Expand All @@ -109,7 +119,10 @@ impl Read {

let mut did_timeout = false;

while records_bytes < target_bytes {
while match target {
ReadTarget::Bytes(target_bytes) => records_bytes < target_bytes,
ReadTarget::Docs(target_docs) => records.len() < target_docs,
} {
let read = match tokio::select! {
biased; // Attempt to read before yielding.

Expand Down Expand Up @@ -169,6 +182,8 @@ impl Read {
ReadJsonLine::Doc { root, next_offset } => (root, next_offset),
};

let mut record_bytes: usize = 0;

let Some(doc::ArchivedNode::String(uuid)) = self.uuid_ptr.query(root.get()) else {
let serialized_doc = root.get().to_debug_json_value();
anyhow::bail!(
Expand Down Expand Up @@ -215,14 +230,14 @@ impl Read {
buf.put_i16(9999);
// ControlMessageType: unused: i16
buf.put_i16(9999);
records_bytes += 4;
record_bytes += 4;
Some(buf.split().freeze())
} else {
tmp.push(0);
tmp.extend(self.key_schema_id.to_be_bytes());
() = avro::encode_key(&mut tmp, &self.key_schema, root.get(), &self.key_ptr)?;

records_bytes += tmp.len();
record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
tmp.clear();
Some(buf.split().freeze())
Expand All @@ -236,7 +251,7 @@ impl Read {
tmp.extend(self.value_schema_id.to_be_bytes());
() = avro::encode(&mut tmp, &self.value_schema, root.get())?;

records_bytes += tmp.len();
record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
tmp.clear();
Some(buf.split().freeze())
Expand All @@ -257,7 +272,11 @@ impl Read {
//
// Note that sequence must increment at the same rate
// as offset for efficient record batch packing.
let kafka_offset = next_offset - 1;
let kafka_offset = if let Some(rewrite_from) = self.rewrite_offsets_from {
rewrite_from + records.len() as i64
} else {
next_offset - 1
};

records.push(Record {
control: is_control,
Expand All @@ -273,6 +292,7 @@ impl Read {
transactional: false,
value,
});
records_bytes += record_bytes;
}

let opts = RecordEncodeOptions {
Expand All @@ -297,12 +317,14 @@ impl Read {
metrics::counter!("dekaf_bytes_read", "journal_name" => self.journal_name.to_owned())
.increment(records_bytes as u64);

let frozen = buf.freeze();

Ok((
self,
match (records.len() > 0, did_timeout) {
(false, true) => BatchResult::TimeoutNoData,
(true, true) => BatchResult::TimeoutExceededBeforeTarget(buf.freeze()),
(true, false) => BatchResult::TargetExceededBeforeTimeout(buf.freeze()),
(true, true) => BatchResult::TimeoutExceededBeforeTarget(frozen),
(true, false) => BatchResult::TargetExceededBeforeTimeout(frozen),
(false, false) => {
unreachable!("shouldn't be able see no documents, and also not timeout")
}
Expand Down
Loading

0 comments on commit 775e028

Please sign in to comment.