From 1c6d6248dc2cf5961dcd4f0be5d55db866b58f3f Mon Sep 17 00:00:00 2001 From: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com> Date: Tue, 15 Dec 2020 12:56:10 -0500 Subject: [PATCH] Introduce soft and hard limits for memory limiter (#2250) Contributes to https://github.com/open-telemetry/opentelemetry-collector/issues/1121 Memory limiter processor previously had only one limit. When exceeding this limit it was previously continuously calling GC. This resulted in huge CPU consumption if the check interval was small and this was forcing to use large check intervals. This in turn was resulting in lethargic response to growing memory usage and the memory limiter was not very effective in situations when memory usage were growing rapidly (e.g. when there was a big spike or when the backend was down). I changed the logic of memory limiter to be based on 2 thresholds: soft and hard. While below soft threshold the memory limiter is fully disabled. Between soft and hard limiter the limiter begins dropping incoming data but does not perform GC. Only when exceed the hard limit we perform GC. The net result is that the actually used memory is limited at the level set by soft limit and fluctuates between soft and hard limit as the garbage is created and collected. Correspondingly GC runs much more infrequently, only when the hard limit is reached and such GC immediately collects significant amount of garbage (reduces memory usage close to soft limit) and thus does not require subsequent GC calls for quite some time. I did some performance tests with the old and new approaches with 4000 Mib limit, 100,000 spans per second and with exporter completely blocked (no backend). With the old approach an interval of 100 ms causes about 450% of CPU usage once the memory limit is hit (while below limit the CPU usage is around 50%). Here is an extract of performance test output showing the moment when the limiter is hit: ``` 2020/12/03 20:20:47 Agent RAM (RES):3296 MiB, CPU:44.4% | Sent: 7022700 items | Received: 0 items (0/sec) 2020/12/03 20:20:50 Agent RAM (RES):3481 MiB, CPU:43.0% | Sent: 7322500 items | Received: 0 items (0/sec) 2020/12/03 20:20:53 Agent RAM (RES):3681 MiB, CPU:41.6% | Sent: 7614100 items | Received: 0 items (0/sec) 2020/12/03 20:20:56 Agent RAM (RES):3703 MiB, CPU:47.7% | Sent: 7863600 items | Received: 0 items (0/sec) 2020/12/03 20:20:59 Agent RAM (RES):3028 MiB, CPU:47.0% | Sent: 8062700 items | Received: 0 items (0/sec) 2020/12/03 20:21:02 Agent RAM (RES):3644 MiB, CPU:246.9% | Sent: 8331600 items | Received: 0 items (0/sec) <-- likely a regular GC, not at limit yet 2020/12/03 20:21:05 Agent RAM (RES):3555 MiB, CPU:72.8% | Sent: 8620500 items | Received: 0 items (0/sec) 2020/12/03 20:21:08 Agent RAM (RES):3717 MiB, CPU:57.5% | Sent: 8895500 items | Received: 0 items (0/sec) 2020/12/03 20:21:11 Agent RAM (RES):3877 MiB, CPU:126.9% | Sent: 9172900 items | Received: 0 items (0/sec) <-- hit limit 2020/12/03 20:21:14 Agent RAM (RES):3900 MiB, CPU:127.6% | Sent: 9461100 items | Received: 0 items (0/sec) 2020/12/03 20:21:17 Agent RAM (RES):3918 MiB, CPU:201.7% | Sent: 9728900 items | Received: 0 items (0/sec) 2020/12/03 20:21:20 Agent RAM (RES):3938 MiB, CPU:326.0% | Sent: 9994700 items | Received: 0 items (0/sec) 2020/12/03 20:21:23 Agent RAM (RES):3951 MiB, CPU:470.8% | Sent: 10253200 items | Received: 0 items (0/sec) 2020/12/03 20:21:26 Agent RAM (RES):3955 MiB, CPU:440.0% | Sent: 10504400 items | Received: 0 items (0/sec) 2020/12/03 20:21:29 Agent RAM (RES):3961 MiB, CPU:451.0% | Sent: 10766200 items | Received: 0 items (0/sec) 2020/12/03 20:21:32 Agent RAM (RES):3965 MiB, CPU:465.8% | Sent: 11008400 items | Received: 0 items (0/sec) 2020/12/03 20:21:35 Agent RAM (RES):3974 MiB, CPU:423.6% | Sent: 11272700 items | Received: 0 items (0/sec) ``` Even the interval of 1 second was unusable with the old approach and we had to choose a longer interval to avoid performance degradation. With the new approach under the exact same conditions when using 100ms check interval the CPU usage is 50% when below memory limits and when the hard memory limits are hit the CPU usage increases to 68%. With 1 second check interval there is no measurable increase in CPU usage when memory limiter is hit (unlike 9x CPU increase with the old approach). Here is an extract of performance test output showing the moment when the limiter is hit: ``` 2020/12/03 20:28:35 Agent RAM (RES):1888 MiB, CPU:48.2% | Sent: 3796400 items | Received: 0 items (0/sec) 2020/12/03 20:28:38 Agent RAM (RES):2029 MiB, CPU:47.1% | Sent: 4088600 items | Received: 0 items (0/sec) 2020/12/03 20:28:41 Agent RAM (RES):2197 MiB, CPU:48.3% | Sent: 4388200 items | Received: 0 items (0/sec) 2020/12/03 20:28:44 Agent RAM (RES):2370 MiB, CPU:45.7% | Sent: 4679900 items | Received: 0 items (0/sec) 2020/12/03 20:28:47 Agent RAM (RES):2558 MiB, CPU:49.0% | Sent: 4972200 items | Received: 0 items (0/sec) 2020/12/03 20:28:50 Agent RAM (RES):2771 MiB, CPU:47.4% | Sent: 5260700 items | Received: 0 items (0/sec) 2020/12/03 20:28:53 Agent RAM (RES):2921 MiB, CPU:133.3% | Sent: 5547500 items | Received: 0 items (0/sec) 2020/12/03 20:28:56 Agent RAM (RES):2922 MiB, CPU:50.1% | Sent: 5846700 items | Received: 0 items (0/sec) 2020/12/03 20:28:59 Agent RAM (RES):2957 MiB, CPU:43.6% | Sent: 6131700 items | Received: 0 items (0/sec) 2020/12/03 20:29:02 Agent RAM (RES):3144 MiB, CPU:50.0% | Sent: 6419400 items | Received: 0 items (0/sec) 2020/12/03 20:29:05 Agent RAM (RES):3328 MiB, CPU:49.0% | Sent: 6719100 items | Received: 0 items (0/sec) 2020/12/03 20:29:08 Agent RAM (RES):3488 MiB, CPU:38.6% | Sent: 7007300 items | Received: 0 items (0/sec) 2020/12/03 20:29:11 Agent RAM (RES):3667 MiB, CPU:42.0% | Sent: 7306700 items | Received: 0 items (0/sec) 2020/12/03 20:29:14 Agent RAM (RES):3813 MiB, CPU:37.4% | Sent: 7577700 items | Received: 0 items (0/sec) 2020/12/03 20:29:17 Agent RAM (RES):3802 MiB, CPU:170.9% | Sent: 7860100 items | Received: 0 items (0/sec) <-- hit hard limit 2020/12/03 20:29:20 Agent RAM (RES):3882 MiB, CPU:68.1% | Sent: 8160000 items | Received: 0 items (0/sec) 2020/12/03 20:29:23 Agent RAM (RES):4007 MiB, CPU:42.3% | Sent: 8447900 items | Received: 0 items (0/sec) 2020/12/03 20:29:26 Agent RAM (RES):4007 MiB, CPU:39.3% | Sent: 8747800 items | Received: 0 items (0/sec) 2020/12/03 20:29:29 Agent RAM (RES):4008 MiB, CPU:34.3% | Sent: 9038400 items | Received: 0 items (0/sec) 2020/12/03 20:29:32 Agent RAM (RES):4009 MiB, CPU:39.9% | Sent: 9317200 items | Received: 0 items (0/sec) ``` This is a dramatically better picture compared to the old approach. With 1 second interval memory limiter's impact on CPU is not measurable with the new approach, whereas with the old approach it was still showing several times higher CPU when limit was hit. This makes small check intervals practically useful and allows to effectively suppress incoming surges of data. --- processor/memorylimiter/README.md | 76 ++++++----- processor/memorylimiter/memorylimiter.go | 110 ++++++++++++---- processor/memorylimiter/memorylimiter_test.go | 119 +++++++++--------- testbed/tests/trace_test.go | 70 +++-------- 4 files changed, 213 insertions(+), 162 deletions(-) diff --git a/processor/memorylimiter/README.md b/processor/memorylimiter/README.md index 1e649a770e3..0b5008c5463 100644 --- a/processor/memorylimiter/README.md +++ b/processor/memorylimiter/README.md @@ -3,28 +3,45 @@ Supported pipeline types: metrics, traces The memory limiter processor is used to prevent out of memory situations on -the collector. Given that the amount and type of data a collector processes is +the collector. Given that the amount and type of data the collector processes is environment specific and resource utilization of the collector is also dependent on the configured processors, it is important to put checks in place regarding -memory usage. The memory_limiter processor offers the follow safeguards: - -- Ability to define an interval when memory usage will be checked and if memory -usage exceeds a defined limit will trigger GC to reduce memory consumption. -- Ability to define an interval when memory usage will be compared against the -previous interval's value and if the delta exceeds a defined limit will trigger -GC to reduce memory consumption. - -In addition, there is a command line option (`mem-ballast-size-mib`) which can be -used to define a ballast, which allocates memory and provides stability to the -heap. If defined, the ballast increases the base size of the heap so that GC -triggers are delayed and the number of GC cycles over time is reduced. While the -ballast is configured via the command line, today the same value configured on the -command line must also be defined in the memory_limiter processor. - -Note that while these configuration options can help mitigate out of memory -situations, they are not a replacement for properly sizing and configuring the -collector. For example, if the limit or spike thresholds are crossed, the collector -will return errors to all receive operations until enough memory is freed. This may +memory usage. + +The memory_limiter processor allows to perform periodic checks of memory +usage if it exceeds defined limits will begin dropping data and forcing GC to reduce +memory consumption. + +The memory_limiter uses soft and hard memory limits. Hard limit is always above or equal +the soft limit. + +When the memory usage exceeds the soft limit the processor will start dropping the data and +return errors to the preceding component it in the pipeline (which should be normally a +receiver). + +When the memory usage is above the hard limit in addition to dropping the data the +processor will forcedly perform garbage collection in order to try to free memory. + +When the memory usage drop below the soft limit, the normal operation is resumed (data +will not longer be dropped and no forced garbage collection will be performed). + +The difference between the soft limit and hard limits is defined via `spike_limit_mib` +configuration option. The value of this option should be selected in a way that ensures +that between the memory check intervals the memory usage cannot increase by more than this +value (otherwise memory usage may exceed the hard limit - even if temporarily). +A good starting point for `spike_limit_mib` is 20% of the hard limit. Bigger +`spike_limit_mib` values may be necessary for spiky traffic or for longer check intervals. + +In addition, if the command line option `mem-ballast-size-mib` is used to specify a +ballast (see command line help for details), the same value that is provided via the +command line must also be defined in the memory_limiter processor using `ballast_size_mib` +config option. If the command line option value and config option value don't match +the behavior of the memory_limiter processor will be unpredictable. + +Note that while the processor can help mitigate out of memory situations, +it is not a replacement for properly sizing and configuring the +collector. Keep in mind that if the soft limit is crossed, the collector will +return errors to all receive operations until enough memory is freed. This will result in dropped data. It is highly recommended to configure the ballast command line option as well as the @@ -39,13 +56,16 @@ Please refer to [config.go](./config.go) for the config spec. The following configuration options **must be changed**: - `check_interval` (default = 0s): Time between measurements of memory -usage. Values below 1 second are not recommended since it can result in -unnecessary CPU consumption. +usage. The recommended value is 1 second. +If the expected traffic to the Collector is very spiky then decrease the `check_interval` +or increase `spike_limit_mib` to avoid memory usage going over the hard limit. - `limit_mib` (default = 0): Maximum amount of memory, in MiB, targeted to be allocated by the process heap. Note that typically the total memory usage of -process will be about 50MiB higher than this value. -- `spike_limit_mib` (default = 0): Maximum spike expected between the -measurements of memory usage. The value must be less than `limit_mib`. +process will be about 50MiB higher than this value. This defines the hard limit. +- `spike_limit_mib` (default = 20% of `limit_mib`): Maximum spike expected between the +measurements of memory usage. The value must be less than `limit_mib`. The soft limit +value will be equal to (limit_mib - spike_limit_mib). +The recommended value for `spike_limit_mib` is about 20% `limit_mib`. - `limit_percentage` (default = 0): Maximum amount of total memory targeted to be allocated by the process heap. This configuration is supported on Linux systems with cgroups and it's intended to be used in dynamic platforms like docker. @@ -69,16 +89,16 @@ Examples: processors: memory_limiter: ballast_size_mib: 2000 - check_interval: 5s + check_interval: 1s limit_mib: 4000 - spike_limit_mib: 500 + spike_limit_mib: 800 ``` ```yaml processors: memory_limiter: ballast_size_mib: 2000 - check_interval: 5s + check_interval: 1s limit_percentage: 50 spike_limit_percentage: 30 ``` diff --git a/processor/memorylimiter/memorylimiter.go b/processor/memorylimiter/memorylimiter.go index fcba9833b4e..5954e27f038 100644 --- a/processor/memorylimiter/memorylimiter.go +++ b/processor/memorylimiter/memorylimiter.go @@ -61,7 +61,7 @@ var ( var getMemoryFn = iruntime.TotalMemory type memoryLimiter struct { - decision dropDecision + usageChecker memUsageChecker memCheckWait time.Duration ballastSize uint64 @@ -71,6 +71,8 @@ type memoryLimiter struct { ticker *time.Ticker + lastGCDone time.Time + // The function to read the mem values is set as a reference to help with // testing different values. readMemStatsFn func(m *runtime.MemStats) @@ -83,6 +85,10 @@ type memoryLimiter struct { obsrep *obsreport.ProcessorObsReport } +// Minimum interval between forced GC when in soft limited mode. We don't want to +// do GCs too frequently since it is a CPU-heavy operation. +const minGCIntervalWhenSoftLimited = 10 * time.Second + // newMemoryLimiter returns a new memorylimiter processor. func newMemoryLimiter(logger *zap.Logger, cfg *Config) (*memoryLimiter, error) { ballastSize := uint64(cfg.BallastSizeMiB) * mibBytes @@ -94,18 +100,18 @@ func newMemoryLimiter(logger *zap.Logger, cfg *Config) (*memoryLimiter, error) { return nil, errLimitOutOfRange } - decision, err := getDecision(cfg, logger) + usageChecker, err := getMemUsageChecker(cfg, logger) if err != nil { return nil, err } logger.Info("Memory limiter configured", - zap.Uint64("limit_mib", decision.memAllocLimit), - zap.Uint64("spike_limit_mib", decision.memSpikeLimit), + zap.Uint64("limit_mib", usageChecker.memAllocLimit), + zap.Uint64("spike_limit_mib", usageChecker.memSpikeLimit), zap.Duration("check_interval", cfg.CheckInterval)) ml := &memoryLimiter{ - decision: *decision, + usageChecker: *usageChecker, memCheckWait: cfg.CheckInterval, ballastSize: ballastSize, ticker: time.NewTicker(cfg.CheckInterval), @@ -120,11 +126,11 @@ func newMemoryLimiter(logger *zap.Logger, cfg *Config) (*memoryLimiter, error) { return ml, nil } -func getDecision(cfg *Config, logger *zap.Logger) (*dropDecision, error) { +func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, error) { memAllocLimit := uint64(cfg.MemoryLimitMiB) * mibBytes memSpikeLimit := uint64(cfg.MemorySpikeLimitMiB) * mibBytes if cfg.MemoryLimitMiB != 0 { - return newFixedDecision(memAllocLimit, memSpikeLimit) + return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit) } totalMemory, err := getMemoryFn() if err != nil { @@ -134,7 +140,7 @@ func getDecision(cfg *Config, logger *zap.Logger) (*dropDecision, error) { zap.Int64("total_memory", totalMemory), zap.Uint32("limit_percentage", cfg.MemoryLimitPercentage), zap.Uint32("spike_limit_percentage", cfg.MemorySpikePercentage)) - return newPercentageDecision(totalMemory, int64(cfg.MemoryLimitPercentage), int64(cfg.MemorySpikePercentage)) + return newPercentageMemUsageChecker(totalMemory, int64(cfg.MemoryLimitPercentage), int64(cfg.MemorySpikePercentage)) } func (ml *memoryLimiter) shutdown(context.Context) error { @@ -220,6 +226,7 @@ func (ml *memoryLimiter) readMemStats() *runtime.MemStats { ml.logger.Warn(typeStr + " is likely incorrectly configured. " + ballastSizeMibKey + " must be set equal to --mem-ballast-size-mib command line option.") } + return ms } @@ -228,7 +235,7 @@ func (ml *memoryLimiter) readMemStats() *runtime.MemStats { func (ml *memoryLimiter) startMonitoring() { go func() { for range ml.ticker.C { - ml.memCheck() + ml.checkMemLimits() } }() } @@ -238,44 +245,95 @@ func (ml *memoryLimiter) forcingDrop() bool { return atomic.LoadInt64(&ml.forceDrop) != 0 } -func (ml *memoryLimiter) memCheck() { +func (ml *memoryLimiter) setForcingDrop(b bool) { + var i int64 + if b { + i = 1 + } + atomic.StoreInt64(&ml.forceDrop, i) +} + +func memstatToZapField(ms *runtime.MemStats) zap.Field { + return zap.Uint64("cur_mem_mib", ms.Alloc/1024/1024) +} + +func (ml *memoryLimiter) doGCandReadMemStats() *runtime.MemStats { + runtime.GC() + ml.lastGCDone = time.Now() ms := ml.readMemStats() - ml.memLimiting(ms) + ml.logger.Info("Memory usage after GC.", memstatToZapField(ms)) + return ms } -func (ml *memoryLimiter) memLimiting(ms *runtime.MemStats) { - if !ml.decision.shouldDrop(ms) { - atomic.StoreInt64(&ml.forceDrop, 0) - } else { - atomic.StoreInt64(&ml.forceDrop, 1) - // Force a GC at this point and see if this is enough to get to - // the desired level. - runtime.GC() +func (ml *memoryLimiter) checkMemLimits() { + ms := ml.readMemStats() + + ml.logger.Debug("Currently used memory.", memstatToZapField(ms)) + + if ml.usageChecker.aboveHardLimit(ms) { + ml.logger.Warn("Memory usage is above hard limit. Forcing a GC.", memstatToZapField(ms)) + ms = ml.doGCandReadMemStats() } + + // Remember current dropping state. + wasForcingDrop := ml.forcingDrop() + + // Check if the memory usage is above the soft limit. + mustForceDrop := ml.usageChecker.aboveSoftLimit(ms) + + if wasForcingDrop && !mustForceDrop { + // Was previously dropping but enough memory is available now, no need to limit. + ml.logger.Info("Memory usage back within limits. Resuming normal operation.", memstatToZapField(ms)) + } + + if !wasForcingDrop && mustForceDrop { + // We are above soft limit, do a GC if it wasn't done recently and see if + // it brings memory usage below the soft limit. + if time.Since(ml.lastGCDone) > minGCIntervalWhenSoftLimited { + ml.logger.Info("Memory usage is above soft limit. Forcing a GC.", memstatToZapField(ms)) + ms = ml.doGCandReadMemStats() + // Check the limit again to see if GC helped. + mustForceDrop = ml.usageChecker.aboveSoftLimit(ms) + } + + if mustForceDrop { + ml.logger.Warn("Memory usage is above soft limit. Dropping data.", memstatToZapField(ms)) + } + } + + ml.setForcingDrop(mustForceDrop) } -type dropDecision struct { +type memUsageChecker struct { memAllocLimit uint64 memSpikeLimit uint64 } -func (d dropDecision) shouldDrop(ms *runtime.MemStats) bool { - return d.memAllocLimit <= ms.Alloc || d.memAllocLimit-ms.Alloc <= d.memSpikeLimit +func (d memUsageChecker) aboveSoftLimit(ms *runtime.MemStats) bool { + return ms.Alloc >= d.memAllocLimit-d.memSpikeLimit } -func newFixedDecision(memAllocLimit, memSpikeLimit uint64) (*dropDecision, error) { +func (d memUsageChecker) aboveHardLimit(ms *runtime.MemStats) bool { + return ms.Alloc >= d.memAllocLimit +} + +func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) (*memUsageChecker, error) { if memSpikeLimit >= memAllocLimit { return nil, errMemSpikeLimitOutOfRange } - return &dropDecision{ + if memSpikeLimit == 0 { + // If spike limit is unspecified use 20% of mem limit. + memSpikeLimit = memAllocLimit / 5 + } + return &memUsageChecker{ memAllocLimit: memAllocLimit, memSpikeLimit: memSpikeLimit, }, nil } -func newPercentageDecision(totalMemory int64, percentageLimit, percentageSpike int64) (*dropDecision, error) { +func newPercentageMemUsageChecker(totalMemory int64, percentageLimit, percentageSpike int64) (*memUsageChecker, error) { if percentageLimit > 100 || percentageLimit <= 0 || percentageSpike > 100 || percentageSpike <= 0 { return nil, errPercentageLimitOutOfRange } - return newFixedDecision(uint64(percentageLimit*totalMemory)/100, uint64(percentageSpike*totalMemory)/100) + return newFixedMemUsageChecker(uint64(percentageLimit*totalMemory)/100, uint64(percentageSpike*totalMemory)/100) } diff --git a/processor/memorylimiter/memorylimiter_test.go b/processor/memorylimiter/memorylimiter_test.go index 244f4e9f67d..396038e5f9d 100644 --- a/processor/memorylimiter/memorylimiter_test.go +++ b/processor/memorylimiter/memorylimiter_test.go @@ -104,13 +104,14 @@ func TestNew(t *testing.T) { func TestMetricsMemoryPressureResponse(t *testing.T) { var currentMemAlloc uint64 ml := &memoryLimiter{ - decision: dropDecision{ + usageChecker: memUsageChecker{ memAllocLimit: 1024, }, readMemStatsFn: func(ms *runtime.MemStats) { ms.Alloc = currentMemAlloc }, obsrep: obsreport.NewProcessorObsReport(configtelemetry.LevelNone, ""), + logger: zap.NewNop(), } mp, err := processorhelper.NewMetricsProcessor( &Config{ @@ -130,12 +131,12 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { // Below memAllocLimit. currentMemAlloc = 800 - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, mp.ConsumeMetrics(ctx, md)) // Above memAllocLimit. currentMemAlloc = 1800 - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, md)) // Check ballast effect @@ -143,28 +144,28 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { // Below memAllocLimit accounting for ballast. currentMemAlloc = 800 + ml.ballastSize - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, mp.ConsumeMetrics(ctx, md)) // Above memAllocLimit even accountiing for ballast. currentMemAlloc = 1800 + ml.ballastSize - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, md)) // Restore ballast to default. ml.ballastSize = 0 // Check spike limit - ml.decision.memSpikeLimit = 512 + ml.usageChecker.memSpikeLimit = 512 // Below memSpikeLimit. currentMemAlloc = 500 - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, mp.ConsumeMetrics(ctx, md)) // Above memSpikeLimit. currentMemAlloc = 550 - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, md)) } @@ -174,13 +175,14 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { func TestTraceMemoryPressureResponse(t *testing.T) { var currentMemAlloc uint64 ml := &memoryLimiter{ - decision: dropDecision{ + usageChecker: memUsageChecker{ memAllocLimit: 1024, }, readMemStatsFn: func(ms *runtime.MemStats) { ms.Alloc = currentMemAlloc }, obsrep: obsreport.NewProcessorObsReport(configtelemetry.LevelNone, ""), + logger: zap.NewNop(), } tp, err := processorhelper.NewTraceProcessor( &Config{ @@ -200,12 +202,12 @@ func TestTraceMemoryPressureResponse(t *testing.T) { // Below memAllocLimit. currentMemAlloc = 800 - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, tp.ConsumeTraces(ctx, td)) // Above memAllocLimit. currentMemAlloc = 1800 - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td)) // Check ballast effect @@ -213,28 +215,28 @@ func TestTraceMemoryPressureResponse(t *testing.T) { // Below memAllocLimit accounting for ballast. currentMemAlloc = 800 + ml.ballastSize - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, tp.ConsumeTraces(ctx, td)) // Above memAllocLimit even accountiing for ballast. currentMemAlloc = 1800 + ml.ballastSize - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td)) // Restore ballast to default. ml.ballastSize = 0 // Check spike limit - ml.decision.memSpikeLimit = 512 + ml.usageChecker.memSpikeLimit = 512 // Below memSpikeLimit. currentMemAlloc = 500 - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, tp.ConsumeTraces(ctx, td)) // Above memSpikeLimit. currentMemAlloc = 550 - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td)) } @@ -244,13 +246,14 @@ func TestTraceMemoryPressureResponse(t *testing.T) { func TestLogMemoryPressureResponse(t *testing.T) { var currentMemAlloc uint64 ml := &memoryLimiter{ - decision: dropDecision{ + usageChecker: memUsageChecker{ memAllocLimit: 1024, }, readMemStatsFn: func(ms *runtime.MemStats) { ms.Alloc = currentMemAlloc }, obsrep: obsreport.NewProcessorObsReport(configtelemetry.LevelNone, ""), + logger: zap.NewNop(), } lp, err := processorhelper.NewLogsProcessor( &Config{ @@ -270,12 +273,12 @@ func TestLogMemoryPressureResponse(t *testing.T) { // Below memAllocLimit. currentMemAlloc = 800 - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, lp.ConsumeLogs(ctx, ld)) // Above memAllocLimit. currentMemAlloc = 1800 - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld)) // Check ballast effect @@ -283,42 +286,42 @@ func TestLogMemoryPressureResponse(t *testing.T) { // Below memAllocLimit accounting for ballast. currentMemAlloc = 800 + ml.ballastSize - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, lp.ConsumeLogs(ctx, ld)) // Above memAllocLimit even accountiing for ballast. currentMemAlloc = 1800 + ml.ballastSize - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld)) // Restore ballast to default. ml.ballastSize = 0 // Check spike limit - ml.decision.memSpikeLimit = 512 + ml.usageChecker.memSpikeLimit = 512 // Below memSpikeLimit. currentMemAlloc = 500 - ml.memCheck() + ml.checkMemLimits() assert.NoError(t, lp.ConsumeLogs(ctx, ld)) // Above memSpikeLimit. currentMemAlloc = 550 - ml.memCheck() + ml.checkMemLimits() assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld)) } func TestGetDecision(t *testing.T) { t.Run("fixed_limit", func(t *testing.T) { - d, err := getDecision(&Config{MemoryLimitMiB: 100, MemorySpikeLimitMiB: 20}, zap.NewNop()) + d, err := getMemUsageChecker(&Config{MemoryLimitMiB: 100, MemorySpikeLimitMiB: 20}, zap.NewNop()) require.NoError(t, err) - assert.Equal(t, &dropDecision{ + assert.Equal(t, &memUsageChecker{ memAllocLimit: 100 * mibBytes, memSpikeLimit: 20 * mibBytes, }, d) }) t.Run("fixed_limit_error", func(t *testing.T) { - d, err := getDecision(&Config{MemoryLimitMiB: 20, MemorySpikeLimitMiB: 100}, zap.NewNop()) + d, err := getMemUsageChecker(&Config{MemoryLimitMiB: 20, MemorySpikeLimitMiB: 100}, zap.NewNop()) require.Error(t, err) assert.Nil(t, d) }) @@ -330,55 +333,55 @@ func TestGetDecision(t *testing.T) { return 100 * mibBytes, nil } t.Run("percentage_limit", func(t *testing.T) { - d, err := getDecision(&Config{MemoryLimitPercentage: 50, MemorySpikePercentage: 10}, zap.NewNop()) + d, err := getMemUsageChecker(&Config{MemoryLimitPercentage: 50, MemorySpikePercentage: 10}, zap.NewNop()) require.NoError(t, err) - assert.Equal(t, &dropDecision{ + assert.Equal(t, &memUsageChecker{ memAllocLimit: 50 * mibBytes, memSpikeLimit: 10 * mibBytes, }, d) }) t.Run("percentage_limit_error", func(t *testing.T) { - d, err := getDecision(&Config{MemoryLimitPercentage: 101, MemorySpikePercentage: 10}, zap.NewNop()) + d, err := getMemUsageChecker(&Config{MemoryLimitPercentage: 101, MemorySpikePercentage: 10}, zap.NewNop()) require.Error(t, err) assert.Nil(t, d) - d, err = getDecision(&Config{MemoryLimitPercentage: 99, MemorySpikePercentage: 101}, zap.NewNop()) + d, err = getMemUsageChecker(&Config{MemoryLimitPercentage: 99, MemorySpikePercentage: 101}, zap.NewNop()) require.Error(t, err) assert.Nil(t, d) }) } func TestDropDecision(t *testing.T) { - decison1000Limit30Spike30, err := newPercentageDecision(1000, 60, 30) + decison1000Limit30Spike30, err := newPercentageMemUsageChecker(1000, 60, 30) require.NoError(t, err) - decison1000Limit60Spike50, err := newPercentageDecision(1000, 60, 50) + decison1000Limit60Spike50, err := newPercentageMemUsageChecker(1000, 60, 50) require.NoError(t, err) - decison1000Limit40Spike20, err := newPercentageDecision(1000, 40, 20) + decison1000Limit40Spike20, err := newPercentageMemUsageChecker(1000, 40, 20) require.NoError(t, err) - decison1000Limit40Spike60, err := newPercentageDecision(1000, 40, 60) + decison1000Limit40Spike60, err := newPercentageMemUsageChecker(1000, 40, 60) require.Error(t, err) assert.Nil(t, decison1000Limit40Spike60) tests := []struct { - name string - decision dropDecision - ms *runtime.MemStats - shouldDrop bool + name string + usageChecker memUsageChecker + ms *runtime.MemStats + shouldDrop bool }{ { - name: "should drop over limit", - decision: *decison1000Limit30Spike30, - ms: &runtime.MemStats{Alloc: 600}, - shouldDrop: true, + name: "should drop over limit", + usageChecker: *decison1000Limit30Spike30, + ms: &runtime.MemStats{Alloc: 600}, + shouldDrop: true, }, { - name: "should not drop", - decision: *decison1000Limit30Spike30, - ms: &runtime.MemStats{Alloc: 100}, - shouldDrop: false, + name: "should not drop", + usageChecker: *decison1000Limit30Spike30, + ms: &runtime.MemStats{Alloc: 100}, + shouldDrop: false, }, { - name: "should not drop spike, fixed decision", - decision: dropDecision{ + name: "should not drop spike, fixed usageChecker", + usageChecker: memUsageChecker{ memAllocLimit: 600, memSpikeLimit: 500, }, @@ -386,21 +389,21 @@ func TestDropDecision(t *testing.T) { shouldDrop: true, }, { - name: "should drop, spike, percentage decision", - decision: *decison1000Limit60Spike50, - ms: &runtime.MemStats{Alloc: 300}, - shouldDrop: true, + name: "should drop, spike, percentage usageChecker", + usageChecker: *decison1000Limit60Spike50, + ms: &runtime.MemStats{Alloc: 300}, + shouldDrop: true, }, { - name: "should drop, spike, percentage decision", - decision: *decison1000Limit40Spike20, - ms: &runtime.MemStats{Alloc: 250}, - shouldDrop: true, + name: "should drop, spike, percentage usageChecker", + usageChecker: *decison1000Limit40Spike20, + ms: &runtime.MemStats{Alloc: 250}, + shouldDrop: true, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - shouldDrop := test.decision.shouldDrop(test.ms) + shouldDrop := test.usageChecker.aboveSoftLimit(test.ms) assert.Equal(t, test.shouldDrop, shouldDrop) }) } diff --git a/testbed/tests/trace_test.go b/testbed/tests/trace_test.go index 7424d78dd3e..b65ea31fca5 100644 --- a/testbed/tests/trace_test.go +++ b/testbed/tests/trace_test.go @@ -22,7 +22,6 @@ package tests import ( "context" - "fmt" "path" "path/filepath" "testing" @@ -120,8 +119,8 @@ func TestTraceNoBackend10kSPS(t *testing.T) { limitProcessors := map[string]string{ "memory_limiter": ` memory_limiter: - check_interval: 1s - limit_mib: 10 + check_interval: 100ms + limit_mib: 20 `, } @@ -131,60 +130,31 @@ func TestTraceNoBackend10kSPS(t *testing.T) { { Name: "NoMemoryLimit", Processor: noLimitProcessors, - ExpectedMaxRAM: 200, - ExpectedMinFinalRAM: 30, + ExpectedMaxRAM: 150, + ExpectedMinFinalRAM: 100, }, { Name: "MemoryLimit", Processor: limitProcessors, - ExpectedMaxRAM: 60, - ExpectedMinFinalRAM: 10, + ExpectedMaxRAM: 70, + ExpectedMinFinalRAM: 40, }, } - var testSenders = []struct { - name string - sender testbed.DataSender - receiver testbed.DataReceiver - resourceSpec testbed.ResourceSpec - configuration []processorConfig - }{ - { - "JaegerGRPC", - testbed.NewJaegerGRPCDataSender(testbed.DefaultHost, testbed.DefaultJaegerPort), - testbed.NewOCDataReceiver(testbed.DefaultOCPort), - testbed.ResourceSpec{ - ExpectedMaxCPU: 70, - ExpectedMaxRAM: 198, - }, - processorsConfig, - }, - { - "Zipkin", - testbed.NewZipkinDataSender(testbed.DefaultHost, testbed.DefaultZipkinAddressPort), - testbed.NewOCDataReceiver(testbed.DefaultOCPort), - testbed.ResourceSpec{ - ExpectedMaxCPU: 120, - ExpectedMaxRAM: 198, - }, - processorsConfig, - }, - } - - for _, test := range testSenders { - for _, testConf := range test.configuration { - testName := fmt.Sprintf("%s/%s", test.name, testConf.Name) - t.Run(testName, func(t *testing.T) { - ScenarioTestTraceNoBackend10kSPS( - t, - test.sender, - test.receiver, - test.resourceSpec, - performanceResultsSummary, - testConf, - ) - }) - } + for _, testConf := range processorsConfig { + t.Run(testConf.Name, func(t *testing.T) { + ScenarioTestTraceNoBackend10kSPS( + t, + testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), + testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)), + testbed.ResourceSpec{ + ExpectedMaxCPU: 50, + ExpectedMaxRAM: testConf.ExpectedMaxRAM, + }, + performanceResultsSummary, + testConf, + ) + }) } }