Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
Allow creating additional View universes. (#1196)
Browse files Browse the repository at this point in the history
* Allow creating additional View universes.

* Add methods to extract stats.Option

* Update with comments from @rghetia

* Change record interface to include WithMeter option, per @rghetia

* Update with feedback from @rghetia

Signed-off-by: Evan Anderson <evan.k.anderson@gmail.com>

* Add a benchmark for stats.WithMeter (but with no views registered)

Signed-off-by: Evan Anderson <evan.k.anderson@gmail.com>

* Stop the custom meter in test to prevent leaking goroutines.
  • Loading branch information
evankanderson authored Feb 18, 2020
1 parent a7631f6 commit 84d38db
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 46 deletions.
17 changes: 17 additions & 0 deletions stats/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
_ "go.opencensus.io/stats/view" // enable collection
"go.opencensus.io/tag"
)
Expand Down Expand Up @@ -52,6 +53,22 @@ func BenchmarkRecord8(b *testing.B) {
}
}

func BenchmarkRecord8_WithRecorder(b *testing.B) {
ctx := context.Background()
meter := view.NewMeter()
meter.Start()
defer meter.Stop()
b.ResetTimer()

for i := 0; i < b.N; i++ {
// Note that this benchmark has one extra allocation for stats.WithRecorder.
// If you cache the recorder option, this benchmark should be equally fast as BenchmarkRecord8
stats.RecordWithOptions(ctx, stats.WithRecorder(meter), stats.WithMeasurements(m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1)))
}

b.StopTimer()
}

func BenchmarkRecord8_Parallel(b *testing.B) {
ctx := context.Background()
b.ResetTimer()
Expand Down
20 changes: 20 additions & 0 deletions stats/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,19 @@ func init() {
}
}

// Recorder provides an interface for exporting measurement information from
// the static Record method by using the WithRecorder option.
type Recorder interface {
// Record records a set of measurements associated with the given tags and attachments.
// The second argument is a `[]Measurement`.
Record(*tag.Map, interface{}, map[string]interface{})
}

type recordOptions struct {
attachments metricdata.Attachments
mutators []tag.Mutator
measurements []Measurement
recorder Recorder
}

// WithAttachments applies provided exemplar attachments.
Expand All @@ -58,6 +67,14 @@ func WithMeasurements(measurements ...Measurement) Options {
}
}

// WithRecorder records the measurements to the specified `Recorder`, rather
// than to the global metrics recorder.
func WithRecorder(meter Recorder) Options {
return func(ro *recordOptions) {
ro.recorder = meter
}
}

// Options apply changes to recordOptions.
type Options func(*recordOptions)

