Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce soft and hard limits for memory limiter #2250

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 48 additions & 28 deletions processor/memorylimiter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,45 @@
Supported pipeline types: metrics, traces

The memory limiter processor is used to prevent out of memory situations on
the collector. Given that the amount and type of data a collector processes is
the collector. Given that the amount and type of data the collector processes is
environment specific and resource utilization of the collector is also dependent
on the configured processors, it is important to put checks in place regarding
memory usage. The memory_limiter processor offers the follow safeguards:
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved

- Ability to define an interval when memory usage will be checked and if memory
usage exceeds a defined limit will trigger GC to reduce memory consumption.
- Ability to define an interval when memory usage will be compared against the
previous interval's value and if the delta exceeds a defined limit will trigger
GC to reduce memory consumption.

In addition, there is a command line option (`mem-ballast-size-mib`) which can be
used to define a ballast, which allocates memory and provides stability to the
heap. If defined, the ballast increases the base size of the heap so that GC
triggers are delayed and the number of GC cycles over time is reduced. While the
ballast is configured via the command line, today the same value configured on the
command line must also be defined in the memory_limiter processor.

Note that while these configuration options can help mitigate out of memory
situations, they are not a replacement for properly sizing and configuring the
collector. For example, if the limit or spike thresholds are crossed, the collector
will return errors to all receive operations until enough memory is freed. This may
memory usage.

The memory_limiter processor allows to perform periodic checks of memory
usage if it exceeds defined limits will begin dropping data and forcing GC to reduce
memory consumption.

The memory_limiter uses soft and hard memory limits. Hard limit is always above or equal
the soft limit.

When the memory usage exceeds the soft limit the processor will start dropping the data and
return errors to the preceding component it in the pipeline (which should be normally a
receiver).

When the memory usage is above the hard limit in addition to dropping the data the
processor will forcedly perform garbage collection in order to try to free memory.

When the memory usage drop below the soft limit, the normal operation is resumed (data
will not longer be dropped and no forced garbage collection will be performed).

The difference between the soft limit and hard limits is defined via `spike_limit_mib`
configuration option. The value of this option should be selected in a way that ensures
that between the memory check intervals the memory usage cannot increase by more than this
value (otherwise memory usage may exceed the hard limit - even if temporarily).
A good starting point for `spike_limit_mib` is 20% of the hard limit. Bigger
`spike_limit_mib` values may be necessary for spiky traffic or for longer check intervals.

In addition, if the command line option `mem-ballast-size-mib` is used to specify a
ballast (see command line help for details), the same value that is provided via the
command line must also be defined in the memory_limiter processor using `ballast_size_mib`
config option. If the command line option value and config option value don't match
the behavior of the memory_limiter processor will be unpredictable.

Note that while the processor can help mitigate out of memory situations,
it is not a replacement for properly sizing and configuring the
collector. Keep in mind that if the soft limit is crossed, the collector will
return errors to all receive operations until enough memory is freed. This will
result in dropped data.

It is highly recommended to configure the ballast command line option as well as the
Expand All @@ -39,13 +56,16 @@ Please refer to [config.go](./config.go) for the config spec.

