Skip to content

Commit

Permalink
support exact kind in OTLP metrics exporter (open-telemetry#1309)
Browse files Browse the repository at this point in the history
* support exact kind in OTLP metrics exporter

* add change log

* rename function

* inline start time and end time variables

* fix test

* add test for exact int data points

* add test for exact float data points

* use newly introduced number package for numbers according to upstream change

* fix package ref
  • Loading branch information
hstan authored and Azfaar Qureshi committed Dec 3, 2020
1 parent e8a4ee0 commit 0f98e6a
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `go.opentelemetry.io/otel/api/global` packages global TextMapPropagator now delegates functionality to a globally set delegate for all previously returned propagators. (#1258)
- Fix condition in `label.Any`. (#1299)
- Fix global `TracerProvider` to pass options to its configured provider. (#1329)
- Fix missing handler for `ExactKind` aggregator in OTLP metrics transformer (#1309)

## [0.13.0] - 2020-10-08

Expand Down
60 changes: 60 additions & 0 deletions exporters/otlp/internal/transform/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,71 @@ func Record(exportSelector export.ExportKindSelector, r export.Record) (*metricp
}
return gaugePoint(r, value, time.Time{}, tm)

case aggregation.ExactKind:
e, ok := agg.(aggregation.Points)
if !ok {
return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg)
}
pts, err := e.Points()
if err != nil {
return nil, err
}

return gaugeArray(r, pts)

default:
return nil, fmt.Errorf("%w: %T", ErrUnimplementedAgg, agg)
}
}

func gaugeArray(record export.Record, points []number.Number) (*metricpb.Metric, error) {
desc := record.Descriptor()
m := &metricpb.Metric{
Name: desc.Name(),
Description: desc.Description(),
Unit: string(desc.Unit()),
}

switch n := desc.NumberKind(); n {
case number.Int64Kind:
var pts []*metricpb.IntDataPoint
for _, p := range points {
pts = append(pts, &metricpb.IntDataPoint{
Labels: nil,
StartTimeUnixNano: toNanos(record.StartTime()),
TimeUnixNano: toNanos(record.EndTime()),
Value: p.CoerceToInt64(n),
})
}
m.Data = &metricpb.Metric_IntGauge{
IntGauge: &metricpb.IntGauge{
DataPoints: pts,
},
}

case number.Float64Kind:
var pts []*metricpb.DoubleDataPoint
for _, p := range points {
pts = append(pts, &metricpb.DoubleDataPoint{
Labels: nil,
StartTimeUnixNano: toNanos(record.StartTime()),
TimeUnixNano: toNanos(record.EndTime()),
Value: p.CoerceToFloat64(n),
})
}
m.Data = &metricpb.Metric_DoubleGauge{
DoubleGauge: &metricpb.DoubleGauge{
DataPoints: pts,
},
}

default:
return nil, fmt.Errorf("%w: %v", ErrUnknownValueType, n)
}

return m, nil
}

func gaugePoint(record export.Record, num number.Number, start, end time.Time) (*metricpb.Metric, error) {
desc := record.Descriptor()
labels := record.Labels()
Expand Down
58 changes: 55 additions & 3 deletions exporters/otlp/internal/transform/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/export/metric/metrictest"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
arrAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
lvAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
Expand Down Expand Up @@ -243,6 +243,58 @@ func TestLastValueIntDataPoints(t *testing.T) {
}
}

func TestExactIntDataPoints(t *testing.T) {
desc := metric.NewDescriptor("", metric.ValueRecorderInstrumentKind, number.Int64Kind)
labels := label.NewSet()
e, ckpt := metrictest.Unslice2(arrAgg.New(2))
assert.NoError(t, e.Update(context.Background(), number.Number(100), &desc))
require.NoError(t, e.SynchronizedMove(ckpt, &desc))
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
p, ok := ckpt.(aggregation.Points)
require.True(t, ok, "ckpt is not an aggregation.Points: %T", ckpt)
pts, err := p.Points()
require.NoError(t, err)

if m, err := gaugeArray(record, pts); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.IntDataPoint{{
Value: 100,
StartTimeUnixNano: toNanos(intervalStart),
TimeUnixNano: toNanos(intervalEnd),
}}, m.GetIntGauge().DataPoints)
assert.Nil(t, m.GetIntHistogram())
assert.Nil(t, m.GetIntSum())
assert.Nil(t, m.GetDoubleGauge())
assert.Nil(t, m.GetDoubleHistogram())
assert.Nil(t, m.GetDoubleSum())
}
}

func TestExactFloatDataPoints(t *testing.T) {
desc := metric.NewDescriptor("", metric.ValueRecorderInstrumentKind, number.Float64Kind)
labels := label.NewSet()
e, ckpt := metrictest.Unslice2(arrAgg.New(2))
assert.NoError(t, e.Update(context.Background(), number.NewFloat64Number(100), &desc))
require.NoError(t, e.SynchronizedMove(ckpt, &desc))
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
p, ok := ckpt.(aggregation.Points)
require.True(t, ok, "ckpt is not an aggregation.Points: %T", ckpt)
pts, err := p.Points()
require.NoError(t, err)

if m, err := gaugeArray(record, pts); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.DoubleDataPoint{{
Value: 100,
StartTimeUnixNano: toNanos(intervalStart),
TimeUnixNano: toNanos(intervalEnd),
}}, m.GetDoubleGauge().DataPoints)
assert.Nil(t, m.GetIntHistogram())
assert.Nil(t, m.GetIntSum())
assert.Nil(t, m.GetIntGauge())
assert.Nil(t, m.GetDoubleHistogram())
assert.Nil(t, m.GetDoubleSum())
}
}

func TestSumErrUnknownValueType(t *testing.T) {
desc := metric.NewDescriptor("", metric.ValueRecorderInstrumentKind, number.Kind(-1))
labels := label.NewSet()
Expand Down Expand Up @@ -357,11 +409,11 @@ func TestRecordAggregatorIncompatibleErrors(t *testing.T) {
require.Nil(t, mpb)
require.True(t, errors.Is(err, ErrIncompatibleAgg))

mpb, err = makeMpb(aggregation.ExactKind, &array.New(1)[0])
mpb, err = makeMpb(aggregation.ExactKind, &lastvalue.New(1)[0])

require.Error(t, err)
require.Nil(t, mpb)
require.True(t, errors.Is(err, ErrUnimplementedAgg))
require.True(t, errors.Is(err, ErrIncompatibleAgg))
}

func TestRecordAggregatorUnexpectedErrors(t *testing.T) {
Expand Down

0 comments on commit 0f98e6a

Please sign in to comment.