-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
influxdb: Concurrent PeriodicFlusher #2190
Conversation
On a side-note, can you also change the names of the file from |
Regarding the file name, in general I agree with @inancgumus, but in this specific case, I think If they look at https://pkg.go.dev/go.k6.io/k6/output, the file name doesn't matter, they mostly see the types, the filenames are at the very bottom (or if you click to see the implementation of something). But if they open https://github.com/grafana/k6/tree/master/output, they see 3 very simple files that should be almost self-explanatory:
Contrast this to something like |
Fair enough. What about:
(or without an -er suffix to not to confuse it with an interface) IMHO, it's practical, makes more sense, and explains what it provides. |
Yes, but that explanation only makes sense if someone is familiar with the outputs enough to know that these are actually helpers 😅 Otherwise, |
OK, then 😅 You know, when you call something a helper, you can put anything in it. There are no limits. |
There are - code reviews 😄 As I said, in general, I completely agree with you, just not about this specific case 😅 And k6 definitely suffers from the problem you're worried about in a few places, most notably with |
4615d9b
to
e2ecd79
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work!
fac5d8b
to
a173d72
Compare
@na-- I also added the log for warning about long execution of the flush metrics function. It could be very noisy in some situations. Do you think is something acceptable from a UX perspective? An experienced user can detect the same thing by inspecting the debug logs with the |
b3a579e
to
6df9b57
Compare
6df9b57
to
46f68c8
Compare
output/helpers.go
Outdated
case <-ticker.C: | ||
limiter <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm this may not be ideal 😞
It will work fine when the InfluxDB instance is able to keep up with the load, but if ever starts slowing down, the AsyncPeriodicFlusher
will be stuck here, since all of the allowed goroutines will be started and limiter
will be full. But the metrics data will keep coming to the output, so the next flushCallback()
will have more data to flush and thus have to do more work and send a bigger chunk to InfluxDB. And so on and so forth, potentially taking more and more time to free up a slot in limiter
and sending ever bigger chunks of data.
The way it was implemented before, we always started a new goroutine:
k6/stats/influxdb/collector.go
Lines 102 to 113 in 2193de0
for { | |
select { | |
case <-ticker.C: | |
c.wg.Add(1) | |
go c.commit() | |
case <-ctx.Done(): | |
c.wg.Add(1) | |
go c.commit() | |
c.wg.Wait() | |
return | |
} | |
} |
And we always got the currently buffered data in a local buffer in that goroutine before we potentially waited for the semaphore to have a free slot:
k6/stats/influxdb/collector.go
Lines 128 to 138 in 2193de0
func (c *Collector) commit() { | |
defer c.wg.Done() | |
c.bufferLock.Lock() | |
samples := c.buffer | |
c.buffer = nil | |
c.bufferLock.Unlock() | |
// let first get the data and then wait our turn | |
c.semaphoreCh <- struct{}{} | |
defer func() { | |
<-c.semaphoreCh | |
}() |
We just waited to send the actual network requests until we had a free slot. So the chunk sizes were smaller, but we could potentially have a whole bunch of started goroutines, waiting, with big chunks of data in memory... But at least these chunks would have been roughly the same size, so InfluxDB likely wouldn't have choked on one.
Not sure which is better, and to be honest, given that InfluxDB v1 is dead, it probably doesn't really matter. But if we want to bring the old way back, we can't do it with a new helper, it needs to be done in the InfluxDB output itself. It should be easy to do, we can use the old PeriodicFlusher
and just spin up the goroutine immediately at the start of the output's flushMetrics()
method 🤷♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it was done in this way to maintain the same logic with the current sync (before we decided to split). We have the same problem with the Stop
procedure.
With the current code, I have these suggestions:
- Add another
select
for skipping requests so we can avoid the stuck (not optimal for performances):
select {
case limiter <- struct{}{}:
go func() {
....
}
default:
continue LOOP
}
- Add timeout for requests, in this way, we could guarantee that also
Stop
doesn't stuck forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and to be honest, given that InfluxDB v1 is dead, it probably doesn't really matter.
This should be used also from the InfluxDB v2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was done so in influxdbv1 for a couple of reasons IIRC:
- influxdbv1 has an upper limit to the size of the post, which means that just continuously increasing that is ... probably not great ;)
- influxdbv1 does at least non zero amount of the ingest while it reads the request, which means it takes quite a while for big requests.
- from my experiments pushing multiple requests was faster/better which was tried after I noticed there is CPU not used, but influxdb uses just 1 core (from a quick look at htop, so grains of salt and all that jazz).
All of this was done by me probably 2 years ago, as far as I remember within a day as a quick try to make the influxdbv1 output more ... stable as it previously was getting to writing 50mb+ requests which were taking upwards of 60s regardless of how you configure influxdb.
Whether the above is applicable for another output is a question I can't answer. The cloud output for example has parallel pushing as well but it's after it has things aggregated and as such can't be in the PeriodicFlusher(which it also doesn't use).
Now given that I broke the concurrent writes in the influxdbv1 a few months ago when I moved it to an output, it seems to me that it might not have worked all that much better as nobody has come to complain. Also if I have to add it again it will literally be to add go func() {
and }
around the parts that pushes after the PeriodicFlusher has flushed.
Arguably the additional change (that I decided I don't want to spend the time IIRC) is to split the samples so there aren't more than a certain (configurable) amount and push those concurrently if necessary. But this likely will be terrible for some other output that will ingest concurrently but we now push to it in multiple requests.
To be honest given the IMO ease of implementing whatever concurrency push logic on top of the current PeriodicFlusher and that IMO it will need to be different depending on the output, I am for scrapping this additional utility type and just writing the 4 lines in each output that needs it if they do and I still don't know if influxdbv2 will actually be helped by this 🤷 .
Additionally, arguably a lot of the problems is that if you have 1k VUs doing 5 iters/s you need to write 5k iteration sample a second ... which probably should all be combined in 1 sample, doing that will likely fix all of the problems ;)
Also AsyncPeriodicFlusher
seems like the wrong name ... maybe ConcurrentPeriodicFlusher
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also if I have to add it again it will literally be to add
go func() {
and}
around the parts that pushes after the PeriodicFlusher has flushed.
To be honest given the IMO ease of implementing whatever concurrency push logic on top of the current PeriodicFlusher and that IMO it will need to be different depending on the output, I am for scrapping this additional utility type and just writing the 4 lines in each output that needs it if they do and I still don't know if influxdbv2 will actually be helped by this
Yeah, you are probably right, this new helper seems to be more harm than help after all 😞 Restoring the original async logic, with smaller (even if not constant) chunk sizes, back in the influxdb code and using the old PeriodicFlusher
seems to be the best way to go here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using the old PeriodicFlusher seems to be the best way to go here...
@na-- to be sure, are we saying we want to remove the PeriodicFlusher
and implement the ticker with the concurrent flush directly in the influxdb
output, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should:
- remove (i.e. not add)
NewAsyncPeriodicFlusher
- leave
PeriodicFlusher
how it used to be - also don't change how the old
PeriodicFlusher
was used in theinfluxdb
output before this PR - leave the semaphore code in there as well, but do everything in the
flushMetrics()
method after thesamples := o.GetBufferedSamples()
in a new goroutine (with an added waitgroup to ensure we flush everything beforeStop()
ends):Lines 203 to 208 in aab12d5
samples := o.GetBufferedSamples() o.semaphoreCh <- struct{}{} defer func() { <-o.semaphoreCh }()
I think this would mimic the old InfluxDB behavior, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @na-- that this should happen for the old influxdb output, whether this should be a priority is a different question, but it should be fairly straightforward.
For the new one(influxdbv2), I am still interested in a real-world test with and without the upstream influxdb library's async writer instead of the bad benchmark I have written. I would expect that influxdbv2 will handle ingestion better than the old one and maybe even use multiple cores/goroutines for the ingestion without needing to send multiple requests 🤞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still interested in a real-world test with and without the upstream influxdb library's async writer instead of the bad benchmark I have written
an attempt about it can be found in the influxv2 PR grafana/xk6-output-influxdb#2
Just a suggestion: once the logic of spawning in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turning the above ^^ into a "request change"
46f68c8
to
b4afa19
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, and sorry for the whole whole confusion with me suggesting the async helper 😞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I think we might be (in general) writing too many debug messages in outputs, but this is fine and unrelated to the problem at hand so you can ignore it :)
output/influxdb/output.go
Outdated
return nil | ||
} | ||
|
||
func (o *Output) flushMetrics() { | ||
samples := o.GetBufferedSamples() | ||
if len(samples) < 1 { | ||
o.logger.Debug("Any buffered samples, skipping the flush operation") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for a message IMO, this will just catch cases where someone is running very light script that has really big response times
b4afa19
to
dd6cef2
Compare
@yorugac added the test for asserting concurrency and rate limiting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I meant that test for new periodic flusher type -- now it's specific for InfluxDB output... 🤔 But that's a good check to have anyway 🙂 LGTM 👍
Closes #2185