Skip to content

Commit

Permalink
Batch Metrics Exported From the OTLP Exporter (#626)
Browse files Browse the repository at this point in the history
* Add check to sum transform for unknown NumberKind

* Initial batching

* Move CheckpointSet transform to internal package

* Add tests for the Exporter Export method

Check batching and general output exporter ResourceMetrics are correct.

* Check errors in tests

* Apply suggestions from code review

Co-Authored-By: Krzesimir Nowak <qdlacz@gmail.com>

* Use var instead of multiple calls for group IDs

* Fix otlp metric test reporting

Co-authored-by: Krzesimir Nowak <qdlacz@gmail.com>
  • Loading branch information
MrAlias and krnowak committed Apr 15, 2020
1 parent a8f7b32 commit ebc245b
Show file tree
Hide file tree
Showing 4 changed files with 951 additions and 116 deletions.
203 changes: 199 additions & 4 deletions exporters/otlp/internal/transform/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,212 @@
package transform

import (
"context"
"errors"
"fmt"
"strings"
"sync"

commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
resourcepb "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1"

"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/resource"
)

// ErrUnimplementedAgg is returned when a transformation of an unimplemented
// aggregator is attempted.
var ErrUnimplementedAgg = errors.New("unimplemented aggregator")
var (
// ErrUnimplementedAgg is returned when a transformation of an unimplemented
// aggregator is attempted.
ErrUnimplementedAgg = errors.New("unimplemented aggregator")

// ErrUnknownValueType is returned when a transformation of an unknown value
// is attempted.
ErrUnknownValueType = errors.New("invalid value type")

// ErrContextCanceled is returned when a context cancellation halts a
// transformation.
ErrContextCanceled = errors.New("context canceled")

// ErrTransforming is returned when an unexected error is encoutered transforming.
ErrTransforming = errors.New("transforming failed")
)

// result is the product of transforming Records into OTLP Metrics.
type result struct {
Resource resource.Resource
Library string
Metric *metricpb.Metric
Err error
}

// CheckpointSet transforms all records contained in a checkpoint into
// batched OTLP ResourceMetrics.
func CheckpointSet(ctx context.Context, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
records, errc := source(ctx, cps)

// Start a fixed number of goroutines to transform records.
transformed := make(chan result)
var wg sync.WaitGroup
wg.Add(int(numWorkers))
for i := uint(0); i < numWorkers; i++ {
go func() {
defer wg.Done()
transformer(ctx, records, transformed)
}()
}
go func() {
wg.Wait()
close(transformed)
}()

// Synchronously collect the transformed records and transmit.
rms, err := sink(ctx, transformed)
if err != nil {
return nil, err
}

// source is complete, check for any errors.
if err := <-errc; err != nil {
return nil, err
}
return rms, nil
}

// source starts a goroutine that sends each one of the Records yielded by
// the CheckpointSet on the returned chan. Any error encoutered will be sent
// on the returned error chan after seeding is complete.
func source(ctx context.Context, cps export.CheckpointSet) (<-chan export.Record, <-chan error) {
errc := make(chan error, 1)
out := make(chan export.Record)
// Seed records into process.
go func() {
defer close(out)
// No select is needed since errc is buffered.
errc <- cps.ForEach(func(r export.Record) error {
select {
case <-ctx.Done():
return ErrContextCanceled
case out <- r:
}
return nil
})
}()
return out, errc
}

// transformer transforms records read from the passed in chan into
// OTLP Metrics which are sent on the out chan.
func transformer(ctx context.Context, in <-chan export.Record, out chan<- result) {
for r := range in {
m, err := Record(r)
// Propagate errors, but do not send empty results.
if err == nil && m == nil {
continue
}
res := result{
Resource: r.Descriptor().Resource(),
Library: r.Descriptor().LibraryName(),
Metric: m,
Err: err,
}
select {
case <-ctx.Done():
return
case out <- res:
}
}
}

// sink collects transformed Records and batches them.
//
// Any errors encoutered transforming input will be reported with an
// ErrTransforming as well as the completed ResourceMetrics. It is up to the
// caller to handle any incorrect data in these ResourceMetrics.
func sink(ctx context.Context, in <-chan result) ([]*metricpb.ResourceMetrics, error) {
var errStrings []string

type resourceBatch struct {
Resource *resourcepb.Resource
// Group by instrumentation library name and then the MetricDescriptor.
InstrumentationLibraryBatches map[string]map[string]*metricpb.Metric
}

// group by unique Resource string.
grouped := make(map[string]resourceBatch)
for res := range in {
if res.Err != nil {
errStrings = append(errStrings, res.Err.Error())
continue
}

rID := res.Resource.String()
rb, ok := grouped[rID]
if !ok {
rb = resourceBatch{
Resource: Resource(&res.Resource),
InstrumentationLibraryBatches: make(map[string]map[string]*metricpb.Metric),
}
grouped[rID] = rb
}

mb, ok := rb.InstrumentationLibraryBatches[res.Library]
if !ok {
mb = make(map[string]*metricpb.Metric)
rb.InstrumentationLibraryBatches[res.Library] = mb
}

mID := res.Metric.GetMetricDescriptor().String()
m, ok := mb[mID]
if !ok {
mb[mID] = res.Metric
continue
}
if len(res.Metric.Int64DataPoints) > 0 {
m.Int64DataPoints = append(m.Int64DataPoints, res.Metric.Int64DataPoints...)
}
if len(res.Metric.DoubleDataPoints) > 0 {
m.DoubleDataPoints = append(m.DoubleDataPoints, res.Metric.DoubleDataPoints...)
}
if len(res.Metric.HistogramDataPoints) > 0 {
m.HistogramDataPoints = append(m.HistogramDataPoints, res.Metric.HistogramDataPoints...)
}
if len(res.Metric.SummaryDataPoints) > 0 {
m.SummaryDataPoints = append(m.SummaryDataPoints, res.Metric.SummaryDataPoints...)
}
}

if len(grouped) == 0 {
return nil, nil
}

var rms []*metricpb.ResourceMetrics
for _, rb := range grouped {
rm := &metricpb.ResourceMetrics{Resource: rb.Resource}
for ilName, mb := range rb.InstrumentationLibraryBatches {
ilm := &metricpb.InstrumentationLibraryMetrics{
Metrics: make([]*metricpb.Metric, 0, len(mb)),
}
if ilName != "" {
ilm.InstrumentationLibrary = &commonpb.InstrumentationLibrary{Name: ilName}
}
for _, m := range mb {
ilm.Metrics = append(ilm.Metrics, m)
}
rm.InstrumentationLibraryMetrics = append(rm.InstrumentationLibraryMetrics, ilm)
}
rms = append(rms, rm)
}

// Report any transform errors.
if len(errStrings) > 0 {
return rms, fmt.Errorf("%w:\n -%s", ErrTransforming, strings.Join(errStrings, "\n -"))
}
return rms, nil
}

// Record transforms a Record into an OTLP Metric. An ErrUnimplementedAgg
// error is returned if the Record Aggregator is not supported.
Expand All @@ -42,8 +234,9 @@ func Record(r export.Record) (*metricpb.Metric, error) {
return minMaxSumCount(d, l, a)
case aggregator.Sum:
return sum(d, l, a)
default:
return nil, fmt.Errorf("%w: %v", ErrUnimplementedAgg, a)
}
return nil, ErrUnimplementedAgg
}

