Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dekaf: Detect and handle requests for documents in a limited range directly before the write-head of a collection #1701

Merged
merged 8 commits into from
Oct 18, 2024

Conversation

jshearer
Copy link
Contributor

@jshearer jshearer commented Oct 15, 2024

Background:

Dekaf fetches documents by reading bytes from a Gazette journal starting at the requested Kafka offset., skipping past partial documents if the requested offset is not on a document boundary. On the other hand, it returns records with their offsets pointing at the last inclusive byte in a document for a couple of good reasons:

// Map documents into a Kafka offset which is their last
// inclusive byte index within the document.
//
// Kafka adds one for its next fetch_offset, and this behavior
// means its next fetch will be a valid document begin offset.
//
// This behavior also lets us subtract one from the journal
// write head or a fragment end offset to arrive at a
// logically correct Kafka high water mark which a client
// can expect to read through.
//
// Note that sequence must increment at the same rate
// as offset for efficient record batch packing.
let kafka_offset = next_offset - 1;

Unfortunately, this breaks a (surprisingly not very important) assumption inherent in the Kafka protocol: a record at offset X should be fetchable by issuing a Fetch starting at offset X - 1.

The reason this isn't very important is that normally, consumers will either want to fetch all of the historical documents in a topic, or they'll want to poll the topic for new documents as they come in. In both of those cases, the consumer will first issue a ListOffsets request which we will always answer with an offset known to be the beginning of a document (or rather really the end of the previous document since fetches start at the returned offset + 1).

The case where this assumption becomes relevant is when consumers try to do things like ask for "the last N documents". Normally, the flow should work like this:

  • Consumer asks for the latest readable offset in a partition by issuing a ListOffsets with a timestamp of -1, which is the sentinel value for "largest readable offset"
  • The consumer will then subtract the number of latest documents they want to read from the returned offset, and issue aFetch starting at that offset
  • Because offsets are usually densely packed, the broker will then serve documents starting at the requested offset through the last readable offset, providing the consumer with the "last N documents"

Since Dekaf uses byte offsets instead of sequential record counters, that Fetch request will almost always point at a byte offset in the middle of a document. Dekaf will then skip past that document and, since the request was a small offset before the end of the journal, will find no more documents to read and return an empty record set.

Problem:

This is fine under normal circumstances: the consumer will see no records through the end of the partition, and either finish, or start polling for new documents. Where things break down is when this request actually needs to get the last n documents. We've found two such integration partners whose data preview UIs break when unable to fetch recent documents:

  • Tinybird's data preview UI will eventually give up after failing to fetch documents, but it will result in the new dataflow/table starting out with a barebones schema, rather than the AVRO-derived schema of the documents in the collection.
  • Startree's data preview UI, as it turns out, won't even let you proceed to create the dataflow at all if it's unable to load preview documents.

Solution:

Eventually, this will be solved by implementing committed read logic in Rust, and transitioning Dekaf to use it. That'll let Dekaf return dense document offsets instead of sparse byte offsets. There's a bunch of logic to get right here, and some unanswered questions, such as: what do we do when a consumer asks for an arbitrary document offset? Do we have to read the whole preceding fragment? Do we have to keep a stateful index of document->byte offsets in order to read through less data to get to the desired offset? etc.

In the meantime, we're making the assumption that the limited subset of requests that we can reliably identify as "fetch latest N documents" don't actually need to be served the latest N documents, and rather just need to see some representative documents from the collection. As such, this PR implements logic to detect these requests and serve some easily readable documents from the beginning of the latest fragment, with their offsets rewritten to be in the requested range.

This satisfies the consumer's request and in my testing, unblocks both Tinybird's and Startree's data preview UIs, while still allowing normal request flows to proceed as usual.


This change is Reviewable

@jshearer jshearer force-pushed the jshearer/dekaf_data_preview_uis branch 5 times, most recently from 8be16c9 to 08f0870 Compare October 16, 2024 15:16
@jshearer jshearer changed the title dekaf: Add support for reading backwards in order to properly handle data preview UIs dekaf: Detect and handle requests for documents in a limited range directly before the write-head of a collection Oct 16, 2024
@jshearer jshearer force-pushed the jshearer/dekaf_data_preview_uis branch 4 times, most recently from d2859cd to cd68e02 Compare October 16, 2024 16:20
@jshearer jshearer marked this pull request as ready for review October 16, 2024 16:53
@jshearer jshearer force-pushed the jshearer/dekaf_data_preview_uis branch from cd68e02 to 775e028 Compare October 16, 2024 17:13
Copy link
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a gut check, I worry about the new introduction of offset re-writing and the existing and new extra state tracking (e.x. latest_partition_offsets) introduced for this feature, and the potential for bugs where we inadvertently trip a non-preview request into the preview handling.

