From 1fce711f51ed8fcf3606f9387baf0f63b988bed1 Mon Sep 17 00:00:00 2001 From: jmacd Date: Sat, 30 May 2020 14:47:22 -0700 Subject: [PATCH 1/5] Typo --- sdk/metric/integrator/simple/simple.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/integrator/simple/simple.go b/sdk/metric/integrator/simple/simple.go index 28c35ac5084..2165f40ef87 100644 --- a/sdk/metric/integrator/simple/simple.go +++ b/sdk/metric/integrator/simple/simple.go @@ -46,7 +46,7 @@ type ( } batch struct { - // RWMutex implements locking for the `CheckpoingSet` interface. + // RWMutex implements locking for the `CheckpointSet` interface. sync.RWMutex values map[batchKey]batchValue } From 8185a367d43bca564ff98fea5c25b4b0bb02407a Mon Sep 17 00:00:00 2001 From: jmacd Date: Sat, 30 May 2020 14:52:26 -0700 Subject: [PATCH 2/5] Swap order of ddsketch.New for consistency w/ histogram.New --- api/global/internal/benchmark_test.go | 4 ++-- exporters/metric/stdout/stdout_test.go | 2 +- sdk/metric/aggregator/ddsketch/ddsketch.go | 2 +- sdk/metric/aggregator/ddsketch/ddsketch_test.go | 6 +++--- sdk/metric/benchmark_test.go | 4 ++-- sdk/metric/selector/simple/simple.go | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/api/global/internal/benchmark_test.go b/api/global/internal/benchmark_test.go index 017afbd95bd..51d5ceae157 100644 --- a/api/global/internal/benchmark_test.go +++ b/api/global/internal/benchmark_test.go @@ -63,9 +63,9 @@ func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggrega if strings.HasSuffix(descriptor.Name(), "minmaxsumcount") { return minmaxsumcount.New(descriptor) } else if strings.HasSuffix(descriptor.Name(), "ddsketch") { - return ddsketch.New(ddsketch.NewDefaultConfig(), descriptor) + return ddsketch.New(descriptor, ddsketch.NewDefaultConfig()) } else if strings.HasSuffix(descriptor.Name(), "array") { - return ddsketch.New(ddsketch.NewDefaultConfig(), descriptor) + return ddsketch.New(descriptor, ddsketch.NewDefaultConfig()) } } return nil diff --git a/exporters/metric/stdout/stdout_test.go b/exporters/metric/stdout/stdout_test.go index 1d5805ba2eb..b401ac881c1 100644 --- a/exporters/metric/stdout/stdout_test.go +++ b/exporters/metric/stdout/stdout_test.go @@ -240,7 +240,7 @@ func TestStdoutValueRecorderFormat(t *testing.T) { func TestStdoutNoData(t *testing.T) { desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind) for name, tc := range map[string]export.Aggregator{ - "ddsketch": ddsketch.New(ddsketch.NewDefaultConfig(), &desc), + "ddsketch": ddsketch.New(&desc, ddsketch.NewDefaultConfig()), "minmaxsumcount": minmaxsumcount.New(&desc), } { tc := tc diff --git a/sdk/metric/aggregator/ddsketch/ddsketch.go b/sdk/metric/aggregator/ddsketch/ddsketch.go index 197d95e4d37..7a2e23524e6 100644 --- a/sdk/metric/aggregator/ddsketch/ddsketch.go +++ b/sdk/metric/aggregator/ddsketch/ddsketch.go @@ -43,7 +43,7 @@ var _ aggregator.MinMaxSumCount = &Aggregator{} var _ aggregator.Distribution = &Aggregator{} // New returns a new DDSketch aggregator. -func New(cfg *Config, desc *metric.Descriptor) *Aggregator { +func New(desc *metric.Descriptor, cfg *Config) *Aggregator { return &Aggregator{ cfg: cfg, kind: desc.NumberKind(), diff --git a/sdk/metric/aggregator/ddsketch/ddsketch_test.go b/sdk/metric/aggregator/ddsketch/ddsketch_test.go index cc68359de11..2b4a3b0c094 100644 --- a/sdk/metric/aggregator/ddsketch/ddsketch_test.go +++ b/sdk/metric/aggregator/ddsketch/ddsketch_test.go @@ -34,7 +34,7 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) { ctx := context.Background() descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind) - agg := New(NewDefaultConfig(), descriptor) + agg := New(descriptor, NewDefaultConfig()) all := test.NewNumbers(profile.NumberKind) for i := 0; i < count; i++ { @@ -94,8 +94,8 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) { ctx := context.Background() descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind) - agg1 := New(NewDefaultConfig(), descriptor) - agg2 := New(NewDefaultConfig(), descriptor) + agg1 := New(descriptor, NewDefaultConfig()) + agg2 := New(descriptor, NewDefaultConfig()) all := test.NewNumbers(profile.NumberKind) for i := 0; i < count; i++ { diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index d2864931982..f7a00fd548e 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -67,9 +67,9 @@ func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggrega if strings.HasSuffix(descriptor.Name(), "minmaxsumcount") { return minmaxsumcount.New(descriptor) } else if strings.HasSuffix(descriptor.Name(), "ddsketch") { - return ddsketch.New(ddsketch.NewDefaultConfig(), descriptor) + return ddsketch.New(descriptor, ddsketch.NewDefaultConfig()) } else if strings.HasSuffix(descriptor.Name(), "array") { - return ddsketch.New(ddsketch.NewDefaultConfig(), descriptor) + return ddsketch.New(descriptor, ddsketch.NewDefaultConfig()) } } return nil diff --git a/sdk/metric/selector/simple/simple.go b/sdk/metric/selector/simple/simple.go index a5956e4f3b0..1cd783e0ef1 100644 --- a/sdk/metric/selector/simple/simple.go +++ b/sdk/metric/selector/simple/simple.go @@ -91,7 +91,7 @@ func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor) export.A func (s selectorSketch) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator { switch descriptor.MetricKind() { case metric.ValueObserverKind, metric.ValueRecorderKind: - return ddsketch.New(s.config, descriptor) + return ddsketch.New(descriptor, s.config) default: return sum.New() } From dca905488df7f7873f62cc2cc65ffb6a1773140b Mon Sep 17 00:00:00 2001 From: jmacd Date: Mon, 8 Jun 2020 22:41:55 -0700 Subject: [PATCH 3/5] Remove Integrator.Process ctx argument --- api/global/internal/benchmark_test.go | 2 +- sdk/export/metric/metric.go | 11 +-- sdk/metric/benchmark_test.go | 83 +++------------------ sdk/metric/correct_test.go | 2 +- sdk/metric/integrator/simple/simple.go | 3 +- sdk/metric/integrator/simple/simple_test.go | 41 +++++----- sdk/metric/sdk.go | 2 +- sdk/metric/stress_test.go | 2 +- 8 files changed, 40 insertions(+), 106 deletions(-) diff --git a/api/global/internal/benchmark_test.go b/api/global/internal/benchmark_test.go index 51d5ceae157..0f5d5437daf 100644 --- a/api/global/internal/benchmark_test.go +++ b/api/global/internal/benchmark_test.go @@ -71,7 +71,7 @@ func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggrega return nil } -func (*benchFixture) Process(context.Context, export.Record) error { +func (*benchFixture) Process(export.Record) error { return nil } diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index f22cdec0f3b..4317d21ba16 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -62,11 +62,12 @@ type Integrator interface { // Process is called by the SDK once per internal record, // passing the export Record (a Descriptor, the corresponding - // Labels, and the checkpointed Aggregator). - // - // The Context argument originates from the controller that - // orchestrates collection. - Process(ctx context.Context, record Record) error + // Labels, and the checkpointed Aggregator). This call has no + // Context argument because it is expected to perform only + // computation. An SDK is not expected to call exporters from + // with Process, use a controller for that (see + // ./controllers/{pull,push}. + Process(record Record) error } // AggregationSelector supports selecting the kind of Aggregator to diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index f7a00fd548e..2f880705f0a 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -32,13 +32,10 @@ import ( "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" ) -type processFunc func(context.Context, export.Record) error - type benchFixture struct { meter metric.MeterMust accumulator *sdk.Accumulator B *testing.B - pcb processFunc } func newFixture(b *testing.B) *benchFixture { @@ -52,10 +49,6 @@ func newFixture(b *testing.B) *benchFixture { return bf } -func (f *benchFixture) setProcessCallback(cb processFunc) { - f.pcb = cb -} - func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator { name := descriptor.Name() switch { @@ -75,11 +68,8 @@ func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggrega return nil } -func (f *benchFixture) Process(ctx context.Context, rec export.Record) error { - if f.pcb == nil { - return nil - } - return f.pcb(ctx, rec) +func (f *benchFixture) Process(rec export.Record) error { + return nil } func (*benchFixture) CheckpointSet() export.CheckpointSet { @@ -201,28 +191,14 @@ func BenchmarkAcquireReleaseExistingHandle(b *testing.B) { var benchmarkIteratorVar kv.KeyValue func benchmarkIterator(b *testing.B, n int) { - fix := newFixture(b) - fix.setProcessCallback(func(ctx context.Context, rec export.Record) error { - var kv kv.KeyValue - li := rec.Labels().Iter() - fix.B.StartTimer() - for i := 0; i < fix.B.N; i++ { - iter := li - // test getting only the first element - if iter.Next() { - kv = iter.Label() - } - } - fix.B.StopTimer() - benchmarkIteratorVar = kv - return nil - }) - cnt := fix.meter.NewInt64Counter("int64.counter") - ctx := context.Background() - cnt.Add(ctx, 1, makeLabels(n)...) - + labels := label.NewSet(makeLabels(n)...) b.ResetTimer() - fix.accumulator.Collect(ctx) + for i := 0; i < b.N; i++ { + iter := labels.Iter() + for iter.Next() { + benchmarkIteratorVar = iter.Label() + } + } } func BenchmarkIterator_0(b *testing.B) { @@ -560,11 +536,6 @@ func BenchmarkBatchRecord_8Labels_8Instruments(b *testing.B) { func BenchmarkRepeatedDirectCalls(b *testing.B) { ctx := context.Background() fix := newFixture(b) - encoder := label.DefaultEncoder() - fix.pcb = func(_ context.Context, rec export.Record) error { - _ = rec.Labels().Encoded(encoder) - return nil - } c := fix.meter.NewInt64Counter("int64.counter") k := kv.String("bench", "true") @@ -576,39 +547,3 @@ func BenchmarkRepeatedDirectCalls(b *testing.B) { fix.accumulator.Collect(ctx) } } - -// LabelIterator - -func BenchmarkLabelIterator(b *testing.B) { - const labelCount = 1024 - ctx := context.Background() - fix := newFixture(b) - - var rec export.Record - fix.pcb = func(_ context.Context, processRec export.Record) error { - rec = processRec - return nil - } - - keyValues := makeLabels(labelCount) - counter := fix.meter.NewInt64Counter("test.counter") - counter.Add(ctx, 1, keyValues...) - - fix.accumulator.Collect(ctx) - - b.ResetTimer() - - labels := rec.Labels() - iter := labels.Iter() - var val kv.KeyValue - for i := 0; i < b.N; i++ { - if !iter.Next() { - iter = labels.Iter() - iter.Next() - } - val = iter.Label() - } - if false { - fmt.Println(val) - } -} diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index b4eb8daddd5..ac5b35c4a0d 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -119,7 +119,7 @@ func (ci *correctnessIntegrator) CheckpointSet() export.CheckpointSet { func (*correctnessIntegrator) FinishedCollection() { } -func (ci *correctnessIntegrator) Process(_ context.Context, record export.Record) error { +func (ci *correctnessIntegrator) Process(record export.Record) error { ci.records = append(ci.records, record) return nil } diff --git a/sdk/metric/integrator/simple/simple.go b/sdk/metric/integrator/simple/simple.go index 2165f40ef87..178c21fe043 100644 --- a/sdk/metric/integrator/simple/simple.go +++ b/sdk/metric/integrator/simple/simple.go @@ -15,7 +15,6 @@ package simple // import "go.opentelemetry.io/otel/sdk/metric/integrator/simple" import ( - "context" "errors" "sync" @@ -65,7 +64,7 @@ func New(selector export.AggregationSelector, stateful bool) *Integrator { } } -func (b *Integrator) Process(_ context.Context, record export.Record) error { +func (b *Integrator) Process(record export.Record) error { desc := record.Descriptor() key := batchKey{ descriptor: desc, diff --git a/sdk/metric/integrator/simple/simple_test.go b/sdk/metric/integrator/simple/simple_test.go index 54fecdd7f7b..bac89ee9efe 100644 --- a/sdk/metric/integrator/simple/simple_test.go +++ b/sdk/metric/integrator/simple/simple_test.go @@ -30,34 +30,33 @@ import ( // These tests use the ../test label encoding. func TestSimpleStateless(t *testing.T) { - ctx := context.Background() b := simple.New(test.NewAggregationSelector(), false) // Set initial lastValue values - _ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 10)) - _ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels2, 20)) - _ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels3, 30)) + _ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 10)) + _ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels2, 20)) + _ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels3, 30)) - _ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueBDesc, test.Labels1, 10)) - _ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueBDesc, test.Labels2, 20)) - _ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueBDesc, test.Labels3, 30)) + _ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels1, 10)) + _ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels2, 20)) + _ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels3, 30)) // Another lastValue Set for Labels1 - _ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 50)) - _ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueBDesc, test.Labels1, 50)) + _ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 50)) + _ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels1, 50)) // Set initial counter values - _ = b.Process(ctx, test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10)) - _ = b.Process(ctx, test.NewCounterRecord(&test.CounterADesc, test.Labels2, 20)) - _ = b.Process(ctx, test.NewCounterRecord(&test.CounterADesc, test.Labels3, 40)) + _ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10)) + _ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels2, 20)) + _ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels3, 40)) - _ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 10)) - _ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels2, 20)) - _ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels3, 40)) + _ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 10)) + _ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels2, 20)) + _ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels3, 40)) // Another counter Add for Labels1 - _ = b.Process(ctx, test.NewCounterRecord(&test.CounterADesc, test.Labels1, 50)) - _ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 50)) + _ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels1, 50)) + _ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 50)) checkpointSet := b.CheckpointSet() @@ -97,11 +96,11 @@ func TestSimpleStateful(t *testing.T) { counterA := test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10) caggA := counterA.Aggregator() - _ = b.Process(ctx, counterA) + _ = b.Process(counterA) counterB := test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 10) caggB := counterB.Aggregator() - _ = b.Process(ctx, counterB) + _ = b.Process(counterB) checkpointSet := b.CheckpointSet() b.FinishedCollection() @@ -140,8 +139,8 @@ func TestSimpleStateful(t *testing.T) { b.FinishedCollection() // Now process the second update - _ = b.Process(ctx, export.NewRecord(&test.CounterADesc, test.Labels1, test.Resource, caggA)) - _ = b.Process(ctx, export.NewRecord(&test.CounterBDesc, test.Labels1, test.Resource, caggB)) + _ = b.Process(export.NewRecord(&test.CounterADesc, test.Labels1, test.Resource, caggA)) + _ = b.Process(export.NewRecord(&test.CounterBDesc, test.Labels1, test.Resource, caggB)) checkpointSet = b.CheckpointSet() diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 3f6d11204b3..db0d2274142 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -467,7 +467,7 @@ func (m *Accumulator) checkpoint(ctx context.Context, descriptor *metric.Descrip recorder.Checkpoint(ctx, descriptor) exportRecord := export.NewRecord(descriptor, labels, m.resource, recorder) - err := m.integrator.Process(ctx, exportRecord) + err := m.integrator.Process(exportRecord) if err != nil { global.Handle(err) } diff --git a/sdk/metric/stress_test.go b/sdk/metric/stress_test.go index 7bdd72432b2..92094de340a 100644 --- a/sdk/metric/stress_test.go +++ b/sdk/metric/stress_test.go @@ -263,7 +263,7 @@ func (*testFixture) CheckpointSet() export.CheckpointSet { func (*testFixture) FinishedCollection() { } -func (f *testFixture) Process(_ context.Context, record export.Record) error { +func (f *testFixture) Process(record export.Record) error { labels := record.Labels().ToSlice() key := testKey{ labels: canonicalizeLabels(labels), From ffc46b3f289e15b1d7b3fa0ad18339af394a15c9 Mon Sep 17 00:00:00 2001 From: jmacd Date: Mon, 8 Jun 2020 22:51:08 -0700 Subject: [PATCH 4/5] Remove Aggregator.Checkpoint ctx argument --- exporters/metric/stdout/stdout_test.go | 16 ++++++------- exporters/metric/test/test.go | 2 +- .../otlp/internal/transform/metric_test.go | 10 ++++---- exporters/otlp/otlp_metric_test.go | 2 +- sdk/export/metric/metric.go | 6 ++--- sdk/metric/aggregator/array/array.go | 2 +- sdk/metric/aggregator/array/array_test.go | 17 ++++---------- sdk/metric/aggregator/ddsketch/ddsketch.go | 2 +- .../aggregator/ddsketch/ddsketch_test.go | 10 +++----- sdk/metric/aggregator/histogram/histogram.go | 2 +- .../aggregator/histogram/histogram_test.go | 14 ++++------- sdk/metric/aggregator/lastvalue/lastvalue.go | 2 +- .../aggregator/lastvalue/lastvalue_test.go | 13 ++++------- sdk/metric/aggregator/minmaxsumcount/mmsc.go | 2 +- .../aggregator/minmaxsumcount/mmsc_test.go | 14 ++++------- sdk/metric/aggregator/sum/sum.go | 2 +- sdk/metric/aggregator/sum/sum_test.go | 15 ++++-------- sdk/metric/histogram_stress_test.go | 2 +- sdk/metric/integrator/simple/simple_test.go | 4 ++-- sdk/metric/integrator/test/test.go | 4 ++-- sdk/metric/minmaxsumcount_stress_test.go | 2 +- sdk/metric/sdk.go | 23 ++++++++----------- 22 files changed, 64 insertions(+), 102 deletions(-) diff --git a/exporters/metric/stdout/stdout_test.go b/exporters/metric/stdout/stdout_test.go index b401ac881c1..9f7e2a62495 100644 --- a/exporters/metric/stdout/stdout_test.go +++ b/exporters/metric/stdout/stdout_test.go @@ -101,7 +101,7 @@ func TestStdoutTimestamp(t *testing.T) { desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Int64NumberKind) lvagg := lastvalue.New() aggtest.CheckedUpdate(t, lvagg, metric.NewInt64Number(321), &desc) - lvagg.Checkpoint(ctx, &desc) + lvagg.Checkpoint(&desc) checkpointSet.Add(&desc, lvagg) @@ -146,7 +146,7 @@ func TestStdoutCounterFormat(t *testing.T) { desc := metric.NewDescriptor("test.name", metric.CounterKind, metric.Int64NumberKind) cagg := sum.New() aggtest.CheckedUpdate(fix.t, cagg, metric.NewInt64Number(123), &desc) - cagg.Checkpoint(fix.ctx, &desc) + cagg.Checkpoint(&desc) checkpointSet.Add(&desc, cagg, kv.String("A", "B"), kv.String("C", "D")) @@ -163,7 +163,7 @@ func TestStdoutLastValueFormat(t *testing.T) { desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind) lvagg := lastvalue.New() aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc) - lvagg.Checkpoint(fix.ctx, &desc) + lvagg.Checkpoint(&desc) checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D")) @@ -181,7 +181,7 @@ func TestStdoutMinMaxSumCount(t *testing.T) { magg := minmaxsumcount.New(&desc) aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(123.456), &desc) aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(876.543), &desc) - magg.Checkpoint(fix.ctx, &desc) + magg.Checkpoint(&desc) checkpointSet.Add(&desc, magg, kv.String("A", "B"), kv.String("C", "D")) @@ -204,7 +204,7 @@ func TestStdoutValueRecorderFormat(t *testing.T) { aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(float64(i)+0.5), &desc) } - magg.Checkpoint(fix.ctx, &desc) + magg.Checkpoint(&desc) checkpointSet.Add(&desc, magg, kv.String("A", "B"), kv.String("C", "D")) @@ -252,7 +252,7 @@ func TestStdoutNoData(t *testing.T) { checkpointSet := test.NewCheckpointSet(testResource) magg := tc - magg.Checkpoint(fix.ctx, &desc) + magg.Checkpoint(&desc) checkpointSet.Add(&desc, magg) @@ -270,7 +270,7 @@ func TestStdoutLastValueNotSet(t *testing.T) { desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind) lvagg := lastvalue.New() - lvagg.Checkpoint(fix.ctx, &desc) + lvagg.Checkpoint(&desc) checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D")) @@ -321,7 +321,7 @@ func TestStdoutResource(t *testing.T) { desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind) lvagg := lastvalue.New() aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc) - lvagg.Checkpoint(fix.ctx, &desc) + lvagg.Checkpoint(&desc) checkpointSet.Add(&desc, lvagg, tc.attrs...) diff --git a/exporters/metric/test/test.go b/exporters/metric/test/test.go index 7a788efd58f..396c8284c12 100644 --- a/exporters/metric/test/test.go +++ b/exporters/metric/test/test.go @@ -105,7 +105,7 @@ func (p *CheckpointSet) updateAggregator(desc *metric.Descriptor, newAgg export. ctx := context.Background() // Updates and checkpoint the new aggregator _ = newAgg.Update(ctx, createNumber(desc, v), desc) - newAgg.Checkpoint(ctx, desc) + newAgg.Checkpoint(desc) // Try to add this aggregator to the CheckpointSet agg, added := p.Add(desc, newAgg, labels...) diff --git a/exporters/otlp/internal/transform/metric_test.go b/exporters/otlp/internal/transform/metric_test.go index 79f71e187f3..930915d2cb4 100644 --- a/exporters/otlp/internal/transform/metric_test.go +++ b/exporters/otlp/internal/transform/metric_test.go @@ -89,7 +89,7 @@ func TestMinMaxSumCountValue(t *testing.T) { assert.EqualError(t, err, aggregator.ErrNoData.Error()) // Checkpoint to set non-zero values - mmsc.Checkpoint(context.Background(), &metric.Descriptor{}) + mmsc.Checkpoint(&metric.Descriptor{}) min, max, sum, count, err := minMaxSumCountValues(mmsc) if assert.NoError(t, err) { assert.Equal(t, min, metric.NewInt64Number(1)) @@ -146,7 +146,7 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) { if !assert.NoError(t, mmsc.Update(ctx, 1, &metric.Descriptor{})) { return } - mmsc.Checkpoint(ctx, &metric.Descriptor{}) + mmsc.Checkpoint(&metric.Descriptor{}) for _, test := range tests { desc := metric.NewDescriptor(test.name, test.metricKind, test.numberKind, metric.WithDescription(test.description), @@ -165,7 +165,7 @@ func TestMinMaxSumCountDatapoints(t *testing.T) { mmsc := minmaxsumcount.New(&desc) assert.NoError(t, mmsc.Update(context.Background(), 1, &desc)) assert.NoError(t, mmsc.Update(context.Background(), 10, &desc)) - mmsc.Checkpoint(context.Background(), &desc) + mmsc.Checkpoint(&desc) expected := []*metricpb.SummaryDataPoint{ { Count: 2, @@ -261,7 +261,7 @@ func TestSumInt64DataPoints(t *testing.T) { labels := label.NewSet() s := sumAgg.New() assert.NoError(t, s.Update(context.Background(), metric.Number(1), &desc)) - s.Checkpoint(context.Background(), &desc) + s.Checkpoint(&desc) if m, err := sum(&desc, &labels, s); assert.NoError(t, err) { assert.Equal(t, []*metricpb.Int64DataPoint{{Value: 1}}, m.Int64DataPoints) assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDataPoints) @@ -275,7 +275,7 @@ func TestSumFloat64DataPoints(t *testing.T) { labels := label.NewSet() s := sumAgg.New() assert.NoError(t, s.Update(context.Background(), metric.NewFloat64Number(1), &desc)) - s.Checkpoint(context.Background(), &desc) + s.Checkpoint(&desc) if m, err := sum(&desc, &labels, s); assert.NoError(t, err) { assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints) assert.Equal(t, []*metricpb.DoubleDataPoint{{Value: 1}}, m.DoubleDataPoints) diff --git a/exporters/otlp/otlp_metric_test.go b/exporters/otlp/otlp_metric_test.go index 390a59995c5..4d90c5f024b 100644 --- a/exporters/otlp/otlp_metric_test.go +++ b/exporters/otlp/otlp_metric_test.go @@ -657,7 +657,7 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me default: t.Fatalf("invalid number kind: %v", r.nKind) } - agg.Checkpoint(ctx, &desc) + agg.Checkpoint(&desc) equiv := r.resource.Equivalent() resources[equiv] = r.resource diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index 4317d21ba16..6c3d49a5814 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -120,9 +120,9 @@ type Aggregator interface { // accessed using by converting to one a suitable interface // types in the `aggregator` sub-package. // - // The Context argument originates from the controller that - // orchestrates collection. - Checkpoint(context.Context, *metric.Descriptor) + // This call has no Context argument because it is expected to + // perform only computation. + Checkpoint(*metric.Descriptor) // Merge combines the checkpointed state from the argument // aggregator into this aggregator's checkpointed state. diff --git a/sdk/metric/aggregator/array/array.go b/sdk/metric/aggregator/array/array.go index 2d92a54fee8..f3b15c564bc 100644 --- a/sdk/metric/aggregator/array/array.go +++ b/sdk/metric/aggregator/array/array.go @@ -85,7 +85,7 @@ func (c *Aggregator) Points() ([]metric.Number, error) { // Checkpoint saves the current state and resets the current state to // the empty set, taking a lock to prevent concurrent Update() calls. -func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) { +func (c *Aggregator) Checkpoint(desc *metric.Descriptor) { c.lock.Lock() c.checkpoint, c.current = c.current, nil c.lock.Unlock() diff --git a/sdk/metric/aggregator/array/array_test.go b/sdk/metric/aggregator/array/array_test.go index 2c3efb70684..e6ce683bf25 100644 --- a/sdk/metric/aggregator/array/array_test.go +++ b/sdk/metric/aggregator/array/array_test.go @@ -15,7 +15,6 @@ package array import ( - "context" "fmt" "math" "os" @@ -66,8 +65,7 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) { test.CheckedUpdate(t, agg, y, descriptor) } - ctx := context.Background() - agg.Checkpoint(ctx, descriptor) + agg.Checkpoint(descriptor) all.Sort() @@ -116,8 +114,6 @@ type mergeTest struct { } func (mt *mergeTest) run(t *testing.T, profile test.Profile) { - ctx := context.Background() - descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind) agg1 := New() @@ -145,8 +141,8 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) { } } - agg1.Checkpoint(ctx, descriptor) - agg2.Checkpoint(ctx, descriptor) + agg1.Checkpoint(descriptor) + agg2.Checkpoint(descriptor) test.CheckedMerge(t, agg1, agg2, descriptor) @@ -213,8 +209,6 @@ func TestArrayErrors(t *testing.T) { require.Error(t, err) require.Equal(t, err, aggregator.ErrNoData) - ctx := context.Background() - descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind) test.CheckedUpdate(t, agg, metric.Number(0), descriptor) @@ -222,7 +216,7 @@ func TestArrayErrors(t *testing.T) { if profile.NumberKind == metric.Float64NumberKind { test.CheckedUpdate(t, agg, metric.NewFloat64Number(math.NaN()), descriptor) } - agg.Checkpoint(ctx, descriptor) + agg.Checkpoint(descriptor) count, err := agg.Count() require.Equal(t, int64(1), count, "NaN value was not counted") @@ -275,7 +269,6 @@ func TestArrayFloat64(t *testing.T) { all := test.NewNumbers(metric.Float64NumberKind) - ctx := context.Background() agg := New() for _, f := range fpsf(1) { @@ -288,7 +281,7 @@ func TestArrayFloat64(t *testing.T) { test.CheckedUpdate(t, agg, metric.NewFloat64Number(f), descriptor) } - agg.Checkpoint(ctx, descriptor) + agg.Checkpoint(descriptor) all.Sort() diff --git a/sdk/metric/aggregator/ddsketch/ddsketch.go b/sdk/metric/aggregator/ddsketch/ddsketch.go index 7a2e23524e6..32069c03136 100644 --- a/sdk/metric/aggregator/ddsketch/ddsketch.go +++ b/sdk/metric/aggregator/ddsketch/ddsketch.go @@ -103,7 +103,7 @@ func (c *Aggregator) toNumber(f float64) metric.Number { // Checkpoint saves the current state and resets the current state to // the empty set, taking a lock to prevent concurrent Update() calls. -func (c *Aggregator) Checkpoint(ctx context.Context, _ *metric.Descriptor) { +func (c *Aggregator) Checkpoint(*metric.Descriptor) { replace := sdk.NewDDSketch(c.cfg) c.lock.Lock() diff --git a/sdk/metric/aggregator/ddsketch/ddsketch_test.go b/sdk/metric/aggregator/ddsketch/ddsketch_test.go index 2b4a3b0c094..76fdcdd3d4f 100644 --- a/sdk/metric/aggregator/ddsketch/ddsketch_test.go +++ b/sdk/metric/aggregator/ddsketch/ddsketch_test.go @@ -15,7 +15,6 @@ package ddsketch import ( - "context" "fmt" "testing" @@ -31,8 +30,6 @@ type updateTest struct { } func (ut *updateTest) run(t *testing.T, profile test.Profile) { - ctx := context.Background() - descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind) agg := New(descriptor, NewDefaultConfig()) @@ -47,7 +44,7 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) { test.CheckedUpdate(t, agg, y, descriptor) } - agg.Checkpoint(ctx, descriptor) + agg.Checkpoint(descriptor) all.Sort() @@ -91,7 +88,6 @@ type mergeTest struct { } func (mt *mergeTest) run(t *testing.T, profile test.Profile) { - ctx := context.Background() descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind) agg1 := New(descriptor, NewDefaultConfig()) @@ -122,8 +118,8 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) { } } - agg1.Checkpoint(ctx, descriptor) - agg2.Checkpoint(ctx, descriptor) + agg1.Checkpoint(descriptor) + agg2.Checkpoint(descriptor) test.CheckedMerge(t, agg1, agg2, descriptor) diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index a8e3a76ded5..11aaaf229a0 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -107,7 +107,7 @@ func (c *Aggregator) Histogram() (aggregator.Buckets, error) { // the empty set. Since no locks are taken, there is a chance that // the independent Sum, Count and Bucket Count are not consistent with each // other. -func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) { +func (c *Aggregator) Checkpoint(desc *metric.Descriptor) { c.lock.Lock() c.checkpoint, c.current = c.current, emptyState(c.boundaries) c.lock.Unlock() diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index 13a1fcb5210..c9e32ed43bc 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -15,7 +15,6 @@ package histogram_test import ( - "context" "math" "math/rand" "sort" @@ -81,7 +80,6 @@ func TestHistogramPositiveAndNegative(t *testing.T) { // Validates count, sum and buckets for a given profile and policy func testHistogram(t *testing.T, profile test.Profile, policy policy) { - ctx := context.Background() descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind) agg := histogram.New(descriptor, boundaries) @@ -94,7 +92,7 @@ func testHistogram(t *testing.T, profile test.Profile, policy policy) { test.CheckedUpdate(t, agg, x, descriptor) } - agg.Checkpoint(ctx, descriptor) + agg.Checkpoint(descriptor) all.Sort() @@ -137,8 +135,6 @@ func TestHistogramInitial(t *testing.T) { } func TestHistogramMerge(t *testing.T) { - ctx := context.Background() - test.RunProfiles(t, func(t *testing.T, profile test.Profile) { descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind) @@ -158,8 +154,8 @@ func TestHistogramMerge(t *testing.T) { test.CheckedUpdate(t, agg2, x, descriptor) } - agg1.Checkpoint(ctx, descriptor) - agg2.Checkpoint(ctx, descriptor) + agg1.Checkpoint(descriptor) + agg2.Checkpoint(descriptor) test.CheckedMerge(t, agg1, agg2, descriptor) @@ -192,13 +188,11 @@ func TestHistogramMerge(t *testing.T) { } func TestHistogramNotSet(t *testing.T) { - ctx := context.Background() - test.RunProfiles(t, func(t *testing.T, profile test.Profile) { descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind) agg := histogram.New(descriptor, boundaries) - agg.Checkpoint(ctx, descriptor) + agg.Checkpoint(descriptor) asum, err := agg.Sum() require.Equal(t, metric.Number(0), asum, "Empty checkpoint sum = 0") diff --git a/sdk/metric/aggregator/lastvalue/lastvalue.go b/sdk/metric/aggregator/lastvalue/lastvalue.go index 338a05cf12a..8c2f74cb31c 100644 --- a/sdk/metric/aggregator/lastvalue/lastvalue.go +++ b/sdk/metric/aggregator/lastvalue/lastvalue.go @@ -80,7 +80,7 @@ func (g *Aggregator) LastValue() (metric.Number, time.Time, error) { } // Checkpoint atomically saves the current value. -func (g *Aggregator) Checkpoint(ctx context.Context, _ *metric.Descriptor) { +func (g *Aggregator) Checkpoint(*metric.Descriptor) { g.checkpoint = atomic.LoadPointer(&g.current) } diff --git a/sdk/metric/aggregator/lastvalue/lastvalue_test.go b/sdk/metric/aggregator/lastvalue/lastvalue_test.go index 1b4da094fb6..d706927dddc 100644 --- a/sdk/metric/aggregator/lastvalue/lastvalue_test.go +++ b/sdk/metric/aggregator/lastvalue/lastvalue_test.go @@ -15,7 +15,6 @@ package lastvalue import ( - "context" "math/rand" "os" "testing" @@ -50,8 +49,6 @@ func TestMain(m *testing.M) { } func TestLastValueUpdate(t *testing.T) { - ctx := context.Background() - test.RunProfiles(t, func(t *testing.T, profile test.Profile) { agg := New() @@ -64,7 +61,7 @@ func TestLastValueUpdate(t *testing.T) { test.CheckedUpdate(t, agg, x, record) } - agg.Checkpoint(ctx, record) + agg.Checkpoint(record) lv, _, err := agg.LastValue() require.Equal(t, last, lv, "Same last value - non-monotonic") @@ -73,8 +70,6 @@ func TestLastValueUpdate(t *testing.T) { } func TestLastValueMerge(t *testing.T) { - ctx := context.Background() - test.RunProfiles(t, func(t *testing.T, profile test.Profile) { agg1 := New() agg2 := New() @@ -88,8 +83,8 @@ func TestLastValueMerge(t *testing.T) { test.CheckedUpdate(t, agg1, first1, descriptor) test.CheckedUpdate(t, agg2, first2, descriptor) - agg1.Checkpoint(ctx, descriptor) - agg2.Checkpoint(ctx, descriptor) + agg1.Checkpoint(descriptor) + agg2.Checkpoint(descriptor) _, t1, err := agg1.LastValue() require.Nil(t, err) @@ -110,7 +105,7 @@ func TestLastValueNotSet(t *testing.T) { descriptor := test.NewAggregatorTest(metric.ValueObserverKind, metric.Int64NumberKind) g := New() - g.Checkpoint(context.Background(), descriptor) + g.Checkpoint(descriptor) value, timestamp, err := g.LastValue() require.Equal(t, aggregator.ErrNoData, err) diff --git a/sdk/metric/aggregator/minmaxsumcount/mmsc.go b/sdk/metric/aggregator/minmaxsumcount/mmsc.go index a66bec496c8..f72e8e3ae36 100644 --- a/sdk/metric/aggregator/minmaxsumcount/mmsc.go +++ b/sdk/metric/aggregator/minmaxsumcount/mmsc.go @@ -102,7 +102,7 @@ func (c *Aggregator) Max() (metric.Number, error) { // Checkpoint saves the current state and resets the current state to // the empty set. -func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) { +func (c *Aggregator) Checkpoint(desc *metric.Descriptor) { c.lock.Lock() c.checkpoint, c.current = c.current, c.emptyState() c.lock.Unlock() diff --git a/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go b/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go index d01916b8e11..7e9325b8c75 100644 --- a/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go +++ b/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go @@ -15,7 +15,6 @@ package minmaxsumcount import ( - "context" "math" "math/rand" "testing" @@ -78,7 +77,6 @@ func TestMinMaxSumCountPositiveAndNegative(t *testing.T) { // Validates min, max, sum and count for a given profile and policy func minMaxSumCount(t *testing.T, profile test.Profile, policy policy) { - ctx := context.Background() descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind) agg := New(descriptor) @@ -91,7 +89,7 @@ func minMaxSumCount(t *testing.T, profile test.Profile, policy policy) { test.CheckedUpdate(t, agg, x, descriptor) } - agg.Checkpoint(ctx, descriptor) + agg.Checkpoint(descriptor) all.Sort() @@ -124,8 +122,6 @@ func minMaxSumCount(t *testing.T, profile test.Profile, policy policy) { } func TestMinMaxSumCountMerge(t *testing.T) { - ctx := context.Background() - test.RunProfiles(t, func(t *testing.T, profile test.Profile) { descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind) @@ -145,8 +141,8 @@ func TestMinMaxSumCountMerge(t *testing.T) { test.CheckedUpdate(t, agg2, x, descriptor) } - agg1.Checkpoint(ctx, descriptor) - agg2.Checkpoint(ctx, descriptor) + agg1.Checkpoint(descriptor) + agg2.Checkpoint(descriptor) test.CheckedMerge(t, agg1, agg2, descriptor) @@ -182,13 +178,11 @@ func TestMinMaxSumCountMerge(t *testing.T) { } func TestMaxSumCountNotSet(t *testing.T) { - ctx := context.Background() - test.RunProfiles(t, func(t *testing.T, profile test.Profile) { descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind) agg := New(descriptor) - agg.Checkpoint(ctx, descriptor) + agg.Checkpoint(descriptor) asum, err := agg.Sum() require.Equal(t, metric.Number(0), asum, "Empty checkpoint sum = 0") diff --git a/sdk/metric/aggregator/sum/sum.go b/sdk/metric/aggregator/sum/sum.go index 1bf6505e1d9..832dce33f7c 100644 --- a/sdk/metric/aggregator/sum/sum.go +++ b/sdk/metric/aggregator/sum/sum.go @@ -51,7 +51,7 @@ func (c *Aggregator) Sum() (metric.Number, error) { // Checkpoint atomically saves the current value and resets the // current sum to zero. -func (c *Aggregator) Checkpoint(ctx context.Context, _ *metric.Descriptor) { +func (c *Aggregator) Checkpoint(*metric.Descriptor) { c.checkpoint = c.current.SwapNumberAtomic(metric.Number(0)) } diff --git a/sdk/metric/aggregator/sum/sum_test.go b/sdk/metric/aggregator/sum/sum_test.go index 617254d2ca4..6df2de3f3ae 100644 --- a/sdk/metric/aggregator/sum/sum_test.go +++ b/sdk/metric/aggregator/sum/sum_test.go @@ -15,7 +15,6 @@ package sum import ( - "context" "os" "testing" "unsafe" @@ -49,8 +48,6 @@ func TestMain(m *testing.M) { } func TestCounterSum(t *testing.T) { - ctx := context.Background() - test.RunProfiles(t, func(t *testing.T, profile test.Profile) { agg := New() @@ -63,7 +60,7 @@ func TestCounterSum(t *testing.T) { test.CheckedUpdate(t, agg, x, descriptor) } - agg.Checkpoint(ctx, descriptor) + agg.Checkpoint(descriptor) asum, err := agg.Sum() require.Equal(t, sum, asum, "Same sum - monotonic") @@ -72,8 +69,6 @@ func TestCounterSum(t *testing.T) { } func TestValueRecorderSum(t *testing.T) { - ctx := context.Background() - test.RunProfiles(t, func(t *testing.T, profile test.Profile) { agg := New() @@ -90,7 +85,7 @@ func TestValueRecorderSum(t *testing.T) { sum.AddNumber(profile.NumberKind, r2) } - agg.Checkpoint(ctx, descriptor) + agg.Checkpoint(descriptor) asum, err := agg.Sum() require.Equal(t, sum, asum, "Same sum - monotonic") @@ -99,8 +94,6 @@ func TestValueRecorderSum(t *testing.T) { } func TestCounterMerge(t *testing.T) { - ctx := context.Background() - test.RunProfiles(t, func(t *testing.T, profile test.Profile) { agg1 := New() agg2 := New() @@ -115,8 +108,8 @@ func TestCounterMerge(t *testing.T) { test.CheckedUpdate(t, agg2, x, descriptor) } - agg1.Checkpoint(ctx, descriptor) - agg2.Checkpoint(ctx, descriptor) + agg1.Checkpoint(descriptor) + agg2.Checkpoint(descriptor) test.CheckedMerge(t, agg1, agg2, descriptor) diff --git a/sdk/metric/histogram_stress_test.go b/sdk/metric/histogram_stress_test.go index f4a1ca687f1..ebce935eb8d 100644 --- a/sdk/metric/histogram_stress_test.go +++ b/sdk/metric/histogram_stress_test.go @@ -44,7 +44,7 @@ func TestStressInt64Histogram(t *testing.T) { startTime := time.Now() for time.Since(startTime) < time.Second { - h.Checkpoint(context.Background(), &desc) + h.Checkpoint(&desc) b, _ := h.Histogram() c, _ := h.Count() diff --git a/sdk/metric/integrator/simple/simple_test.go b/sdk/metric/integrator/simple/simple_test.go index bac89ee9efe..eecbf7cc7ed 100644 --- a/sdk/metric/integrator/simple/simple_test.go +++ b/sdk/metric/integrator/simple/simple_test.go @@ -125,8 +125,8 @@ func TestSimpleStateful(t *testing.T) { // Update and re-checkpoint the original record. _ = caggA.Update(ctx, metric.NewInt64Number(20), &test.CounterADesc) _ = caggB.Update(ctx, metric.NewInt64Number(20), &test.CounterBDesc) - caggA.Checkpoint(ctx, &test.CounterADesc) - caggB.Checkpoint(ctx, &test.CounterBDesc) + caggA.Checkpoint(&test.CounterADesc) + caggB.Checkpoint(&test.CounterBDesc) // As yet cagg has not been passed to Integrator.Process. Should // not see an update. diff --git a/sdk/metric/integrator/test/test.go b/sdk/metric/integrator/test/test.go index 7cc6288ab28..008cfeb1dfc 100644 --- a/sdk/metric/integrator/test/test.go +++ b/sdk/metric/integrator/test/test.go @@ -131,7 +131,7 @@ func LastValueAgg(desc *metric.Descriptor, v int64) export.Aggregator { ctx := context.Background() gagg := lastvalue.New() _ = gagg.Update(ctx, metric.NewInt64Number(v), desc) - gagg.Checkpoint(ctx, desc) + gagg.Checkpoint(desc) return gagg } @@ -150,7 +150,7 @@ func CounterAgg(desc *metric.Descriptor, v int64) export.Aggregator { ctx := context.Background() cagg := sum.New() _ = cagg.Update(ctx, metric.NewInt64Number(v), desc) - cagg.Checkpoint(ctx, desc) + cagg.Checkpoint(desc) return cagg } diff --git a/sdk/metric/minmaxsumcount_stress_test.go b/sdk/metric/minmaxsumcount_stress_test.go index 0b51f66a64d..916f20efbe6 100644 --- a/sdk/metric/minmaxsumcount_stress_test.go +++ b/sdk/metric/minmaxsumcount_stress_test.go @@ -46,7 +46,7 @@ func TestStressInt64MinMaxSumCount(t *testing.T) { startTime := time.Now() for time.Since(startTime) < time.Second { - mmsc.Checkpoint(context.Background(), &desc) + mmsc.Checkpoint(&desc) s, _ := mmsc.Sum() c, _ := mmsc.Count() diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index db0d2274142..c134d9fb647 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -50,7 +50,6 @@ type ( // `*asyncInstrument` instances asyncLock sync.Mutex asyncInstruments *internal.AsyncInstrumentState - asyncContext context.Context // currentEpoch is the current epoch number. It is // incremented in `Collect()`. @@ -354,13 +353,13 @@ func (m *Accumulator) Collect(ctx context.Context) int { defer m.collectLock.Unlock() checkpointed := m.observeAsyncInstruments(ctx) - checkpointed += m.collectSyncInstruments(ctx) + checkpointed += m.collectSyncInstruments() m.currentEpoch++ return checkpointed } -func (m *Accumulator) collectSyncInstruments(ctx context.Context) int { +func (m *Accumulator) collectSyncInstruments() int { checkpointed := 0 m.current.Range(func(key interface{}, value interface{}) bool { @@ -374,7 +373,7 @@ func (m *Accumulator) collectSyncInstruments(ctx context.Context) int { if mods != coll { // Updates happened in this interval, // checkpoint and continue. - checkpointed += m.checkpointRecord(ctx, inuse) + checkpointed += m.checkpointRecord(inuse) inuse.collectedCount = mods return true } @@ -395,7 +394,7 @@ func (m *Accumulator) collectSyncInstruments(ctx context.Context) int { // last we'll see of this record, checkpoint mods = atomic.LoadInt64(&inuse.updateCount) if mods != coll { - checkpointed += m.checkpointRecord(ctx, inuse) + checkpointed += m.checkpointRecord(inuse) } return true }) @@ -419,10 +418,8 @@ func (m *Accumulator) observeAsyncInstruments(ctx context.Context) int { defer m.asyncLock.Unlock() asyncCollected := 0 - m.asyncContext = ctx - m.asyncInstruments.Run(context.Background(), m) - m.asyncContext = nil + m.asyncInstruments.Run(ctx, m) for _, inst := range m.asyncInstruments.Instruments() { if a := m.fromAsync(inst); a != nil { @@ -433,8 +430,8 @@ func (m *Accumulator) observeAsyncInstruments(ctx context.Context) int { return asyncCollected } -func (m *Accumulator) checkpointRecord(ctx context.Context, r *record) int { - return m.checkpoint(ctx, &r.inst.descriptor, r.recorder, r.labels) +func (m *Accumulator) checkpointRecord(r *record) int { + return m.checkpoint(&r.inst.descriptor, r.recorder, r.labels) } func (m *Accumulator) checkpointAsync(a *asyncInstrument) int { @@ -446,7 +443,7 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int { lrec := lrec epochDiff := m.currentEpoch - lrec.observedEpoch if epochDiff == 0 { - checkpointed += m.checkpoint(m.asyncContext, &a.descriptor, lrec.recorder, lrec.labels) + checkpointed += m.checkpoint(&a.descriptor, lrec.recorder, lrec.labels) } else if epochDiff > 1 { // This is second collection cycle with no // observations for this labelset. Remove the @@ -460,11 +457,11 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int { return checkpointed } -func (m *Accumulator) checkpoint(ctx context.Context, descriptor *metric.Descriptor, recorder export.Aggregator, labels *label.Set) int { +func (m *Accumulator) checkpoint(descriptor *metric.Descriptor, recorder export.Aggregator, labels *label.Set) int { if recorder == nil { return 0 } - recorder.Checkpoint(ctx, descriptor) + recorder.Checkpoint(descriptor) exportRecord := export.NewRecord(descriptor, labels, m.resource, recorder) err := m.integrator.Process(exportRecord) From 4608ebc8246d93c75097f194a6585cd5b7a0a1d7 Mon Sep 17 00:00:00 2001 From: jmacd Date: Tue, 9 Jun 2020 00:13:24 -0700 Subject: [PATCH 5/5] Revert bugfix --- sdk/metric/sdk.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index c134d9fb647..8d4acc78a68 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -419,7 +419,8 @@ func (m *Accumulator) observeAsyncInstruments(ctx context.Context) int { asyncCollected := 0 - m.asyncInstruments.Run(ctx, m) + // TODO: change this to `ctx` (in a separate PR, with tests) + m.asyncInstruments.Run(context.Background(), m) for _, inst := range m.asyncInstruments.Instruments() { if a := m.fromAsync(inst); a != nil {