-
Notifications
You must be signed in to change notification settings - Fork 514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor the PusherConsumer
interactions
#9133
Refactor the PusherConsumer
interactions
#9133
Conversation
@@ -24,21 +24,28 @@ import ( | |||
"github.com/grafana/mimir/pkg/util/spanlogger" | |||
) | |||
|
|||
const shardForSeriesBuffer = 2000 // TODO dimitarvdimitrov 2000 is arbitrary; the idea is that we don't block the goroutine calling PushToStorage while we're flushing. A linked list with a sync.Cond or something different would also work | |||
|
|||
type Pusher interface { | |||
PushToStorage(context.Context, *mimirpb.WriteRequest) error | |||
} | |||
|
|||
type PusherCloser interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also wanted to refactor this but turned out to be too big of a change and decided against it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice one! I think there's a place where we can have a deadlock and a place where checking the channel every time is too expensive. Otherwise, this is definitely an improvement.
You mentioned some race condition, but I couldn't spot any. Isn't the test failure caused by different series sharding on different CPU architectures?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'll take another look of the whole PR in about 30m, going into a meeting now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, i've reviewed the whole PR. I think there's another place where we might lose data
a55e417
to
08a724a
Compare
- Introduce a new `BatchingQueue` to reveal the intentions of a queue per shard. - Removed the need to call `Close` as we pushed data into TSDB from the main loop. This was really confusing given we were using close as a semantic to "no more items are coming" and ensure any incomplete batches were done. - Renamed and moved the `noopPusherCloser` which is more of an alternative way to push data without any sort of concurrency. - Inlined a few methods to get rid of certain level of indirection that made the code harder to understand.
08a724a
to
8a8a7c2
Compare
16f8a12
into
dimitar/ingester/consume-latency-push-sharding
What this PR does
BatchingQueue
to reveal the intentions of a queue per shard.Close
as we pushed data into TSDB from the main loop. This was really confusing given we were using close as a semantic to "no more items are coming" and ensure any incomplete batches were done.noopPusherCloser
which is more of an alternative way to push data without any sort of concurrency.Which issue(s) this PR fixes or relates to
N/A
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]
.about-versioning.md
updated with experimental features.