Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename SynchronizedCopy to SynchronizedMove and update comment #858

Merged
merged 2 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions exporters/metric/stdout/stdout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestStdoutTimestamp(t *testing.T) {
lvagg, ckpt := test.Unslice2(lastvalue.New(2))

aggtest.CheckedUpdate(t, lvagg, metric.NewInt64Number(321), &desc)
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, lvagg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, ckpt)

Expand Down Expand Up @@ -151,7 +151,7 @@ func TestStdoutCounterFormat(t *testing.T) {
cagg, ckpt := test.Unslice2(sum.New(2))

aggtest.CheckedUpdate(fix.t, cagg, metric.NewInt64Number(123), &desc)
require.NoError(t, cagg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, cagg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))

Expand All @@ -169,7 +169,7 @@ func TestStdoutLastValueFormat(t *testing.T) {
lvagg, ckpt := test.Unslice2(lastvalue.New(2))

aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, lvagg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))

Expand All @@ -189,7 +189,7 @@ func TestStdoutMinMaxSumCount(t *testing.T) {

aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(123.456), &desc)
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(876.543), &desc)
require.NoError(t, magg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, magg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))

Expand All @@ -212,7 +212,7 @@ func TestStdoutValueRecorderFormat(t *testing.T) {
aggtest.CheckedUpdate(fix.t, aagg, metric.NewFloat64Number(float64(i)+0.5), &desc)
}

require.NoError(t, aagg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, aagg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))

Expand Down Expand Up @@ -256,7 +256,7 @@ func TestStdoutNoData(t *testing.T) {

checkpointSet := test.NewCheckpointSet(testResource)

require.NoError(t, agg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, agg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, ckpt)

Expand All @@ -278,7 +278,7 @@ func TestStdoutLastValueNotSet(t *testing.T) {
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)

lvagg, ckpt := test.Unslice2(lastvalue.New(2))
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, lvagg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D"))

Expand Down Expand Up @@ -330,7 +330,7 @@ func TestStdoutResource(t *testing.T) {
lvagg, ckpt := test.Unslice2(lastvalue.New(2))

aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, lvagg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, ckpt, tc.attrs...)

Expand Down
4 changes: 2 additions & 2 deletions exporters/metric/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (NoopAggregator) Update(context.Context, metric.Number, *metric.Descriptor)
return nil
}

