diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index 12355db63dd..bca21769743 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -34,7 +34,7 @@ type reader struct { producer producer temporalityFunc func(InstrumentKind) Temporality aggregationFunc AggregationSelector - collectFunc func(context.Context) (export.Metrics, error) + collectFunc func(context.Context) (export.ResourceMetrics, error) forceFlushFunc func(context.Context) error shutdownFunc func(context.Context) error } @@ -45,11 +45,13 @@ func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // n return r.aggregationFunc(kind) } -func (r *reader) register(p producer) { r.producer = p } -func (r *reader) temporality(kind InstrumentKind) Temporality { return r.temporalityFunc(kind) } -func (r *reader) Collect(ctx context.Context) (export.Metrics, error) { return r.collectFunc(ctx) } -func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) } -func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) } +func (r *reader) register(p producer) { r.producer = p } +func (r *reader) temporality(kind InstrumentKind) Temporality { return r.temporalityFunc(kind) } +func (r *reader) Collect(ctx context.Context) (export.ResourceMetrics, error) { + return r.collectFunc(ctx) +} +func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) } +func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) } func TestConfigReaderSignalsEmpty(t *testing.T) { f, s := config{}.readerSignals() diff --git a/sdk/metric/export/data.go b/sdk/metric/export/data.go index 9ac934d5390..d130da3786a 100644 --- a/sdk/metric/export/data.go +++ b/sdk/metric/export/data.go @@ -20,6 +20,131 @@ package export // import "go.opentelemetry.io/otel/sdk/metric/export" -// Metrics is the result of a single collection. -type Metrics struct { /* TODO: implement #2889 */ +import ( + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/unit" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/resource" +) + +// ResourceMetrics is a collection of ScopeMetrics and the associated Resource +// that created them. +type ResourceMetrics struct { + // Resource represents the entity that collected the metrics. + Resource *resource.Resource + // ScopeMetrics are the collection of metrics with unique Scopes. + ScopeMetrics []ScopeMetrics +} + +// ScopeMetrics is a collection of Metrics Produces by a Meter. +type ScopeMetrics struct { + // Scope is the Scope that the Meter was created with. + Scope instrumentation.Scope + // Metrics are a list of aggregations created by the Meter. + Metrics []Metrics +} + +// Metrics is a collection of one or more aggregated timeseries from an Instrument. +type Metrics struct { + // Name is the name of the Instrument that created this data. + Name string + // Description is the description of the Instrument, which can be used in documentation. + Description string + // Unit is the unit in which the Instrument reports. + Unit unit.Unit + // Data is the aggregated data from an Instrument. + Data Aggregation +} + +// Aggregation is the store of data reported by an Instrument. +// It will be one of: Gauge, Sum, Histogram. +type Aggregation interface { + privateAggregation() +} + +// Gauge represents a measurement of the current value of an instrument. +type Gauge struct { + // DataPoints reprents individual aggregated measurements with unique Attributes. + DataPoints []DataPoint +} + +func (Gauge) privateAggregation() {} + +// Sum represents the sum of all measurements of values from an instrument. +type Sum struct { + // DataPoints reprents individual aggregated measurements with unique Attributes. + DataPoints []DataPoint + // Temporality describes if the aggregation is reported as the change from the + // last report time, or the cumulative changes since a fixed start time. + Temporality Temporality + // IsMonotonic represents if this aggregation only increases or decreases. + IsMonotonic bool +} + +func (Sum) privateAggregation() {} + +// DataPoint is a single data point in a timeseries. +type DataPoint struct { + // Attributes is the set of key value pairs that uniquely identify the timeseries. + Attributes []attribute.KeyValue + // StartTime is when the timeseries was started. (optional) + StartTime time.Time + // Time is the time when the timeseries was recorded. (optional) + Time time.Time + // Value is the value of this data point. + Value Value +} + +// Value is a int64 or float64. All Values created by the sdk will be either +// Int64 or Float64. +type Value interface { + privateValue() +} + +// Int64 is a container for an int64 value. +type Int64 int64 + +func (Int64) privateValue() {} + +// Float64 is a container for a float64 value. +type Float64 float64 + +func (Float64) privateValue() {} + +// Histogram represents the histogram of all measurements of values from an instrument. +type Histogram struct { + // DataPoints reprents individual aggregated measurements with unique Attributes. + DataPoints []HistogramDataPoint + // Temporality describes if the aggregation is reported as the change from the + // last report time, or the cumulative changes since a fixed start time. + Temporality Temporality +} + +func (Histogram) privateAggregation() {} + +// HistogramDataPoint is a single histogram data point in a timeseries. +type HistogramDataPoint struct { + // Attributes is the set of key value pairs that uniquely identify the timeseries. + Attributes []attribute.KeyValue + // StartTime is when the timeseries was started. + StartTime time.Time + // Time is the time when the timeseries was recorded. + Time time.Time + + // Count is the number of updates this histogram has been calculated with. + Count uint64 + // Bounds are the upper bounds of the buckets of the histogram. Because the + // last boundary is +infinity this one is implied. + Bounds []float64 + // BucketCounts is the count of each of the buckets. + BucketCounts []uint64 + + // Min is the minimum value recorded. (optional) + Min *float64 + // Max is the maximum value recorded. (optional) + Max *float64 + // Sum is the sum of the values recorded. + Sum float64 } diff --git a/sdk/metric/export/temporality.go b/sdk/metric/export/temporality.go new file mode 100644 index 00000000000..163bec389e8 --- /dev/null +++ b/sdk/metric/export/temporality.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.17 +// +build go1.17 + +package export // import "go.opentelemetry.io/otel/sdk/metric/export" + +// Temporality defines the window that an aggregation was calculated over. +type Temporality uint8 + +const ( + // undefinedTemporality represents an unset Temporality. + //nolint:deadcode,unused,varcheck + undefinedTemporality Temporality = iota + + // CumulativeTemporality defines a measurement interval that continues to + // expand forward in time from a starting point. New measurements are + // added to all previous measurements since a start time. + CumulativeTemporality + + // DeltaTemporality defines a measurement interval that resets each cycle. + // Measurements from one cycle are recorded independently, measurements + // from other cycles do not affect them. + DeltaTemporality +) diff --git a/sdk/metric/exporter.go b/sdk/metric/exporter.go index d62838c9b15..cce68657e54 100644 --- a/sdk/metric/exporter.go +++ b/sdk/metric/exporter.go @@ -41,7 +41,7 @@ type Exporter interface { // implement any retry logic. All errors returned by this function are // considered unrecoverable and will be reported to a configured error // Handler. - Export(context.Context, export.Metrics) error + Export(context.Context, export.ResourceMetrics) error // ForceFlush flushes any metric data held by an exporter. // diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 7c483050040..1ded52c00ca 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -90,10 +90,10 @@ func (mr *manualReader) Shutdown(context.Context) error { // Collect gathers all metrics from the SDK, calling any callbacks necessary. // Collect will return an error if called after shutdown. -func (mr *manualReader) Collect(ctx context.Context) (export.Metrics, error) { +func (mr *manualReader) Collect(ctx context.Context) (export.ResourceMetrics, error) { p := mr.producer.Load() if p == nil { - return export.Metrics{}, ErrReaderNotRegistered + return export.ResourceMetrics{}, ErrReaderNotRegistered } ph, ok := p.(produceHolder) @@ -103,8 +103,9 @@ func (mr *manualReader) Collect(ctx context.Context) (export.Metrics, error) { // happen, return an error instead of panicking so a users code does // not halt in the processes. err := fmt.Errorf("manual reader: invalid producer: %T", p) - return export.Metrics{}, err + return export.ResourceMetrics{}, err } + return ph.produce(ctx) } diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 200d5989664..8f600f1fbc1 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -199,10 +199,10 @@ func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregatio // exporter, it is left to the caller to handle that if desired. // // An error is returned if this is called after Shutdown. -func (r *periodicReader) Collect(ctx context.Context) (export.Metrics, error) { +func (r *periodicReader) Collect(ctx context.Context) (export.ResourceMetrics, error) { p := r.producer.Load() if p == nil { - return export.Metrics{}, ErrReaderNotRegistered + return export.ResourceMetrics{}, ErrReaderNotRegistered } ph, ok := p.(produceHolder) @@ -212,7 +212,7 @@ func (r *periodicReader) Collect(ctx context.Context) (export.Metrics, error) { // happen, return an error instead of panicking so a users code does // not halt in the processes. err := fmt.Errorf("periodic reader: invalid producer: %T", p) - return export.Metrics{}, err + return export.ResourceMetrics{}, err } return ph.produce(ctx) } diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index 02a9261b4f6..27cbda7e3ca 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -56,14 +56,14 @@ func TestWithInterval(t *testing.T) { } type fnExporter struct { - exportFunc func(context.Context, export.Metrics) error + exportFunc func(context.Context, export.ResourceMetrics) error flushFunc func(context.Context) error shutdownFunc func(context.Context) error } var _ Exporter = (*fnExporter)(nil) -func (e *fnExporter) Export(ctx context.Context, m export.Metrics) error { +func (e *fnExporter) Export(ctx context.Context, m export.ResourceMetrics) error { if e.exportFunc != nil { return e.exportFunc(ctx, m) } @@ -94,7 +94,7 @@ func (ts *periodicReaderTestSuite) SetupTest() { ts.readerTestSuite.SetupTest() e := &fnExporter{ - exportFunc: func(context.Context, export.Metrics) error { return assert.AnError }, + exportFunc: func(context.Context, export.ResourceMetrics) error { return assert.AnError }, flushFunc: func(context.Context) error { return assert.AnError }, shutdownFunc: func(context.Context) error { return assert.AnError }, } @@ -163,7 +163,7 @@ func TestPeriodicReaderRun(t *testing.T) { otel.SetErrorHandler(eh) exp := &fnExporter{ - exportFunc: func(_ context.Context, m export.Metrics) error { + exportFunc: func(_ context.Context, m export.ResourceMetrics) error { // The testProducer produces testMetrics. assert.Equal(t, testMetrics, m) return assert.AnError diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index c365c41b829..3b71cac089c 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -65,7 +65,7 @@ type Reader interface { // Collect gathers and returns all metric data related to the Reader from // the SDK. An error is returned if this is called after Shutdown. - Collect(context.Context) (export.Metrics, error) + Collect(context.Context) (export.ResourceMetrics, error) // ForceFlush flushes all metric measurements held in an export pipeline. // @@ -93,21 +93,21 @@ type producer interface { // produce returns aggregated metrics from a single collection. // // This method is safe to call concurrently. - produce(context.Context) (export.Metrics, error) + produce(context.Context) (export.ResourceMetrics, error) } // produceHolder is used as an atomic.Value to wrap the non-concrete producer // type. type produceHolder struct { - produce func(context.Context) (export.Metrics, error) + produce func(context.Context) (export.ResourceMetrics, error) } // shutdownProducer produces an ErrReaderShutdown error always. type shutdownProducer struct{} // produce returns an ErrReaderShutdown error. -func (p shutdownProducer) produce(context.Context) (export.Metrics, error) { - return export.Metrics{}, ErrReaderShutdown +func (p shutdownProducer) produce(context.Context) (export.ResourceMetrics, error) { + return export.ResourceMetrics{}, ErrReaderShutdown } // ReaderOption applies a configuration option value to either a ManualReader or diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index c5ef1c83119..4a3c3429b44 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -69,7 +69,7 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() { m, err := ts.Reader.Collect(ctx) ts.ErrorIs(err, ErrReaderShutdown) - ts.Equal(export.Metrics{}, m) + ts.Equal(export.ResourceMetrics{}, m) } func (ts *readerTestSuite) TestShutdownTwice() { @@ -88,7 +88,7 @@ func (ts *readerTestSuite) TestMultipleForceFlush() { func (ts *readerTestSuite) TestMultipleRegister() { p0 := testProducer{ - produceFunc: func(ctx context.Context) (export.Metrics, error) { + produceFunc: func(ctx context.Context) (export.ResourceMetrics, error) { // Differentiate this producer from the second by returning an // error. return testMetrics, assert.AnError @@ -143,18 +143,18 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() { m, err := ts.Reader.Collect(ctx) ts.ErrorIs(err, ErrReaderShutdown) - ts.Equal(export.Metrics{}, m) + ts.Equal(export.ResourceMetrics{}, m) } -var testMetrics = export.Metrics{ +var testMetrics = export.ResourceMetrics{ // TODO: test with actual data. } type testProducer struct { - produceFunc func(context.Context) (export.Metrics, error) + produceFunc func(context.Context) (export.ResourceMetrics, error) } -func (p testProducer) produce(ctx context.Context) (export.Metrics, error) { +func (p testProducer) produce(ctx context.Context) (export.ResourceMetrics, error) { if p.produceFunc != nil { return p.produceFunc(ctx) } @@ -168,7 +168,7 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) { // Store bechmark results in a closure to prevent the compiler from // inlining and skipping the function. var ( - collectedMetrics export.Metrics + collectedMetrics export.ResourceMetrics err error ) @@ -178,9 +178,7 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) { for n := 0; n < b.N; n++ { collectedMetrics, err = r.Collect(ctx) - if collectedMetrics != testMetrics || err != nil { - b.Errorf("unexpected Collect response: (%#v, %v)", collectedMetrics, err) - } + assert.Equalf(b, testMetrics, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err) } } }