Skip to content

Commit

Permalink
Rename SynchronizedCopy to SynchronizedMove and update comment (#858)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd authored Jun 23, 2020
1 parent d343f98 commit ea53fb4
Show file tree
Hide file tree
Showing 21 changed files with 67 additions and 66 deletions.
16 changes: 8 additions & 8 deletions exporters/metric/stdout/stdout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestStdoutTimestamp(t *testing.T) {
lvagg, ckpt := test.Unslice2(lastvalue.New(2))

aggtest.CheckedUpdate(t, lvagg, metric.NewInt64Number(321), &desc)
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, lvagg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, ckpt)

Expand Down Expand Up @@ -151,7 +151,7 @@ func TestStdoutCounterFormat(t *testing.T) {
cagg, ckpt := test.Unslice2(sum.New(2))

aggtest.CheckedUpdate(fix.t, cagg, metric.NewInt64Number(123), &desc)
require.NoError(t, cagg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, cagg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))

Expand All @@ -169,7 +169,7 @@ func TestStdoutLastValueFormat(t *testing.T) {
lvagg, ckpt := test.Unslice2(lastvalue.New(2))

aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, lvagg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))

Expand All @@ -189,7 +189,7 @@ func TestStdoutMinMaxSumCount(t *testing.T) {

aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(123.456), &desc)
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(876.543), &desc)
require.NoError(t, magg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, magg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))

Expand All @@ -212,7 +212,7 @@ func TestStdoutValueRecorderFormat(t *testing.T) {
aggtest.CheckedUpdate(fix.t, aagg, metric.NewFloat64Number(float64(i)+0.5), &desc)
}

require.NoError(t, aagg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, aagg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))

Expand Down Expand Up @@ -256,7 +256,7 @@ func TestStdoutNoData(t *testing.T) {

checkpointSet := test.NewCheckpointSet(testResource)

require.NoError(t, agg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, agg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, ckpt)

Expand All @@ -278,7 +278,7 @@ func TestStdoutLastValueNotSet(t *testing.T) {
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)

lvagg, ckpt := test.Unslice2(lastvalue.New(2))
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, lvagg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D"))

Expand Down Expand Up @@ -330,7 +330,7 @@ func TestStdoutResource(t *testing.T) {
lvagg, ckpt := test.Unslice2(lastvalue.New(2))

aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, lvagg.SynchronizedMove(ckpt, &desc))

checkpointSet.Add(&desc, ckpt, tc.attrs...)

Expand Down
4 changes: 2 additions & 2 deletions exporters/metric/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (NoopAggregator) Update(context.Context, metric.Number, *metric.Descriptor)
return nil
}

// SynchronizedCopy implements export.Aggregator.
func (NoopAggregator) SynchronizedCopy(export.Aggregator, *metric.Descriptor) error {
// SynchronizedMove implements export.Aggregator.
func (NoopAggregator) SynchronizedMove(export.Aggregator, *metric.Descriptor) error {
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions exporters/otlp/internal/transform/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestMinMaxSumCountValue(t *testing.T) {
assert.EqualError(t, err, aggregation.ErrNoData.Error())

// Checkpoint to set non-zero values
require.NoError(t, mmsc.SynchronizedCopy(ckpt, &metric.Descriptor{}))
require.NoError(t, mmsc.SynchronizedMove(ckpt, &metric.Descriptor{}))
min, max, sum, count, err := minMaxSumCountValues(ckpt.(aggregation.MinMaxSumCount))
if assert.NoError(t, err) {
assert.Equal(t, min, metric.NewInt64Number(1))
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
if !assert.NoError(t, mmsc.Update(ctx, 1, &metric.Descriptor{})) {
return
}
require.NoError(t, mmsc.SynchronizedCopy(ckpt, &metric.Descriptor{}))
require.NoError(t, mmsc.SynchronizedMove(ckpt, &metric.Descriptor{}))
for _, test := range tests {
desc := metric.NewDescriptor(test.name, test.metricKind, test.numberKind,
metric.WithDescription(test.description),
Expand All @@ -179,7 +179,7 @@ func TestMinMaxSumCountDatapoints(t *testing.T) {

assert.NoError(t, mmsc.Update(context.Background(), 1, &desc))
assert.NoError(t, mmsc.Update(context.Background(), 10, &desc))
require.NoError(t, mmsc.SynchronizedCopy(ckpt, &desc))
require.NoError(t, mmsc.SynchronizedMove(ckpt, &desc))
expected := []*metricpb.SummaryDataPoint{
{
Count: 2,
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestSumInt64DataPoints(t *testing.T) {
labels := label.NewSet()
s, ckpt := test.Unslice2(sumAgg.New(2))
assert.NoError(t, s.Update(context.Background(), metric.Number(1), &desc))
require.NoError(t, s.SynchronizedCopy(ckpt, &desc))
require.NoError(t, s.SynchronizedMove(ckpt, &desc))
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
if m, err := sum(record, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint{{
Expand All @@ -299,7 +299,7 @@ func TestSumFloat64DataPoints(t *testing.T) {
labels := label.NewSet()
s, ckpt := test.Unslice2(sumAgg.New(2))
assert.NoError(t, s.Update(context.Background(), metric.NewFloat64Number(1), &desc))
require.NoError(t, s.SynchronizedCopy(ckpt, &desc))
require.NoError(t, s.SynchronizedMove(ckpt, &desc))
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
if m, err := sum(record, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/otlp_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
default:
t.Fatalf("invalid number kind: %v", r.nKind)
}
require.NoError(t, agg.SynchronizedCopy(ckpt, &desc))
require.NoError(t, agg.SynchronizedMove(ckpt, &desc))

equiv := r.resource.Equivalent()
resources[equiv] = r.resource
Expand Down
11 changes: 6 additions & 5 deletions sdk/export/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,12 @@ type Aggregator interface {
// inspected for a `correlation.Map` or `trace.SpanContext`.
Update(context.Context, metric.Number, *metric.Descriptor) error

// SynchronizedCopy is called during collection to finish one
// SynchronizedMove is called during collection to finish one
// period of aggregation by atomically saving the
// currently-updating state into the argument Aggregator.
// currently-updating state into the argument Aggregator AND
// resetting the current value to the zero state.
//
// SynchronizedCopy() is called concurrently with Update(). These
// SynchronizedMove() is called concurrently with Update(). These
// two methods must be synchronized with respect to each
// other, for correctness.
//
Expand All @@ -145,11 +146,11 @@ type Aggregator interface {
//
// This call has no Context argument because it is expected to
// perform only computation.
SynchronizedCopy(destination Aggregator, descriptor *metric.Descriptor) error
SynchronizedMove(destination Aggregator, descriptor *metric.Descriptor) error

// Merge combines the checkpointed state from the argument
// Aggregator into this Aggregator. Merge is not synchronized
// with respect to Update or SynchronizedCopy.
// with respect to Update or SynchronizedMove.
//
// The owner of an Aggregator being merged is responsible for
// synchronization of both Aggregator states.
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/aggregator/array/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var _ aggregation.Points = &Aggregator{}

// New returns a new array aggregator, which aggregates recorded
// measurements by storing them in an array. This type uses a mutex
// for Update() and SynchronizedCopy() concurrency.
// for Update() and SynchronizedMove() concurrency.
func New(cnt int) []Aggregator {
return make([]Aggregator, cnt)
}
Expand Down Expand Up @@ -92,9 +92,9 @@ func (c *Aggregator) Points() ([]metric.Number, error) {
return c.points, nil
}

// SynchronizedCopy saves the current state to oa and resets the current state to
// SynchronizedMove saves the current state to oa and resets the current state to
// the empty set, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, desc *metric.Descriptor) error {
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
Expand All @@ -114,7 +114,7 @@ func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, desc *metric.Descrip
}

// Update adds the recorded measurement to the current data set.
// Update takes a lock to prevent concurrent Update() and SynchronizedCopy()
// Update takes a lock to prevent concurrent Update() and SynchronizedMove()
// calls.
func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metric.Descriptor) error {
c.lock.Lock()
Expand Down
10 changes: 5 additions & 5 deletions sdk/metric/aggregator/array/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
test.CheckedUpdate(t, agg, y, descriptor)
}

err := agg.SynchronizedCopy(ckpt, descriptor)
err := agg.SynchronizedMove(ckpt, descriptor)
require.NoError(t, err)

checkZero(t, agg, descriptor)
Expand Down Expand Up @@ -154,8 +154,8 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
}
}

require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor))

checkZero(t, agg1, descriptor)
checkZero(t, agg2, descriptor)
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestArrayErrors(t *testing.T) {
if profile.NumberKind == metric.Float64NumberKind {
test.CheckedUpdate(t, agg, metric.NewFloat64Number(math.NaN()), descriptor)
}
require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))

count, err := ckpt.Count()
require.Equal(t, int64(1), count, "NaN value was not counted")
Expand Down Expand Up @@ -297,7 +297,7 @@ func TestArrayFloat64(t *testing.T) {
test.CheckedUpdate(t, agg, metric.NewFloat64Number(f), descriptor)
}

require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))

all.Sort()

Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/aggregator/ddsketch/ddsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func (c *Aggregator) toNumber(f float64) metric.Number {
return metric.NewInt64Number(int64(f))
}

// SynchronizedCopy saves the current state into oa and resets the current state to
// SynchronizedMove saves the current state into oa and resets the current state to
// a new sketch, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, _ *metric.Descriptor) error {
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
Expand All @@ -129,7 +129,7 @@ func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, _ *metric.Descriptor
}

// Update adds the recorded measurement to the current data set.
// Update takes a lock to prevent concurrent Update() and SynchronizedCopy()
// Update takes a lock to prevent concurrent Update() and SynchronizedMove()
// calls.
func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metric.Descriptor) error {
c.lock.Lock()
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/aggregator/ddsketch/ddsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
test.CheckedUpdate(t, agg, y, descriptor)
}

err := agg.SynchronizedCopy(ckpt, descriptor)
err := agg.SynchronizedMove(ckpt, descriptor)
require.NoError(t, err)

checkZero(t, agg, descriptor)
Expand Down Expand Up @@ -156,8 +156,8 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
}
}

require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor))

checkZero(t, agg1, descriptor)
checkZero(t, agg1, descriptor)
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/aggregator/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ func (c *Aggregator) Histogram() (aggregation.Buckets, error) {
}, nil
}

// SynchronizedCopy saves the current state into oa and resets the current state to
// SynchronizedMove saves the current state into oa and resets the current state to
// the empty set. Since no locks are taken, there is a chance that
// the independent Sum, Count and Bucket Count are not consistent with each
// other.
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, desc *metric.Descriptor) error {
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/aggregator/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func testHistogram(t *testing.T, profile test.Profile, policy policy) {
test.CheckedUpdate(t, agg, x, descriptor)
}

require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))

checkZero(t, agg, descriptor)

Expand Down Expand Up @@ -184,8 +184,8 @@ func TestHistogramMerge(t *testing.T) {
test.CheckedUpdate(t, agg2, x, descriptor)
}

require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor))

test.CheckedMerge(t, ckpt1, ckpt2, descriptor)

Expand Down Expand Up @@ -223,7 +223,7 @@ func TestHistogramNotSet(t *testing.T) {

agg, ckpt := new2(descriptor)

err := agg.SynchronizedCopy(ckpt, descriptor)
err := agg.SynchronizedMove(ckpt, descriptor)
require.NoError(t, err)

checkZero(t, agg, descriptor)
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/aggregator/lastvalue/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (g *Aggregator) LastValue() (metric.Number, time.Time, error) {
return gd.value.AsNumber(), gd.timestamp, nil
}

// SynchronizedCopy atomically saves the current value.
func (g *Aggregator) SynchronizedCopy(oa export.Aggregator, _ *metric.Descriptor) error {
// SynchronizedMove atomically saves the current value.
func (g *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(g, oa)
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/aggregator/lastvalue/lastvalue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestLastValueUpdate(t *testing.T) {
test.CheckedUpdate(t, agg, x, record)
}

err := agg.SynchronizedCopy(ckpt, record)
err := agg.SynchronizedMove(ckpt, record)
require.NoError(t, err)

lv, _, err := ckpt.LastValue()
Expand All @@ -102,8 +102,8 @@ func TestLastValueMerge(t *testing.T) {
test.CheckedUpdate(t, agg1, first1, descriptor)
test.CheckedUpdate(t, agg2, first2, descriptor)

require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor))

checkZero(t, agg1)
checkZero(t, agg2)
Expand All @@ -127,7 +127,7 @@ func TestLastValueNotSet(t *testing.T) {
descriptor := test.NewAggregatorTest(metric.ValueObserverKind, metric.Int64NumberKind)

g, ckpt := new2()
require.NoError(t, g.SynchronizedCopy(ckpt, descriptor))
require.NoError(t, g.SynchronizedMove(ckpt, descriptor))

checkZero(t, g)
}
6 changes: 3 additions & 3 deletions sdk/metric/aggregator/minmaxsumcount/mmsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var _ aggregation.MinMaxSumCount = &Aggregator{}
// count. It does not compute quantile information other than Min and
// Max.
//
// This type uses a mutex for Update() and SynchronizedCopy() concurrency.
// This type uses a mutex for Update() and SynchronizedMove() concurrency.
func New(cnt int, desc *metric.Descriptor) []Aggregator {
kind := desc.NumberKind()
aggs := make([]Aggregator, cnt)
Expand Down Expand Up @@ -101,9 +101,9 @@ func (c *Aggregator) Max() (metric.Number, error) {
return c.max, nil
}

// SynchronizedCopy saves the current state into oa and resets the current state to
// SynchronizedMove saves the current state into oa and resets the current state to
// the empty set.
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, desc *metric.Descriptor) error {
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
Expand Down
Loading

0 comments on commit ea53fb4

Please sign in to comment.