// SynchronizedCopy implements export.Aggregator.
func (NoopAggregator) SynchronizedCopy(export.Aggregator, *metric.Descriptor) error {
// SynchronizedMove implements export.Aggregator.
func (NoopAggregator) SynchronizedMove(export.Aggregator, *metric.Descriptor) error {
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions exporters/otlp/internal/transform/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestMinMaxSumCountValue(t *testing.T) {
assert.EqualError(t, err, aggregation.ErrNoData.Error())

// Checkpoint to set non-zero values
require.NoError(t, mmsc.SynchronizedCopy(ckpt, &metric.Descriptor{}))
require.NoError(t, mmsc.SynchronizedMove(ckpt, &metric.Descriptor{}))
min, max, sum, count, err := minMaxSumCountValues(ckpt.(aggregation.MinMaxSumCount))
if assert.NoError(t, err) {
assert.Equal(t, min, metric.NewInt64Number(1))
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
if !assert.NoError(t, mmsc.Update(ctx, 1, &metric.Descriptor{})) {
return
}
require.NoError(t, mmsc.SynchronizedCopy(ckpt, &metric.Descriptor{}))
require.NoError(t, mmsc.SynchronizedMove(ckpt, &metric.Descriptor{}))
for _, test := range tests {
desc := metric.NewDescriptor(test.name, test.metricKind, test.numberKind,
metric.WithDescription(test.description),
Expand All @@ -179,7 +179,7 @@ func TestMinMaxSumCountDatapoints(t *testing.T) {

assert.NoError(t, mmsc.Update(context.Background(), 1, &desc))
assert.NoError(t, mmsc.Update(context.Background(), 10, &desc))
require.NoError(t, mmsc.SynchronizedCopy(ckpt, &desc))
require.NoError(t, mmsc.SynchronizedMove(ckpt, &desc))
expected := []*metricpb.SummaryDataPoint{
{
Count: 2,
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestSumInt64DataPoints(t *testing.T) {
labels := label.NewSet()
s, ckpt := test.Unslice2(sumAgg.New(2))
assert.NoError(t, s.Update(context.Background(), metric.Number(1), &desc))
require.NoError(t, s.SynchronizedCopy(ckpt, &desc))
require.NoError(t, s.SynchronizedMove(ckpt, &desc))
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
if m, err := sum(record, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint{{
Expand All @@ -299,7 +299,7 @@ func TestSumFloat64DataPoints(t *testing.T) {
labels := label.NewSet()
s, ckpt := test.Unslice2(sumAgg.New(2))
assert.NoError(t, s.Update(context.Background(), metric.NewFloat64Number(1), &desc))
require.NoError(t, s.SynchronizedCopy(ckpt, &desc))
require.NoError(t, s.SynchronizedMove(ckpt, &desc))
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
if m, err := sum(record, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/otlp_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
default:
t.Fatalf("invalid number kind: %v", r.nKind)
}
require.NoError(t, agg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, agg.SynchronizedMove(ckpt, &desc))

equiv := r.resource.Equivalent()
resources[equiv] = r.resource
Expand Down
11 changes: 6 additions & 5 deletions sdk/export/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,12 @@ type Aggregator interface {
// inspected for a `correlation.Map` or `trace.SpanContext`.
Update(context.Context, metric.Number, *metric.Descriptor) error

// SynchronizedCopy is called during collection to finish one
// SynchronizedMove is called during collection to finish one
// period of aggregation by atomically saving the
// currently-updating state into the argument Aggregator.
// currently-updating state into the argument Aggregator AND
// resetting the current value to the zero state.
//
// SynchronizedCopy() is called concurrently with Update(). These
// SynchronizedMove() is called concurrently with Update(). These
// two methods must be synchronized with respect to each
// other, for correctness.
//
Expand All @@ -145,11 +146,11 @@ type Aggregator interface {
//
// This call has no Context argument because it is expected to
// perform only computation.
SynchronizedCopy(destination Aggregator, descriptor *metric.Descriptor) error
SynchronizedMove(destination Aggregator, descriptor *metric.Descriptor) error

// Merge combines the checkpointed state from the argument
// Aggregator into this Aggregator. Merge is not synchronized
// with respect to Update or SynchronizedCopy.
// with respect to Update or SynchronizedMove.
//
// The owner of an Aggregator being merged is responsible for
// synchronization of both Aggregator states.
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/aggregator/array/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var _ aggregation.Points = &Aggregator{}

// New returns a new array aggregator, which aggregates recorded
// measurements by storing them in an array. This type uses a mutex
// for Update() and SynchronizedCopy() concurrency.
// for Update() and SynchronizedMove() concurrency.
func New(cnt int) []Aggregator {
return make([]Aggregator, cnt)
}
Expand Down Expand Up @@ -92,9 +92,9 @@ func (c *Aggregator) Points() ([]metric.Number, error) {
return c.points, nil
}

// SynchronizedCopy saves the current state to oa and resets the current state to
// SynchronizedMove saves the current state to oa and resets the current state to
// the empty set, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, desc *metric.Descriptor) error {
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
Expand All @@ -114,7 +114,7 @@ func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, desc *metric.Descrip
}

// Update adds the recorded measurement to the current data set.
// Update takes a lock to prevent concurrent Update() and SynchronizedCopy()
// Update takes a lock to prevent concurrent Update() and SynchronizedMove()
// calls.
func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metric.Descriptor) error {
c.lock.Lock()
Expand Down
10 changes: 5 additions & 5 deletions sdk/metric/aggregator/array/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
test.CheckedUpdate(t, agg, y, descriptor)
}

err := agg.SynchronizedCopy(ckpt, descriptor)
err := agg.SynchronizedMove(ckpt, descriptor)
require.NoError(t, err)

checkZero(t, agg, descriptor)
Expand Down Expand Up @@ -154,8 +154,8 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
}
}

require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor))

checkZero(t, agg1, descriptor)
checkZero(t, agg2, descriptor)
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestArrayErrors(t *testing.T) {
if profile.NumberKind == metric.Float64NumberKind {
test.CheckedUpdate(t, agg, metric.NewFloat64Number(math.NaN()), descriptor)
}
require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))

count, err := ckpt.Count()
require.Equal(t, int64(1), count, "NaN value was not counted")
Expand Down Expand Up @@ -297,7 +297,7 @@ func TestArrayFloat64(t *testing.T) {
test.CheckedUpdate(t, agg, metric.NewFloat64Number(f), descriptor)
}

require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))

all.Sort()

Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/aggregator/ddsketch/ddsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func (c *Aggregator) toNumber(f float64) metric.Number {
return metric.NewInt64Number(int64(f))
}

// SynchronizedCopy saves the current state into oa and resets the current state to
// SynchronizedMove saves the current state into oa and resets the current state to
// a new sketch, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, _ *metric.Descriptor) error {
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
Expand All @@ -129,7 +129,7 @@ func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, _ *metric.Descriptor
}

// Update adds the recorded measurement to the current data set.
// Update takes a lock to prevent concurrent Update() and SynchronizedCopy()
// Update takes a lock to prevent concurrent Update() and SynchronizedMove()
// calls.
func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metric.Descriptor) error {
c.lock.Lock()
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/aggregator/ddsketch/ddsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
test.CheckedUpdate(t, agg, y, descriptor)
}

err := agg.SynchronizedCopy(ckpt, descriptor)
err := agg.SynchronizedMove(ckpt, descriptor)
require.NoError(t, err)

checkZero(t, agg, descriptor)
Expand Down Expand Up @@ -156,8 +156,8 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
}
}

require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor))

checkZero(t, agg1, descriptor)
checkZero(t, agg1, descriptor)
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/aggregator/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ func (c *Aggregator) Histogram() (aggregation.Buckets, error) {
}, nil
}

// SynchronizedCopy saves the current state into oa and resets the current state to
// SynchronizedMove saves the current state into oa and resets the current state to
// the empty set. Since no locks are taken, there is a chance that
// the independent Sum, Count and Bucket Count are not consistent with each
// other.
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, desc *metric.Descriptor) error {
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/aggregator/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func testHistogram(t *testing.T, profile test.Profile, policy policy) {
test.CheckedUpdate(t, agg, x, descriptor)
}

require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))

checkZero(t, agg, descriptor)

Expand Down Expand Up @@ -184,8 +184,8 @@ func TestHistogramMerge(t *testing.T) {
test.CheckedUpdate(t, agg2, x, descriptor)
}

require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor))

test.CheckedMerge(t, ckpt1, ckpt2, descriptor)

Expand Down Expand Up @@ -223,7 +223,7 @@ func TestHistogramNotSet(t *testing.T) {

agg, ckpt := new2(descriptor)

err := agg.SynchronizedCopy(ckpt, descriptor)
err := agg.SynchronizedMove(ckpt, descriptor)
require.NoError(t, err)

checkZero(t, agg, descriptor)
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/aggregator/lastvalue/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (g *Aggregator) LastValue() (metric.Number, time.Time, error) {
return gd.value.AsNumber(), gd.timestamp, nil
}

// SynchronizedCopy atomically saves the current value.
func (g *Aggregator) SynchronizedCopy(oa export.Aggregator, _ *metric.Descriptor) error {
// SynchronizedMove atomically saves the current value.
func (g *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(g, oa)
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/aggregator/lastvalue/lastvalue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestLastValueUpdate(t *testing.T) {
test.CheckedUpdate(t, agg, x, record)
}

err := agg.SynchronizedCopy(ckpt, record)
err := agg.SynchronizedMove(ckpt, record)
require.NoError(t, err)

lv, _, err := ckpt.LastValue()
Expand All @@ -102,8 +102,8 @@ func TestLastValueMerge(t *testing.T) {
test.CheckedUpdate(t, agg1, first1, descriptor)
test.CheckedUpdate(t, agg2, first2, descriptor)

require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor))

checkZero(t, agg1)
checkZero(t, agg2)
Expand All @@ -127,7 +127,7 @@ func TestLastValueNotSet(t *testing.T) {
descriptor := test.NewAggregatorTest(metric.ValueObserverKind, metric.Int64NumberKind)

g, ckpt := new2()
require.NoError(t, g.SynchronizedCopy(ckpt, descriptor))
require.NoError(t, g.SynchronizedMove(ckpt, descriptor))

checkZero(t, g)
}
6 changes: 3 additions & 3 deletions sdk/metric/aggregator/minmaxsumcount/mmsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var _ aggregation.MinMaxSumCount = &Aggregator{}
// count. It does not compute quantile information other than Min and
// Max.
//
// This type uses a mutex for Update() and SynchronizedCopy() concurrency.
// This type uses a mutex for Update() and SynchronizedMove() concurrency.
func New(cnt int, desc *metric.Descriptor) []Aggregator {
kind := desc.NumberKind()
aggs := make([]Aggregator, cnt)
Expand Down Expand Up @@ -101,9 +101,9 @@ func (c *Aggregator) Max() (metric.Number, error) {
return c.max, nil
}

// SynchronizedCopy saves the current state into oa and resets the current state to
// SynchronizedMove saves the current state into oa and resets the current state to
// the empty set.
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, desc *metric.Descriptor) error {
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
Expand Down
Loading