Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add structure to the export data. #2961

Merged
merged 9 commits into from
Jul 11, 2022
14 changes: 8 additions & 6 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type reader struct {
producer producer
temporalityFunc func(InstrumentKind) Temporality
aggregationFunc AggregationSelector
collectFunc func(context.Context) (export.Metrics, error)
collectFunc func(context.Context) (export.ResourceMetrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}
Expand All @@ -45,11 +45,13 @@ func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // n
return r.aggregationFunc(kind)
}

func (r *reader) register(p producer) { r.producer = p }
func (r *reader) temporality(kind InstrumentKind) Temporality { return r.temporalityFunc(kind) }
func (r *reader) Collect(ctx context.Context) (export.Metrics, error) { return r.collectFunc(ctx) }
func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) }
func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) }
func (r *reader) register(p producer) { r.producer = p }
func (r *reader) temporality(kind InstrumentKind) Temporality { return r.temporalityFunc(kind) }
func (r *reader) Collect(ctx context.Context) (export.ResourceMetrics, error) {
return r.collectFunc(ctx)
}
func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) }
func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) }

func TestConfigReaderSignalsEmpty(t *testing.T) {
f, s := config{}.readerSignals()
Expand Down
129 changes: 127 additions & 2 deletions sdk/metric/export/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,131 @@

package export // import "go.opentelemetry.io/otel/sdk/metric/export"

// Metrics is the result of a single collection.
type Metrics struct { /* TODO: implement #2889 */
import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
)

// ResourceMetrics is a collection of ScopeMetrics and the associated Resource
// that created them.
type ResourceMetrics struct {
// Resource represents the entity that collected the metrics.
Resource *resource.Resource
// ScopeMetrics are the collection of metrics with unique Scopes.
ScopeMetrics []ScopeMetrics
}

// ScopeMetrics is a collection of Metrics Produces by a Meter.
type ScopeMetrics struct {
// Scope is the Scope that the Meter was created with.
Scope instrumentation.Scope
// Metrics are a list of aggregations created by the Meter.
Metrics []Metrics
}

// Metrics is a collection of one or more aggregated timeseries from an Instrument.
type Metrics struct {
// Name is the name of the Instrument that created this data.
Name string
// Description is the description of the Instrument, which can be used in documentation.
Description string
// Unit is the unit in which the Instrument reports.
Unit unit.Unit
// Data is the aggregated data from an Instrument.
Data Aggregation
}

// Aggregation is the store of data reported by an Instrument.
// It will be one of: Gauge, Sum, Histogram.
type Aggregation interface {
privateAggregation()
}

// Gauge represents a measurement of the current value of an instrument.
type Gauge struct {
// DataPoints reprents individual aggregated measurements with unique Attributes.
DataPoints []DataPoint
}

func (Gauge) privateAggregation() {}

// Sum represents the sum of all measurements of values from an instrument.
type Sum struct {
// DataPoints reprents individual aggregated measurements with unique Attributes.
DataPoints []DataPoint
// Temporality describes if the aggregation is reported as the change from the
// last report time, or the cumulative changes since a fixed start time.
Temporality Temporality
// IsMonotonic represents if this aggregation only increases or decreases.
IsMonotonic bool
}

func (Sum) privateAggregation() {}

// DataPoint is a single data point in a timeseries.
type DataPoint struct {
// Attributes is the set of key value pairs that uniquely identify the timeseries.
Attributes []attribute.KeyValue
// StartTime is when the timeseries was started. (optional)
StartTime time.Time
// Time is the time when the timeseries was recorded. (optional)
Time time.Time
// Value is the value of this data point.
Value Value
}

// Value is a int64 or float64. All Values created by the sdk will be either
// Int64 or Float64.
type Value interface {
privateValue()
}

// Int64 is a container for an int64 value.
type Int64 int64

func (Int64) privateValue() {}

// Float64 is a container for a float64 value.
type Float64 float64

func (Float64) privateValue() {}

// Histogram represents the histogram of all measurements of values from an instrument.
type Histogram struct {
// DataPoints reprents individual aggregated measurements with unique Attributes.
DataPoints []HistogramDataPoint
// Temporality describes if the aggregation is reported as the change from the
// last report time, or the cumulative changes since a fixed start time.
Temporality Temporality
}

func (Histogram) privateAggregation() {}

// HistogramDataPoint is a single histogram data point in a timeseries.
type HistogramDataPoint struct {
// Attributes is the set of key value pairs that uniquely identify the timeseries.
Attributes []attribute.KeyValue
// StartTime is when the timeseries was started.
StartTime time.Time
// Time is the time when the timeseries was recorded.
Time time.Time

// Count is the number of updates this histogram has been calculated with.
Count uint64
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
// Bounds are the upper bounds of the buckets of the histogram. Because the
// last boundary is +infinity this one is implied.
Bounds []float64
// BucketCounts is the count of each of the buckets.
BucketCounts []uint64

// Min is the minimum value recorded. (optional)
Min *float64
// Max is the maximum value recorded. (optional)
Max *float64
// Sum is the sum of the values recorded.
Sum float64
}
37 changes: 37 additions & 0 deletions sdk/metric/export/temporality.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

package export // import "go.opentelemetry.io/otel/sdk/metric/export"

// Temporality defines the window that an aggregation was calculated over.
type Temporality uint8

const (
// undefinedTemporality represents an unset Temporality.
//nolint:deadcode,unused,varcheck
undefinedTemporality Temporality = iota

// CumulativeTemporality defines a measurement interval that continues to
// expand forward in time from a starting point. New measurements are
// added to all previous measurements since a start time.
CumulativeTemporality

// DeltaTemporality defines a measurement interval that resets each cycle.
// Measurements from one cycle are recorded independently, measurements
// from other cycles do not affect them.
DeltaTemporality
)
2 changes: 1 addition & 1 deletion sdk/metric/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Exporter interface {
// implement any retry logic. All errors returned by this function are
// considered unrecoverable and will be reported to a configured error
// Handler.
Export(context.Context, export.Metrics) error
Export(context.Context, export.ResourceMetrics) error

// ForceFlush flushes any metric data held by an exporter.
//
Expand Down
7 changes: 4 additions & 3 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ func (mr *manualReader) Shutdown(context.Context) error {

// Collect gathers all metrics from the SDK, calling any callbacks necessary.
// Collect will return an error if called after shutdown.
func (mr *manualReader) Collect(ctx context.Context) (export.Metrics, error) {
func (mr *manualReader) Collect(ctx context.Context) (export.ResourceMetrics, error) {
p := mr.producer.Load()
if p == nil {
return export.Metrics{}, ErrReaderNotRegistered
return export.ResourceMetrics{}, ErrReaderNotRegistered
}

ph, ok := p.(produceHolder)
Expand All @@ -103,8 +103,9 @@ func (mr *manualReader) Collect(ctx context.Context) (export.Metrics, error) {
// happen, return an error instead of panicking so a users code does
// not halt in the processes.
err := fmt.Errorf("manual reader: invalid producer: %T", p)
return export.Metrics{}, err
return export.ResourceMetrics{}, err
}

return ph.produce(ctx)
}

Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregatio
// exporter, it is left to the caller to handle that if desired.
//
// An error is returned if this is called after Shutdown.
func (r *periodicReader) Collect(ctx context.Context) (export.Metrics, error) {
func (r *periodicReader) Collect(ctx context.Context) (export.ResourceMetrics, error) {
p := r.producer.Load()
if p == nil {
return export.Metrics{}, ErrReaderNotRegistered
return export.ResourceMetrics{}, ErrReaderNotRegistered
}

ph, ok := p.(produceHolder)
Expand All @@ -212,7 +212,7 @@ func (r *periodicReader) Collect(ctx context.Context) (export.Metrics, error) {
// happen, return an error instead of panicking so a users code does
// not halt in the processes.
err := fmt.Errorf("periodic reader: invalid producer: %T", p)
return export.Metrics{}, err
return export.ResourceMetrics{}, err
}
return ph.produce(ctx)
}
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ func TestWithInterval(t *testing.T) {
}

type fnExporter struct {
exportFunc func(context.Context, export.Metrics) error
exportFunc func(context.Context, export.ResourceMetrics) error
flushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}

var _ Exporter = (*fnExporter)(nil)

func (e *fnExporter) Export(ctx context.Context, m export.Metrics) error {
func (e *fnExporter) Export(ctx context.Context, m export.ResourceMetrics) error {
if e.exportFunc != nil {
return e.exportFunc(ctx, m)
}
Expand Down Expand Up @@ -94,7 +94,7 @@ func (ts *periodicReaderTestSuite) SetupTest() {
ts.readerTestSuite.SetupTest()

e := &fnExporter{
exportFunc: func(context.Context, export.Metrics) error { return assert.AnError },
exportFunc: func(context.Context, export.ResourceMetrics) error { return assert.AnError },
flushFunc: func(context.Context) error { return assert.AnError },
shutdownFunc: func(context.Context) error { return assert.AnError },
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestPeriodicReaderRun(t *testing.T) {
otel.SetErrorHandler(eh)

exp := &fnExporter{
exportFunc: func(_ context.Context, m export.Metrics) error {
exportFunc: func(_ context.Context, m export.ResourceMetrics) error {
// The testProducer produces testMetrics.
assert.Equal(t, testMetrics, m)
return assert.AnError
Expand Down
10 changes: 5 additions & 5 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Reader interface {

// Collect gathers and returns all metric data related to the Reader from
// the SDK. An error is returned if this is called after Shutdown.
Collect(context.Context) (export.Metrics, error)
Collect(context.Context) (export.ResourceMetrics, error)

// ForceFlush flushes all metric measurements held in an export pipeline.
//
Expand Down Expand Up @@ -93,21 +93,21 @@ type producer interface {
// produce returns aggregated metrics from a single collection.
//
// This method is safe to call concurrently.
produce(context.Context) (export.Metrics, error)
produce(context.Context) (export.ResourceMetrics, error)
}

// produceHolder is used as an atomic.Value to wrap the non-concrete producer
// type.
type produceHolder struct {
produce func(context.Context) (export.Metrics, error)
produce func(context.Context) (export.ResourceMetrics, error)
}

// shutdownProducer produces an ErrReaderShutdown error always.
type shutdownProducer struct{}

// produce returns an ErrReaderShutdown error.
func (p shutdownProducer) produce(context.Context) (export.Metrics, error) {
return export.Metrics{}, ErrReaderShutdown
func (p shutdownProducer) produce(context.Context) (export.ResourceMetrics, error) {
return export.ResourceMetrics{}, ErrReaderShutdown
}

// ReaderOption applies a configuration option value to either a ManualReader or
Expand Down
18 changes: 8 additions & 10 deletions sdk/metric/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() {

m, err := ts.Reader.Collect(ctx)
ts.ErrorIs(err, ErrReaderShutdown)
ts.Equal(export.Metrics{}, m)
ts.Equal(export.ResourceMetrics{}, m)
}

func (ts *readerTestSuite) TestShutdownTwice() {
Expand All @@ -88,7 +88,7 @@ func (ts *readerTestSuite) TestMultipleForceFlush() {

func (ts *readerTestSuite) TestMultipleRegister() {
p0 := testProducer{
produceFunc: func(ctx context.Context) (export.Metrics, error) {
produceFunc: func(ctx context.Context) (export.ResourceMetrics, error) {
// Differentiate this producer from the second by returning an
// error.
return testMetrics, assert.AnError
Expand Down Expand Up @@ -143,18 +143,18 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() {

m, err := ts.Reader.Collect(ctx)
ts.ErrorIs(err, ErrReaderShutdown)
ts.Equal(export.Metrics{}, m)
ts.Equal(export.ResourceMetrics{}, m)
}

var testMetrics = export.Metrics{
var testMetrics = export.ResourceMetrics{
// TODO: test with actual data.
}

type testProducer struct {
produceFunc func(context.Context) (export.Metrics, error)
produceFunc func(context.Context) (export.ResourceMetrics, error)
}

func (p testProducer) produce(ctx context.Context) (export.Metrics, error) {
func (p testProducer) produce(ctx context.Context) (export.ResourceMetrics, error) {
if p.produceFunc != nil {
return p.produceFunc(ctx)
}
Expand All @@ -168,7 +168,7 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) {
// Store bechmark results in a closure to prevent the compiler from
// inlining and skipping the function.
var (
collectedMetrics export.Metrics
collectedMetrics export.ResourceMetrics
err error
)

Expand All @@ -178,9 +178,7 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) {

for n := 0; n < b.N; n++ {
collectedMetrics, err = r.Collect(ctx)
if collectedMetrics != testMetrics || err != nil {
b.Errorf("unexpected Collect response: (%#v, %v)", collectedMetrics, err)
}
assert.Equalf(b, testMetrics, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err)
}
}
}
Expand Down