From d47a60c8c7deb44229e1edaa5a13642f9c76bb17 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 27 Nov 2024 18:01:28 +0000 Subject: [PATCH] [processor/lsminterval] Define cardinality limits and handle overflows --- .../{ => config}/config.go | 22 +- processor/lsmintervalprocessor/factory.go | 7 +- processor/lsmintervalprocessor/go.mod | 2 + processor/lsmintervalprocessor/go.sum | 4 + .../internal/merger/limits/store.go | 542 ++++++++++++++++++ .../internal/merger/limits/tracker.go | 166 ++++++ .../internal/merger/merger.go | 13 +- .../internal/merger/model.go | 266 +++------ processor/lsmintervalprocessor/processor.go | 28 +- .../lsmintervalprocessor/processor_test.go | 166 ++++-- .../input.yaml | 74 +++ .../next.yaml | 1 + .../output.yaml | 67 +++ .../exphistogram_delta_overflow/input.yaml | 74 +++ .../exphistogram_delta_overflow/next.yaml | 1 + .../exphistogram_delta_overflow/output.yaml | 68 +++ .../histogram_cumulative_overflow/input.yaml | 56 ++ .../histogram_cumulative_overflow/next.yaml | 1 + .../histogram_cumulative_overflow/output.yaml | 63 ++ .../histogram_delta_overflow/input.yaml | 56 ++ .../histogram_delta_overflow/next.yaml | 1 + .../histogram_delta_overflow/output.yaml | 63 ++ .../sum_cumulative_overflow/input.yaml | 50 ++ .../sum_cumulative_overflow/next.yaml | 1 + .../sum_cumulative_overflow/output.yaml | 50 ++ .../testdata/sum_delta_overflow/input.yaml | 51 ++ .../testdata/sum_delta_overflow/next.yaml | 2 + .../testdata/sum_delta_overflow/output.yaml | 50 ++ 28 files changed, 1665 insertions(+), 280 deletions(-) rename processor/lsmintervalprocessor/{ => config}/config.go (78%) create mode 100644 processor/lsmintervalprocessor/internal/merger/limits/store.go create mode 100644 processor/lsmintervalprocessor/internal/merger/limits/tracker.go create mode 100644 processor/lsmintervalprocessor/testdata/exphistogram_cumulative_overflow/input.yaml create mode 100644 processor/lsmintervalprocessor/testdata/exphistogram_cumulative_overflow/next.yaml create mode 100644 processor/lsmintervalprocessor/testdata/exphistogram_cumulative_overflow/output.yaml create mode 100644 processor/lsmintervalprocessor/testdata/exphistogram_delta_overflow/input.yaml create mode 100644 processor/lsmintervalprocessor/testdata/exphistogram_delta_overflow/next.yaml create mode 100644 processor/lsmintervalprocessor/testdata/exphistogram_delta_overflow/output.yaml create mode 100644 processor/lsmintervalprocessor/testdata/histogram_cumulative_overflow/input.yaml create mode 100644 processor/lsmintervalprocessor/testdata/histogram_cumulative_overflow/next.yaml create mode 100644 processor/lsmintervalprocessor/testdata/histogram_cumulative_overflow/output.yaml create mode 100644 processor/lsmintervalprocessor/testdata/histogram_delta_overflow/input.yaml create mode 100644 processor/lsmintervalprocessor/testdata/histogram_delta_overflow/next.yaml create mode 100644 processor/lsmintervalprocessor/testdata/histogram_delta_overflow/output.yaml create mode 100644 processor/lsmintervalprocessor/testdata/sum_cumulative_overflow/input.yaml create mode 100644 processor/lsmintervalprocessor/testdata/sum_cumulative_overflow/next.yaml create mode 100644 processor/lsmintervalprocessor/testdata/sum_cumulative_overflow/output.yaml create mode 100644 processor/lsmintervalprocessor/testdata/sum_delta_overflow/input.yaml create mode 100644 processor/lsmintervalprocessor/testdata/sum_delta_overflow/next.yaml create mode 100644 processor/lsmintervalprocessor/testdata/sum_delta_overflow/output.yaml diff --git a/processor/lsmintervalprocessor/config.go b/processor/lsmintervalprocessor/config/config.go similarity index 78% rename from processor/lsmintervalprocessor/config.go rename to processor/lsmintervalprocessor/config/config.go index ad04b42b..efee4ea7 100644 --- a/processor/lsmintervalprocessor/config.go +++ b/processor/lsmintervalprocessor/config/config.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package lsmintervalprocessor // import "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor" +package config // import "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config" import ( "time" @@ -39,7 +39,10 @@ type Config struct { // TODO (lahsivjar): Make specifying interval easier. We can just // optimize the timer to run on differnt times and remove any // restriction on different interval configuration. - Intervals []IntervalConfig `mapstructure:"intervals"` + Intervals []IntervalConfig `mapstructure:"intervals"` + ResourceLimits LimitConfig `mapstructure:"resource_limit"` + ScopeLimits LimitConfig `mapstructure:"scope_limit"` + DatapointLimits LimitConfig `mapstructure:"datapoint_limit"` } // PassThrough determines whether metrics should be passed through as they @@ -63,6 +66,21 @@ type IntervalConfig struct { Statements []string `mapstructure:"statements"` } +type LimitConfig struct { + Attributes map[string]struct{} `mapstructure:"attributes"` + MaxCardinality uint64 `mapstructure:"max_cardinality"` + Overflow OverflowConfig `mapstructure:"overflow"` +} + +type OverflowConfig struct { + Attributes []Attribute `mapstructure:"attributes"` +} + +type Attribute struct { + Key string `mapstructure:"key"` + Value any `mapstructure:"value"` +} + func (config *Config) Validate() error { // TODO (lahsivjar): Add validation for interval duration return nil diff --git a/processor/lsmintervalprocessor/factory.go b/processor/lsmintervalprocessor/factory.go index c3a1f4a4..adda884a 100644 --- a/processor/lsmintervalprocessor/factory.go +++ b/processor/lsmintervalprocessor/factory.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor" + "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config" "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" @@ -41,8 +42,8 @@ func NewFactory() processor.Factory { } func createDefaultConfig() component.Config { - return &Config{ - Intervals: []IntervalConfig{ + return &config.Config{ + Intervals: []config.IntervalConfig{ {Duration: 60 * time.Second}, }, } @@ -54,7 +55,7 @@ func createMetricsProcessor( cfg component.Config, nextConsumer consumer.Metrics, ) (processor.Metrics, error) { - processorConfig, ok := cfg.(*Config) + processorConfig, ok := cfg.(*config.Config) if !ok { return nil, fmt.Errorf("configuration parsing error") } diff --git a/processor/lsmintervalprocessor/go.mod b/processor/lsmintervalprocessor/go.mod index 1147ab6d..26b482aa 100644 --- a/processor/lsmintervalprocessor/go.mod +++ b/processor/lsmintervalprocessor/go.mod @@ -3,6 +3,7 @@ module github.com/elastic/opentelemetry-collector-components/processor/lsminterv go 1.22.0 require ( + github.com/axiomhq/hyperloglog v0.2.0 github.com/cockroachdb/pebble v1.1.2 github.com/google/go-cmp v0.6.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.114.0 @@ -35,6 +36,7 @@ require ( github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect github.com/elastic/go-grok v0.3.1 // indirect github.com/elastic/lunes v0.1.0 // indirect github.com/getsentry/sentry-go v0.27.0 // indirect diff --git a/processor/lsmintervalprocessor/go.sum b/processor/lsmintervalprocessor/go.sum index 56852173..f0c775db 100644 --- a/processor/lsmintervalprocessor/go.sum +++ b/processor/lsmintervalprocessor/go.sum @@ -10,6 +10,8 @@ github.com/antchfx/xmlquery v1.4.2 h1:MZKd9+wblwxfQ1zd1AdrTsqVaMjMCwow3IqkCSe00K github.com/antchfx/xmlquery v1.4.2/go.mod h1:QXhvf5ldTuGqhd1SHNvvtlhhdQLks4dD0awIVhXIDTA= github.com/antchfx/xpath v1.3.2 h1:LNjzlsSjinu3bQpw9hWMY9ocB80oLOWuQqFvO6xt51U= github.com/antchfx/xpath v1.3.2/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwqzXNcs= +github.com/axiomhq/hyperloglog v0.2.0 h1:u1XT3yyY1rjzlWuP6NQIrV4bRYHOaqZaovqjcBEvZJo= +github.com/axiomhq/hyperloglog v0.2.0/go.mod h1:GcgMjz9gaDKZ3G0UMS6Fq/VkZ4l7uGgcJyxA7M+omIM= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -32,6 +34,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc h1:8WFBn63wegobsYAX0YjD+8suexZDga5CctH4CCTx2+8= +github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/elastic/go-grok v0.3.1 h1:WEhUxe2KrwycMnlvMimJXvzRa7DoByJB4PVUIE1ZD/U= github.com/elastic/go-grok v0.3.1/go.mod h1:n38ls8ZgOboZRgKcjMY8eFeZFMmcL9n2lP0iHhIDk64= github.com/elastic/lunes v0.1.0 h1:amRtLPjwkWtzDF/RKzcEPMvSsSseLDLW+bnhfNSLRe4= diff --git a/processor/lsmintervalprocessor/internal/merger/limits/store.go b/processor/lsmintervalprocessor/internal/merger/limits/store.go new file mode 100644 index 00000000..631e9c94 --- /dev/null +++ b/processor/lsmintervalprocessor/internal/merger/limits/store.go @@ -0,0 +1,542 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package limits // import "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger/limits" + +import ( + "fmt" + + "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config" + "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/identity" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +const ( + resourceLimitsAttrKey = "_resource_limits" + scopeLimitsAttrKey = "_scope_limits" + datapointsLimitsAttrKey = "_datapoints_limits" +) + +// The source metric must be modified only by the store once the store is created. +type Store struct { + cfg *config.Config + source pmetric.Metrics + + // Keeps track of resource metrics overflow + resourceLimits *Tracker[identity.Resource] + + // Lookup tables created from source + resLookup map[identity.Resource]resourceMetrics + scopeLookup map[identity.Scope]scopeMetrics + metricLookup map[identity.Metric]metric + numberLookup map[identity.Stream]numberDataPoint + summaryLookup map[identity.Stream]summaryDataPoint + histoLookup map[identity.Stream]histogramDataPoint + expHistoLookup map[identity.Stream]exponentialHistogramDataPoint +} + +func NewStore(cfg *config.Config) *Store { + s := &Store{ + cfg: cfg, + source: pmetric.NewMetrics(), + resourceLimits: NewTracker[identity.Resource](cfg.ResourceLimits), + resLookup: make(map[identity.Resource]resourceMetrics), + scopeLookup: make(map[identity.Scope]scopeMetrics), + metricLookup: make(map[identity.Metric]metric), + numberLookup: make(map[identity.Stream]numberDataPoint), + summaryLookup: make(map[identity.Stream]summaryDataPoint), + histoLookup: make(map[identity.Stream]histogramDataPoint), + expHistoLookup: make(map[identity.Stream]exponentialHistogramDataPoint), + } + return s +} + +// init initializes the lookup maps from the source. It assumes that the source +// will NOT overflow or if it has overflowed it already has the overflow +// buckets initialized. +// TODO: Propogate error +func (s *Store) init(source pmetric.Metrics) error { + s.source = source + s.resourceLimits.SetOverflowBucketID(getOverflowResourceBucketID(s.resourceLimits)) + rms := s.source.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + rm := rms.At(i) + rmID := identity.OfResource(rm.Resource()) + scopeLimits := NewTracker[identity.Scope](s.cfg.ScopeLimits) + scopeLimits.SetOverflowBucketID(getOverflowScopeBucketID(scopeLimits, rmID)) + rm.Resource().Attributes().RemoveIf(func(k string, v pcommon.Value) bool { + switch k { + case resourceLimitsAttrKey: + s.resourceLimits.UnmarshalBinary(v.Bytes().AsRaw()) + return true + case scopeLimitsAttrKey: + scopeLimits.UnmarshalBinary(v.Bytes().AsRaw()) + return true + } + return false + }) + s.resLookup[rmID] = resourceMetrics{ + ResourceMetrics: rm, + scopeLimits: scopeLimits, + } + sms := rm.ScopeMetrics() + for j := 0; j < sms.Len(); j++ { + sm := sms.At(j) + scope := sm.Scope() + smID := identity.OfScope(rmID, scope) + datapointsLimits := NewTracker[identity.Stream](s.cfg.DatapointLimits) + scope.Attributes().RemoveIf(func(k string, v pcommon.Value) bool { + if k == datapointsLimitsAttrKey { + datapointsLimits.UnmarshalBinary(v.Bytes().AsRaw()) + return true + } + return false + }) + + // TODO: Updating trackers + // TODO: How do we deal with overflow bucket IDs in trackers? Are they deterministic based on configs? + s.scopeLookup[smID] = scopeMetrics{ + ScopeMetrics: sm, + datapointsLimits: datapointsLimits, + } + metrics := sm.Metrics() + for k := 0; k < metrics.Len(); k++ { + metric := metrics.At(k) + metricID := identity.OfMetric(smID, metric) + s.metricLookup[metricID] = metric + + switch metric.Type() { + case pmetric.MetricTypeSum: + dps := metric.Sum().DataPoints() + for l := 0; l < dps.Len(); l++ { + dp := dps.At(l) + streamID := identity.OfStream(metricID, dp) + s.numberLookup[streamID] = dp + } + case pmetric.MetricTypeSummary: + dps := metric.Summary().DataPoints() + for l := 0; l < dps.Len(); l++ { + dp := dps.At(l) + streamID := identity.OfStream(metricID, dp) + s.summaryLookup[streamID] = dp + } + case pmetric.MetricTypeHistogram: + dps := metric.Histogram().DataPoints() + for l := 0; l < dps.Len(); l++ { + dp := dps.At(l) + streamID := identity.OfStream(metricID, dp) + s.histoLookup[streamID] = dp + } + case pmetric.MetricTypeExponentialHistogram: + dps := metric.ExponentialHistogram().DataPoints() + for l := 0; l < dps.Len(); l++ { + dp := dps.At(l) + streamID := identity.OfStream(metricID, dp) + s.expHistoLookup[streamID] = dp + } + } + } + } + } + return nil +} + +func (s *Store) MarshalProto() ([]byte, error) { + rms := s.source.ResourceMetrics() + if rms.Len() > 0 { + bLimits, err := s.resourceLimits.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("failed to marshal resource limits: %w", err) + } + // Encode resource tracker at the 0th resource metrics + // TODO (lahsivjar): Is this safe? We don't ever remove + // resource metrics so it should be but best to check. + // Also, the limits checker should ensure max cardinality + // is greater than zero. + rmAttr := rms.At(0).Resource().Attributes() + b := rmAttr.PutEmptyBytes(resourceLimitsAttrKey) + b.FromRaw(bLimits) + + // Encode scope trackers in resource attributes + for _, res := range s.resLookup { + bLimits, err := res.scopeLimits.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("failed to marshal scope limits: %w", err) + } + resAttrs := res.ResourceMetrics.Resource().Attributes() + b := resAttrs.PutEmptyBytes(scopeLimitsAttrKey) + b.FromRaw(bLimits) + } + + // Encode datapoints trackers in scope attributes + for _, scope := range s.scopeLookup { + bLimits, err := scope.datapointsLimits.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("failed to marshal datapoints limits: %w", err) + } + scopeAttrs := scope.ScopeMetrics.Scope().Attributes() + b := scopeAttrs.PutEmptyBytes(datapointsLimitsAttrKey) + b.FromRaw(bLimits) + } + } + var marshaler pmetric.ProtoMarshaler + return marshaler.MarshalMetrics(s.source) +} + +func (s *Store) UnmarshalProto(data []byte) error { + var unmarshaler pmetric.ProtoUnmarshaler + m, err := unmarshaler.UnmarshalMetrics(data) + if err != nil { + return fmt.Errorf("failed to unmarshal data: %w", err) + } + return s.init(m) +} + +// Get returns the current pmetric.Metrics. The returned metric will +// include datapoint overflow metrics if `Finalize` is called before +// and will not include these metrics if `Finalize` is not called. +func (s *Store) Get() pmetric.Metrics { + return s.source +} + +// Finalize finalizes all overflows in the metrics to prepare it for +// harvest. This method should be called for harvest and only once. +func (s *Store) Finalize() (pmetric.Metrics, error) { + // At this point we need to assume that the metrics are returned + // as a final step in the store, thus, prepare the final metric. + // In the final metric we have to add datapoint limits. + for _, sm := range s.scopeLookup { + if sm.datapointsLimits.overflowCounts == nil { + continue + } + // Add overflow metric to the scope + overflowMetric := sm.ScopeMetrics.Metrics().AppendEmpty() + overflowMetric.SetName("_other") + overflowMetric.SetDescription("Overflow count due to datapoints limit") + overflowSum := overflowMetric.SetEmptySum() + overflowSum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + overflowDP := overflowSum.DataPoints().AppendEmpty() + if err := sm.datapointsLimits.Decorate(overflowDP.Attributes()); err != nil { + return pmetric.Metrics{}, fmt.Errorf("failed to finalize merged metric: %w", err) + } + overflowDP.SetIntValue(int64(sm.datapointsLimits.overflowCounts.Estimate())) + } + // Remove any hanging resource or scope which failed to have any entries + // due to children reaching their limits. + // TODO (lahsivjar): We can probably optimize to not require this loop by + // adding to source metric only at finalize. + s.source.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { + rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool { + sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { + switch m.Type() { + case pmetric.MetricTypeGauge: + return m.Gauge().DataPoints().Len() == 0 + case pmetric.MetricTypeSum: + return m.Sum().DataPoints().Len() == 0 + case pmetric.MetricTypeHistogram: + return m.Histogram().DataPoints().Len() == 0 + case pmetric.MetricTypeExponentialHistogram: + return m.ExponentialHistogram().DataPoints().Len() == 0 + case pmetric.MetricTypeSummary: + return m.Summary().DataPoints().Len() == 0 + } + return false + }) + return sm.Metrics().Len() == 0 + }) + return rm.ScopeMetrics().Len() == 0 + }) + return s.source, nil +} + +// AddResourceMetrics adds a new resource metrics to the store while also +// applying resource limiters. If a limit is configured and breached by +// adding the provided resource metric, then, a new overflow resource +// metric is created and returned. +func (s *Store) AddResourceMetrics( + otherRm pmetric.ResourceMetrics, +) identity.Resource { + resID := identity.OfResource(otherRm.Resource()) + if _, ok := s.resLookup[resID]; ok { + return resID + } + if s.resourceLimits.CheckOverflow( + resID.Hash().Sum64(), + otherRm.Resource().Attributes(), + ) { + // Overflow, get/prepare an overflow bucket + overflowResID := s.resourceLimits.GetOverflowBucketID() + if overflowResID == (identity.Resource{}) { + overflowRm := s.source.ResourceMetrics().AppendEmpty() + s.resourceLimits.Decorate(overflowRm.Resource().Attributes()) + overflowResID = identity.OfResource(overflowRm.Resource()) + s.resourceLimits.SetOverflowBucketID(overflowResID) + s.resLookup[overflowResID] = resourceMetrics{ + ResourceMetrics: overflowRm, + scopeLimits: NewTracker[identity.Scope](s.cfg.ScopeLimits), + } + } + return overflowResID + } + + // Clone it *without* the ScopeMetricsSlice data + rm := s.source.ResourceMetrics().AppendEmpty() + rm.SetSchemaUrl(otherRm.SchemaUrl()) + otherRm.Resource().CopyTo(rm.Resource()) + s.resLookup[resID] = resourceMetrics{ + ResourceMetrics: rm, + scopeLimits: NewTracker[identity.Scope](s.cfg.ScopeLimits), + } + return resID +} + +func (s *Store) AddScopeMetrics( + resID identity.Resource, + otherSm pmetric.ScopeMetrics, +) identity.Scope { + scopeID := identity.OfScope(resID, otherSm.Scope()) + if _, ok := s.scopeLookup[scopeID]; ok { + return scopeID + } + res := s.resLookup[resID] + if res.scopeLimits.CheckOverflow( + scopeID.Hash().Sum64(), + otherSm.Scope().Attributes(), + ) { + // Overflow, get/prepare an overflow bucket + overflowScopeID := res.scopeLimits.GetOverflowBucketID() + if overflowScopeID == (identity.Scope{}) { + overflowScope := res.ScopeMetrics().AppendEmpty() + res.scopeLimits.Decorate(overflowScope.Scope().Attributes()) + overflowScopeID = identity.OfScope(resID, overflowScope.Scope()) + res.scopeLimits.SetOverflowBucketID(overflowScopeID) + s.scopeLookup[overflowScopeID] = scopeMetrics{ + ScopeMetrics: overflowScope, + datapointsLimits: NewTracker[identity.Stream](s.cfg.DatapointLimits), + } + } + return overflowScopeID + } + + // Clone it *without* the MetricSlice data + sm := res.ScopeMetrics().AppendEmpty() + otherSm.Scope().CopyTo(sm.Scope()) + sm.SetSchemaUrl(otherSm.SchemaUrl()) + s.scopeLookup[scopeID] = scopeMetrics{ + ScopeMetrics: sm, + datapointsLimits: NewTracker[identity.Stream](s.cfg.DatapointLimits), + } + return scopeID +} + +func (s *Store) AddMetric( + scopeID identity.Scope, + otherMetric pmetric.Metric, +) identity.Metric { + metricID := identity.OfMetric(scopeID, otherMetric) + if _, ok := s.metricLookup[metricID]; ok { + return metricID + } + scope := s.scopeLookup[scopeID] + + // Metrics doesn't have overflows (only datapoints have) + // Clone it *without* the datapoint data + m := scope.Metrics().AppendEmpty() + m.SetName(otherMetric.Name()) + m.SetDescription(otherMetric.Description()) + m.SetUnit(otherMetric.Unit()) + switch otherMetric.Type() { + case pmetric.MetricTypeGauge: + m.SetEmptyGauge() + case pmetric.MetricTypeSummary: + m.SetEmptySummary() + case pmetric.MetricTypeSum: + otherSum := otherMetric.Sum() + + sum := m.SetEmptySum() + sum.SetAggregationTemporality(otherSum.AggregationTemporality()) + sum.SetIsMonotonic(otherSum.IsMonotonic()) + case pmetric.MetricTypeHistogram: + otherHist := otherMetric.Histogram() + + hist := m.SetEmptyHistogram() + hist.SetAggregationTemporality(otherHist.AggregationTemporality()) + case pmetric.MetricTypeExponentialHistogram: + otherExp := otherMetric.ExponentialHistogram() + + exp := m.SetEmptyExponentialHistogram() + exp.SetAggregationTemporality(otherExp.AggregationTemporality()) + } + s.metricLookup[metricID] = m + return metricID +} + +// AddSumDataPoint returns a data point entry in the store for the given metric +// and the external data point if it is present. If the data point is not +// present then either a new data point is added or if the data point overflows +// due to configured limit then an empty data point is returned. The returned +// bool value is `true` if a new data point is created and `false` otherwise. +func (s *Store) AddSumDataPoint( + metricID identity.Metric, + otherDP pmetric.NumberDataPoint, +) (pmetric.NumberDataPoint, bool) { + streamID := identity.OfStream(metricID, otherDP) + if dp, ok := s.numberLookup[streamID]; ok { + return dp, false + } + sm := s.scopeLookup[metricID.Scope()] + metric := s.metricLookup[metricID] + if sm.datapointsLimits.CheckOverflow( + metricID.Hash().Sum64(), + otherDP.Attributes(), + ) { + // Datapoints overflow detected. In this case no action has to be + // done at this point since data point overflow should create a new + // overflow metric of sum type recording the number of unique + // datapoints. This number will be recorded in the limit tracker + // and the metric will be populated on demand. + return pmetric.NumberDataPoint{}, false + } + dp := metric.Sum().DataPoints().AppendEmpty() + s.numberLookup[streamID] = dp + return dp, true +} + +// AddSummaryDataPoint returns a data point entry in the store for the given +// metric and the external data point if it is present. If the data point is +// not present then either a new data point is added or if the data point +// overflows due to configured limit then an empty data point is returned. +// The returned bool value is `true` if a new data point is created and +// `false` otherwise. +func (s *Store) AddSummaryDataPoint( + metricID identity.Metric, + otherDP pmetric.SummaryDataPoint, +) (pmetric.SummaryDataPoint, bool) { + streamID := identity.OfStream(metricID, otherDP) + if dp, ok := s.summaryLookup[streamID]; ok { + return dp, false + } + sm := s.scopeLookup[metricID.Scope()] + metric := s.metricLookup[metricID] + if sm.datapointsLimits.CheckOverflow( + metricID.Hash().Sum64(), + otherDP.Attributes(), + ) { + // Datapoints overflow detected. In this case no action has to be + // done at this point since data point overflow should create a new + // overflow metric of sum type recording the number of unique + // datapoints. This number will be recorded in the limit tracker + // and the metric will be populated on demand. + return pmetric.SummaryDataPoint{}, false + } + dp := metric.Summary().DataPoints().AppendEmpty() + s.summaryLookup[streamID] = dp + return dp, true +} + +// AddHistogramDataPoint returns a data point entry in the store for the given +// metric and the external data point if it is present. If the data point is +// not present then either a new data point is added or if the data point +// overflows due to configured limit then an empty data point is returned. +// The returned bool value is `true` if a new data point is created and +// `false` otherwise. +func (s *Store) AddHistogramDataPoint( + metricID identity.Metric, + otherDP pmetric.HistogramDataPoint, +) (pmetric.HistogramDataPoint, bool) { + streamID := identity.OfStream(metricID, otherDP) + if dp, ok := s.histoLookup[streamID]; ok { + return dp, false + } + sm := s.scopeLookup[metricID.Scope()] + metric := s.metricLookup[metricID] + if sm.datapointsLimits.CheckOverflow( + metricID.Hash().Sum64(), + otherDP.Attributes(), + ) { + // Datapoints overflow detected. In this case no action has to be + // done at this point since data point overflow should create a new + // overflow metric of sum type recording the number of unique + // datapoints. This number will be recorded in the limit tracker + // and the metric will be populated on demand. + return pmetric.HistogramDataPoint{}, false + } + dp := metric.Histogram().DataPoints().AppendEmpty() + s.histoLookup[streamID] = dp + return dp, true +} + +// AddExponentialHistogramDataPoint returns a data point entry in the store +// for the given metric and the external data point if it is present. If the +// data point is not present then either a new data point is added or if the +// data point overflows due to configured limit then an empty data point is +// returned. The returned bool value is `true` if a new data point is created +// and `false` otherwise. +func (s *Store) AddExponentialHistogramDataPoint( + metricID identity.Metric, + otherDP pmetric.ExponentialHistogramDataPoint, +) (pmetric.ExponentialHistogramDataPoint, bool) { + streamID := identity.OfStream(metricID, otherDP) + if dp, ok := s.expHistoLookup[streamID]; ok { + return dp, false + } + sm := s.scopeLookup[metricID.Scope()] + metric := s.metricLookup[metricID] + if sm.datapointsLimits.CheckOverflow( + metricID.Hash().Sum64(), + otherDP.Attributes(), + ) { + // Datapoints overflow detected. In this case no action has to be + // done at this point since data point overflow should create a new + // overflow metric of sum type recording the number of unique + // datapoints. This number will be recorded in the limit tracker + // and the metric will be populated on demand. + return pmetric.ExponentialHistogramDataPoint{}, false + } + dp := metric.ExponentialHistogram().DataPoints().AppendEmpty() + s.expHistoLookup[streamID] = dp + return dp, true +} + +func getOverflowResourceBucketID(s *Tracker[identity.Resource]) identity.Resource { + r := pcommon.NewResource() + s.Decorate(r.Attributes()) + return identity.OfResource(r) +} + +func getOverflowScopeBucketID(s *Tracker[identity.Scope], resID identity.Resource) identity.Scope { + is := pcommon.NewInstrumentationScope() + s.Decorate(is.Attributes()) + return identity.OfScope(resID, is) +} + +type resourceMetrics struct { + pmetric.ResourceMetrics + + // Keeps track of scope overflows within each resource metric + scopeLimits *Tracker[identity.Scope] +} +type scopeMetrics struct { + pmetric.ScopeMetrics + datapointsLimits *Tracker[identity.Stream] +} +type metric = pmetric.Metric +type numberDataPoint = pmetric.NumberDataPoint +type summaryDataPoint = pmetric.SummaryDataPoint +type histogramDataPoint = pmetric.HistogramDataPoint +type exponentialHistogramDataPoint = pmetric.ExponentialHistogramDataPoint diff --git a/processor/lsmintervalprocessor/internal/merger/limits/tracker.go b/processor/lsmintervalprocessor/internal/merger/limits/tracker.go new file mode 100644 index 00000000..eed43b92 --- /dev/null +++ b/processor/lsmintervalprocessor/internal/merger/limits/tracker.go @@ -0,0 +1,166 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package limits // import "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger/limits" + +import ( + "encoding/binary" + "errors" + "fmt" + + "github.com/axiomhq/hyperloglog" + "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +// Tracker tracks the configured limits while merging. It records the +// observed count as well as the unique overflow counts. +type Tracker[K any] struct { + cfg config.LimitConfig + + // TODO: Can we make this deterministic? + overflowBucketID K + // Note that overflow buckets will NOT be counted in observed count + // though, overflow buckets can have overflow of their own. + observedCount uint64 + // TODO (lahsivjar): This needs to be encoded as an attribute maybe? + overflowCounts *hyperloglog.Sketch +} + +func NewTracker[K any](cfg config.LimitConfig) *Tracker[K] { + return &Tracker[K]{cfg: cfg} +} + +// CheckOverflow matches the passed attributes to the configured +// attributes for the limit. Returns a boolean indicating overflow. +// It assumes that any entry passed to this method is a NEW entry +// and the check for this is left to the caller. +func (t *Tracker[K]) CheckOverflow( + hash uint64, + attrs pcommon.Map, +) bool { + if !t.match(attrs) || t.cfg.MaxCardinality == 0 { + return false + } + if t.observedCount == t.cfg.MaxCardinality { + t.recordOverflow(hash) + return true + } + t.observedCount++ + return false +} + +func (t *Tracker[K]) Decorate(m pcommon.Map) error { + if len(t.cfg.Overflow.Attributes) == 0 { + return nil + } + + var errs []error + m.EnsureCapacity(len(t.cfg.Overflow.Attributes)) + for _, attr := range t.cfg.Overflow.Attributes { + v := m.PutEmpty(attr.Key) + if err := v.FromRaw(attr.Value); err != nil { + errs = append(errs, err) + } + } + if len(errs) > 0 { + return fmt.Errorf("failed to decorate overflow bucket: %w", errors.Join(errs...)) + } + return nil +} + +func (t *Tracker[K]) SetOverflowBucketID(bucketID K) { + t.overflowBucketID = bucketID +} + +func (t *Tracker[K]) GetOverflowBucketID() K { + return t.overflowBucketID +} + +// MarshalBinary encodes the tracker to a byte slice. Note that only the +// observed count and the overflow estimator are encoded. +func (t *Tracker[K]) MarshalBinary() ([]byte, error) { + var ( + hll []byte + err error + ) + if t.overflowCounts != nil { + hll, err = t.overflowCounts.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("failed to marshal overflow estimator: %w", err) + } + } + + var offset int + // 8 bytes for observed count + HLL binary encoded size + data := make([]byte, 8+len(hll)) + binary.BigEndian.PutUint64(data[offset:], t.observedCount) + offset += 8 + + if len(hll) > 0 { + copy(data[offset:], hll) + } + return data, nil +} + +// UnmarshalBinary unmarshals binary encoded tracker to the go struct. Note +// that limit config and the overflow bucket ID are not encoded in the binary +// format as they could be recreated. Example usage: +// +// t := NewTracker[identity.Resource](cfg) +// if err := t.UnmarshalBinary(data); err != nil { +// panic(err) +// } +// t.SetOverflowBucketID(identity.OfResource(decodedResource)) +func (t *Tracker[K]) UnmarshalBinary(data []byte) error { + if len(data) < 8 { + return fmt.Errorf("failed to unmarshal observed count, invalid data of length %d received", len(data)) + } + + t.observedCount = binary.BigEndian.Uint64(data[:8]) + if len(data) == 8 { + return nil + } + t.overflowCounts = hyperloglog.New14() + if err := t.overflowCounts.UnmarshalBinary(data[8:]); err != nil { + return fmt.Errorf("failed to unmarshal overflow estimator hll sketch: %w", err) + } + return nil +} + +func (t *Tracker[K]) match(attrs pcommon.Map) bool { + if len(t.cfg.Attributes) == 0 { + // If no attributes are defined then it is a match by default + return true + } + var match int + attrs.Range(func(k string, v pcommon.Value) bool { + if _, ok := t.cfg.Attributes[k]; ok { + match++ + } + return match == len(t.cfg.Attributes) + }) + return match == len(t.cfg.Attributes) +} + +func (t *Tracker[K]) recordOverflow(hash uint64) { + if t.overflowCounts == nil { + // Creates an overflow with 14 precision + t.overflowCounts = hyperloglog.New14() + } + t.overflowCounts.InsertHash(hash) +} diff --git a/processor/lsmintervalprocessor/internal/merger/merger.go b/processor/lsmintervalprocessor/internal/merger/merger.go index 3f4faa2e..3070d79a 100644 --- a/processor/lsmintervalprocessor/internal/merger/merger.go +++ b/processor/lsmintervalprocessor/internal/merger/merger.go @@ -21,20 +21,25 @@ import ( "io" "github.com/cockroachdb/pebble" + "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config" ) var _ pebble.ValueMerger = (*Merger)(nil) type Merger struct { current Value + cfg *config.Config } -func New(v Value) *Merger { - return &Merger{current: v} +func New(v Value, cfg *config.Config) *Merger { + return &Merger{ + current: v, + cfg: cfg, + } } func (m *Merger) MergeNewer(value []byte) error { - var op Value + op := NewValue(m.cfg) if err := op.UnmarshalProto(value); err != nil { return err } @@ -42,7 +47,7 @@ func (m *Merger) MergeNewer(value []byte) error { } func (m *Merger) MergeOlder(value []byte) error { - var op Value + op := NewValue(m.cfg) if err := op.UnmarshalProto(value); err != nil { return err } diff --git a/processor/lsmintervalprocessor/internal/merger/model.go b/processor/lsmintervalprocessor/internal/merger/model.go index b63e5966..7fa25e8b 100644 --- a/processor/lsmintervalprocessor/internal/merger/model.go +++ b/processor/lsmintervalprocessor/internal/merger/model.go @@ -24,8 +24,10 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" + "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config" "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/data" "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/identity" + "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger/limits" ) // TODO (lahsivjar): Think about multitenancy, should be part of the key @@ -82,51 +84,43 @@ func (k *Key) Unmarshal(d []byte) error { // Not safe for concurrent use. type Value struct { - Metrics pmetric.Metrics - - dynamicMapBuilt bool - resLookup map[identity.Resource]pmetric.ResourceMetrics - scopeLookup map[identity.Scope]pmetric.ScopeMetrics - metricLookup map[identity.Metric]pmetric.Metric - numberLookup map[identity.Stream]pmetric.NumberDataPoint - summaryLookup map[identity.Stream]pmetric.SummaryDataPoint - histoLookup map[identity.Stream]pmetric.HistogramDataPoint - expHistoLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint + store *limits.Store } -func (v *Value) SizeBinary() int { - // TODO (lahsivjar): Possible optimization, can take marshaler - // as input and reuse with MarshalProto if this causes allocations. - var marshaler pmetric.ProtoMarshaler - return marshaler.MetricsSize(v.Metrics) +func NewValue(cfg *config.Config) Value { + return Value{ + store: limits.NewStore(cfg), + } +} + +func (v *Value) Get() pmetric.Metrics { + return v.store.Get() +} + +func (v *Value) Finalize() (pmetric.Metrics, error) { + return v.store.Finalize() } func (v *Value) MarshalProto() ([]byte, error) { - var marshaler pmetric.ProtoMarshaler - return marshaler.MarshalMetrics(v.Metrics) + return v.store.MarshalProto() } -func (v *Value) UnmarshalProto(data []byte) (err error) { - var unmarshaler pmetric.ProtoUnmarshaler - v.Metrics, err = unmarshaler.UnmarshalMetrics(data) - return +func (v *Value) UnmarshalProto(data []byte) error { + return v.store.UnmarshalProto(data) } func (v *Value) Merge(op Value) error { - // Dynamic maps allow quick lookups to aid merging. - // We build the map only once and maintain it while - // merging by updating as required. - v.buildDynamicMaps() - - rms := op.Metrics.ResourceMetrics() + rms := op.Get().ResourceMetrics() for i := 0; i < rms.Len(); i++ { rm := rms.At(i) + resID := v.store.AddResourceMetrics(rm) sms := rm.ScopeMetrics() for j := 0; j < sms.Len(); j++ { sm := sms.At(j) + scopeID := v.store.AddScopeMetrics(resID, sm) metrics := sm.Metrics() for k := 0; k < metrics.Len(); k++ { - v.MergeMetric(rm, sm, metrics.At(k)) + v.mergeMetric(resID, scopeID, metrics.At(k)) } } } @@ -138,211 +132,82 @@ func (v *Value) MergeMetric( sm pmetric.ScopeMetrics, m pmetric.Metric, ) { - // Dynamic maps allow quick lookups to aid merging. - // We build the map only once and maintain it while - // merging by updating as required. - v.buildDynamicMaps() + resID := v.store.AddResourceMetrics(rm) + scopeID := v.store.AddScopeMetrics(resID, sm) + v.mergeMetric(resID, scopeID, m) +} + +func (v *Value) mergeMetric( + resID identity.Resource, + scopeID identity.Scope, + m pmetric.Metric, +) { + metricID := v.store.AddMetric(scopeID, m) switch m.Type() { case pmetric.MetricTypeSum: - mClone, metricID := v.getOrCloneMetric(rm, sm, m) merge( m.Sum().DataPoints(), - mClone.Sum().DataPoints(), metricID, - v.numberLookup, + v.store.AddSumDataPoint, m.Sum().AggregationTemporality(), ) case pmetric.MetricTypeSummary: - mClone, metricID := v.getOrCloneMetric(rm, sm, m) merge( m.Summary().DataPoints(), - mClone.Summary().DataPoints(), metricID, - v.summaryLookup, + v.store.AddSummaryDataPoint, // Assume summary to be cumulative temporality pmetric.AggregationTemporalityCumulative, ) case pmetric.MetricTypeHistogram: - mClone, metricID := v.getOrCloneMetric(rm, sm, m) merge( m.Histogram().DataPoints(), - mClone.Histogram().DataPoints(), metricID, - v.histoLookup, + v.store.AddHistogramDataPoint, m.Histogram().AggregationTemporality(), ) case pmetric.MetricTypeExponentialHistogram: - mClone, metricID := v.getOrCloneMetric(rm, sm, m) merge( m.ExponentialHistogram().DataPoints(), - mClone.ExponentialHistogram().DataPoints(), metricID, - v.expHistoLookup, + v.store.AddExponentialHistogramDataPoint, m.ExponentialHistogram().AggregationTemporality(), ) } } -func (v *Value) buildDynamicMaps() { - if v.dynamicMapBuilt { - return - } - v.dynamicMapBuilt = true - - v.resLookup = make(map[identity.Resource]pmetric.ResourceMetrics) - v.scopeLookup = make(map[identity.Scope]pmetric.ScopeMetrics) - v.metricLookup = make(map[identity.Metric]pmetric.Metric) - v.numberLookup = make(map[identity.Stream]pmetric.NumberDataPoint) - v.summaryLookup = make(map[identity.Stream]pmetric.SummaryDataPoint) - v.histoLookup = make(map[identity.Stream]pmetric.HistogramDataPoint) - v.expHistoLookup = make(map[identity.Stream]pmetric.ExponentialHistogramDataPoint) - - rms := v.Metrics.ResourceMetrics() - for i := 0; i < rms.Len(); i++ { - rm := rms.At(i) - res := identity.OfResource(rm.Resource()) - v.resLookup[res] = rm - - sms := rm.ScopeMetrics() - for j := 0; j < sms.Len(); j++ { - sm := sms.At(j) - iscope := identity.OfScope(res, sm.Scope()) - v.scopeLookup[iscope] = sm - - metrics := sm.Metrics() - for k := 0; k < metrics.Len(); k++ { - metric := metrics.At(k) - imetric := identity.OfMetric(iscope, metric) - - switch metric.Type() { - case pmetric.MetricTypeSum: - dps := metric.Sum().DataPoints() - for l := 0; l < dps.Len(); l++ { - dp := dps.At(l) - v.numberLookup[identity.OfStream(imetric, dp)] = dp - } - case pmetric.MetricTypeSummary: - dps := metric.Summary().DataPoints() - for l := 0; l < dps.Len(); l++ { - dp := dps.At(l) - v.summaryLookup[identity.OfStream(imetric, dp)] = dp - } - case pmetric.MetricTypeHistogram: - dps := metric.Histogram().DataPoints() - for l := 0; l < dps.Len(); l++ { - dp := dps.At(l) - v.histoLookup[identity.OfStream(imetric, dp)] = dp - } - case pmetric.MetricTypeExponentialHistogram: - dps := metric.ExponentialHistogram().DataPoints() - for l := 0; l < dps.Len(); l++ { - dp := dps.At(l) - v.expHistoLookup[identity.OfStream(imetric, dp)] = dp - } - } - } - } - } -} - -func (v *Value) getOrCloneMetric( - rm pmetric.ResourceMetrics, - sm pmetric.ScopeMetrics, - m pmetric.Metric, -) (pmetric.Metric, identity.Metric) { - // Find the ResourceMetrics - resID := identity.OfResource(rm.Resource()) - rmClone, ok := v.resLookup[resID] - if !ok { - // We need to clone it *without* the ScopeMetricsSlice data - rmClone = v.Metrics.ResourceMetrics().AppendEmpty() - rm.Resource().CopyTo(rmClone.Resource()) - rmClone.SetSchemaUrl(rm.SchemaUrl()) - v.resLookup[resID] = rmClone - } - - // Find the ScopeMetrics - scopeID := identity.OfScope(resID, sm.Scope()) - smClone, ok := v.scopeLookup[scopeID] - if !ok { - // We need to clone it *without* the MetricSlice data - smClone = rmClone.ScopeMetrics().AppendEmpty() - sm.Scope().CopyTo(smClone.Scope()) - smClone.SetSchemaUrl(sm.SchemaUrl()) - v.scopeLookup[scopeID] = smClone - } - - // Find the Metric - metricID := identity.OfMetric(scopeID, m) - mClone, ok := v.metricLookup[metricID] - if !ok { - // We need to clone it *without* the datapoint data - mClone = smClone.Metrics().AppendEmpty() - mClone.SetName(m.Name()) - mClone.SetDescription(m.Description()) - mClone.SetUnit(m.Unit()) - - switch m.Type() { - case pmetric.MetricTypeGauge: - mClone.SetEmptyGauge() - case pmetric.MetricTypeSummary: - mClone.SetEmptySummary() - case pmetric.MetricTypeSum: - src := m.Sum() - - dest := mClone.SetEmptySum() - dest.SetAggregationTemporality(src.AggregationTemporality()) - dest.SetIsMonotonic(src.IsMonotonic()) - case pmetric.MetricTypeHistogram: - src := m.Histogram() - - dest := mClone.SetEmptyHistogram() - dest.SetAggregationTemporality(src.AggregationTemporality()) - case pmetric.MetricTypeExponentialHistogram: - src := m.ExponentialHistogram() - - dest := mClone.SetEmptyExponentialHistogram() - dest.SetAggregationTemporality(src.AggregationTemporality()) - } - - v.metricLookup[metricID] = mClone - } - - return mClone, metricID -} - func merge[DPS DataPointSlice[DP], DP DataPoint[DP]]( - from, to DPS, - mID identity.Metric, - lookup map[identity.Stream]DP, + from DPS, + toMetricID identity.Metric, + addDP func(identity.Metric, DP) (DP, bool), temporality pmetric.AggregationTemporality, ) { switch temporality { case pmetric.AggregationTemporalityCumulative: - mergeCumulative(from, to, mID, lookup) + mergeCumulative(from, toMetricID, addDP) case pmetric.AggregationTemporalityDelta: - mergeDelta(from, to, mID, lookup) + mergeDelta(from, toMetricID, addDP) } } func mergeCumulative[DPS DataPointSlice[DP], DP DataPoint[DP]]( - from, to DPS, - mID identity.Metric, - lookup map[identity.Stream]DP, + from DPS, + toMetricID identity.Metric, + addDP func(identity.Metric, DP) (DP, bool), ) { + var zero DP for i := 0; i < from.Len(); i++ { fromDP := from.At(i) - - streamID := identity.OfStream(mID, fromDP) - toDP, ok := lookup[streamID] - if !ok { - toDP = to.AppendEmpty() - fromDP.CopyTo(toDP) - lookup[streamID] = toDP + toDP, ok := addDP(toMetricID, fromDP) + if toDP == zero { + // Overflow, discard the datapoint continue } - + if ok { + // New data point is created so we can copy the old data directly + fromDP.CopyTo(toDP) + } if fromDP.Timestamp() > toDP.Timestamp() { fromDP.CopyTo(toDP) } @@ -350,19 +215,21 @@ func mergeCumulative[DPS DataPointSlice[DP], DP DataPoint[DP]]( } func mergeDelta[DPS DataPointSlice[DP], DP DataPoint[DP]]( - from, to DPS, - mID identity.Metric, - lookup map[identity.Stream]DP, + from DPS, + toMetricID identity.Metric, + addDP func(identity.Metric, DP) (DP, bool), ) { + var zero DP for i := 0; i < from.Len(); i++ { fromDP := from.At(i) - - streamID := identity.OfStream(mID, fromDP) - toDP, ok := lookup[streamID] - if !ok { - toDP = to.AppendEmpty() + toDP, ok := addDP(toMetricID, fromDP) + if toDP == zero { + // Overflow, discard the datapoint + continue + } + if ok { + // New data point is created so we can copy the old data directly fromDP.CopyTo(toDP) - lookup[streamID] = toDP continue } @@ -374,11 +241,6 @@ func mergeDelta[DPS DataPointSlice[DP], DP DataPoint[DP]]( case pmetric.ExponentialHistogramDataPoint: mergeDeltaExponentialHistogramDP(fromDP, any(toDP).(pmetric.ExponentialHistogramDataPoint)) } - - // Keep the highest timestamp for the aggregated metric - if fromDP.Timestamp() > toDP.Timestamp() { - toDP.SetTimestamp(fromDP.Timestamp()) - } } } diff --git a/processor/lsmintervalprocessor/processor.go b/processor/lsmintervalprocessor/processor.go index a6e6ff70..20552996 100644 --- a/processor/lsmintervalprocessor/processor.go +++ b/processor/lsmintervalprocessor/processor.go @@ -33,6 +33,7 @@ import ( "go.opentelemetry.io/collector/processor" "go.uber.org/zap" + "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config" "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger" ) @@ -43,7 +44,7 @@ var zeroTime = time.Unix(0, 0).UTC() const batchCommitThreshold = 16 << 20 // 16MB type Processor struct { - passthrough PassThrough + cfg *config.Config db *pebble.DB dataDir string @@ -63,16 +64,16 @@ type Processor struct { logger *zap.Logger } -func newProcessor(cfg *Config, ivlDefs []intervalDef, log *zap.Logger, next consumer.Metrics) (*Processor, error) { +func newProcessor(cfg *config.Config, ivlDefs []intervalDef, log *zap.Logger, next consumer.Metrics) (*Processor, error) { dbOpts := &pebble.Options{ Merger: &pebble.Merger{ Name: "pmetrics_merger", Merge: func(key, value []byte) (pebble.ValueMerger, error) { - var v merger.Value + v := merger.NewValue(cfg) if err := v.UnmarshalProto(value); err != nil { return nil, fmt.Errorf("failed to unmarshal value from db: %w", err) } - return merger.New(v), nil + return merger.New(v, cfg), nil }, }, } @@ -88,7 +89,7 @@ func newProcessor(cfg *Config, ivlDefs []intervalDef, log *zap.Logger, next cons ctx, cancel := context.WithCancel(context.Background()) return &Processor{ - passthrough: cfg.PassThrough, + cfg: cfg, dataDir: dataDir, dbOpts: dbOpts, wOpts: writeOpts, @@ -209,7 +210,7 @@ func (p *Processor) Capabilities() consumer.Capabilities { func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { var errs []error - v := merger.Value{Metrics: pmetric.NewMetrics()} + v := merger.NewValue(p.cfg) md.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool { sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { @@ -218,7 +219,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro // TODO (lahsivjar): implement support for gauges return false case pmetric.MetricTypeSummary: - if p.passthrough.Summary { + if p.cfg.PassThrough.Summary { return false } v.MergeMetric(rm, sm, m) @@ -366,12 +367,16 @@ func (p *Processor) exportForInterval( var errs []error var exportedDPCount int for iter.First(); iter.Valid(); iter.Next() { - var v merger.Value + v := merger.NewValue(p.cfg) if err := v.UnmarshalProto(iter.Value()); err != nil { errs = append(errs, fmt.Errorf("failed to decode binary from database: %w", err)) continue } - resourceMetrics := v.Metrics.ResourceMetrics() + finalMetrics, err := v.Finalize() + if err != nil { + errs = append(errs, fmt.Errorf("failed to finalize merged metric: %w", err)) + } + resourceMetrics := finalMetrics.ResourceMetrics() if ivl.Statements != nil { for i := 0; i < resourceMetrics.Len(); i++ { res := resourceMetrics.At(i) @@ -419,11 +424,12 @@ func (p *Processor) exportForInterval( } } } - if err := p.next.ConsumeMetrics(ctx, v.Metrics); err != nil { + metrics := v.Get() + if err := p.next.ConsumeMetrics(ctx, metrics); err != nil { errs = append(errs, fmt.Errorf("failed to consume the decoded value: %w", err)) continue } - exportedDPCount += v.Metrics.DataPointCount() + exportedDPCount += metrics.DataPointCount() } if err := p.db.DeleteRange(lb, ub, p.wOpts); err != nil { errs = append(errs, fmt.Errorf("failed to delete exported entries: %w", err)) diff --git a/processor/lsmintervalprocessor/processor_test.go b/processor/lsmintervalprocessor/processor_test.go index 658086af..d86196db 100644 --- a/processor/lsmintervalprocessor/processor_test.go +++ b/processor/lsmintervalprocessor/processor_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -56,8 +57,8 @@ func TestAggregation(t *testing.T) { } for _, tc := range testCases { - config := &Config{ - Intervals: []IntervalConfig{ + config := &config.Config{ + Intervals: []config.IntervalConfig{ { Duration: time.Second, Statements: []string{ @@ -68,59 +69,121 @@ func TestAggregation(t *testing.T) { }, }, }, - PassThrough: PassThrough{ + PassThrough: config.PassThrough{ Summary: tc.passThrough, }, } t.Run(tc.name, func(t *testing.T) { t.Parallel() + testRunHelper(t, tc.name, config) + }) + } +} - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - next := &consumertest.MetricsSink{} +func TestAggregationOverflow(t *testing.T) { + t.Parallel() - factory := NewFactory() - settings := processortest.NewNopSettings() - settings.TelemetrySettings.Logger = zaptest.NewLogger(t, zaptest.Level(zapcore.DebugLevel)) - mgp, err := factory.CreateMetrics( - context.Background(), - settings, - config, - next, - ) - require.NoError(t, err) - require.IsType(t, &Processor{}, mgp) - t.Cleanup(func() { require.NoError(t, mgp.Shutdown(context.Background())) }) + oneCardinalityLimitConfig := config.LimitConfig{ + MaxCardinality: 1, + Overflow: config.OverflowConfig{ + Attributes: []config.Attribute{{Key: "test_overflow", Value: any(true)}}, + }, + } - dir := filepath.Join("testdata", tc.name) - md, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml")) - require.NoError(t, err) + testCases := []struct { + name string + }{ + {name: "sum_cumulative_overflow"}, + {name: "sum_delta_overflow"}, + {name: "histogram_cumulative_overflow"}, + {name: "histogram_delta_overflow"}, + {name: "exphistogram_cumulative_overflow"}, + {name: "exphistogram_delta_overflow"}, + } - // Start the processor and feed the metrics - err = mgp.Start(context.Background(), componenttest.NewNopHost()) - require.NoError(t, err) - err = mgp.ConsumeMetrics(ctx, md) - require.NoError(t, err) - - var allMetrics []pmetric.Metrics - require.Eventually(t, func() bool { - // 1 from calling next on the input and 1 from the export - allMetrics = next.AllMetrics() - return len(allMetrics) == 2 - }, 5*time.Second, 100*time.Millisecond) - - expectedNextData, err := golden.ReadMetrics(filepath.Join(dir, "next.yaml")) - require.NoError(t, err) - assert.NoError(t, pmetrictest.CompareMetrics(expectedNextData, allMetrics[0])) - - expectedExportData, err := golden.ReadMetrics(filepath.Join(dir, "output.yaml")) - require.NoError(t, err) - assert.NoError(t, pmetrictest.CompareMetrics(expectedExportData, allMetrics[1])) + for _, tc := range testCases { + config := &config.Config{ + Intervals: []config.IntervalConfig{ + { + Duration: time.Second, + Statements: []string{ + `set(resource.attributes["custom_res_attr"], "res")`, + `set(instrumentation_scope.attributes["custom_scope_attr"], "scope")`, + `set(attributes["custom_dp_attr"], "dp")`, + }, + }, + }, + ResourceLimits: oneCardinalityLimitConfig, + ScopeLimits: oneCardinalityLimitConfig, + DatapointLimits: oneCardinalityLimitConfig, + } + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + testRunHelper(t, tc.name, config) }) } } +func testRunHelper(t *testing.T, name string, config *config.Config) { + t.Helper() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + next := &consumertest.MetricsSink{} + + factory := NewFactory() + settings := processortest.NewNopSettings() + settings.TelemetrySettings.Logger = zaptest.NewLogger(t, zaptest.Level(zapcore.DebugLevel)) + mgp, err := factory.CreateMetrics( + context.Background(), + settings, + config, + next, + ) + require.NoError(t, err) + require.IsType(t, &Processor{}, mgp) + t.Cleanup(func() { require.NoError(t, mgp.Shutdown(context.Background())) }) + + dir := filepath.Join("testdata", name) + md, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml")) + require.NoError(t, err) + + // Start the processor and feed the metrics + err = mgp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + err = mgp.ConsumeMetrics(ctx, md) + require.NoError(t, err) + + var allMetrics []pmetric.Metrics + require.Eventually(t, func() bool { + // 1 from calling next on the input and 1 from the export + allMetrics = next.AllMetrics() + return len(allMetrics) == 2 + }, 5*time.Second, 100*time.Millisecond) + + expectedNextData, err := golden.ReadMetrics(filepath.Join(dir, "next.yaml")) + require.NoError(t, err) + assert.NoError(t, pmetrictest.CompareMetrics(expectedNextData, allMetrics[0])) + + expectedExportData, err := golden.ReadMetrics(filepath.Join(dir, "output.yaml")) + require.NoError(t, err) + assert.NoError(t, pmetrictest.CompareMetrics(expectedExportData, allMetrics[1])) +} + +func BenchmarkAggregation(b *testing.B) { + benchmarkAggregation(b, nil) +} + +func BenchmarkAggregationWithOTTL(b *testing.B) { + benchmarkAggregation(b, []string{ + `set(resource.attributes["custom_res_attr"], "res")`, + `set(instrumentation_scope.attributes["custom_scope_attr"], "scope")`, + `set(attributes["custom_dp_attr"], "dp")`, + `set(resource.attributes["dependent_attr"], Concat([attributes["aaa"], "dependent"], "-"))`, + }) +} + func benchmarkAggregation(b *testing.B, ottlStatements []string) { testCases := []struct { name string @@ -137,14 +200,14 @@ func benchmarkAggregation(b *testing.B, ottlStatements []string) { } for _, tc := range testCases { - config := &Config{ - Intervals: []IntervalConfig{ + config := &config.Config{ + Intervals: []config.IntervalConfig{ { Duration: time.Hour, Statements: ottlStatements, }, }, - PassThrough: PassThrough{ + PassThrough: config.PassThrough{ Summary: tc.passThrough, }, } @@ -191,16 +254,3 @@ func benchmarkAggregation(b *testing.B, ottlStatements []string) { }) } } - -func BenchmarkAggregation(b *testing.B) { - benchmarkAggregation(b, nil) -} - -func BenchmarkAggregationWithOTTL(b *testing.B) { - benchmarkAggregation(b, []string{ - `set(resource.attributes["custom_res_attr"], "res")`, - `set(instrumentation_scope.attributes["custom_scope_attr"], "scope")`, - `set(attributes["custom_dp_attr"], "dp")`, - `set(resource.attributes["dependent_attr"], Concat([attributes["aaa"], "dependent"], "-"))`, - }) -} diff --git a/processor/lsmintervalprocessor/testdata/exphistogram_cumulative_overflow/input.yaml b/processor/lsmintervalprocessor/testdata/exphistogram_cumulative_overflow/input.yaml new file mode 100644 index 00000000..538ee1ad --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/exphistogram_cumulative_overflow/input.yaml @@ -0,0 +1,74 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: cumulative.exphistogram.test.1 + exponentialHistogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 5000000 + scale: 4 + zeroCount: 5 + sum: 2.5 + count: 94 + positive: + offset: 2 + bucketCounts: [4, 7, 9, 6, 25] + negative: + offset: 6 + bucketCounts: [2, 13, 7, 12, 4] + attributes: + - key: aaa + value: + stringValue: bbb + - name: cumulative.exphistogram.test.1 + exponentialHistogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 7000000 + scale: 4 + zeroCount: 2 + sum: -1.5 + count: 59 + positive: + offset: 2 + bucketCounts: [2, 3, 7, 4, 20] + negative: + offset: 7 + bucketCounts: [8, 3, 9, 1] + attributes: + - key: aaa + value: + stringValue: bbb + - name: cumulative.exphistogram.test.2 + exponentialHistogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 8000000 + scale: 4 + zeroCount: 5 + sum: 2.1 + count: 147 + positive: + offset: 2 + bucketCounts: [9, 12, 17, 8, 34] + negative: + offset: 6 + bucketCounts: [6, 21, 9, 19, 7] + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/lsmintervalprocessor/testdata/exphistogram_cumulative_overflow/next.yaml b/processor/lsmintervalprocessor/testdata/exphistogram_cumulative_overflow/next.yaml new file mode 100644 index 00000000..3949e7c5 --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/exphistogram_cumulative_overflow/next.yaml @@ -0,0 +1 @@ +resourceMetrics: [] diff --git a/processor/lsmintervalprocessor/testdata/exphistogram_cumulative_overflow/output.yaml b/processor/lsmintervalprocessor/testdata/exphistogram_cumulative_overflow/output.yaml new file mode 100644 index 00000000..96ab94e4 --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/exphistogram_cumulative_overflow/output.yaml @@ -0,0 +1,67 @@ +resourceMetrics: + - resource: + attributes: + - key: asdf + value: + stringValue: foo + - key: custom_res_attr + value: + stringValue: res + schemaUrl: https://test-res-schema.com/schema + scopeMetrics: + - metrics: + - exponentialHistogram: + aggregationTemporality: 2 + dataPoints: + - attributes: + - key: aaa + value: + stringValue: bbb + - key: custom_dp_attr + value: + stringValue: dp + count: "59" + negative: + bucketCounts: + - "8" + - "3" + - "9" + - "1" + offset: 7 + positive: + bucketCounts: + - "2" + - "3" + - "7" + - "4" + - "20" + offset: 2 + scale: 4 + sum: -1.5 + timeUnixNano: "7000000" + zeroCount: "2" + name: cumulative.exphistogram.test.1 + - description: Overflow count due to datapoints limit + name: _other + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "1" + attributes: + - key: custom_dp_attr + value: + stringValue: dp + - key: test_overflow + value: + boolValue: true + schemaUrl: https://test-scope-schema.com/schema + scope: + attributes: + - key: custom_scope_attr + value: + stringValue: scope + - key: foo + value: + stringValue: bar + name: MyTestInstrument + version: 1.2.3 diff --git a/processor/lsmintervalprocessor/testdata/exphistogram_delta_overflow/input.yaml b/processor/lsmintervalprocessor/testdata/exphistogram_delta_overflow/input.yaml new file mode 100644 index 00000000..d4e5a24f --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/exphistogram_delta_overflow/input.yaml @@ -0,0 +1,74 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: cumulative.exphistogram.test.1 + exponentialHistogram: + aggregationTemporality: 1 + dataPoints: + - timeUnixNano: 5000000 + scale: 4 + zeroCount: 5 + sum: 2.5 + count: 94 + positive: + offset: 2 + bucketCounts: [4, 7, 9, 6, 25] + negative: + offset: 6 + bucketCounts: [2, 13, 7, 12, 4] + attributes: + - key: aaa + value: + stringValue: bbb + - name: cumulative.exphistogram.test.1 + exponentialHistogram: + aggregationTemporality: 1 + dataPoints: + - timeUnixNano: 7000000 + scale: 4 + zeroCount: 2 + sum: -1.5 + count: 59 + positive: + offset: 2 + bucketCounts: [2, 3, 7, 4, 20] + negative: + offset: 7 + bucketCounts: [8, 3, 9, 1] + attributes: + - key: aaa + value: + stringValue: bbb + - name: cumulative.exphistogram.test.2 + exponentialHistogram: + aggregationTemporality: 1 + dataPoints: + - timeUnixNano: 8000000 + scale: 4 + zeroCount: 5 + sum: 2.1 + count: 147 + positive: + offset: 2 + bucketCounts: [9, 12, 17, 8, 34] + negative: + offset: 6 + bucketCounts: [6, 21, 9, 19, 7] + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/lsmintervalprocessor/testdata/exphistogram_delta_overflow/next.yaml b/processor/lsmintervalprocessor/testdata/exphistogram_delta_overflow/next.yaml new file mode 100644 index 00000000..3949e7c5 --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/exphistogram_delta_overflow/next.yaml @@ -0,0 +1 @@ +resourceMetrics: [] diff --git a/processor/lsmintervalprocessor/testdata/exphistogram_delta_overflow/output.yaml b/processor/lsmintervalprocessor/testdata/exphistogram_delta_overflow/output.yaml new file mode 100644 index 00000000..a89040ec --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/exphistogram_delta_overflow/output.yaml @@ -0,0 +1,68 @@ +resourceMetrics: + - resource: + attributes: + - key: asdf + value: + stringValue: foo + - key: custom_res_attr + value: + stringValue: res + schemaUrl: https://test-res-schema.com/schema + scopeMetrics: + - metrics: + - exponentialHistogram: + aggregationTemporality: 1 + dataPoints: + - attributes: + - key: aaa + value: + stringValue: bbb + - key: custom_dp_attr + value: + stringValue: dp + count: "153" + negative: + bucketCounts: + - "2" + - "21" + - "10" + - "21" + - "5" + offset: 6 + positive: + bucketCounts: + - "6" + - "10" + - "16" + - "10" + - "45" + offset: 2 + scale: 4 + sum: 1 + timeUnixNano: "7000000" + zeroCount: "7" + name: cumulative.exphistogram.test.1 + - description: Overflow count due to datapoints limit + name: _other + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "1" + attributes: + - key: custom_dp_attr + value: + stringValue: dp + - key: test_overflow + value: + boolValue: true + schemaUrl: https://test-scope-schema.com/schema + scope: + attributes: + - key: custom_scope_attr + value: + stringValue: scope + - key: foo + value: + stringValue: bar + name: MyTestInstrument + version: 1.2.3 diff --git a/processor/lsmintervalprocessor/testdata/histogram_cumulative_overflow/input.yaml b/processor/lsmintervalprocessor/testdata/histogram_cumulative_overflow/input.yaml new file mode 100644 index 00000000..f695e779 --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/histogram_cumulative_overflow/input.yaml @@ -0,0 +1,56 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: cumulative.histogram.test.1 + histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 5000000 + count: 60 + sum: 2670 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [9, 4, 7, 9, 6, 25] + attributes: + - key: aaa + value: + stringValue: bbb + - name: cumulative.histogram.test.1 + histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 7000000 + count: 41 + sum: 2110 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [5, 2, 3, 7, 4, 20] + attributes: + - key: aaa + value: + stringValue: bbb + - name: cumulative.histogram.test.2 + histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 8000000 + count: 91 + sum: 3600 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [11, 9, 12, 17, 8, 34] + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/lsmintervalprocessor/testdata/histogram_cumulative_overflow/next.yaml b/processor/lsmintervalprocessor/testdata/histogram_cumulative_overflow/next.yaml new file mode 100644 index 00000000..3949e7c5 --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/histogram_cumulative_overflow/next.yaml @@ -0,0 +1 @@ +resourceMetrics: [] diff --git a/processor/lsmintervalprocessor/testdata/histogram_cumulative_overflow/output.yaml b/processor/lsmintervalprocessor/testdata/histogram_cumulative_overflow/output.yaml new file mode 100644 index 00000000..03f87076 --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/histogram_cumulative_overflow/output.yaml @@ -0,0 +1,63 @@ +resourceMetrics: + - resource: + attributes: + - key: asdf + value: + stringValue: foo + - key: custom_res_attr + value: + stringValue: res + schemaUrl: https://test-res-schema.com/schema + scopeMetrics: + - metrics: + - histogram: + aggregationTemporality: 2 + dataPoints: + - attributes: + - key: aaa + value: + stringValue: bbb + - key: custom_dp_attr + value: + stringValue: dp + bucketCounts: + - "5" + - "2" + - "3" + - "7" + - "4" + - "20" + count: "41" + explicitBounds: + - 0.01 + - 0.1 + - 1 + - 10 + - 100 + sum: 2110 + timeUnixNano: "7000000" + name: cumulative.histogram.test.1 + - description: Overflow count due to datapoints limit + name: _other + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "1" + attributes: + - key: custom_dp_attr + value: + stringValue: dp + - key: test_overflow + value: + boolValue: true + schemaUrl: https://test-scope-schema.com/schema + scope: + attributes: + - key: custom_scope_attr + value: + stringValue: scope + - key: foo + value: + stringValue: bar + name: MyTestInstrument + version: 1.2.3 diff --git a/processor/lsmintervalprocessor/testdata/histogram_delta_overflow/input.yaml b/processor/lsmintervalprocessor/testdata/histogram_delta_overflow/input.yaml new file mode 100644 index 00000000..ed31922c --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/histogram_delta_overflow/input.yaml @@ -0,0 +1,56 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: delta.histogram.test.1 + histogram: + aggregationTemporality: 1 + dataPoints: + - timeUnixNano: 5000000 + count: 60 + sum: 2670 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [9, 4, 7, 9, 6, 25] + attributes: + - key: aaa + value: + stringValue: bbb + - name: delta.histogram.test.1 + histogram: + aggregationTemporality: 1 + dataPoints: + - timeUnixNano: 7000000 + count: 41 + sum: 2110 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [5, 2, 3, 7, 4, 20] + attributes: + - key: aaa + value: + stringValue: bbb + - name: delta.histogram.test.2 + histogram: + aggregationTemporality: 1 + dataPoints: + - timeUnixNano: 8000000 + count: 91 + sum: 3600 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [11, 9, 12, 17, 8, 34] + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/lsmintervalprocessor/testdata/histogram_delta_overflow/next.yaml b/processor/lsmintervalprocessor/testdata/histogram_delta_overflow/next.yaml new file mode 100644 index 00000000..3949e7c5 --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/histogram_delta_overflow/next.yaml @@ -0,0 +1 @@ +resourceMetrics: [] diff --git a/processor/lsmintervalprocessor/testdata/histogram_delta_overflow/output.yaml b/processor/lsmintervalprocessor/testdata/histogram_delta_overflow/output.yaml new file mode 100644 index 00000000..42346547 --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/histogram_delta_overflow/output.yaml @@ -0,0 +1,63 @@ +resourceMetrics: + - resource: + attributes: + - key: asdf + value: + stringValue: foo + - key: custom_res_attr + value: + stringValue: res + schemaUrl: https://test-res-schema.com/schema + scopeMetrics: + - metrics: + - histogram: + aggregationTemporality: 1 + dataPoints: + - attributes: + - key: aaa + value: + stringValue: bbb + - key: custom_dp_attr + value: + stringValue: dp + bucketCounts: + - "14" + - "6" + - "10" + - "16" + - "10" + - "45" + count: "101" + explicitBounds: + - 0.01 + - 0.1 + - 1 + - 10 + - 100 + sum: 4780 + timeUnixNano: "7000000" + name: delta.histogram.test.1 + - description: Overflow count due to datapoints limit + name: _other + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "1" + attributes: + - key: custom_dp_attr + value: + stringValue: dp + - key: test_overflow + value: + boolValue: true + schemaUrl: https://test-scope-schema.com/schema + scope: + attributes: + - key: custom_scope_attr + value: + stringValue: scope + - key: foo + value: + stringValue: bar + name: MyTestInstrument + version: 1.2.3 diff --git a/processor/lsmintervalprocessor/testdata/sum_cumulative_overflow/input.yaml b/processor/lsmintervalprocessor/testdata/sum_cumulative_overflow/input.yaml new file mode 100644 index 00000000..9f09295d --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/sum_cumulative_overflow/input.yaml @@ -0,0 +1,50 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: cumulative.monotonic.sum.1 + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 5000000 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + - name: cumulative.monotonic.sum.1 + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 7000000 + asDouble: 222 + attributes: + - key: aaa + value: + stringValue: bbb + - name: cumulative.monotonic.sum.2 + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 8000000 + asDouble: 444 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/lsmintervalprocessor/testdata/sum_cumulative_overflow/next.yaml b/processor/lsmintervalprocessor/testdata/sum_cumulative_overflow/next.yaml new file mode 100644 index 00000000..3949e7c5 --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/sum_cumulative_overflow/next.yaml @@ -0,0 +1 @@ +resourceMetrics: [] diff --git a/processor/lsmintervalprocessor/testdata/sum_cumulative_overflow/output.yaml b/processor/lsmintervalprocessor/testdata/sum_cumulative_overflow/output.yaml new file mode 100644 index 00000000..b28d7258 --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/sum_cumulative_overflow/output.yaml @@ -0,0 +1,50 @@ +resourceMetrics: + - resource: + attributes: + - key: asdf + value: + stringValue: foo + - key: custom_res_attr + value: + stringValue: res + schemaUrl: https://test-res-schema.com/schema + scopeMetrics: + - metrics: + - name: cumulative.monotonic.sum.1 + sum: + aggregationTemporality: 2 + dataPoints: + - asDouble: 222 + attributes: + - key: aaa + value: + stringValue: bbb + - key: custom_dp_attr + value: + stringValue: dp + timeUnixNano: "7000000" + isMonotonic: true + - description: Overflow count due to datapoints limit + name: _other + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "1" + attributes: + - key: custom_dp_attr + value: + stringValue: dp + - key: test_overflow + value: + boolValue: true + schemaUrl: https://test-scope-schema.com/schema + scope: + attributes: + - key: custom_scope_attr + value: + stringValue: scope + - key: foo + value: + stringValue: bar + name: MyTestInstrument + version: 1.2.3 diff --git a/processor/lsmintervalprocessor/testdata/sum_delta_overflow/input.yaml b/processor/lsmintervalprocessor/testdata/sum_delta_overflow/input.yaml new file mode 100644 index 00000000..76ad7489 --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/sum_delta_overflow/input.yaml @@ -0,0 +1,51 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: delta.monotonic.sum.1 + sum: + aggregationTemporality: 1 + isMonotonic: true + dataPoints: + - timeUnixNano: 7000000 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + - name: delta.monotonic.sum.1 + sum: + aggregationTemporality: 1 + isMonotonic: true + dataPoints: + - timeUnixNano: 8000000 + asDouble: 222 + attributes: + - key: aaa + value: + stringValue: bbb + - name: delta.monotonic.sum.2 + sum: + aggregationTemporality: 1 + isMonotonic: true + dataPoints: + - timeUnixNano: 9000000 + asDouble: 444 + attributes: + - key: aaa + value: + stringValue: bbb + diff --git a/processor/lsmintervalprocessor/testdata/sum_delta_overflow/next.yaml b/processor/lsmintervalprocessor/testdata/sum_delta_overflow/next.yaml new file mode 100644 index 00000000..2417646d --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/sum_delta_overflow/next.yaml @@ -0,0 +1,2 @@ +resourceMetrics: [] + diff --git a/processor/lsmintervalprocessor/testdata/sum_delta_overflow/output.yaml b/processor/lsmintervalprocessor/testdata/sum_delta_overflow/output.yaml new file mode 100644 index 00000000..c021bbaf --- /dev/null +++ b/processor/lsmintervalprocessor/testdata/sum_delta_overflow/output.yaml @@ -0,0 +1,50 @@ +resourceMetrics: + - resource: + attributes: + - key: asdf + value: + stringValue: foo + - key: custom_res_attr + value: + stringValue: res + schemaUrl: https://test-res-schema.com/schema + scopeMetrics: + - metrics: + - name: delta.monotonic.sum.1 + sum: + aggregationTemporality: 1 + dataPoints: + - asDouble: 555 + attributes: + - key: aaa + value: + stringValue: bbb + - key: custom_dp_attr + value: + stringValue: dp + timeUnixNano: "8000000" + isMonotonic: true + - description: Overflow count due to datapoints limit + name: _other + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "1" + attributes: + - key: custom_dp_attr + value: + stringValue: dp + - key: test_overflow + value: + boolValue: true + schemaUrl: https://test-scope-schema.com/schema + scope: + attributes: + - key: custom_scope_attr + value: + stringValue: scope + - key: foo + value: + stringValue: bar + name: MyTestInstrument + version: 1.2.3