Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telegraf should have a way of flushing based on buffer size rather than time #666

Closed
tripledes opened this issue Feb 8, 2016 · 9 comments · Fixed by #699
Closed

Telegraf should have a way of flushing based on buffer size rather than time #666

tripledes opened this issue Feb 8, 2016 · 9 comments · Fixed by #699

Comments

@tripledes
Copy link

Today we've faced an issue with Kafka consumer plugin when we've lost data, currently not a big deal as we're in POC stage.

The thing is that having a look at the code (and hopefully having understood it), the buffer implemented in the plugin seems to defeat the whole purpose of having Kafka as (kinda) safety measure for not losing metrics.

If I understood it properly, the buffer actually buffers up to its size waiting for Gather() to be called at the configured interval, so I guess this is the actual point for having such a mechanism implemented.

I'm guessing that currently the call to Gather() cannot be skipped somehow, can it? I understand the method has to be present to satisfy the interface, but can it return something that will make telegraf happy while the plugin keeps pushing messages and not buffering them? Or at least not dropping them?

Again, if my understanding is correct, I see few scenarios where metrics could be dropped rendering the whole setup a bit useless.

I'm sorry I cannot provide a better solution for the problem as buffering but following another strategy might solve this issue but might cause others (like memory issues).

@sparrc
Copy link
Contributor

sparrc commented Feb 8, 2016

@tripledes this could be done, but you'll really just be pushing the problem off to the next buffer, which is the output buffer. (Outputs also buffer metrics until their flush interval is called)

The only real solution to what you're describing is to set very low collection and flush intervals to avoid buffering a large number of metrics.

@tripledes
Copy link
Author

@sparrc I see. We'll try to tweak telegraf a bit and see how it goes, we can always write our own consumer.

Thanks!

@sparrc
Copy link
Contributor

sparrc commented Feb 10, 2016

@tripledes How did the tweaking go? I think I'm going to re-open this because you bring up a good point, which is that Telegraf maybe should have ways of bypassing the typical serialized Gather -> buffer -> Flush flow.

Mainly I'm thinking that this would be useful for the message broker consumers (we have Kafka now and will have MQTT soon).

One possibility might be to provide a "max_flush_buffer" option, so that Telegraf will simply flush whenever that gets filled up, rather than waiting for flush_interval.

@sparrc sparrc reopened this Feb 10, 2016
@sparrc sparrc changed the title Kafka (input) consumer plugin feels rather inflexible Telegraf should have a way of flushing based on buffer size rather than time Feb 10, 2016
@sparrc
Copy link
Contributor

sparrc commented Feb 10, 2016

Another question is: Should "ServiceInputs" have access to the "accumulator" when they are started, rather than only when Gather(acc) is called? If this were the case, then they could continuously add to the accumulator, and Gather would just be a dummy function, as you suggested earlier.

@tripledes
Copy link
Author

I'm sorry to say that last night we had a storage issue on the test environment and the tweaking will have to wait a bit. Pleas keep in mind that the environment is rather small currently and our thoughts regarding the issue were more about the flexibility of the implementation at a bigger scale.

We really think that somehow bypassing the interval can be beneficial for Kafka and as you suggested for more systems, but still your point was a food one, buffering somewhere else can bring other issues, I guess like memory, limits on the output system...

BTW, who's generating the timestamps? Sorry I don't have a computer right now, is it the output? In the accumulator?

@sparrc
Copy link
Contributor

sparrc commented Feb 10, 2016

If there is a timestamp in the parsed data then that is used, otherwise it's set when accumulator.Add() is called (which is usually called within Gather())

@tripledes
Copy link
Author

I see, I checked today to verify that we were getting the timestamp already on Kafka, it's there...I just wanted to be sure the timestamps were generated on collection so even if the metrics are pushed to the output with a slight delay, they have proper timestamp.

I think the idea of using a max_flush_buffer would come closer to what we had in mind, cause pushing to the accumulator and waiting for the points being pushed to the output on the next interval might not be the desired behaviour, I guess that some users (as ourselves) might want something more like a stream of data flowing from Kafka to the output.

Would it be crazy/stupid to have the concept of services as real services? I mean something that runs independently and having access to the outputs (or anything required for this matter).

@sparrc
Copy link
Contributor

sparrc commented Feb 11, 2016

We could....but I think that some users may actually prefer to batch points and write them together, because this is at least better for influxdb write and network traffic load, not sure how it affects other outputs though.

@netixen
Copy link
Contributor

netixen commented Feb 13, 2016

@sparrc

Another question is: Should "ServiceInputs" have access to the "accumulator" when they are started, rather than only when Gather(acc) is called?

That would be very useful. When writing the NATS consumer I noticed we would loose consumed metrics if Telegraf is shutdown even gracefully. If we have access to the accumulator then we can flush it inside Stop(). Also we can use the access to the accumulator for different batching strategies based on the "topic/subject". I can contribute to a PR if there is interest.

sparrc added a commit that referenced this issue Feb 16, 2016
sparrc added a commit that referenced this issue Feb 16, 2016
sparrc added a commit that referenced this issue Feb 16, 2016
sparrc added a commit that referenced this issue Feb 16, 2016
sparrc added a commit that referenced this issue Feb 16, 2016
sparrc added a commit that referenced this issue Feb 16, 2016
sparrc added a commit that referenced this issue Feb 16, 2016
sparrc added a commit that referenced this issue Feb 16, 2016
sparrc added a commit that referenced this issue Feb 17, 2016
sparrc added a commit that referenced this issue Feb 17, 2016
sparrc added a commit that referenced this issue Feb 17, 2016
sparrc added a commit that referenced this issue Feb 17, 2016
sparrc added a commit that referenced this issue Feb 17, 2016
sparrc added a commit that referenced this issue Feb 17, 2016
sparrc added a commit that referenced this issue Feb 17, 2016
sparrc added a commit that referenced this issue Feb 17, 2016
this includes:
- Add Accumulator to the Start() function of service inputs
- For message consumer plugins, use the Accumulator to constantly add
  metrics and make Gather a dummy function
- rework unit tests to match this new behavior.
- make "flush_buffer_when_full" a config option that defaults to true

closes #666
sparrc added a commit that referenced this issue Feb 17, 2016
this includes:
- Add Accumulator to the Start() function of service inputs
- For message consumer plugins, use the Accumulator to constantly add
  metrics and make Gather a dummy function
- rework unit tests to match this new behavior.
- make "flush_buffer_when_full" a config option that defaults to true

closes #666
geodimm pushed a commit to miketonks/telegraf that referenced this issue Mar 10, 2016
this includes:
- Add Accumulator to the Start() function of service inputs
- For message consumer plugins, use the Accumulator to constantly add
  metrics and make Gather a dummy function
- rework unit tests to match this new behavior.
- make "flush_buffer_when_full" a config option that defaults to true

closes influxdata#666
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants