Skip to content

Commit

Permalink
Clarify that memory limiter refuses data, doesn't drop it
Browse files Browse the repository at this point in the history
Contributes to #1084

- Clarify what the memory limiter does.
- Set expectations from receivers, how they are supposed to react
  when the memory limiter refuses the data.
- Add a test that demonstrates that memory limiter does not lose data
  if the receiver and exporter behave according to the contract.

All receivers must adhere to this contract. See for example
an issue opened against filelog receiver:
open-telemetry/opentelemetry-collector-contrib#20511

Note that there are no functional changes to the memory limiter.

Future work: one additional thing we can do is implement a backoff
logic in the memory limiter. When in memory limited mode the processor
can introduce pauses before it returns from the ConsumeLogs/Traces/Metrics
call. This will allow to slow down the inflow of data into the Collector
and give time for the pipeline to clear and memory usage to return to the
normal. This needs to be explored further.
  • Loading branch information
tigrannajaryan committed Mar 31, 2023
1 parent d4c25d4 commit a3c5432
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 60 deletions.
28 changes: 20 additions & 8 deletions processor/memorylimiterprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,32 @@ on the configured processors, it is important to put checks in place regarding
memory usage.

The memory_limiter processor allows to perform periodic checks of memory
usage if it exceeds defined limits will begin dropping data and forcing GC to reduce
usage if it exceeds defined limits will begin refusing data and forcing GC to reduce
memory consumption.

The memory_limiter uses soft and hard memory limits. Hard limit is always above or equal
the soft limit.

When the memory usage exceeds the soft limit the processor will start dropping the data and
return errors to the preceding component it in the pipeline (which should be normally a
receiver).
When the memory usage exceeds the soft limit the processor will enter the memory limited
mode and will start refusing the data by returning errors to the preceding component
in the pipeline that made the ConsumeLogs/Trace/Metrics function call.
The preceding component should be normally a receiver.

When the memory usage is above the hard limit in addition to dropping the data the
In memory limited mode the error returned by ConsumeLogs/Trace/Metrics function is a
non-permanent error. When receivers see this error they are expected to retry sending
the same data. The receivers may also apply a backpressure to their data sources
in order to slow down the inflow of data into the Collector and allow the memory usage
to go below the limits.

>Warning: if the component preceding the memory limiter in the pipeline does not correctly
retry and send the data again after ConsumeLogs/Trace/Metrics functions return then that
data will be permanently lost. We consider such components incorrectly implemented.

When the memory usage is above the hard limit in addition to refusing the data the
processor will forcedly perform garbage collection in order to try to free memory.

When the memory usage drop below the soft limit, the normal operation is resumed (data
will not longer be dropped and no forced garbage collection will be performed).
will no longer be refused and no forced garbage collection will be performed).

The difference between the soft limit and hard limits is defined via `spike_limit_mib`
configuration option. The value of this option should be selected in a way that ensures
Expand All @@ -39,8 +50,9 @@ A good starting point for `spike_limit_mib` is 20% of the hard limit. Bigger
Note that while the processor can help mitigate out of memory situations,
it is not a replacement for properly sizing and configuring the
collector. Keep in mind that if the soft limit is crossed, the collector will
return errors to all receive operations until enough memory is freed. This will
result in dropped data.
return errors to all receive operations until enough memory is freed. This may
eventually result in dropped data since the receivers may not be able to hold back
and retry the data indefinitely.

It is highly recommended to configure `ballastextension` as well as the
`memory_limiter` processor on every collector. The ballast should be configured to
Expand Down
3 changes: 1 addition & 2 deletions processor/memorylimiterprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
// limitations under the License.

// Package memorylimiterprocessor provides a processor for OpenTelemetry Service pipeline
// that drops data on the pipeline according to the current state of memory
// usage.
// that refuses data on the pipeline according to the current state of memory usage.
package memorylimiterprocessor // import "go.opentelemetry.io/collector/processor/memorylimiterprocessor"

import (
Expand Down
78 changes: 78 additions & 0 deletions processor/memorylimiterprocessor/internal/mock_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal"

import (
"context"
"sync/atomic"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
)

type MockExporter struct {
destAvailable int64
acceptedLogCount int64
deliveredLogCount int64
Logs []plog.Logs
}

var _ consumer.Logs = (*MockExporter)(nil)

func (e *MockExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}

func (e *MockExporter) ConsumeLogs(_ context.Context, ld plog.Logs) error {
atomic.AddInt64(&e.acceptedLogCount, int64(ld.LogRecordCount()))

if atomic.LoadInt64(&e.destAvailable) == 1 {
// Destination is available, immediately deliver.
atomic.AddInt64(&e.deliveredLogCount, int64(ld.LogRecordCount()))
} else {
// Destination is not available. Queue the logs in the exporter.
e.Logs = append(e.Logs, ld)
}
return nil
}

func (e *MockExporter) SetDestAvailable(available bool) {
if available {
// Pretend we delivered all queued accepted logs.
atomic.AddInt64(&e.deliveredLogCount, e.acceptedLogCount)

// Get rid of the delivered logs so that memory can be collected.
e.Logs = nil

// Now mark destination available so that subsequent ConsumeLogs
// don't queue the logs anymore.
atomic.StoreInt64(&e.destAvailable, 1)

} else {
atomic.StoreInt64(&e.destAvailable, 0)
}
}

func (e *MockExporter) AcceptedLogCount() int {
return int(atomic.LoadInt64(&e.acceptedLogCount))
}

func (e *MockExporter) DeliveredLogCount() int {
return int(atomic.LoadInt64(&e.deliveredLogCount))
}

func NewMockExporter() *MockExporter {
return &MockExporter{}
}
72 changes: 72 additions & 0 deletions processor/memorylimiterprocessor/internal/mock_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal"

import (
"context"
"strings"
"sync"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/plog"
)

type MockReceiver struct {
ProduceCount int
NextConsumer consumer.Logs
lastConsumeResult error
mux sync.Mutex
}

func (m *MockReceiver) Start() {
go m.produce()
}

// This function demonstrates how the receivers should behave when the ConsumeLogs/Traces/Metrics
// call returns an error.
func (m *MockReceiver) produce() {
for i := 0; i < m.ProduceCount; i++ {
// Create a large log to consume some memory.
ld := plog.NewLogs()
lr := ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
kiloStr := strings.Repeat("x", 10*1024)
lr.SetSeverityText(kiloStr)

retry:
// Send to the pipeline.
err := m.NextConsumer.ConsumeLogs(context.Background(), ld)

// Remember the result to be used in the tests.
m.mux.Lock()
m.lastConsumeResult = err
m.mux.Unlock()

if err != nil {
// Sending to the pipeline failed.
if !consumererror.IsPermanent(err) {
// Retryable error. Try the same data again.
goto retry
}
// Permanent error. Drop it.
}
}
}

func (m *MockReceiver) LastConsumeResult() error {
m.mux.Lock()
defer m.mux.Unlock()
return m.lastConsumeResult
}
46 changes: 23 additions & 23 deletions processor/memorylimiterprocessor/memorylimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ const (
)

var (
// errForcedDrop will be returned to callers of ConsumeTraceData to indicate
// that data is being dropped due to high memory usage.
errForcedDrop = errors.New("data dropped due to high memory usage")
// errDataRefused will be returned to callers of ConsumeTraceData to indicate
// that data is being refused due to high memory usage.
errDataRefused = errors.New("data refused due to high memory usage")

// Construction errors

Expand Down Expand Up @@ -70,8 +70,8 @@ type memoryLimiter struct {
memCheckWait time.Duration
ballastSize uint64

// forceDrop is used atomically to indicate when data should be dropped.
forceDrop *atomic.Bool
// mustRefuse is used to indicate when data should be refused.
mustRefuse *atomic.Bool

ticker *time.Ticker

Expand Down Expand Up @@ -129,7 +129,7 @@ func newMemoryLimiter(set processor.CreateSettings, cfg *Config) (*memoryLimiter
ticker: time.NewTicker(cfg.CheckInterval),
readMemStatsFn: runtime.ReadMemStats,
logger: logger,
forceDrop: &atomic.Bool{},
mustRefuse: &atomic.Bool{},
obsrep: obsrep,
}

Expand Down Expand Up @@ -180,15 +180,15 @@ func (ml *memoryLimiter) shutdown(context.Context) error {

func (ml *memoryLimiter) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
numSpans := td.SpanCount()
if ml.forceDrop.Load() {
if ml.mustRefuse.Load() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
// it is necessary to check the pipeline to see if this is directly connected
// to a receiver (ie.: a receiver is on the call stack). For now it
// assumes that the pipeline is properly configured and a receiver is on the
// callstack.
// callstack and that the receiver will correctly retry the refused data again.
ml.obsrep.TracesRefused(ctx, numSpans)

return td, errForcedDrop
return td, errDataRefused
}

// Even if the next consumer returns error record the data as accepted by
Expand All @@ -199,14 +199,14 @@ func (ml *memoryLimiter) processTraces(ctx context.Context, td ptrace.Traces) (p

func (ml *memoryLimiter) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
numDataPoints := md.DataPointCount()
if ml.forceDrop.Load() {
if ml.mustRefuse.Load() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
// it is necessary to check the pipeline to see if this is directly connected
// to a receiver (ie.: a receiver is on the call stack). For now it
// assumes that the pipeline is properly configured and a receiver is on the
// callstack.
ml.obsrep.MetricsRefused(ctx, numDataPoints)
return md, errForcedDrop
return md, errDataRefused
}

// Even if the next consumer returns error record the data as accepted by
Expand All @@ -217,15 +217,15 @@ func (ml *memoryLimiter) processMetrics(ctx context.Context, md pmetric.Metrics)

func (ml *memoryLimiter) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
numRecords := ld.LogRecordCount()
if ml.forceDrop.Load() {
if ml.mustRefuse.Load() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
// it is necessary to check the pipeline to see if this is directly connected
// to a receiver (ie.: a receiver is on the call stack). For now it
// assumes that the pipeline is properly configured and a receiver is on the
// callstack.
ml.obsrep.LogsRefused(ctx, numRecords)

return ld, errForcedDrop
return ld, errDataRefused
}

// Even if the next consumer returns error record the data as accepted by
Expand Down Expand Up @@ -288,33 +288,33 @@ func (ml *memoryLimiter) checkMemLimits() {
ms = ml.doGCandReadMemStats()
}

// Remember current dropping state.
wasForcingDrop := ml.forceDrop.Load()
// Remember current state.
wasRefusing := ml.mustRefuse.Load()

// Check if the memory usage is above the soft limit.
mustForceDrop := ml.usageChecker.aboveSoftLimit(ms)
mustRefuse := ml.usageChecker.aboveSoftLimit(ms)

if wasForcingDrop && !mustForceDrop {
// Was previously dropping but enough memory is available now, no need to limit.
if wasRefusing && !mustRefuse {
// Was previously refusing but enough memory is available now, no need to limit.
ml.logger.Info("Memory usage back within limits. Resuming normal operation.", memstatToZapField(ms))
}

if !wasForcingDrop && mustForceDrop {
if !wasRefusing && mustRefuse {
// We are above soft limit, do a GC if it wasn't done recently and see if
// it brings memory usage below the soft limit.
if time.Since(ml.lastGCDone) > minGCIntervalWhenSoftLimited {
ml.logger.Info("Memory usage is above soft limit. Forcing a GC.", memstatToZapField(ms))
ms = ml.doGCandReadMemStats()
// Check the limit again to see if GC helped.
mustForceDrop = ml.usageChecker.aboveSoftLimit(ms)
mustRefuse = ml.usageChecker.aboveSoftLimit(ms)
}

if mustForceDrop {
ml.logger.Warn("Memory usage is above soft limit. Dropping data.", memstatToZapField(ms))
if mustRefuse {
ml.logger.Warn("Memory usage is above soft limit. Refusing data.", memstatToZapField(ms))
}
}

ml.forceDrop.Store(mustForceDrop)
ml.mustRefuse.Store(mustRefuse)
}

type memUsageChecker struct {
Expand Down
Loading

0 comments on commit a3c5432

Please sign in to comment.