The following configuration options **must be changed**:
- `check_interval` (default = 0s): Time between measurements of memory
usage. Values below 1 second are not recommended since it can result in
unnecessary CPU consumption.
usage. The recommended value is 1 second.
If the expected traffic to the Collector is very spiky then decrease the `check_interval`
or increase `spike_limit_mib` to avoid memory usage going over the hard limit.
- `limit_mib` (default = 0): Maximum amount of memory, in MiB, targeted to be
allocated by the process heap. Note that typically the total memory usage of
process will be about 50MiB higher than this value.
- `spike_limit_mib` (default = 0): Maximum spike expected between the
measurements of memory usage. The value must be less than `limit_mib`.
process will be about 50MiB higher than this value. This defines the hard limit.
- `spike_limit_mib` (default = 20% of `limit_mib`): Maximum spike expected between the
measurements of memory usage. The value must be less than `limit_mib`. The soft limit
value will be equal to (limit_mib - spike_limit_mib).
The recommended value for `spike_limit_mib` is about 20% `limit_mib`.
- `limit_percentage` (default = 0): Maximum amount of total memory targeted to be
allocated by the process heap. This configuration is supported on Linux systems with cgroups
and it's intended to be used in dynamic platforms like docker.
Expand All @@ -69,16 +89,16 @@ Examples:
processors:
memory_limiter:
ballast_size_mib: 2000
check_interval: 5s
check_interval: 1s
limit_mib: 4000
spike_limit_mib: 500
spike_limit_mib: 800
```

```yaml
processors:
memory_limiter:
ballast_size_mib: 2000
check_interval: 5s
check_interval: 1s
limit_percentage: 50
spike_limit_percentage: 30
```
Expand Down
110 changes: 84 additions & 26 deletions processor/memorylimiter/memorylimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var (
var getMemoryFn = iruntime.TotalMemory

type memoryLimiter struct {
decision dropDecision
usageChecker memUsageChecker

memCheckWait time.Duration
ballastSize uint64
Expand All @@ -71,6 +71,8 @@ type memoryLimiter struct {

ticker *time.Ticker

lastGCDone time.Time

// The function to read the mem values is set as a reference to help with
// testing different values.
readMemStatsFn func(m *runtime.MemStats)
Expand All @@ -83,6 +85,10 @@ type memoryLimiter struct {
obsrep *obsreport.ProcessorObsReport
}

// Minimum interval between forced GC when in soft limited mode. We don't want to
// do GCs too frequently since it is a CPU-heavy operation.
const minGCIntervalWhenSoftLimited = 10 * time.Second

// newMemoryLimiter returns a new memorylimiter processor.
func newMemoryLimiter(logger *zap.Logger, cfg *Config) (*memoryLimiter, error) {
ballastSize := uint64(cfg.BallastSizeMiB) * mibBytes
Expand All @@ -94,18 +100,18 @@ func newMemoryLimiter(logger *zap.Logger, cfg *Config) (*memoryLimiter, error) {
return nil, errLimitOutOfRange
}

decision, err := getDecision(cfg, logger)
usageChecker, err := getMemUsageChecker(cfg, logger)
if err != nil {
return nil, err
}

logger.Info("Memory limiter configured",
zap.Uint64("limit_mib", decision.memAllocLimit),
zap.Uint64("spike_limit_mib", decision.memSpikeLimit),
zap.Uint64("limit_mib", usageChecker.memAllocLimit),
zap.Uint64("spike_limit_mib", usageChecker.memSpikeLimit),
zap.Duration("check_interval", cfg.CheckInterval))

ml := &memoryLimiter{
decision: *decision,
usageChecker: *usageChecker,
memCheckWait: cfg.CheckInterval,
ballastSize: ballastSize,
ticker: time.NewTicker(cfg.CheckInterval),
Expand All @@ -120,11 +126,11 @@ func newMemoryLimiter(logger *zap.Logger, cfg *Config) (*memoryLimiter, error) {
return ml, nil
}

func getDecision(cfg *Config, logger *zap.Logger) (*dropDecision, error) {
func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, error) {
memAllocLimit := uint64(cfg.MemoryLimitMiB) * mibBytes
memSpikeLimit := uint64(cfg.MemorySpikeLimitMiB) * mibBytes
if cfg.MemoryLimitMiB != 0 {
return newFixedDecision(memAllocLimit, memSpikeLimit)
return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit)
}
totalMemory, err := getMemoryFn()
if err != nil {
Expand All @@ -134,7 +140,7 @@ func getDecision(cfg *Config, logger *zap.Logger) (*dropDecision, error) {
zap.Int64("total_memory", totalMemory),
zap.Uint32("limit_percentage", cfg.MemoryLimitPercentage),
zap.Uint32("spike_limit_percentage", cfg.MemorySpikePercentage))
return newPercentageDecision(totalMemory, int64(cfg.MemoryLimitPercentage), int64(cfg.MemorySpikePercentage))
return newPercentageMemUsageChecker(totalMemory, int64(cfg.MemoryLimitPercentage), int64(cfg.MemorySpikePercentage))
}

func (ml *memoryLimiter) shutdown(context.Context) error {
Expand Down Expand Up @@ -220,6 +226,7 @@ func (ml *memoryLimiter) readMemStats() *runtime.MemStats {
ml.logger.Warn(typeStr + " is likely incorrectly configured. " + ballastSizeMibKey +
" must be set equal to --mem-ballast-size-mib command line option.")
}

return ms
}

Expand All @@ -228,7 +235,7 @@ func (ml *memoryLimiter) readMemStats() *runtime.MemStats {
func (ml *memoryLimiter) startMonitoring() {
go func() {
for range ml.ticker.C {
ml.memCheck()
ml.checkMemLimits()
}
}()
}
Expand All @@ -238,44 +245,95 @@ func (ml *memoryLimiter) forcingDrop() bool {
return atomic.LoadInt64(&ml.forceDrop) != 0
}

func (ml *memoryLimiter) memCheck() {
func (ml *memoryLimiter) setForcingDrop(b bool) {
var i int64
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
if b {
i = 1
}
atomic.StoreInt64(&ml.forceDrop, i)
}

func memstatToZapField(ms *runtime.MemStats) zap.Field {
return zap.Uint64("cur_mem_mib", ms.Alloc/1024/1024)
}

func (ml *memoryLimiter) doGCandReadMemStats() *runtime.MemStats {
runtime.GC()
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
ml.lastGCDone = time.Now()
ms := ml.readMemStats()
ml.memLimiting(ms)
ml.logger.Info("Memory usage after GC.", memstatToZapField(ms))
return ms
}

func (ml *memoryLimiter) memLimiting(ms *runtime.MemStats) {
if !ml.decision.shouldDrop(ms) {
atomic.StoreInt64(&ml.forceDrop, 0)
} else {
atomic.StoreInt64(&ml.forceDrop, 1)
// Force a GC at this point and see if this is enough to get to
// the desired level.
runtime.GC()
func (ml *memoryLimiter) checkMemLimits() {
ms := ml.readMemStats()

ml.logger.Debug("Currently used memory.", memstatToZapField(ms))

if ml.usageChecker.aboveHardLimit(ms) {
ml.logger.Warn("Memory usage is above hard limit. Forcing a GC.", memstatToZapField(ms))
ms = ml.doGCandReadMemStats()
}

// Remember current dropping state.
wasForcingDrop := ml.forcingDrop()

// Check if the memory usage is above the soft limit.
mustForceDrop := ml.usageChecker.aboveSoftLimit(ms)

if wasForcingDrop && !mustForceDrop {
// Was previously dropping but enough memory is available now, no need to limit.
ml.logger.Info("Memory usage back within limits. Resuming normal operation.", memstatToZapField(ms))
}

if !wasForcingDrop && mustForceDrop {
// We are above soft limit, do a GC if it wasn't done recently and see if
// it brings memory usage below the soft limit.
if time.Since(ml.lastGCDone) > minGCIntervalWhenSoftLimited {
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
ml.logger.Info("Memory usage is above soft limit. Forcing a GC.", memstatToZapField(ms))
ms = ml.doGCandReadMemStats()
// Check the limit again to see if GC helped.
mustForceDrop = ml.usageChecker.aboveSoftLimit(ms)
}

if mustForceDrop {
ml.logger.Warn("Memory usage is above soft limit. Dropping data.", memstatToZapField(ms))
}
}

ml.setForcingDrop(mustForceDrop)
}

type dropDecision struct {
type memUsageChecker struct {
memAllocLimit uint64
memSpikeLimit uint64
}

func (d dropDecision) shouldDrop(ms *runtime.MemStats) bool {
return d.memAllocLimit <= ms.Alloc || d.memAllocLimit-ms.Alloc <= d.memSpikeLimit
func (d memUsageChecker) aboveSoftLimit(ms *runtime.MemStats) bool {
return ms.Alloc >= d.memAllocLimit-d.memSpikeLimit
}

func newFixedDecision(memAllocLimit, memSpikeLimit uint64) (*dropDecision, error) {
func (d memUsageChecker) aboveHardLimit(ms *runtime.MemStats) bool {
return ms.Alloc >= d.memAllocLimit
}

func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) (*memUsageChecker, error) {
if memSpikeLimit >= memAllocLimit {
return nil, errMemSpikeLimitOutOfRange
}
return &dropDecision{
if memSpikeLimit == 0 {
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
// If spike limit is unspecified use 20% of mem limit.
memSpikeLimit = memAllocLimit / 5
}
return &memUsageChecker{
memAllocLimit: memAllocLimit,
memSpikeLimit: memSpikeLimit,
}, nil
}

func newPercentageDecision(totalMemory int64, percentageLimit, percentageSpike int64) (*dropDecision, error) {
func newPercentageMemUsageChecker(totalMemory int64, percentageLimit, percentageSpike int64) (*memUsageChecker, error) {
if percentageLimit > 100 || percentageLimit <= 0 || percentageSpike > 100 || percentageSpike <= 0 {
return nil, errPercentageLimitOutOfRange
}
return newFixedDecision(uint64(percentageLimit*totalMemory)/100, uint64(percentageSpike*totalMemory)/100)
return newFixedMemUsageChecker(uint64(percentageLimit*totalMemory)/100, uint64(percentageSpike*totalMemory)/100)
}
Loading