Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Resource into the metric export Record #739

Merged
merged 3 commits into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exporters/metric/prometheus/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func ExampleNewExportPipeline() {

// Simulate a push
meterImpl.Collect(ctx)
err = exporter.Export(ctx, nil, integrator.CheckpointSet())
err = exporter.Export(ctx, integrator.CheckpointSet())
if err != nil {
panic(err)
}
Expand Down
5 changes: 2 additions & 3 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"go.opentelemetry.io/otel/sdk/metric/controller/push"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"
)

// Exporter is an implementation of metric.Exporter that sends metrics to
Expand Down Expand Up @@ -169,8 +168,7 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, h
}

// Export exports the provide metric record to prometheus.
func (e *Exporter) Export(_ context.Context, _ *resource.Resource, checkpointSet export.CheckpointSet) error {
// TODO: Use the resource value in this exporter.
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
e.snapshot = checkpointSet
return nil
}
Expand Down Expand Up @@ -211,6 +209,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
err := c.exp.snapshot.ForEach(func(record export.Record) error {
agg := record.Aggregator()
numberKind := record.Descriptor().NumberKind()
// TODO: Use the resource value in this record.
labels := labelValues(record.Labels())
desc := c.toDesc(&record)

Expand Down
4 changes: 2 additions & 2 deletions exporters/metric/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPrometheusExporter(t *testing.T) {
}

var expected []string
checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(nil)

counter := metric.NewDescriptor(
"counter", metric.CounterKind, metric.Float64NumberKind)
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestPrometheusExporter(t *testing.T) {
}

func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *test.CheckpointSet, expected []string) {
err := exporter.Export(context.Background(), nil, checkpointSet)
err := exporter.Export(context.Background(), checkpointSet)
require.Nil(t, err)

rec := httptest.NewRecorder()
Expand Down
5 changes: 2 additions & 3 deletions exporters/metric/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/sdk/resource"

export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
Expand Down Expand Up @@ -145,18 +144,18 @@ func NewExportPipeline(config Config, period time.Duration, opts ...push.Option)
return pusher, nil
}

func (e *Exporter) Export(_ context.Context, resource *resource.Resource, checkpointSet export.CheckpointSet) error {
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
var aggError error
var batch expoBatch
if !e.config.DoNotPrintTime {
ts := time.Now()
batch.Timestamp = &ts
}
encodedResource := resource.Encoded(e.config.LabelEncoder)
aggError = checkpointSet.ForEach(func(record export.Record) error {
desc := record.Descriptor()
agg := record.Aggregator()
kind := desc.NumberKind()
encodedResource := record.Resource().Encoded(e.config.LabelEncoder)

var expose expoLine

Expand Down
48 changes: 24 additions & 24 deletions exporters/metric/stdout/stdout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ type testFixture struct {
ctx context.Context
exporter *stdout.Exporter
output *bytes.Buffer
resource *resource.Resource
}

func newFixture(t *testing.T, resource *resource.Resource, config stdout.Config) testFixture {
var testResource = resource.New(kv.String("R", "V"))

func newFixture(t *testing.T, config stdout.Config) testFixture {
buf := &bytes.Buffer{}
config.Writer = buf
config.DoNotPrintTime = true
Expand All @@ -60,7 +61,6 @@ func newFixture(t *testing.T, resource *resource.Resource, config stdout.Config)
ctx: context.Background(),
exporter: exp,
output: buf,
resource: resource,
}
}

Expand All @@ -69,7 +69,7 @@ func (fix testFixture) Output() string {
}

func (fix testFixture) Export(checkpointSet export.CheckpointSet) {
err := fix.exporter.Export(fix.ctx, fix.resource, checkpointSet)
err := fix.exporter.Export(fix.ctx, checkpointSet)
if err != nil {
fix.t.Error("export failed: ", err)
}
Expand All @@ -95,7 +95,7 @@ func TestStdoutTimestamp(t *testing.T) {

before := time.Now()

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

ctx := context.Background()
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Int64NumberKind)
Expand All @@ -105,7 +105,7 @@ func TestStdoutTimestamp(t *testing.T) {

checkpointSet.Add(&desc, lvagg)

if err := exporter.Export(ctx, nil, checkpointSet); err != nil {
if err := exporter.Export(ctx, checkpointSet); err != nil {
t.Fatal("Unexpected export error: ", err)
}

Expand Down Expand Up @@ -139,9 +139,9 @@ func TestStdoutTimestamp(t *testing.T) {
}

func TestStdoutCounterFormat(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.CounterKind, metric.Int64NumberKind)
cagg := sum.New()
Expand All @@ -152,13 +152,13 @@ func TestStdoutCounterFormat(t *testing.T) {

fix.Export(checkpointSet)

require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","sum":123}]}`, fix.Output())
require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","sum":123}]}`, fix.Output())
}

func TestStdoutLastValueFormat(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
Expand All @@ -169,13 +169,13 @@ func TestStdoutLastValueFormat(t *testing.T) {

fix.Export(checkpointSet)

require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","last":123.456}]}`, fix.Output())
require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","last":123.456}]}`, fix.Output())
}

func TestStdoutMinMaxSumCount(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind)
magg := minmaxsumcount.New(&desc)
Expand All @@ -187,15 +187,15 @@ func TestStdoutMinMaxSumCount(t *testing.T) {

fix.Export(checkpointSet)

require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","min":123.456,"max":876.543,"sum":999.999,"count":2}]}`, fix.Output())
require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","min":123.456,"max":876.543,"sum":999.999,"count":2}]}`, fix.Output())
}

func TestStdoutValueRecorderFormat(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{
fix := newFixture(t, stdout.Config{
PrettyPrint: true,
})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind)
magg := array.New()
Expand All @@ -213,7 +213,7 @@ func TestStdoutValueRecorderFormat(t *testing.T) {
require.Equal(t, `{
"updates": [
{
"name": "test.name{A=B,C=D}",
"name": "test.name{R=V,A=B,C=D}",
"min": 0.5,
"max": 999.5,
"sum": 500000,
Expand Down Expand Up @@ -247,9 +247,9 @@ func TestStdoutNoData(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Parallel()

fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

magg := tc
magg.Checkpoint(fix.ctx, &desc)
Expand All @@ -264,9 +264,9 @@ func TestStdoutNoData(t *testing.T) {
}

func TestStdoutLastValueNotSet(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)

desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
Expand Down Expand Up @@ -314,9 +314,9 @@ func TestStdoutResource(t *testing.T) {
}

for _, tc := range testCases {
fix := newFixture(t, tc.res, stdout.Config{})
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(tc.res)

desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
Expand Down
13 changes: 8 additions & 5 deletions exporters/metric/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/resource"
)

type mapkey struct {
Expand All @@ -35,15 +36,17 @@ type mapkey struct {
}

type CheckpointSet struct {
records map[mapkey]export.Record
updates []export.Record
records map[mapkey]export.Record
resource *resource.Resource
updates []export.Record
}

// NewCheckpointSet returns a test CheckpointSet that new records could be added.
// Records are grouped by their encoded labels.
func NewCheckpointSet() *CheckpointSet {
func NewCheckpointSet(resource *resource.Resource) *CheckpointSet {
return &CheckpointSet{
records: make(map[mapkey]export.Record),
records: make(map[mapkey]export.Record),
resource: resource,
}
}

Expand All @@ -67,7 +70,7 @@ func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, l
return record.Aggregator(), false
}

rec := export.NewRecord(desc, &elabels, newAgg)
rec := export.NewRecord(desc, &elabels, p.resource, newAgg)
p.updates = append(p.updates, rec)
p.records[key] = rec
return newAgg, true
Expand Down
8 changes: 4 additions & 4 deletions exporters/otlp/internal/transform/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type result struct {

// CheckpointSet transforms all records contained in a checkpoint into
// batched OTLP ResourceMetrics.
func CheckpointSet(ctx context.Context, resource *resource.Resource, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
func CheckpointSet(ctx context.Context, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
records, errc := source(ctx, cps)

// Start a fixed number of goroutines to transform records.
Expand All @@ -71,7 +71,7 @@ func CheckpointSet(ctx context.Context, resource *resource.Resource, cps export.
for i := uint(0); i < numWorkers; i++ {
go func() {
defer wg.Done()
transformer(ctx, resource, records, transformed)
transformer(ctx, records, transformed)
}()
}
go func() {
Expand Down Expand Up @@ -116,15 +116,15 @@ func source(ctx context.Context, cps export.CheckpointSet) (<-chan export.Record

// transformer transforms records read from the passed in chan into
// OTLP Metrics which are sent on the out chan.
func transformer(ctx context.Context, resource *resource.Resource, in <-chan export.Record, out chan<- result) {
func transformer(ctx context.Context, in <-chan export.Record, out chan<- result) {
for r := range in {
m, err := Record(r)
// Propagate errors, but do not send empty results.
if err == nil && m == nil {
continue
}
res := result{
Resource: resource,
Resource: r.Resource(),
Library: r.Descriptor().LibraryName(),
Metric: m,
Err: err,
Expand Down
5 changes: 2 additions & 3 deletions exporters/otlp/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/resource"
)

type Exporter struct {
Expand Down Expand Up @@ -212,7 +211,7 @@ func (e *Exporter) Stop() error {
// Export implements the "go.opentelemetry.io/otel/sdk/export/metric".Exporter
// interface. It transforms and batches metric Records into OTLP Metrics and
// transmits them to the configured collector.
func (e *Exporter) Export(parent context.Context, resource *resource.Resource, cps metricsdk.CheckpointSet) error {
func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) error {
// Unify the parent context Done signal with the exporter stopCh.
ctx, cancel := context.WithCancel(parent)
defer cancel()
Expand All @@ -224,7 +223,7 @@ func (e *Exporter) Export(parent context.Context, resource *resource.Resource, c
}
}(ctx, cancel)

rms, err := transform.CheckpointSet(ctx, resource, cps, e.c.numWorkers)
rms, err := transform.CheckpointSet(ctx, cps, e.c.numWorkers)
if err != nil {
return err
}
Expand Down
11 changes: 4 additions & 7 deletions exporters/otlp/otlp_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,11 +659,10 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me

equiv := r.resource.Equivalent()
resources[equiv] = r.resource
recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, agg))
recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, r.resource, agg))
}
for equiv, records := range recs {
resource := resources[equiv]
assert.NoError(t, exp.Export(context.Background(), resource, checkpointSet{records: records}))
for _, records := range recs {
assert.NoError(t, exp.Export(context.Background(), checkpointSet{records: records}))
}

// assert.ElementsMatch does not equate nested slices of different order,
Expand Down Expand Up @@ -713,8 +712,6 @@ func TestEmptyMetricExport(t *testing.T) {
exp.metricExporter = msc
exp.started = true

resource := resource.New(kv.String("R", "S"))

for _, test := range []struct {
records []metricsdk.Record
want []metricpb.ResourceMetrics
Expand All @@ -729,7 +726,7 @@ func TestEmptyMetricExport(t *testing.T) {
},
} {
msc.Reset()
require.NoError(t, exp.Export(context.Background(), resource, checkpointSet{records: test.records}))
require.NoError(t, exp.Export(context.Background(), checkpointSet{records: test.records}))
assert.Equal(t, test.want, msc.ResourceMetrics())
}
}
Loading