Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zstd encoder limit and raise encoder cache size #2979

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

rtreffer
Copy link
Contributor

@rtreffer rtreffer commented Sep 7, 2024

This PR addresses #2965 with a different approach.

This PR addresses 2 issues with the current implementation

  1. The number of in-use zstd encoders can exceed GOMAXPROCS if a large number of goroutines are used
  2. The number of cached encoders is too low for highly parallel sarama use, leading to repeated encoder creation and thus low throughput

The PR preserves the following property

  • Encoders are lazy created

The memory behavior of applications can change slightly.
Before applying the patch:

  • The maximum memory usage from encoders was tied to the concurrency (goroutines) but would shrink to 1 on idle

After applying the patch:

  • The maximum memory usage from encoders is tied to the peak parallelism per compression level

This should not change the worst case for the great majority of users, but it might be relevant in cases where applications were alternating between high sarama use and other uses.

There are 2 new benchmarks and a testing flag (zstdTestingDisableConcurrencyLimit) to verify the concurrency limiting.
I've also added some more information to the tests (like setting the bytes so throughput can be measured).
Here is a sample output from my machine (AMD framework 13):

# go test -benchmem -run=^$ -test.v -bench ^BenchmarkZstdMemory github.com/IBM/sarama
goos: linux
goarch: amd64
pkg: github.com/IBM/sarama
cpu: AMD Ryzen 7 7840U w/ Radeon  780M Graphics     
BenchmarkZstdMemoryConsumption
BenchmarkZstdMemoryConsumption-16                             16          68034969 ns/op        2959.16 MB/s            96.00 (gomaxprocs)               1.000 (goroutines)     21974595 B/op           815 allocs/op
BenchmarkZstdMemoryConsumptionConcurrency
BenchmarkZstdMemoryConsumptionConcurrency-16                  39          30498097 ns/op        8801.71 MB/s             4.000 (gomaxprocs)            256.0 (goroutines)       86327669 B/op          1479 allocs/op
BenchmarkZstdMemoryNoConcurrencyLimit
BenchmarkZstdMemoryNoConcurrencyLimit-16                      21          52053651 ns/op        5156.90 MB/s             4.000 (gomaxprocs)            256.0 (goroutines)       437548737 B/op         2196 allocs/op
PASS
ok      github.com/IBM/sarama   3.566s

A recent issue reported by Henry Haiying Cai showed that the current
zstd reuse logic has 2 major flaws
- There is no upper bound on created zstd encoders
- The reuse of encoders is low if many goroutines hit `zstdCompress`
  simultaniously

This is fixed by changing the original behavior in 2 ways
- There are never more than GOMAXPROCs encoders in use
- The maximum number of encoders per compression level is GOMAXPRCOS at
  some early point in time

This means we have finally an upper bound on in-use encoders and with
that a worst case memory consumption. Caching that amount of encoders
does not worsen the worst case behavior (unless many compression levels
are in use at the same time).

This should be a significant performance improvement for codebases that
generate many messages in parallel, fanning out to many partitions.

Signed-off-by: René Treffer <treffer@measite.de>
Signed-off-by: René Treffer <treffer@measite.de>
defer zstdMutex.Unlock()

limit := runtime.GOMAXPROCS(0)
for zstdCheckedOutEncoders >= limit && !zstdTestingDisableConcurrencyLimit {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Is zstdCheckedOutEncoders and the capacity of of zstdEncoderChannel are doing the same kind of control?
  2. Is seems hard to follow if the limit value from line 49 are different from line 52 or line 29, might need some comment or explanation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some process could have changed the GOMAXPROCS in between calls. The proper behavior after calling a sync.Semaphore.Wait() is to refresh all values before performing any tests.

This gives a nice speedup at high compresison levels with no changes for
message sizes at or below 1MB (the default).

Signed-off-by: René Treffer <treffer@measite.de>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants