Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*kgo.Record pooling support #1

Merged
merged 1 commit into from
Sep 23, 2024
Merged

Conversation

ortuman
Copy link
Collaborator

@ortuman ortuman commented Sep 19, 2024

Allow reusing *kgo.Record objects returned from PollFetches/PollRecords methods.

@ortuman ortuman changed the title kgo.Record pooling support *kgo.Record pooling support Sep 19, 2024
@ortuman ortuman force-pushed the ortuman/reduce-kgo-record-alloc branch 2 times, most recently from 1ab7084 to 0fcbe13 Compare September 19, 2024 13:59
@ortuman ortuman marked this pull request as draft September 20, 2024 09:05
@ortuman ortuman force-pushed the ortuman/reduce-kgo-record-alloc branch from 0fcbe13 to fc5a7a3 Compare September 20, 2024 15:38
Signed-off-by: Miguel Ángel Ortuño <ortuman@gmail.com>
@ortuman ortuman force-pushed the ortuman/reduce-kgo-record-alloc branch from a85f7a4 to 7dbbd49 Compare September 23, 2024 07:49
//
// This option is particularly useful for use cases where the volume of generated records is very high,
// as it can negatively impact performance due to the extra GC overhead.
func EnableRecordsPool() ConsumerOpt {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Truth is, I'm not 100% convinced about adding this option. For simplicity, we could always fetch the records directly from the pool, since if the user doesn't explicitly invoke the Reuse method, these will essentially always end up being allocated from the heap.

@ortuman ortuman marked this pull request as ready for review September 23, 2024 08:04
Copy link
Collaborator

@replay replay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx!

@ortuman ortuman merged commit 133b3a2 into master Sep 23, 2024
1 check passed
ortuman added a commit that referenced this pull request Oct 3, 2024
ortuman added a commit that referenced this pull request Oct 3, 2024
* fetching: export utilities for decompressing and parsing partition retch responses

### Background

In grafana/mimir we are working towards making fetch requests ourselves. The primary reason behind that is that individual requests to the kafka backend are slow, so doing them sequentially per partition becomes the bottleneck in our application. So we want to fetch records in parallel to speed up the consumption.

One difficulty I met when issuing `FetchRequest`s ourselves is that parsing the response is non-trivial. That's why I'm proposing to export these functions for downstream projects to use.

Alternatively, I can also try contributing the concurrent fetching logic. But I believe that is much more nuanced and with more tradeoffs around fetched bytes and latency. So I wasn't sure whether it's a good fit for a general purpose library. I'm open to discuss this further.

### What this PR does

Moves `(*kgo.cursorOffsetNext).processRespPartition` from being a method to being a standalone function - `kgo.processRespPartition`. There were also little changes necessary to make the interface suitable for public use (like removing the `*broker` parameter).

### Side effects

To minimize the necessary changes and the API surface of the package I opted to use a single global decompressor for all messages. Previously, there would be one decompressor per client and that decompressor would be passed down to `(*cursorOffsetNext).processRespPartition`. My understanding is that using different pooled readers (lz4, zst, gzip) shouldn't have a negative impact on performance because usage patterns do not affect the behaviour of the reader (for example, a consistent size of decompressed data doesn't make the reader more or less efficient). I have not thoroughly verified or tested this - Let me know if you think that's important.

An alternative to this is to also export the `decompressor` along with `newDecompressor()` and the auxiliary types for decompression.

* Restore multiline processV0OuterMessage

* `*kgo.Records` pooling support

Signed-off-by: Miguel Ángel Ortuño <ortuman@gmail.com>

* Merge pull request #1 from grafana/ortuman/reduce-kgo-record-alloc

`*kgo.Record` pooling support

* fetching: export utilities for decompressing and parsing partition retch responses

* Merge pull request #4 from dimitarvdimitrov/dimitar/grafana-master-with-export-partition-parsing-utils

fetching: export utilities for decompressing and parsing partition fetch responses

* Merge pull request #3 from ortuman/reduce-decompression-buffer-allocations

Signed-off-by: Miguel Ángel Ortuño <ortuman@gmail.com>

---------

Signed-off-by: Miguel Ángel Ortuño <ortuman@gmail.com>
Co-authored-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants