Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpoint only after Update; Keep records in the sync.Map longer #647

Merged
merged 7 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/metric/atomicfields.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import "unsafe"
func AtomicFieldOffsets() map[string]uintptr {
return map[string]uintptr{
"record.refMapped.value": unsafe.Offsetof(record{}.refMapped.value),
"record.modified": unsafe.Offsetof(record{}.modified),
"record.updateCount": unsafe.Offsetof(record{}.updateCount),
"record.labels.cachedEncoderID": unsafe.Offsetof(record{}.labels.cachedEncoded),
}
}
23 changes: 23 additions & 0 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,30 @@ func BenchmarkBatchRecord_8Labels_8Instruments(b *testing.B) {
benchmarkBatchRecord8Labels(b, 8)
}

// Record creation

func BenchmarkRepeatedDirectCalls(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
encoder := export.NewDefaultLabelEncoder()
fix.pcb = func(_ context.Context, rec export.Record) error {
_ = rec.Labels().Encoded(encoder)
return nil
}

c := fix.meter.NewInt64Counter("int64.counter")
k := key.String("bench", "true")

b.ResetTimer()

for i := 0; i < b.N; i++ {
c.Add(ctx, 1, k)
fix.sdk.Collect(ctx)
}
}

// LabelIterator

func BenchmarkLabelIterator(b *testing.B) {
const labelCount = 1024
ctx := context.Background()
Expand Down
55 changes: 41 additions & 14 deletions sdk/metric/correct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"strings"
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -37,21 +38,28 @@ import (
var Must = metric.Must

type correctnessBatcher struct {
newAggCount int64
jmacd marked this conversation as resolved.
Show resolved Hide resolved

t *testing.T

records []export.Record
}

func (cb *correctnessBatcher) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
func (cb *correctnessBatcher) AggregatorFor(descriptor *metric.Descriptor) (agg export.Aggregator) {
name := descriptor.Name()

switch {
case strings.HasSuffix(name, ".counter"):
return sum.New()
agg = sum.New()
case strings.HasSuffix(name, ".disabled"):
return nil
agg = nil
default:
return array.New()
agg = array.New()
}
if agg != nil {
atomic.AddInt64(&cb.newAggCount, 1)
}
return
}

func (cb *correctnessBatcher) CheckpointSet() export.CheckpointSet {
Expand Down Expand Up @@ -87,15 +95,12 @@ func TestInputRangeTestCounter(t *testing.T) {
sdkErr = nil

checkpointed := sdk.Collect(ctx)
sum, err := batcher.records[0].Aggregator().(aggregator.Sum).Sum()
require.Equal(t, int64(0), sum.AsInt64())
require.Equal(t, 1, checkpointed)
require.Nil(t, err)
require.Equal(t, 0, checkpointed)

batcher.records = nil
counter.Add(ctx, 1)
checkpointed = sdk.Collect(ctx)
sum, err = batcher.records[0].Aggregator().(aggregator.Sum).Sum()
sum, err := batcher.records[0].Aggregator().(aggregator.Sum).Sum()
require.Equal(t, int64(1), sum.AsInt64())
require.Equal(t, 1, checkpointed)
require.Nil(t, err)
Expand All @@ -122,18 +127,15 @@ func TestInputRangeTestMeasure(t *testing.T) {
sdkErr = nil

checkpointed := sdk.Collect(ctx)
count, err := batcher.records[0].Aggregator().(aggregator.Distribution).Count()
require.Equal(t, int64(0), count)
require.Equal(t, 1, checkpointed)
require.Nil(t, err)
require.Equal(t, 0, checkpointed)

measure.Record(ctx, 1)
measure.Record(ctx, 2)

batcher.records = nil
checkpointed = sdk.Collect(ctx)

count, err = batcher.records[0].Aggregator().(aggregator.Distribution).Count()
count, err := batcher.records[0].Aggregator().(aggregator.Distribution).Count()
require.Equal(t, int64(2), count)
require.Equal(t, 1, checkpointed)
require.Nil(t, sdkErr)
Expand Down Expand Up @@ -356,3 +358,28 @@ func TestRecordBatch(t *testing.T) {
"float64.measure/A=B,C=D": 4,
}, out.Map)
}

// TestRecordPersistence ensures that a direct-called instrument that
// is repeatedly used each interval results in a persistent record, so
// that its encoded labels will be cached across collection intervals.
func TestRecordPersistence(t *testing.T) {
ctx := context.Background()
batcher := &correctnessBatcher{
t: t,
}

sdk := metricsdk.New(batcher)
meter := metric.WrapMeterImpl(sdk, "test")

c := Must(meter).NewFloat64Counter("sum.name")
b := c.Bind(key.String("bound", "true"))
uk := key.String("bound", "false")

for i := 0; i < 100; i++ {
c.Add(ctx, 1, uk)
b.Add(ctx, 1)
sdk.Collect(ctx)
}

require.Equal(t, int64(2), batcher.newAggCount)
}
6 changes: 0 additions & 6 deletions sdk/metric/refcount_mapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ func (rm *refcountMapped) unref() {
atomic.AddInt64(&rm.value, -2)
}

// inUse returns true if there is a reference to the entry and it is mapped.
func (rm *refcountMapped) inUse() bool {
val := atomic.LoadInt64(&rm.value)
return val >= 2 && val&1 == 0
}

// tryUnmap flips the mapped bit to "unmapped" state and returns true if both of the
// following conditions are true upon entry to this function:
// * There are no active references;
Expand Down
70 changes: 42 additions & 28 deletions sdk/metric/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,12 @@ type (
// SDK.current map.
refMapped refcountMapped

// modified is an atomic boolean that tracks if the current record
// was modified since the last Collect().
//
// modified has to be aligned for 64-bit atomic operations.
modified int64
// updateCount is incremented on every Update.
updateCount int64

// collectedCount is set to updateCount on collection,
// supports checking for no updates during a round.
collectedCount int64

// labels is the processed label set for this record.
//
Expand Down Expand Up @@ -150,7 +151,7 @@ type (
}

labeledRecorder struct {
modifiedEpoch int64
observedEpoch int64
labels labels
recorder export.Aggregator
}
Expand Down Expand Up @@ -211,12 +212,12 @@ func (a *asyncInstrument) getRecorder(kvs []core.KeyValue) export.Aggregator {

lrec, ok := a.recorders[labels.ordered]
if ok {
if lrec.modifiedEpoch == a.meter.currentEpoch {
if lrec.observedEpoch == a.meter.currentEpoch {
// last value wins for Observers, so if we see the same labels
// in the current epoch, we replace the old recorder
lrec.recorder = a.meter.batcher.AggregatorFor(&a.descriptor)
} else {
lrec.modifiedEpoch = a.meter.currentEpoch
lrec.observedEpoch = a.meter.currentEpoch
}
a.recorders[labels.ordered] = lrec
return lrec.recorder
Expand All @@ -231,7 +232,7 @@ func (a *asyncInstrument) getRecorder(kvs []core.KeyValue) export.Aggregator {
a.recorders[labels.ordered] = labeledRecorder{
recorder: rec,
labels: labels,
modifiedEpoch: a.meter.currentEpoch,
observedEpoch: a.meter.currentEpoch,
}
return rec
}
Expand Down Expand Up @@ -557,25 +558,39 @@ func (m *SDK) collectRecords(ctx context.Context) int {
checkpointed := 0

m.current.Range(func(key interface{}, value interface{}) bool {
// Note: always continue to iterate over the entire
// map by returning `true` in this function.
inuse := value.(*record)
unmapped := inuse.refMapped.tryUnmap()
// If able to unmap then remove the record from the current Map.
if unmapped {
// TODO: Consider leaving the record in the map for one
// collection interval? Since creating records is relatively
// expensive, this would optimize common cases of ongoing use.
m.current.Delete(inuse.mapkey())
}

// Always report the values if a reference to the Record is active,
// this is to keep the previous behavior.
// TODO: Reconsider this logic.
if inuse.refMapped.inUse() || atomic.LoadInt64(&inuse.modified) != 0 {
atomic.StoreInt64(&inuse.modified, 0)
mods := atomic.LoadInt64(&inuse.updateCount)
coll := inuse.collectedCount

if mods != coll {
// Updates happened in this interval,
// checkpoint and continue.
checkpointed += m.checkpointRecord(ctx, inuse)
inuse.collectedCount = mods
return true
}

// Always continue to iterate over the entire map.
// Having no updates since last collection, try to unmap:
if unmapped := inuse.refMapped.tryUnmap(); !unmapped {
// The record is referenced by a binding, continue.
return true
}

// If any other goroutines are now trying to re-insert this
// entry in the map, they are busy calling Gosched() awaiting
// this deletion:
m.current.Delete(inuse.mapkey())

// There's a potential race between `LoadInt64` and
// `tryUnmap` in this function. Since this is the
// last we'll see of this record, checkpoint
mods = atomic.LoadInt64(&inuse.updateCount)
if mods != coll {
checkpointed += m.checkpointRecord(ctx, inuse)
}
return true
})

Expand Down Expand Up @@ -606,7 +621,7 @@ func (m *SDK) checkpointAsync(ctx context.Context, a *asyncInstrument) int {
checkpointed := 0
for encodedLabels, lrec := range a.recorders {
lrec := lrec
epochDiff := m.currentEpoch - lrec.modifiedEpoch
epochDiff := m.currentEpoch - lrec.observedEpoch
if epochDiff == 0 {
checkpointed += m.checkpoint(ctx, &a.descriptor, lrec.recorder, &lrec.labels)
} else if epochDiff > 1 {
Expand Down Expand Up @@ -681,13 +696,12 @@ func (r *record) RecordOne(ctx context.Context, number core.Number) {
r.inst.meter.errorHandler(err)
return
}
// Record was modified, inform the Collect() that things need
// to be collected while the record is still mapped.
atomic.AddInt64(&r.updateCount, 1)
}

func (r *record) Unbind() {
// Record was modified, inform the Collect() that things need to be collected.
// TODO: Reconsider if we should marked as modified when an Update happens and
// collect only when updates happened even for Bounds.
atomic.StoreInt64(&r.modified, 1)
r.refMapped.unref()
}

Expand Down