-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(batchprocessor): forking batchprocessor #79
(batchprocessor): forking batchprocessor #79
Conversation
errors = multierr.Append(errors, err) | ||
|
||
bpt.timeoutTriggerSend, err = meter.Int64Counter( | ||
processorhelper.BuildCustomMetricName(typeStr, "timeout_trigger_send"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not very familiar with the batch processor, so my comment might be off-base. According to the instrumentation, the batch processor has two types of triggers: timeout and max batch size reached. I'm wondering if that's sufficient to control the memory usage of this processor. What happens if the number of distinct metadata value combinations is very high? Could we introduce a third type of trigger that sends the current batches when the total number of entries across all batches reaches a specific threshold in order to keep the overall memory usage under control?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see #80.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review! Yeah this is the first in a series of PRs to enhance the processor. One way of controlling in flight memory usage is that the processor will block requests from being added https://github.com/open-telemetry/opentelemetry-collector/blob/main/processor/batchprocessor/batch_processor.go#L271 if the processing queue is full. In a later PR memory efficiency will be enhanced by controlling admission to the queue with a semaphore based on the size(in bytes) of the request. Unsure if a new trigger would be necessary, but will keep that in mind as we implement our enhancements and observe memory usage in testing
"go.opentelemetry.io/collector/pdata/plog" | ||
) | ||
|
||
// splitLogs removes logrecords from the input data and returns a new data of the specified size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious to understand what happens to the log records that are located after the specified size is exceeded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current behavior of this code, i.e., what's being forked in this PR by copying the core batchprocessor, is behavior we want to change. Currently, the batch processor can send at most one batch in parallel, so it requires certain downstream behavior (i.e., the exporterhelper's queue sender) to function well.
So, you'll see more about this in a subsequent PR, where we fix the behavior. The high-level picture that's missing from this dump of copied code, is that it uses pdata objects to accumulate points in a FIFO manner. Batches are assembled in a single pending pdata
object and then when there are enough points/spans/records, when the pending size is too big it will be split by taking the first points out into a new batch, leaving a residual pdata object. We will ensure that the points in the residual are always the first to go next, which means up to a timeout but no more than one timeout for every point.
Splitting up #71 into 2 PR's. This first PR is just simply forking the batchprocessor into this repository before applying further changes on top.
Part of #80.