Skip to content

Commit

Permalink
Move Resource into the metric export Record (#739)
Browse files Browse the repository at this point in the history
* Checkpoint

* Tests pass
  • Loading branch information
jmacd authored May 19, 2020
1 parent 5a534a0 commit 69da305
Show file tree
Hide file tree
Showing 18 changed files with 172 additions and 154 deletions.
2 changes: 1 addition & 1 deletion exporters/metric/prometheus/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func ExampleNewExportPipeline() {

// Simulate a push
meterImpl.Collect(ctx)
err = exporter.Export(ctx, nil, integrator.CheckpointSet())
err = exporter.Export(ctx, integrator.CheckpointSet())
if err != nil {
panic(err)
}
Expand Down
5 changes: 2 additions & 3 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"go.opentelemetry.io/otel/sdk/metric/controller/push"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"
)

// Exporter is an implementation of metric.Exporter that sends metrics to
Expand Down Expand Up @@ -169,8 +168,7 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, h
}

// Export exports the provide metric record to prometheus.
func (e *Exporter) Export(_ context.Context, _ *resource.Resource, checkpointSet export.CheckpointSet) error {
// TODO: Use the resource value in this exporter.
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
e.snapshot = checkpointSet
return nil
}
Expand Down Expand Up @@ -211,6 +209,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
err := c.exp.snapshot.ForEach(func(record export.Record) error {
agg := record.Aggregator()
numberKind := record.Descriptor().NumberKind()
// TODO: Use the resource value in this record.
labels := labelValues(record.Labels())
desc := c.toDesc(&record)

Expand Down
4 changes: 2 additions & 2 deletions exporters/metric/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPrometheusExporter(t *testing.T) {
}

var expected []string
checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(nil)

counter := metric.NewDescriptor(
"counter", metric.CounterKind, metric.Float64NumberKind)
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestPrometheusExporter(t *testing.T) {
}

func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *test.CheckpointSet, expected []string) {
err := exporter.Export(context.Background(), nil, checkpointSet)
err := exporter.Export(context.Background(), checkpointSet)
require.Nil(t, err)

rec := httptest.NewRecorder()
Expand Down
5 changes: 2 additions & 3 deletions exporters/metric/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/sdk/resource"

export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
Expand Down Expand Up @@ -145,18 +144,18 @@ func NewExportPipeline(config Config, period time.Duration, opts ...push.Option)
return pusher, nil
}

func (e *Exporter) Export(_ context.Context, resource *resource.Resource, checkpointSet export.CheckpointSet) error {
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
var aggError error
var batch expoBatch
if !e.config.DoNotPrintTime {
ts := time.Now()
batch.Timestamp = &ts
}
encodedResource := resource.Encoded(e.config.LabelEncoder)
aggError = checkpointSet.ForEach(func(record export.Record) error {
desc := record.Descriptor()
agg := record.Aggregator()
kind := desc.NumberKind()
encodedResource := record.Resource().Encoded(e.config.LabelEncoder)

var expose expoLine

Expand Down
48 changes: 24 additions & 24 deletions exporters/metric/stdout/stdout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ type testFixture struct {
ctx context.Context
exporter *stdout.Exporter
output *bytes.Buffer
resource *resource.Resource
}

func newFixture(t *testing.T, resource *resource.Resource, config stdout.Config) testFixture {
var testResource = resource.New(kv.String("R", "V"))

func newFixture(t *testing.T, config stdout.Config) testFixture {
buf := &bytes.Buffer{}
config.Writer = buf
config.DoNotPrintTime = true
Expand All @@ -60,7 +61,6 @@ func newFixture(t *testing.T, resource *resource.Resource, config stdout.Config)
ctx: context.Background(),
exporter: exp,
output: buf,
resource: resource,
}
}

Expand All @@ -69,7 +69,7 @@ func (fix testFixture) Output() string {
}

func (fix testFixture) Export(checkpointSet export.CheckpointSet) {
err := fix.exporter.Export(fix.ctx, fix.resource, checkpointSet)
err := fix.exporter.Export(fix.ctx, checkpointSet)
if err != nil {
fix.t.Error("export failed: ", err)
}
Expand All @@ -95,7 +95,7 @@ func TestStdoutTimestamp(t *testing.T) {

before := time.Now()

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

ctx := context.Background()
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Int64NumberKind)
Expand All @@ -105,7 +105,7 @@ func TestStdoutTimestamp(t *testing.T) {

checkpointSet.Add(&desc, lvagg)

if err := exporter.Export(ctx, nil, checkpointSet); err != nil {
if err := exporter.Export(ctx, checkpointSet); err != nil {
t.Fatal("Unexpected export error: ", err)
}

Expand Down Expand Up @@ -139,9 +139,9 @@ func TestStdoutTimestamp(t *testing.T) {
}

func TestStdoutCounterFormat(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.CounterKind, metric.Int64NumberKind)
cagg := sum.New()
Expand All @@ -152,13 +152,13 @@ func TestStdoutCounterFormat(t *testing.T) {

fix.Export(checkpointSet)

require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","sum":123}]}`, fix.Output())
require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","sum":123}]}`, fix.Output())
}

func TestStdoutLastValueFormat(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
Expand All @@ -169,13 +169,13 @@ func TestStdoutLastValueFormat(t *testing.T) {

fix.Export(checkpointSet)

require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","last":123.456}]}`, fix.Output())
require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","last":123.456}]}`, fix.Output())
}

func TestStdoutMinMaxSumCount(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind)
magg := minmaxsumcount.New(&desc)
Expand All @@ -187,15 +187,15 @@ func TestStdoutMinMaxSumCount(t *testing.T) {

fix.Export(checkpointSet)

require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","min":123.456,"max":876.543,"sum":999.999,"count":2}]}`, fix.Output())
require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","min":123.456,"max":876.543,"sum":999.999,"count":2}]}`, fix.Output())
}

func TestStdoutValueRecorderFormat(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{
fix := newFixture(t, stdout.Config{
PrettyPrint: true,
})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind)
magg := array.New()
Expand All @@ -213,7 +213,7 @@ func TestStdoutValueRecorderFormat(t *testing.T) {
require.Equal(t, `{
"updates": [
{
"name": "test.name{A=B,C=D}",
"name": "test.name{R=V,A=B,C=D}",
"min": 0.5,
"max": 999.5,
"sum": 500000,
Expand Down Expand Up @@ -247,9 +247,9 @@ func TestStdoutNoData(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Parallel()

fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

magg := tc
magg.Checkpoint(fix.ctx, &desc)
Expand All @@ -264,9 +264,9 @@ func TestStdoutNoData(t *testing.T) {
}

func TestStdoutLastValueNotSet(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
Expand Down Expand Up @@ -314,9 +314,9 @@ func TestStdoutResource(t *testing.T) {
}

for _, tc := range testCases {
fix := newFixture(t, tc.res, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(tc.res)

desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
Expand Down
13 changes: 8 additions & 5 deletions exporters/metric/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/resource"
)

type mapkey struct {
Expand All @@ -35,15 +36,17 @@ type mapkey struct {
}

type CheckpointSet struct {
records map[mapkey]export.Record
updates []export.Record
records map[mapkey]export.Record
resource *resource.Resource
updates []export.Record
}

// NewCheckpointSet returns a test CheckpointSet that new records could be added.
// Records are grouped by their encoded labels.
func NewCheckpointSet() *CheckpointSet {
func NewCheckpointSet(resource *resource.Resource) *CheckpointSet {
return &CheckpointSet{
records: make(map[mapkey]export.Record),
records: make(map[mapkey]export.Record),
resource: resource,
}
}

Expand All @@ -67,7 +70,7 @@ func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, l
return record.Aggregator(), false
}

rec := export.NewRecord(desc, &elabels, newAgg)
rec := export.NewRecord(desc, &elabels, p.resource, newAgg)
p.updates = append(p.updates, rec)
p.records[key] = rec
return newAgg, true
Expand Down
8 changes: 4 additions & 4 deletions exporters/otlp/internal/transform/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type result struct {

// CheckpointSet transforms all records contained in a checkpoint into
// batched OTLP ResourceMetrics.
func CheckpointSet(ctx context.Context, resource *resource.Resource, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
func CheckpointSet(ctx context.Context, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
records, errc := source(ctx, cps)

// Start a fixed number of goroutines to transform records.
Expand All @@ -71,7 +71,7 @@ func CheckpointSet(ctx context.Context, resource *resource.Resource, cps export.
for i := uint(0); i < numWorkers; i++ {
go func() {
defer wg.Done()
transformer(ctx, resource, records, transformed)
transformer(ctx, records, transformed)
}()
}
go func() {
Expand Down Expand Up @@ -116,15 +116,15 @@ func source(ctx context.Context, cps export.CheckpointSet) (<-chan export.Record

// transformer transforms records read from the passed in chan into
// OTLP Metrics which are sent on the out chan.
func transformer(ctx context.Context, resource *resource.Resource, in <-chan export.Record, out chan<- result) {
func transformer(ctx context.Context, in <-chan export.Record, out chan<- result) {
for r := range in {
m, err := Record(r)
// Propagate errors, but do not send empty results.
if err == nil && m == nil {
continue
}
res := result{
Resource: resource,
Resource: r.Resource(),
Library: r.Descriptor().LibraryName(),
Metric: m,
Err: err,
Expand Down
5 changes: 2 additions & 3 deletions exporters/otlp/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/resource"
)

type Exporter struct {
Expand Down Expand Up @@ -212,7 +211,7 @@ func (e *Exporter) Stop() error {
// Export implements the "go.opentelemetry.io/otel/sdk/export/metric".Exporter
// interface. It transforms and batches metric Records into OTLP Metrics and
// transmits them to the configured collector.
func (e *Exporter) Export(parent context.Context, resource *resource.Resource, cps metricsdk.CheckpointSet) error {
func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) error {
// Unify the parent context Done signal with the exporter stopCh.
ctx, cancel := context.WithCancel(parent)
defer cancel()
Expand All @@ -224,7 +223,7 @@ func (e *Exporter) Export(parent context.Context, resource *resource.Resource, c
}
}(ctx, cancel)

rms, err := transform.CheckpointSet(ctx, resource, cps, e.c.numWorkers)
rms, err := transform.CheckpointSet(ctx, cps, e.c.numWorkers)
if err != nil {
return err
}
Expand Down
11 changes: 4 additions & 7 deletions exporters/otlp/otlp_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,11 +659,10 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me

equiv := r.resource.Equivalent()
resources[equiv] = r.resource
recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, agg))
recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, r.resource, agg))
}
for equiv, records := range recs {
resource := resources[equiv]
assert.NoError(t, exp.Export(context.Background(), resource, checkpointSet{records: records}))
for _, records := range recs {
assert.NoError(t, exp.Export(context.Background(), checkpointSet{records: records}))
}

// assert.ElementsMatch does not equate nested slices of different order,
Expand Down Expand Up @@ -713,8 +712,6 @@ func TestEmptyMetricExport(t *testing.T) {
exp.metricExporter = msc
exp.started = true

resource := resource.New(kv.String("R", "S"))

for _, test := range []struct {
records []metricsdk.Record
want []metricpb.ResourceMetrics
Expand All @@ -729,7 +726,7 @@ func TestEmptyMetricExport(t *testing.T) {
},
} {
msc.Reset()
require.NoError(t, exp.Export(context.Background(), resource, checkpointSet{records: test.records}))
require.NoError(t, exp.Export(context.Background(), checkpointSet{records: test.records}))
assert.Equal(t, test.want, msc.ResourceMetrics())
}
}
Loading

0 comments on commit 69da305

Please sign in to comment.