From 45240852e86bcbd2b20738b305679ca1006fb335 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 9 May 2022 10:27:35 -0700 Subject: [PATCH] tanzuobservabilityexporter: use uber/atomic instead of sync/atomic Signed-off-by: Bogdan Drutu --- exporter/tanzuobservabilityexporter/go.mod | 2 +- .../tanzuobservabilityexporter/metrics.go | 75 ++++++++----------- 2 files changed, 31 insertions(+), 46 deletions(-) diff --git a/exporter/tanzuobservabilityexporter/go.mod b/exporter/tanzuobservabilityexporter/go.mod index efad309d2f86..16b0a6056a77 100644 --- a/exporter/tanzuobservabilityexporter/go.mod +++ b/exporter/tanzuobservabilityexporter/go.mod @@ -11,6 +11,7 @@ require ( go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7 go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7 go.opentelemetry.io/collector/semconv v0.50.1-0.20220429151328-041f39835df7 + go.uber.org/atomic v1.9.0 go.uber.org/multierr v1.8.0 go.uber.org/zap v1.21.0 ) @@ -38,7 +39,6 @@ require ( go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/metric v0.30.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect - go.uber.org/atomic v1.9.0 // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect golang.org/x/text v0.3.7 // indirect diff --git a/exporter/tanzuobservabilityexporter/metrics.go b/exporter/tanzuobservabilityexporter/metrics.go index 515cb32ad6fc..aacfd574876e 100644 --- a/exporter/tanzuobservabilityexporter/metrics.go +++ b/exporter/tanzuobservabilityexporter/metrics.go @@ -20,13 +20,13 @@ import ( "fmt" "math" "strconv" - "sync/atomic" "github.com/wavefronthq/wavefront-sdk-go/histogram" "github.com/wavefronthq/wavefront-sdk-go/senders" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/atomic" "go.uber.org/multierr" "go.uber.org/zap" ) @@ -180,37 +180,20 @@ type flushCloser interface { Close() } -// counter represents an internal counter metric. The zero value is ready to use -type counter struct { - count int64 -} - -// Report reports this counter to tanzu observability. name is the name of +// report the counter to tanzu observability. name is the name of // the metric to be reported. tags is the tags for the metric. sender is what // sends the metric to tanzu observability. Any errors get added to errs. -func (c *counter) Report( - name string, tags map[string]string, sender gaugeSender, errs *[]error, -) { - err := sender.SendMetric(name, float64(c.Get()), 0, "", tags) +func report(count *atomic.Int64, name string, tags map[string]string, sender gaugeSender, errs *[]error) { + err := sender.SendMetric(name, float64(count.Load()), 0, "", tags) if err != nil { *errs = append(*errs, err) } } -// Inc increments this counter by one. -func (c *counter) Inc() { - atomic.AddInt64(&c.count, 1) -} - -// Get gets the value of this counter. -func (c *counter) Get() int64 { - return atomic.LoadInt64(&c.count) -} - // logMissingValue keeps track of metrics with missing values. metric is the // metric with the missing value. settings logs the missing value. count counts // metrics with missing values. -func logMissingValue(metric pmetric.Metric, settings component.TelemetrySettings, count *counter) { +func logMissingValue(metric pmetric.Metric, settings component.TelemetrySettings, count *atomic.Int64) { namef := zap.String(metricNameString, metric.Name()) typef := zap.String(metricTypeString, metric.DataType().String()) settings.Logger.Debug("Metric missing value", namef, typef) @@ -240,7 +223,7 @@ func pushGaugeNumberDataPoint( errs *[]error, sender gaugeSender, settings component.TelemetrySettings, - missingValues *counter, + missingValues *atomic.Int64, ) { tags := attributesToTagsForMetrics(numberDataPoint.Attributes(), mi.SourceKey) ts := numberDataPoint.Timestamp().AsTime().Unix() @@ -263,7 +246,7 @@ type gaugeSender interface { type gaugeConsumer struct { sender gaugeSender settings component.TelemetrySettings - missingValues counter + missingValues *atomic.Int64 } // newGaugeConsumer returns a typedMetricConsumer that consumes gauge metrics @@ -271,8 +254,9 @@ type gaugeConsumer struct { func newGaugeConsumer( sender gaugeSender, settings component.TelemetrySettings) typedMetricConsumer { return &gaugeConsumer{ - sender: sender, - settings: settings, + sender: sender, + settings: settings, + missingValues: atomic.NewInt64(0), } } @@ -290,18 +274,18 @@ func (g *gaugeConsumer) Consume(mi metricInfo, errs *[]error) { errs, g.sender, g.settings, - &g.missingValues) + g.missingValues) } } func (g *gaugeConsumer) PushInternalMetrics(errs *[]error) { - g.missingValues.Report(missingValueMetricName, typeIsGaugeTags, g.sender, errs) + report(g.missingValues, missingValueMetricName, typeIsGaugeTags, g.sender, errs) } type sumConsumer struct { sender senders.MetricSender settings component.TelemetrySettings - missingValues counter + missingValues *atomic.Int64 } // newSumConsumer returns a typedMetricConsumer that consumes sum metrics @@ -309,8 +293,9 @@ type sumConsumer struct { func newSumConsumer( sender senders.MetricSender, settings component.TelemetrySettings) typedMetricConsumer { return &sumConsumer{ - sender: sender, - settings: settings, + sender: sender, + settings: settings, + missingValues: atomic.NewInt64(0), } } @@ -330,20 +315,20 @@ func (s *sumConsumer) Consume(mi metricInfo, errs *[]error) { s.pushNumberDataPoint(mi, numberDataPoints.At(i), errs) } else { pushGaugeNumberDataPoint( - mi, numberDataPoints.At(i), errs, s.sender, s.settings, &s.missingValues) + mi, numberDataPoints.At(i), errs, s.sender, s.settings, s.missingValues) } } } func (s *sumConsumer) PushInternalMetrics(errs *[]error) { - s.missingValues.Report(missingValueMetricName, typeIsSumTags, s.sender, errs) + report(s.missingValues, missingValueMetricName, typeIsSumTags, s.sender, errs) } func (s *sumConsumer) pushNumberDataPoint(mi metricInfo, numberDataPoint pmetric.NumberDataPoint, errs *[]error) { tags := attributesToTagsForMetrics(numberDataPoint.Attributes(), mi.SourceKey) value, err := getValue(numberDataPoint) if err != nil { - logMissingValue(mi.Metric, s.settings, &s.missingValues) + logMissingValue(mi.Metric, s.settings, s.missingValues) return } err = s.sender.SendDeltaCounter(mi.Name(), value, mi.Source, tags) @@ -355,24 +340,28 @@ func (s *sumConsumer) pushNumberDataPoint(mi metricInfo, numberDataPoint pmetric // histogramReporting takes care of logging and internal metrics for histograms type histogramReporting struct { settings component.TelemetrySettings - malformedHistograms counter - noAggregationTemporality counter + malformedHistograms *atomic.Int64 + noAggregationTemporality *atomic.Int64 } // newHistogramReporting returns a new histogramReporting instance. func newHistogramReporting(settings component.TelemetrySettings) *histogramReporting { - return &histogramReporting{settings: settings} + return &histogramReporting{ + settings: settings, + malformedHistograms: atomic.NewInt64(0), + noAggregationTemporality: atomic.NewInt64(0), + } } // Malformed returns the number of malformed histogram data points. func (r *histogramReporting) Malformed() int64 { - return r.malformedHistograms.Get() + return r.malformedHistograms.Load() } // NoAggregationTemporality returns the number of histogram metrics that have no // aggregation temporality. func (r *histogramReporting) NoAggregationTemporality() int64 { - return r.noAggregationTemporality.Get() + return r.noAggregationTemporality.Load() } // LogMalformed logs seeing one malformed data point. @@ -392,12 +381,8 @@ func (r *histogramReporting) LogNoAggregationTemporality(metric pmetric.Metric) // Report sends the counts in this instance to wavefront. // sender is what sends to wavefront. Any errors sending get added to errs. func (r *histogramReporting) Report(sender gaugeSender, errs *[]error) { - r.malformedHistograms.Report(malformedHistogramMetricName, nil, sender, errs) - r.noAggregationTemporality.Report( - noAggregationTemporalityMetricName, - typeIsHistogramTags, - sender, - errs) + report(r.malformedHistograms, malformedHistogramMetricName, nil, sender, errs) + report(r.noAggregationTemporality, noAggregationTemporalityMetricName, typeIsHistogramTags, sender, errs) } type histogramConsumer struct {