// sum transforms a Sum Aggregator into an OTLP Metric.
Expand Down Expand Up @@ -73,6 +266,8 @@ func sum(desc *metric.Descriptor, labels export.Labels, a aggregator.Sum) (*metr
m.DoubleDataPoints = []*metricpb.DoubleDataPoint{
{Value: sum.CoerceToFloat64(n)},
}
default:
return nil, fmt.Errorf("%w: %v", ErrUnknownValueType, n)
}

return m, nil
Expand Down
12 changes: 12 additions & 0 deletions exporters/otlp/internal/transform/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package transform

import (
"context"
"errors"
"testing"

commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
Expand Down Expand Up @@ -282,3 +283,14 @@ func TestSumFloat64DataPoints(t *testing.T) {
assert.Equal(t, []*metricpb.SummaryDataPoint(nil), m.SummaryDataPoints)
}
}

func TestSumErrUnknownValueType(t *testing.T) {
desc := metric.NewDescriptor("", metric.MeasureKind, core.NumberKind(-1))
labels := export.NewSimpleLabels(export.NoopLabelEncoder{})
s := sumAgg.New()
_, err := sum(&desc, labels, s)
assert.Error(t, err)
if !errors.Is(err, ErrUnknownValueType) {
t.Errorf("expected ErrUnknownValueType, got %v", err)
}
}
Loading

0 comments on commit ebc245b

Please sign in to comment.