Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka replay speed: add bytes limit for inflight fetch requests #9892

Merged

Conversation

dimitarvdimitrov
Copy link
Contributor

Problem

OOMs happen because the concurrency * records-per-fetch product is too large. We hold too many records in memory and don't consume them as fast as we fetch them. The goal is to have a setting to control the memory consumption of the process. In the future it may be scaled with the ingester's memory request for example.

Proposal

Instead of configuring records-per-fetch we should configure the maximum memory for all fetchers. We can work backwards to devise the number of records per fetchWant from the number of concurrent fetchers.

I propose to remove these flags

-ingest-storage.kafka.ongoing-records-per-fetch=30
-ingest-storage.kafka.startup-records-per-fetch=512

and replace them with a single flag

-ingest-storage.kafka.max-buffered-bytes=100000000 # 100MB

Each fetchWant still needs to have a well defined startOffset and endOffset. The number of records of each fetchWant will be ($max_inflight_bytes / $startup_concurrency) / $bytes_per_record. We already track the average bytes_per_record of the last few fetches, but the value there can be volatile. Maybe we can also invest in a better tracking of average bytes.

What this PR does

This PR is the first step to doing the above: adding a limit for the inflight bytes.

-ingest-storage.kafka.max-buffered-bytes

The limit works by controlling the fetchWants. We don't dispatch a fetchWant if its MaxBytes would exceed the limit. The idea is that MaxBytes of the request are usually a good estimation of how much data we have to fetch.

This does have a caveat that if estimation is bad, we'd continue and still risk OOMing.

I didn't add two separate limits for startup and ongoing because we're currently setting them both to the same value and the short/medium-term plan is to only have a single config option.

Next steps

The next step would be to remove the records per fetch config options altogether and only rely on max bytes.

Note to reviewers

This is based on #9891 because that PR fixes flaky tests. The changes aren't otherwise dependant.

Checklist

  • Tests updated.
  • Documentation added.
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX].
  • about-versioning.md updated with experimental features.

@dimitarvdimitrov dimitarvdimitrov requested review from a team and tacole02 as code owners November 13, 2024 17:16
Copy link
Contributor

@tacole02 tacole02 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much, @dimitarvdimitrov !

pkg/storage/ingest/config.go Outdated Show resolved Hide resolved
pkg/storage/ingest/fetcher.go Outdated Show resolved Hide resolved
pkg/storage/ingest/reader.go Show resolved Hide resolved
pkg/storage/ingest/fetcher.go Outdated Show resolved Hide resolved
pkg/storage/ingest/fetcher.go Outdated Show resolved Hide resolved
pkg/storage/ingest/fetcher.go Outdated Show resolved Hide resolved
pkg/storage/ingest/fetcher.go Outdated Show resolved Hide resolved
@@ -1043,6 +1084,12 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, bufferedReco
lastConsumedOffset: lastConsumedOffset,
kprom: NewKafkaReaderClientMetrics(component, reg),
}

m.Service = services.NewTimerService(100*time.Millisecond, nil, func(context.Context) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see anyone starting/stopping this service.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's started here

r.dependencies, err = services.NewManager(r.committer, r.offsetReader, r.consumedOffsetWatcher, startOffsetReader, r.metrics)
if err != nil {
return errors.Wrap(err, "creating service manager")
}
// Use context.Background() because we want to stop all dependencies when the PartitionReader stops
// instead of stopping them when ctx is cancelled and while the PartitionReader is still running.
err = services.StartManagerAndAwaitHealthy(context.Background(), r.dependencies)
if err != nil {
return errors.Wrap(err, "starting service manager")
}

and then stopped here

func (r *PartitionReader) stopDependencies() error {
if r.dependencies != nil {
if err := services.StopManagerAndAwaitStopped(context.Background(), r.dependencies); err != nil {
return errors.Wrap(err, "stopping service manager")
}
}

Copy link
Collaborator

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work and nice tests! Pre-approving, but please remember to start/stop the reader metrics service.

nextFetch = nextFetch.Next(recordsPerFetch)

case result, moreLeft := <-refillBufferedResult:
if !moreLeft {
if pendingResults.Len() > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've lost this if check (removeNextResult assumes the list is non empty). Is it a problem?

To my understanding it's not a problem because, with the new logic, we have the guarantee that if refillBufferedResult is valued that there's at least 1 item in the list (the fetch we're currently reading from, which is set to refillBufferedResult itself). Is my understanding correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's right. Now instead of keeping nextResult as state we compute it on every iteration. The invariants from before still hold

pkg/storage/ingest/fetcher.go Show resolved Hide resolved
pkg/storage/ingest/fetcher_test.go Outdated Show resolved Hide resolved
pkg/storage/ingest/fetcher_test.go Show resolved Hide resolved
@dimitarvdimitrov dimitarvdimitrov changed the base branch from dimitar/ingest/replay-speed/avoid-double-fetching to main November 14, 2024 14:52
dimitarvdimitrov and others added 18 commits November 14, 2024 16:52
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

� Conflicts:
�	pkg/storage/ingest/fetcher.go
�	pkg/storage/ingest/fetcher_test.go
�	pkg/storage/ingest/reader.go
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Co-authored-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
@dimitarvdimitrov dimitarvdimitrov force-pushed the dimitar/ingest/replay-speed/inflight-fetch-bytes-limit branch from 07ed98b to 65b3769 Compare November 14, 2024 14:53
@dimitarvdimitrov dimitarvdimitrov enabled auto-merge (squash) November 14, 2024 14:53
@dimitarvdimitrov dimitarvdimitrov merged commit cacf8c5 into main Nov 14, 2024
30 checks passed
@dimitarvdimitrov dimitarvdimitrov deleted the dimitar/ingest/replay-speed/inflight-fetch-bytes-limit branch November 14, 2024 16:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants