-
Notifications
You must be signed in to change notification settings - Fork 532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ingest storage: proper shutdown of partitionCommitter #9436
ingest storage: proper shutdown of partitionCommitter #9436
Conversation
This `partitionCommitter` would be shut down via the services manager as soon as the service context is cancelled. This means that they shut down in parallel with the `PartitionReader`. The race comes when the `partitionCommitter` has already shut down while the `PartitionReader` is still processing some records. Then when the `PartitionReader` tries to `enqueueCommit`, that sets the atomic, but does not send this to Kafka. As a result we may not always persist the latest commit to Kafka. Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
err = services.StartManagerAndAwaitHealthy(ctx, r.dependencies) | ||
// Use context.Background() because we want to stop all dependencies when the PartitionReader stops | ||
// instead of stopping them when ctx is cancelled and while the PartitionReader is still running. | ||
err = services.StartManagerAndAwaitHealthy(context.Background(), r.dependencies) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the tradeoff here is that if we get interrupted during startup, we wouldn't respect that and we'd wait for the full startup to finish
not sure if I should try to solve that (maybe with something like context.AfterFunc()
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the PartitionReader
gets terminated (context canceled) while running PartitionReader.start()
, do we evern call stopDependencies()
at all? If I remember correctly, PartitionReader.stop()
will get called only if PartitionReader.start()
has successfully terminated, that is if it has returned nil
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we take care of it a few lines above
mimir/pkg/storage/ingest/reader.go
Lines 105 to 110 in 01d32ac
// Stop dependencies if the start() fails. | |
defer func() { | |
if returnErr != nil { | |
_ = r.stopDependencies() | |
} | |
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right, I missed it. LGTM Then.
@pracucci should we wait on your to review this or can we proceed with the merge? |
What this PR does
This
partitionCommitter
would be shut down via the services manager as soon as the service context is cancelled. This means that they shut down in parallel with thePartitionReader
. The race comes when thepartitionCommitter
has already shut down while thePartitionReader
is still processing some records. Then when thePartitionReader
tries toenqueueCommit
, that sets the atomic, but does not send this to Kafka.As a result we may not always persist the latest commit to Kafka on shutdown.
Which issue(s) this PR fixes or relates to
Fixes #8697
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]
.about-versioning.md
updated with experimental features.