From 181f34afdad8f58de7a0924dea0a6de32b3fa616 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 5 Mar 2024 14:01:26 +0100 Subject: [PATCH] [deltatocumulative]: limit tracked streams --- .../deltatocumulativeprocessor/README.md | 6 ++- .../deltatocumulativeprocessor/config.go | 13 ++++- .../deltatocumulativeprocessor/factory.go | 5 -- .../internal/streams/limit.go | 39 ++++++++++++++ .../internal/streams/limit_test.go | 51 +++++++++++++++++++ .../deltatocumulativeprocessor/processor.go | 3 ++ 6 files changed, 109 insertions(+), 8 deletions(-) create mode 100644 processor/deltatocumulativeprocessor/internal/streams/limit.go create mode 100644 processor/deltatocumulativeprocessor/internal/streams/limit_test.go diff --git a/processor/deltatocumulativeprocessor/README.md b/processor/deltatocumulativeprocessor/README.md index 1a639128fca8..26e68308d115 100644 --- a/processor/deltatocumulativeprocessor/README.md +++ b/processor/deltatocumulativeprocessor/README.md @@ -25,6 +25,8 @@ processors: deltatocumulative: # how long until a series not receiving new samples is removed [ max_stale: | default = 5m ] + + # upper limit of streams to track. new streams exceeding this limit + # will be dropped + [ max_streams: | default = off] ``` - -There is no further configuration required. All delta samples are converted to cumulative. diff --git a/processor/deltatocumulativeprocessor/config.go b/processor/deltatocumulativeprocessor/config.go index b5744a9779b7..de31e8f8cfd5 100644 --- a/processor/deltatocumulativeprocessor/config.go +++ b/processor/deltatocumulativeprocessor/config.go @@ -13,12 +13,23 @@ import ( var _ component.ConfigValidator = (*Config)(nil) type Config struct { - MaxStale time.Duration `json:"max_stale"` + MaxStale time.Duration `json:"max_stale"` + MaxStreams int `json:"max_streams"` } func (c *Config) Validate() error { if c.MaxStale <= 0 { return fmt.Errorf("max_stale must be a positive duration (got %s)", c.MaxStale) } + if c.MaxStreams <= 0 { + return fmt.Errorf("max_streams must be a positive number (got %d)", c.MaxStreams) + } return nil } + +func createDefaultConfig() component.Config { + return &Config{ + MaxStale: 5 * time.Minute, + MaxStreams: 0, // disable. TODO: find good default + } +} diff --git a/processor/deltatocumulativeprocessor/factory.go b/processor/deltatocumulativeprocessor/factory.go index b2fba4e00fc2..47b968d14f17 100644 --- a/processor/deltatocumulativeprocessor/factory.go +++ b/processor/deltatocumulativeprocessor/factory.go @@ -6,7 +6,6 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentele import ( "context" "fmt" - "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -23,10 +22,6 @@ func NewFactory() processor.Factory { ) } -func createDefaultConfig() component.Config { - return &Config{MaxStale: 5 * time.Minute} -} - func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg component.Config, next consumer.Metrics) (processor.Metrics, error) { pcfg, ok := cfg.(*Config) if !ok { diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit.go b/processor/deltatocumulativeprocessor/internal/streams/limit.go new file mode 100644 index 000000000000..598ec43bfeb7 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/streams/limit.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + +import ( + "errors" + "fmt" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" +) + +func Limit[T any](m Map[T], max int) Map[T] { + return LimitMap[T]{Map: m, Max: max} +} + +type LimitMap[T any] struct { + Max int + streams.Map[T] +} + +func (m LimitMap[T]) Store(id identity.Stream, v T) error { + if m.Map.Len() >= m.Max { + return ErrLimit(m.Max) + } + return m.Map.Store(id, v) +} + +type ErrLimit int + +func (e ErrLimit) Error() string { + return fmt.Sprintf("stream limit of %d reached", e) +} + +func AtLimit(err error) bool { + var errLimit ErrLimit + return errors.As(err, &errLimit) +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go new file mode 100644 index 000000000000..d0c5af6e5666 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" +) + +func TestLimit(t *testing.T) { + sum := random.Sum() + + items := make(exp.HashMap[data.Number]) + lim := streams.Limit(items, 10) + + ids := make([]identity.Stream, 10) + + // write until limit must work + for i := 0; i < 10; i++ { + id, dp := sum.Stream() + ids[i] = id + err := lim.Store(id, dp) + require.NoError(t, err) + } + + // one over limit must be rejected + { + id, dp := sum.Stream() + err := lim.Store(id, dp) + want := streams.ErrLimit(10) + require.ErrorAs(t, err, &want) + require.True(t, streams.AtLimit(err)) + } + + // after removing one, must be accepted again + { + lim.Delete(ids[0]) + + id, dp := sum.Stream() + err := lim.Store(id, dp) + require.NoError(t, err) + } +} diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index 8f5941ce84b1..d1eb73d92ba6 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -53,6 +53,9 @@ func newProcessor(cfg *Config, log *zap.Logger, next consumer.Metrics) *Processo proc.exp = &exp dps = &exp } + if cfg.MaxStreams > 0 { + dps = streams.Limit(dps, cfg.MaxStreams) + } proc.aggr = streams.IntoAggregator(dps) return &proc