Expand Down Expand Up @@ -93,6 +110,9 @@ func RecordWithOptions(ctx context.Context, ros ...Options) error {
return nil
}
recorder := internal.DefaultRecorder
if o.recorder != nil {
recorder = o.recorder.Record
}
if recorder == nil {
return nil
}
Expand Down
106 changes: 106 additions & 0 deletions stats/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestRecordWithAttachments(t *testing.T) {
if err := view.Register(v); err != nil {
log.Fatalf("Failed to register views: %v", err)
}
defer view.Unregister(v)

attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: spanCtx}
stats.RecordWithOptions(context.Background(), stats.WithAttachments(attachments), stats.WithMeasurements(m.M(12)))
Expand Down Expand Up @@ -93,3 +94,108 @@ func TestRecordWithAttachments(t *testing.T) {
func cmpExemplar(got, want *metricdata.Exemplar) string {
return cmp.Diff(got, want, cmpopts.IgnoreFields(metricdata.Exemplar{}, "Timestamp"), cmpopts.IgnoreUnexported(metricdata.Exemplar{}))
}

func TestRecordWithMeter(t *testing.T) {
meter := view.NewMeter()
meter.Start()
defer meter.Stop()
k1 := tag.MustNewKey("k1")
k2 := tag.MustNewKey("k2")
m1 := stats.Int64("TestResolveOptions/m1", "", stats.UnitDimensionless)
m2 := stats.Int64("TestResolveOptions/m2", "", stats.UnitDimensionless)
v := []*view.View{{
Name: "test_view",
TagKeys: []tag.Key{k1, k2},
Measure: m1,
Aggregation: view.Distribution(5, 10),
}, {
Name: "second_view",
TagKeys: []tag.Key{k1},
Measure: m2,
Aggregation: view.Count(),
}}
meter.SetReportingPeriod(100 * time.Millisecond)
if err := meter.Register(v...); err != nil {
t.Fatalf("Failed to register view: %v", err)
}
defer meter.Unregister(v...)

attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: spanCtx}
ctx, err := tag.New(context.Background(), tag.Insert(k1, "foo"), tag.Insert(k2, "foo"))
if err != nil {
t.Fatalf("Failed to set context: %v", err)
}
err = stats.RecordWithOptions(ctx,
stats.WithTags(tag.Upsert(k1, "bar"), tag.Insert(k2, "bar")),
stats.WithAttachments(attachments),
stats.WithMeasurements(m1.M(12), m1.M(6), m2.M(5)),
stats.WithRecorder(meter))
if err != nil {
t.Fatalf("Failed to resolve data point: %v", err)
}

rows, err := meter.RetrieveData("test_view")
if err != nil {
t.Fatalf("Unable to retrieve data for test_view: %v", err)
}
if len(rows) != 1 {
t.Fatalf("Expected one row, got %d rows: %+v", len(rows), rows)
}
if len(rows[0].Tags) != 2 {
t.Errorf("Wrong number of tags %d: %v", len(rows[0].Tags), rows[0].Tags)
}
// k2 was Insert() ed, and shouldn't update the value that was in the supplied context.
wantTags := []tag.Tag{{Key: k1, Value: "bar"}, {Key: k2, Value: "foo"}}
for i, tag := range rows[0].Tags {
if tag.Key != wantTags[i].Key {
t.Errorf("Incorrect tag %d, want: %q, got: %q", i, wantTags[i].Key, tag.Key)
}
if tag.Value != wantTags[i].Value {
t.Errorf("Incorrect tag for %s, want: %q, got: %v", tag.Key, wantTags[i].Value, tag.Value)
}

}
wantBuckets := []int64{0, 1, 1}
gotBuckets := rows[0].Data.(*view.DistributionData)
if !reflect.DeepEqual(gotBuckets.CountPerBucket, wantBuckets) {
t.Fatalf("want buckets %v, got %v", wantBuckets, gotBuckets)
}
for i, e := range gotBuckets.ExemplarsPerBucket {
if gotBuckets.CountPerBucket[i] == 0 {
if e != nil {
t.Errorf("Unexpected exemplar for bucket")
}
continue
}
// values from the metrics above
exemplarValues := []float64{0, 6, 12}
wantExemplar := &metricdata.Exemplar{Value: exemplarValues[i], Attachments: attachments}
if diff := cmpExemplar(e, wantExemplar); diff != "" {
t.Errorf("Bad exemplar for %d: %+v", i, diff)
}
}

rows2, err := meter.RetrieveData("second_view")
if err != nil {
t.Fatalf("Failed to read second_view: %v", err)
}
if len(rows2) != 1 {
t.Fatalf("Expected one row, got %d rows: %v", len(rows2), rows2)
}
if len(rows2[0].Tags) != 1 {
t.Errorf("Expected one tag, got %d tags: %v", len(rows2[0].Tags), rows2[0].Tags)
}
wantTags = []tag.Tag{{Key: k1, Value: "bar"}}
for i, tag := range rows2[0].Tags {
if wantTags[i].Key != tag.Key {
t.Errorf("Wrong key for %d, want %q, got %q", i, wantTags[i].Key, tag.Key)
}
if wantTags[i].Value != tag.Value {
t.Errorf("Wrong value for tag %s, want %q got %q", tag.Key, wantTags[i].Value, tag.Value)
}
}
gotCount := rows2[0].Data.(*view.CountData)
if gotCount.Value != 1 {
t.Errorf("Wrong count for second_view, want %d, got %d", 1, gotCount.Value)
}
}
56 changes: 40 additions & 16 deletions stats/view/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,15 @@ var (
// BenchmarkRecordReqCommand benchmarks calling the internal recording machinery
// directly.
func BenchmarkRecordReqCommand(b *testing.B) {
w := newWorker()
w := NewMeter().(*worker)

register := &registerViewReq{views: []*View{view}, err: make(chan error, 1)}
register.handleCommand(w)
if err := <-register.err; err != nil {
b.Fatal(err)
}

const tagCount = 10
ctxs := make([]context.Context, 0, tagCount)
for i := 0; i < tagCount; i++ {
ctx, _ := tag.New(context.Background(),
tag.Upsert(k1, fmt.Sprintf("v%d", i)),
tag.Upsert(k2, fmt.Sprintf("v%d", i)),
tag.Upsert(k3, fmt.Sprintf("v%d", i)),
tag.Upsert(k4, fmt.Sprintf("v%d", i)),
tag.Upsert(k5, fmt.Sprintf("v%d", i)),
tag.Upsert(k6, fmt.Sprintf("v%d", i)),
tag.Upsert(k7, fmt.Sprintf("v%d", i)),
tag.Upsert(k8, fmt.Sprintf("v%d", i)),
)
ctxs = append(ctxs, ctx)
}
ctxs := prepareContexts(10)

b.ReportAllocs()
b.ResetTimer()
Expand All @@ -91,3 +77,41 @@ func BenchmarkRecordReqCommand(b *testing.B) {
record.handleCommand(w)
}
}

func BenchmarkRecordViaStats(b *testing.B) {

meter := NewMeter()
meter.Start()
defer meter.Stop()
meter.Register(view)
defer meter.Unregister(view)

ctxs := prepareContexts(10)
rec := stats.WithRecorder(meter)
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
stats.RecordWithOptions(ctxs[i%len(ctxs)], rec, stats.WithMeasurements(m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1)))
}

}

func prepareContexts(tagCount int) []context.Context {
ctxs := make([]context.Context, 0, tagCount)
for i := 0; i < tagCount; i++ {
ctx, _ := tag.New(context.Background(),
tag.Upsert(k1, fmt.Sprintf("v%d", i)),
tag.Upsert(k2, fmt.Sprintf("v%d", i)),
tag.Upsert(k3, fmt.Sprintf("v%d", i)),
tag.Upsert(k4, fmt.Sprintf("v%d", i)),
tag.Upsert(k5, fmt.Sprintf("v%d", i)),
tag.Upsert(k6, fmt.Sprintf("v%d", i)),
tag.Upsert(k7, fmt.Sprintf("v%d", i)),
tag.Upsert(k8, fmt.Sprintf("v%d", i)),
)
ctxs = append(ctxs, ctx)
}

return ctxs
}
17 changes: 2 additions & 15 deletions stats/view/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@

package view

import "sync"

var (
exportersMu sync.RWMutex // guards exporters
exporters = make(map[Exporter]struct{})
)

// Exporter exports the collected records as view data.
//
// The ExportView method should return quickly; if an
Expand All @@ -43,16 +36,10 @@ type Exporter interface {
//
// Binaries can register exporters, libraries shouldn't register exporters.
func RegisterExporter(e Exporter) {
exportersMu.Lock()
defer exportersMu.Unlock()

exporters[e] = struct{}{}
defaultWorker.RegisterExporter(e)
}

// UnregisterExporter unregisters an exporter.
func UnregisterExporter(e Exporter) {
exportersMu.Lock()
defer exportersMu.Unlock()

delete(exporters, e)
defaultWorker.UnregisterExporter(e)
}
Loading

0 comments on commit 84d38db

Please sign in to comment.