A commonality of data preview UIs is that they start a new session and the only thing they do with it is preview documents.

I'd be a lot more comfortable with a control flow where we evaluate only on the very first fetch of a session whether this might be a preview UI -- and we do expensive lookups as part of that evaluation without needing to retain them, since we do it only once -- and then tag the session as-a-whole as a "data preview" session.

From there, we have separate read paths for data preview vs regular operation that probably share some routines, but the emphasis is on keeping the contortions out of the regular operation control flow.

Thoughts?

@jshearer
Copy link
Contributor Author

Yeah, I also wanted to isolate data preview sessions from regular sessions. Unfortunately, it looks like both Startree and Tinybird use a flow where one session issues ListOffsets requests to load offsets, and then a separate session tries to Fetch those offsets, which is why I had to introduce the shared state in latest_partition_offsets. I can certainly try the approach of loading the latest fragment offset on the first Fetch of every session and using that to compare against the fetch'd offset. My primary concern is timeouts, as so far I've seen as low as 250ms max_wait_ms time.

I think if I optimistically start a non-"data preview" PendingRead while also fetching the latest offset, that shouldn't slow things down in wall clock time.

@jshearer jshearer force-pushed the jshearer/dekaf_data_preview_uis branch from 86a11c1 to 58dffbb Compare October 16, 2024 21:35
@jshearer
Copy link
Contributor Author

Ok @jgraettinger, what do you think about this? I switched it to fetch the latest offset on the first fetch request of every session, and mark that session as data preview or not. It will then forbid a data-preview tainted session from being used for non-preview fetches.

Of note, testing with Startree turns up that they actually try to do this: the same session that got the Fetch to load the data preview data gets a fetch to load docs from offset 0 (or whatever we return from timestamp -2 -- earliest fetchable offset). We error and close that session, and they re-open a new session to do that load, so everything seems to actually work fine. Going to throw up the build on dekaf-dev when it passes and test with Tinybird

@jshearer
Copy link
Contributor Author

Found an issue when testing with Tinybird: I sometimes get an error that simply says consume_failed in their data preview UI when testing with a collection that does have ongoing writes, so I'm running that down

Copy link
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % question below

Ok @jgraettinger, what do you think about this? I switched it to fetch the latest offset on the first fetch request of every session, and mark that session as data preview or not. It will then forbid a data-preview tainted session from being used for non-preview fetches.

👍 👍

crates/dekaf/src/read.rs Show resolved Hide resolved
@jgraettinger
Copy link
Member

One more question, though: what if a collection has a slow trickle of documents. A non-preview session has been reading it and is fully caught up. It disconnects, a small handful of documents arrive, and then a new session starts again to resume reading from its last offset. How do we reliably detect that this isn't a preview? Should tainting also require that we see a fetch offsets request on the same session prior to the first fetch ?

@jshearer
Copy link
Contributor Author

My understanding, though I haven't tried to reproduce this directly, is:

A consumer is reading through the documents in a topic, and reaches the end, at offset N. N is 1 less than journal's write-head, as this is the last inclusive byte offset of the last document in that journal. N will also be the offset reported by Dekaf for the last document. This consumer will then poll for offset N+1 and either get responses with no documents after the max_wait_ms passes, or get some docs if more were written.

So... I think the check for a data-preview request actually needs to be fetch_offset <= latest_offset && latest_offset - fetch_offset < 13, as any request with fetch_offset > latest_offset (which is journal write head - 1) should be polling requests.

@jshearer jshearer force-pushed the jshearer/dekaf_data_preview_uis branch from 2b5d961 to ecad3c2 Compare October 18, 2024 15:40
@jshearer jshearer force-pushed the jshearer/dekaf_data_preview_uis branch from ecad3c2 to db73eeb Compare October 18, 2024 17:54
@jshearer
Copy link
Contributor Author

With the changes from #1716 rolled in here, this now works in every scenario I test: (Tinybird, Startree, franz-go, kcat) * (collection with no activity, collection with sparse activity, collection with constant activity).

@jshearer jshearer merged commit 75a26e1 into master Oct 18, 2024
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants