Skip to content

Commit

Permalink
Add MetricAggregator.Merge() implementations (#253)
Browse files Browse the repository at this point in the history
* Add MetricAggregator.Merge() implementations

* Update from feedback

* Type
  • Loading branch information
jmacd authored Oct 31, 2019
1 parent 563985f commit 915775e
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 28 deletions.
3 changes: 3 additions & 0 deletions sdk/export/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type MetricAggregator interface {
// called in a single-threaded context. Update()
// calls may arrive concurrently.
Collect(context.Context, MetricRecord, MetricBatcher)

// Merge combines state from two aggregators into one.
Merge(MetricAggregator, *Descriptor)
}

// MetricRecord is the unit of export, pairing a metric
Expand Down
9 changes: 9 additions & 0 deletions sdk/metric/aggregator/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,12 @@ func (c *Aggregator) Update(_ context.Context, number core.Number, rec export.Me

c.current.AddNumberAtomic(kind, number)
}

func (c *Aggregator) Merge(oa export.MetricAggregator, desc *export.Descriptor) {
o, _ := oa.(*Aggregator)
if o == nil {
// TODO warn
return
}
c.checkpoint.AddNumber(desc.NumberKind(), o.checkpoint)
}
28 changes: 28 additions & 0 deletions sdk/metric/aggregator/counter/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,31 @@ func TestCounterNonMonotonic(t *testing.T) {
require.Equal(t, sum, agg.AsNumber(), "Same sum - monotonic")
})
}

func TestCounterMerge(t *testing.T) {
ctx := context.Background()

test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg1 := New()
agg2 := New()

batcher, record := test.NewAggregatorTest(export.CounterMetricKind, profile.NumberKind, false)

sum := core.Number(0)
for i := 0; i < count; i++ {
x := profile.Random(+1)
sum.AddNumber(profile.NumberKind, x)
agg1.Update(ctx, x, record)
agg2.Update(ctx, x, record)
}

agg1.Collect(ctx, record, batcher)
agg2.Collect(ctx, record, batcher)

agg1.Merge(agg2, record.Descriptor())

sum.AddNumber(record.Descriptor().NumberKind(), sum)

require.Equal(t, sum, agg1.AsNumber(), "Same sum - monotonic")
})
}
10 changes: 10 additions & 0 deletions sdk/metric/aggregator/ddsketch/ddsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,13 @@ func (c *Aggregator) Update(_ context.Context, number core.Number, rec export.Me
defer c.lock.Unlock()
c.current.Add(number.CoerceToFloat64(kind))
}

func (c *Aggregator) Merge(oa export.MetricAggregator, d *export.Descriptor) {
o, _ := oa.(*Aggregator)
if o == nil {
// TODO warn
return
}

c.checkpoint.Merge(o.checkpoint)
}
50 changes: 48 additions & 2 deletions sdk/metric/aggregator/ddsketch/ddsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,57 @@ func TestDDSketchAbsolute(t *testing.T) {
all[len(all)-1].CoerceToFloat64(profile.NumberKind),
agg.Max(),
"Same max - absolute")
// Median
require.InEpsilon(t,
all[len(all)/2].CoerceToFloat64(profile.NumberKind),
all.Median(profile.NumberKind).CoerceToFloat64(profile.NumberKind),
agg.Quantile(0.5),
0.1,
"Same median - absolute")
})
}

func TestDDSketchMerge(t *testing.T) {
ctx := context.Background()

test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, false)

agg1 := New(NewDefaultConfig(), record.Descriptor())
agg2 := New(NewDefaultConfig(), record.Descriptor())

var all test.Numbers
for i := 0; i < count; i++ {
x := profile.Random(+1)
all = append(all, x)
agg1.Update(ctx, x, record)
}

for i := 0; i < count; i++ {
x := profile.Random(+1)
all = append(all, x)
agg2.Update(ctx, x, record)
}

agg1.Collect(ctx, record, batcher)
agg2.Collect(ctx, record, batcher)

agg1.Merge(agg2, record.Descriptor())

all.Sort()

require.InEpsilon(t,
all.Sum(profile.NumberKind).CoerceToFloat64(profile.NumberKind),
agg1.Sum(),
0.0000001,
"Same sum - absolute")
require.Equal(t, all.Count(), agg1.Count(), "Same count - absolute")
require.Equal(t,
all[len(all)-1].CoerceToFloat64(profile.NumberKind),
agg1.Max(),
"Same max - absolute")
require.InEpsilon(t,
all.Median(profile.NumberKind).CoerceToFloat64(profile.NumberKind),
agg1.Quantile(0.5),
0.1,
"Same median - absolute")
})
}
37 changes: 33 additions & 4 deletions sdk/metric/aggregator/gauge/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ type (

// Aggregator aggregates gauge events.
Aggregator struct {
// data is an atomic pointer to *gaugeData. It is set
// to `nil` if the gauge has not been set since the
// last collection.
// current is an atomic pointer to *gaugeData. It is never nil.
current unsafe.Pointer

// N.B. Export is not called when checkpoint is nil
// checkpoint is a copy of the current value taken in Collect()
checkpoint unsafe.Pointer
}

Expand Down Expand Up @@ -125,3 +123,34 @@ func (g *Aggregator) updateMonotonic(number core.Number, desc *export.Descriptor
}
}
}

func (g *Aggregator) Merge(oa export.MetricAggregator, desc *export.Descriptor) {
o, _ := oa.(*Aggregator)
if o == nil {
// TODO warn
return
}

ggd := (*gaugeData)(atomic.LoadPointer(&g.checkpoint))
ogd := (*gaugeData)(atomic.LoadPointer(&o.checkpoint))

if desc.Alternate() {
// Monotonic: use the greater value
cmp := ggd.value.CompareNumber(desc.NumberKind(), ogd.value)

if cmp > 0 {
return
}

if cmp < 0 {
g.checkpoint = unsafe.Pointer(ogd)
return
}
}
// Non-monotonic gauge or equal values
if ggd.timestamp.After(ogd.timestamp) {
return
}

g.checkpoint = unsafe.Pointer(ogd)
}
56 changes: 56 additions & 0 deletions sdk/metric/aggregator/gauge/gauge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,59 @@ func TestGaugeMonotonicDescending(t *testing.T) {
require.Equal(t, first, agg.AsNumber(), "Same last value - monotonic")
})
}

func TestGaugeNormalMerge(t *testing.T) {
ctx := context.Background()

test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg1 := New()
agg2 := New()

batcher, record := test.NewAggregatorTest(export.GaugeMetricKind, profile.NumberKind, false)

first1 := profile.Random(+1)
first2 := profile.Random(+1)
first1.AddNumber(profile.NumberKind, first2)

agg1.Update(ctx, first1, record)
agg2.Update(ctx, first2, record)

agg1.Collect(ctx, record, batcher)
agg2.Collect(ctx, record, batcher)

t1 := agg1.Timestamp()
t2 := agg2.Timestamp()
require.True(t, t1.Before(t2))

agg1.Merge(agg2, record.Descriptor())

require.Equal(t, t2, agg1.Timestamp(), "Merged timestamp - non-monotonic")
require.Equal(t, first2, agg1.AsNumber(), "Merged value - non-monotonic")
})
}

func TestGaugeMonotonicMerge(t *testing.T) {
ctx := context.Background()

test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg1 := New()
agg2 := New()

batcher, record := test.NewAggregatorTest(export.GaugeMetricKind, profile.NumberKind, true)

first1 := profile.Random(+1)
agg1.Update(ctx, first1, record)

first2 := profile.Random(+1)
first2.AddNumber(profile.NumberKind, first1)
agg2.Update(ctx, first2, record)

agg1.Collect(ctx, record, batcher)
agg2.Collect(ctx, record, batcher)

agg1.Merge(agg2, record.Descriptor())

require.Equal(t, first2, agg1.AsNumber(), "Merged value - monotonic")
require.Equal(t, agg2.Timestamp(), agg1.Timestamp(), "Merged timestamp - monotonic")
})
}
55 changes: 37 additions & 18 deletions sdk/metric/aggregator/maxsumcount/msc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type (
// Aggregator aggregates measure events, keeping only the max,
// sum, and count.
Aggregator struct {
live state
save state
current state
checkpoint state
}

state struct {
Expand All @@ -45,30 +45,34 @@ func New() *Aggregator {

// Sum returns the accumulated sum as a Number.
func (c *Aggregator) Sum() core.Number {
return c.save.sum
return c.checkpoint.sum
}

// Count returns the accumulated count.
func (c *Aggregator) Count() int64 {
return int64(c.save.count.AsUint64())
return int64(c.checkpoint.count.AsUint64())
}

// Max returns the accumulated max as a Number.
func (c *Aggregator) Max() core.Number {
return c.save.max
return c.checkpoint.max
}

// Collect saves the current value (atomically) and exports it.
// Collect checkpoints the current value (atomically) and exports it.
func (c *Aggregator) Collect(ctx context.Context, rec export.MetricRecord, exp export.MetricBatcher) {
// N.B. There is no atomic operation that can update all three
// values at once, so there are races between Update() and
// Collect(). Therefore, atomically swap fields independently,
// knowing that individually the three parts of this aggregation
// could be spread across multiple collections in rare cases.

c.save.count.SetUint64(c.live.count.SwapUint64Atomic(0))
c.save.sum = c.live.sum.SwapNumberAtomic(core.Number(0))
c.save.max = c.live.max.SwapNumberAtomic(core.Number(0))
// values at once without a memory allocation.
//
// This aggregator is intended to trade this correctness for
// speed.
//
// Therefore, atomically swap fields independently, knowing
// that individually the three parts of this aggregation could
// be spread across multiple collections in rare cases.

c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0))
c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0))
c.checkpoint.max = c.current.max.SwapNumberAtomic(core.Number(0))

exp.Export(ctx, rec, c)
}
Expand All @@ -83,17 +87,32 @@ func (c *Aggregator) Update(_ context.Context, number core.Number, rec export.Me
return
}

c.live.count.AddUint64Atomic(1)
c.live.sum.AddNumberAtomic(kind, number)
c.current.count.AddUint64Atomic(1)
c.current.sum.AddNumberAtomic(kind, number)

for {
current := c.live.max.AsNumberAtomic()
current := c.current.max.AsNumberAtomic()

if number.CompareNumber(kind, current) <= 0 {
break
}
if c.live.max.CompareAndSwapNumber(current, number) {
if c.current.max.CompareAndSwapNumber(current, number) {
break
}
}
}

func (c *Aggregator) Merge(oa export.MetricAggregator, desc *export.Descriptor) {
o, _ := oa.(*Aggregator)
if o == nil {
// TODO warn
return
}

c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count)

if c.checkpoint.max.CompareNumber(desc.NumberKind(), o.checkpoint.max) < 0 {
c.checkpoint.max.SetNumber(o.checkpoint.max)
}
}
44 changes: 43 additions & 1 deletion sdk/metric/aggregator/maxsumcount/msc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,52 @@ func TestMaxSumCountAbsolute(t *testing.T) {
agg.Sum().CoerceToFloat64(profile.NumberKind),
0.000000001,
"Same sum - absolute")
require.Equal(t, all.Count(), agg.Count(), "Same sum - absolute")
require.Equal(t, all.Count(), agg.Count(), "Same count - absolute")
require.Equal(t,
all[len(all)-1],
agg.Max(),
"Same max - absolute")
})
}

func TestMaxSumCountMerge(t *testing.T) {
ctx := context.Background()

test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, false)

agg1 := New()
agg2 := New()

var all test.Numbers

for i := 0; i < count; i++ {
x := profile.Random(+1)
all = append(all, x)
agg1.Update(ctx, x, record)
}
for i := 0; i < count; i++ {
x := profile.Random(+1)
all = append(all, x)
agg2.Update(ctx, x, record)
}

agg1.Collect(ctx, record, batcher)
agg2.Collect(ctx, record, batcher)

agg1.Merge(agg2, record.Descriptor())

all.Sort()

require.InEpsilon(t,
all.Sum(profile.NumberKind).CoerceToFloat64(profile.NumberKind),
agg1.Sum().CoerceToFloat64(profile.NumberKind),
0.000000001,
"Same sum - absolute")
require.Equal(t, all.Count(), agg1.Count(), "Same count - absolute")
require.Equal(t,
all[len(all)-1],
agg1.Max(),
"Same max - absolute")
})
}
Loading

0 comments on commit 915775e

Please sign in to comment.