Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

high number of allocations in kgo.recordToRecord function #823

Open
ortuman opened this issue Sep 19, 2024 · 4 comments
Open

high number of allocations in kgo.recordToRecord function #823

ortuman opened this issue Sep 19, 2024 · 4 comments

Comments

@ortuman
Copy link

ortuman commented Sep 19, 2024

As a result of the large volume of records generated, we’ve observed in one of our consumers a high number of allocations originated in the kgo.recordToRecord function (~65% of total allocations according to the attached screenshot, concretely in this line). This results in a performance degradation caused by GC overhead.

Screenshot 2024-09-19 at 09 59 14

Has the possibility of reusing the generated records through a pool ever been considered in order to minimize this effect? Perhaps it could be offered as an optional parameter? I could propose a PR for your review if you consider it.

@ortuman
Copy link
Author

ortuman commented Sep 19, 2024

Alternatively, perhaps the recordToRecord function may simply return a kgo.Record object by value instead of by reference, although I'm not sure if this might result in a breaking change.

@ortuman
Copy link
Author

ortuman commented Sep 23, 2024

In #827 I've proposed a possible solution to the problem.

@twmb
Copy link
Owner

twmb commented Oct 8, 2024

I see you took your proposal a good bit further in grafana#3. I used to have a sync.Pool for records internally, but removed it due to due to it being completely ineffective. Specifically with my prior usage internally, I'd pull thousands of records out of the pool (allocating them all) and then put them back in at a different independent time. GC would run, all records would be collected anyway, and the pool just wasn't useful.

PR 3 there I think is sound but is tricky to follow. I think it's trying to solve a few goals:

  1. Reuse the temporary byte slice that is created while decompressing
  2. Reuse the temporary []kmsg.Record slice that is created while processing a fetch response
  3. Reuse the end-user *kgo.Record

I think the implementation does the job, but the code is pretty sketchy to analyze. As well, I think it's not working 100% as intended in the case where you are consuming uncompressed data. Currently, if consuming uncompressed data, nothing is being acquired from the DecompressBufferPool, but reuse still releases back into the pool. If consuming compressed and uncompressed data, this byte slice that was never acquired originally (via non-compressed batches) is being acquired for decompressing batches, and the slice is persisting randomly around. If this is what's actually happening (i.e. my reading comprehension is working right now), it's fine, but certainly not the intent.

I think some of the buffers could be put back into the pool a bit more aggressively? e.g., I don't know why rcRawRecordsBuffer is on the kgo.Record field. The point (AFAICT) is to allow the []kmsg.Record slice to be reused -- which I think can be done immediately after a partition is processed (rather than holding on until the end-user processes the kgo.Record).

I'm open to a caching API, but I can't think of a great one yet. I don't think it's something that should be added always for everybody, especially not via globals.

ortuman added a commit to grafana/franz-go that referenced this issue Oct 9, 2024
ref: twmb#823 (comment)

Signed-off-by: Miguel Ángel Ortuño <ortuman@gmail.com>
ortuman added a commit to grafana/franz-go that referenced this issue Oct 9, 2024
ref: twmb#823 (comment)

Signed-off-by: Miguel Ángel Ortuño <ortuman@gmail.com>
ortuman added a commit to grafana/franz-go that referenced this issue Oct 9, 2024
ref: twmb#823 (comment)

Signed-off-by: Miguel Ángel Ortuño <ortuman@gmail.com>
ortuman added a commit to grafana/franz-go that referenced this issue Oct 9, 2024
ref: twmb#823 (comment)

Signed-off-by: Miguel Ángel Ortuño <ortuman@gmail.com>
ortuman added a commit to grafana/franz-go that referenced this issue Oct 9, 2024
)

ref: twmb#823 (comment)

Signed-off-by: Miguel Ángel Ortuño <ortuman@gmail.com>
@twmb twmb added the waiting label Oct 11, 2024
@twmb twmb mentioned this issue Jan 15, 2025
14 tasks
twmb added a commit that referenced this issue Feb 7, 2025
This slab allocates the records we will be creating, reducing the number
of allocs (and thus gc pressure), while keeping alloc size the sameish
(or reducing it a bit). This should address the main problem for 823,
but we can still add caching in a bit.

For #823.
twmb added a commit that referenced this issue Feb 7, 2025
This adds pools in the same manner that hooks exist: users can opt in to
any pool they wish. The decompress pool is the trickiest to implement --
and the trickiest to use if you implement your own decompressor.

TODO remains to add this to the request buffer.

Closes #803, #823.
@twmb
Copy link
Owner

twmb commented Feb 7, 2025

I've read through the PRs in the forked repo, I have a proposal that is a bit different.

There are three areas that the fork introduces caching to:

  • The []kmsg.Record slice (and all values inside it) that is allocated when processing a fetch response batch
  • The []byte slice that is allocated when decompressing a batch before processing
  • A pool for *kgo.Records that is taken from when creating a kgo.Record (in recordToRecord)

As well, already internally there exists one per-client cache of []byte that is used for two things: serializing requests before writing the slice to a connection (and then putting the slice back in the cache), and for formatting logs for the BasicLogger.

The fork supports caching by adding a few private fields to the kgo.Record type. These fields are effectively reference counted slices: a slice is taken from the pool, and then every record in that slice that will be returned to the end user increments the reference to that slice by one. Once you release the record (Reuse), the reference is decremented, and once the reference count hits zero, the slice is put back in the pool. As well, every record that is reused is put back in the package-global pool it is taken from.

The first problem this issue raises -- many small allocs -- can be fixed by switching to two slab allocations (allocate a slice once and then each kmsg.Record or kgo.Record is an index in that slice). That's easy.

For caching, I have a bit of a different idea.

If you have time, please take a look at my most recent commit in this PR: #904

I know I put this off for a very long time, so I imagine you may not take a look at 904 too promptly. I'm going to test it a bit and work through other issues on my 1.19 release checklist; if y'all have no opinion by the time I make it through, I'll play with the PR a bit and eventually merge it anyway.

So -- if you have time -- please take a look and let me know if the APIs introduced are usable. It leaves the implementation of caching entirely to the end user and allows the end user to opt into what caching they'd like. This also addresses #803 at the same time and exports the compressor & decompressor.

twmb added a commit that referenced this issue Feb 7, 2025
This adds pools in the same manner that hooks exist: users can opt in to
any pool they wish. The decompress pool is the trickiest to implement --
and the trickiest to use if you implement your own decompressor.

TODO remains to add this to the request buffer.

Closes #803, #823.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants