diff --git a/stats/benchmark_test.go b/stats/benchmark_test.go index 1c467ec54..e784b8c4c 100644 --- a/stats/benchmark_test.go +++ b/stats/benchmark_test.go @@ -19,6 +19,7 @@ import ( "testing" "go.opencensus.io/stats" + "go.opencensus.io/stats/view" _ "go.opencensus.io/stats/view" // enable collection "go.opencensus.io/tag" ) @@ -52,6 +53,22 @@ func BenchmarkRecord8(b *testing.B) { } } +func BenchmarkRecord8_WithRecorder(b *testing.B) { + ctx := context.Background() + meter := view.NewMeter() + meter.Start() + defer meter.Stop() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Note that this benchmark has one extra allocation for stats.WithRecorder. + // If you cache the recorder option, this benchmark should be equally fast as BenchmarkRecord8 + stats.RecordWithOptions(ctx, stats.WithRecorder(meter), stats.WithMeasurements(m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1))) + } + + b.StopTimer() +} + func BenchmarkRecord8_Parallel(b *testing.B) { ctx := context.Background() b.ResetTimer() diff --git a/stats/record.go b/stats/record.go index ad4691184..2b9728346 100644 --- a/stats/record.go +++ b/stats/record.go @@ -31,10 +31,19 @@ func init() { } } +// Recorder provides an interface for exporting measurement information from +// the static Record method by using the WithRecorder option. +type Recorder interface { + // Record records a set of measurements associated with the given tags and attachments. + // The second argument is a `[]Measurement`. + Record(*tag.Map, interface{}, map[string]interface{}) +} + type recordOptions struct { attachments metricdata.Attachments mutators []tag.Mutator measurements []Measurement + recorder Recorder } // WithAttachments applies provided exemplar attachments. @@ -58,6 +67,14 @@ func WithMeasurements(measurements ...Measurement) Options { } } +// WithRecorder records the measurements to the specified `Recorder`, rather +// than to the global metrics recorder. +func WithRecorder(meter Recorder) Options { + return func(ro *recordOptions) { + ro.recorder = meter + } +} + // Options apply changes to recordOptions. type Options func(*recordOptions) @@ -93,6 +110,9 @@ func RecordWithOptions(ctx context.Context, ros ...Options) error { return nil } recorder := internal.DefaultRecorder + if o.recorder != nil { + recorder = o.recorder.Record + } if recorder == nil { return nil } diff --git a/stats/record_test.go b/stats/record_test.go index 93a652200..65e106c7d 100644 --- a/stats/record_test.go +++ b/stats/record_test.go @@ -56,6 +56,7 @@ func TestRecordWithAttachments(t *testing.T) { if err := view.Register(v); err != nil { log.Fatalf("Failed to register views: %v", err) } + defer view.Unregister(v) attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: spanCtx} stats.RecordWithOptions(context.Background(), stats.WithAttachments(attachments), stats.WithMeasurements(m.M(12))) @@ -93,3 +94,108 @@ func TestRecordWithAttachments(t *testing.T) { func cmpExemplar(got, want *metricdata.Exemplar) string { return cmp.Diff(got, want, cmpopts.IgnoreFields(metricdata.Exemplar{}, "Timestamp"), cmpopts.IgnoreUnexported(metricdata.Exemplar{})) } + +func TestRecordWithMeter(t *testing.T) { + meter := view.NewMeter() + meter.Start() + defer meter.Stop() + k1 := tag.MustNewKey("k1") + k2 := tag.MustNewKey("k2") + m1 := stats.Int64("TestResolveOptions/m1", "", stats.UnitDimensionless) + m2 := stats.Int64("TestResolveOptions/m2", "", stats.UnitDimensionless) + v := []*view.View{{ + Name: "test_view", + TagKeys: []tag.Key{k1, k2}, + Measure: m1, + Aggregation: view.Distribution(5, 10), + }, { + Name: "second_view", + TagKeys: []tag.Key{k1}, + Measure: m2, + Aggregation: view.Count(), + }} + meter.SetReportingPeriod(100 * time.Millisecond) + if err := meter.Register(v...); err != nil { + t.Fatalf("Failed to register view: %v", err) + } + defer meter.Unregister(v...) + + attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: spanCtx} + ctx, err := tag.New(context.Background(), tag.Insert(k1, "foo"), tag.Insert(k2, "foo")) + if err != nil { + t.Fatalf("Failed to set context: %v", err) + } + err = stats.RecordWithOptions(ctx, + stats.WithTags(tag.Upsert(k1, "bar"), tag.Insert(k2, "bar")), + stats.WithAttachments(attachments), + stats.WithMeasurements(m1.M(12), m1.M(6), m2.M(5)), + stats.WithRecorder(meter)) + if err != nil { + t.Fatalf("Failed to resolve data point: %v", err) + } + + rows, err := meter.RetrieveData("test_view") + if err != nil { + t.Fatalf("Unable to retrieve data for test_view: %v", err) + } + if len(rows) != 1 { + t.Fatalf("Expected one row, got %d rows: %+v", len(rows), rows) + } + if len(rows[0].Tags) != 2 { + t.Errorf("Wrong number of tags %d: %v", len(rows[0].Tags), rows[0].Tags) + } + // k2 was Insert() ed, and shouldn't update the value that was in the supplied context. + wantTags := []tag.Tag{{Key: k1, Value: "bar"}, {Key: k2, Value: "foo"}} + for i, tag := range rows[0].Tags { + if tag.Key != wantTags[i].Key { + t.Errorf("Incorrect tag %d, want: %q, got: %q", i, wantTags[i].Key, tag.Key) + } + if tag.Value != wantTags[i].Value { + t.Errorf("Incorrect tag for %s, want: %q, got: %v", tag.Key, wantTags[i].Value, tag.Value) + } + + } + wantBuckets := []int64{0, 1, 1} + gotBuckets := rows[0].Data.(*view.DistributionData) + if !reflect.DeepEqual(gotBuckets.CountPerBucket, wantBuckets) { + t.Fatalf("want buckets %v, got %v", wantBuckets, gotBuckets) + } + for i, e := range gotBuckets.ExemplarsPerBucket { + if gotBuckets.CountPerBucket[i] == 0 { + if e != nil { + t.Errorf("Unexpected exemplar for bucket") + } + continue + } + // values from the metrics above + exemplarValues := []float64{0, 6, 12} + wantExemplar := &metricdata.Exemplar{Value: exemplarValues[i], Attachments: attachments} + if diff := cmpExemplar(e, wantExemplar); diff != "" { + t.Errorf("Bad exemplar for %d: %+v", i, diff) + } + } + + rows2, err := meter.RetrieveData("second_view") + if err != nil { + t.Fatalf("Failed to read second_view: %v", err) + } + if len(rows2) != 1 { + t.Fatalf("Expected one row, got %d rows: %v", len(rows2), rows2) + } + if len(rows2[0].Tags) != 1 { + t.Errorf("Expected one tag, got %d tags: %v", len(rows2[0].Tags), rows2[0].Tags) + } + wantTags = []tag.Tag{{Key: k1, Value: "bar"}} + for i, tag := range rows2[0].Tags { + if wantTags[i].Key != tag.Key { + t.Errorf("Wrong key for %d, want %q, got %q", i, wantTags[i].Key, tag.Key) + } + if wantTags[i].Value != tag.Value { + t.Errorf("Wrong value for tag %s, want %q got %q", tag.Key, wantTags[i].Value, tag.Value) + } + } + gotCount := rows2[0].Data.(*view.CountData) + if gotCount.Value != 1 { + t.Errorf("Wrong count for second_view, want %d, got %d", 1, gotCount.Value) + } +} diff --git a/stats/view/benchmark_test.go b/stats/view/benchmark_test.go index 5937b57e6..6944f22a1 100644 --- a/stats/view/benchmark_test.go +++ b/stats/view/benchmark_test.go @@ -46,7 +46,7 @@ var ( // BenchmarkRecordReqCommand benchmarks calling the internal recording machinery // directly. func BenchmarkRecordReqCommand(b *testing.B) { - w := newWorker() + w := NewMeter().(*worker) register := ®isterViewReq{views: []*View{view}, err: make(chan error, 1)} register.handleCommand(w) @@ -54,21 +54,7 @@ func BenchmarkRecordReqCommand(b *testing.B) { b.Fatal(err) } - const tagCount = 10 - ctxs := make([]context.Context, 0, tagCount) - for i := 0; i < tagCount; i++ { - ctx, _ := tag.New(context.Background(), - tag.Upsert(k1, fmt.Sprintf("v%d", i)), - tag.Upsert(k2, fmt.Sprintf("v%d", i)), - tag.Upsert(k3, fmt.Sprintf("v%d", i)), - tag.Upsert(k4, fmt.Sprintf("v%d", i)), - tag.Upsert(k5, fmt.Sprintf("v%d", i)), - tag.Upsert(k6, fmt.Sprintf("v%d", i)), - tag.Upsert(k7, fmt.Sprintf("v%d", i)), - tag.Upsert(k8, fmt.Sprintf("v%d", i)), - ) - ctxs = append(ctxs, ctx) - } + ctxs := prepareContexts(10) b.ReportAllocs() b.ResetTimer() @@ -91,3 +77,41 @@ func BenchmarkRecordReqCommand(b *testing.B) { record.handleCommand(w) } } + +func BenchmarkRecordViaStats(b *testing.B) { + + meter := NewMeter() + meter.Start() + defer meter.Stop() + meter.Register(view) + defer meter.Unregister(view) + + ctxs := prepareContexts(10) + rec := stats.WithRecorder(meter) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + stats.RecordWithOptions(ctxs[i%len(ctxs)], rec, stats.WithMeasurements(m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1))) + } + +} + +func prepareContexts(tagCount int) []context.Context { + ctxs := make([]context.Context, 0, tagCount) + for i := 0; i < tagCount; i++ { + ctx, _ := tag.New(context.Background(), + tag.Upsert(k1, fmt.Sprintf("v%d", i)), + tag.Upsert(k2, fmt.Sprintf("v%d", i)), + tag.Upsert(k3, fmt.Sprintf("v%d", i)), + tag.Upsert(k4, fmt.Sprintf("v%d", i)), + tag.Upsert(k5, fmt.Sprintf("v%d", i)), + tag.Upsert(k6, fmt.Sprintf("v%d", i)), + tag.Upsert(k7, fmt.Sprintf("v%d", i)), + tag.Upsert(k8, fmt.Sprintf("v%d", i)), + ) + ctxs = append(ctxs, ctx) + } + + return ctxs +} diff --git a/stats/view/export.go b/stats/view/export.go index 7cb59718f..73ba11f5b 100644 --- a/stats/view/export.go +++ b/stats/view/export.go @@ -14,13 +14,6 @@ package view -import "sync" - -var ( - exportersMu sync.RWMutex // guards exporters - exporters = make(map[Exporter]struct{}) -) - // Exporter exports the collected records as view data. // // The ExportView method should return quickly; if an @@ -43,16 +36,10 @@ type Exporter interface { // // Binaries can register exporters, libraries shouldn't register exporters. func RegisterExporter(e Exporter) { - exportersMu.Lock() - defer exportersMu.Unlock() - - exporters[e] = struct{}{} + defaultWorker.RegisterExporter(e) } // UnregisterExporter unregisters an exporter. func UnregisterExporter(e Exporter) { - exportersMu.Lock() - defer exportersMu.Unlock() - - delete(exporters, e) + defaultWorker.UnregisterExporter(e) } diff --git a/stats/view/worker.go b/stats/view/worker.go index 2f3c018af..74e4a90fa 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -28,7 +28,7 @@ import ( ) func init() { - defaultWorker = newWorker() + defaultWorker = NewMeter().(*worker) go defaultWorker.start() internal.DefaultRecorder = record } @@ -47,8 +47,64 @@ type worker struct { c chan command quit, done chan bool mu sync.RWMutex + + exportersMu sync.RWMutex + exporters map[Exporter]struct{} +} + +// Meter defines an interface which allows a single process to maintain +// multiple sets of metrics exports (intended for the advanced case where a +// single process wants to report metrics about multiple objects, such as +// multiple databases or HTTP services). +// +// Note that this is an advanced use case, and the static functions in this +// module should cover the common use cases. +type Meter interface { + stats.Recorder + // Find returns a registered view associated with this name. + // If no registered view is found, nil is returned. + Find(name string) *View + // Register begins collecting data for the given views. + // Once a view is registered, it reports data to the registered exporters. + Register(views ...*View) error + // Unregister the given views. Data will not longer be exported for these views + // after Unregister returns. + // It is not necessary to unregister from views you expect to collect for the + // duration of your program execution. + Unregister(views ...*View) + // SetReportingPeriod sets the interval between reporting aggregated views in + // the program. If duration is less than or equal to zero, it enables the + // default behavior. + // + // Note: each exporter makes different promises about what the lowest supported + // duration is. For example, the Stackdriver exporter recommends a value no + // lower than 1 minute. Consult each exporter per your needs. + SetReportingPeriod(time.Duration) + + // RegisterExporter registers an exporter. + // Collected data will be reported via all the + // registered exporters. Once you no longer + // want data to be exported, invoke UnregisterExporter + // with the previously registered exporter. + // + // Binaries can register exporters, libraries shouldn't register exporters. + RegisterExporter(Exporter) + // UnregisterExporter unregisters an exporter. + UnregisterExporter(Exporter) + + // Start causes the Meter to start processing Record calls and aggregating + // statistics as well as exporting data. + Start() + // Stop causes the Meter to stop processing calls and terminate data export. + Stop() + + // RetrieveData gets a snapshot of the data collected for the the view registered + // with the given name. It is intended for testing only. + RetrieveData(viewName string) ([]*Row, error) } +var _ Meter = (*worker)(nil) + var defaultWorker *worker var defaultReportingDuration = 10 * time.Second @@ -56,11 +112,17 @@ var defaultReportingDuration = 10 * time.Second // Find returns a registered view associated with this name. // If no registered view is found, nil is returned. func Find(name string) (v *View) { + return defaultWorker.Find(name) +} + +// Find returns a registered view associated with this name. +// If no registered view is found, nil is returned. +func (w *worker) Find(name string) (v *View) { req := &getViewByNameReq{ name: name, c: make(chan *getViewByNameResp), } - defaultWorker.c <- req + w.c <- req resp := <-req.c return resp.v } @@ -68,11 +130,17 @@ func Find(name string) (v *View) { // Register begins collecting data for the given views. // Once a view is registered, it reports data to the registered exporters. func Register(views ...*View) error { + return defaultWorker.Register(views...) +} + +// Register begins collecting data for the given views. +// Once a view is registered, it reports data to the registered exporters. +func (w *worker) Register(views ...*View) error { req := ®isterViewReq{ views: views, err: make(chan error), } - defaultWorker.c <- req + w.c <- req return <-req.err } @@ -81,6 +149,14 @@ func Register(views ...*View) error { // It is not necessary to unregister from views you expect to collect for the // duration of your program execution. func Unregister(views ...*View) { + defaultWorker.Unregister(views...) +} + +// Unregister the given views. Data will not longer be exported for these views +// after Unregister returns. +// It is not necessary to unregister from views you expect to collect for the +// duration of your program execution. +func (w *worker) Unregister(views ...*View) { names := make([]string, len(views)) for i := range views { names[i] = views[i].Name @@ -89,31 +165,42 @@ func Unregister(views ...*View) { views: names, done: make(chan struct{}), } - defaultWorker.c <- req + w.c <- req <-req.done } // RetrieveData gets a snapshot of the data collected for the the view registered // with the given name. It is intended for testing only. func RetrieveData(viewName string) ([]*Row, error) { + return defaultWorker.RetrieveData(viewName) +} + +// RetrieveData gets a snapshot of the data collected for the the view registered +// with the given name. It is intended for testing only. +func (w *worker) RetrieveData(viewName string) ([]*Row, error) { req := &retrieveDataReq{ now: time.Now(), v: viewName, c: make(chan *retrieveDataResp), } - defaultWorker.c <- req + w.c <- req resp := <-req.c return resp.rows, resp.err } func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) { + defaultWorker.Record(tags, ms, attachments) +} + +// Record records a set of measurements ms associated with the given tags and attachments. +func (w *worker) Record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) { req := &recordReq{ tm: tags, ms: ms.([]stats.Measurement), attachments: attachments, t: time.Now(), } - defaultWorker.c <- req + w.c <- req } // SetReportingPeriod sets the interval between reporting aggregated views in @@ -124,17 +211,31 @@ func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) { // duration is. For example, the Stackdriver exporter recommends a value no // lower than 1 minute. Consult each exporter per your needs. func SetReportingPeriod(d time.Duration) { + defaultWorker.SetReportingPeriod(d) +} + +// SetReportingPeriod sets the interval between reporting aggregated views in +// the program. If duration is less than or equal to zero, it enables the +// default behavior. +// +// Note: each exporter makes different promises about what the lowest supported +// duration is. For example, the Stackdriver exporter recommends a value no +// lower than 1 minute. Consult each exporter per your needs. +func (w *worker) SetReportingPeriod(d time.Duration) { // TODO(acetechnologist): ensure that the duration d is more than a certain // value. e.g. 1s req := &setReportingPeriodReq{ d: d, c: make(chan bool), } - defaultWorker.c <- req + w.c <- req <-req.c // don't return until the timer is set to the new duration. } -func newWorker() *worker { +// NewMeter constructs a Meter instance. You should only need to use this if +// you need to separate out Measurement recordings and View aggregations within +// a single process. +func NewMeter() Meter { return &worker{ measures: make(map[string]*measureRef), views: make(map[string]*viewInternal), @@ -143,9 +244,15 @@ func newWorker() *worker { c: make(chan command, 1024), quit: make(chan bool), done: make(chan bool), + + exporters: make(map[Exporter]struct{}), } } +func (w *worker) Start() { + go w.start() +} + func (w *worker) start() { prodMgr := metricproducer.GlobalManager() prodMgr.AddProducer(w) @@ -165,7 +272,7 @@ func (w *worker) start() { } } -func (w *worker) stop() { +func (w *worker) Stop() { prodMgr := metricproducer.GlobalManager() prodMgr.DeleteProducer(w) @@ -228,11 +335,11 @@ func (w *worker) reportView(v *viewInternal, now time.Time) { End: time.Now(), Rows: rows, } - exportersMu.Lock() - for e := range exporters { + w.exportersMu.Lock() + defer w.exportersMu.Unlock() + for e := range w.exporters { e.ExportView(viewData) } - exportersMu.Unlock() } func (w *worker) reportUsage(now time.Time) { @@ -279,3 +386,17 @@ func (w *worker) Read() []*metricdata.Metric { } return metrics } + +func (w *worker) RegisterExporter(e Exporter) { + w.exportersMu.Lock() + defer w.exportersMu.Unlock() + + w.exporters[e] = struct{}{} +} + +func (w *worker) UnregisterExporter(e Exporter) { + w.exportersMu.Lock() + defer w.exportersMu.Unlock() + + delete(w.exporters, e) +} diff --git a/stats/view/worker_test.go b/stats/view/worker_test.go index 6c15d37ef..ee7f149aa 100644 --- a/stats/view/worker_test.go +++ b/stats/view/worker_test.go @@ -118,6 +118,105 @@ func Test_Worker_ViewRegistration(t *testing.T) { } } +func Test_Worker_MultiExport(t *testing.T) { + restart() + + // This test reports the same data for the default worker and a secondary + // worker, and ensures that the stats are kept independently. + worker2 := NewMeter().(*worker) + worker2.Start() + + m := stats.Float64("Test_Worker_MultiExport/MF1", "desc MF1", "unit") + key := tag.MustNewKey(("key")) + count := &View{"VF1", "description", []tag.Key{key}, m, Count()} + sum := &View{"VF2", "description", []tag.Key{}, m, Sum()} + + Register(count, sum) + worker2.Register(count) // Don't compute the sum for worker2, to verify independence of computation. + data := []struct { + w Meter + tags string // Tag values + value float64 + }{{ + tags: "a", + value: 2.0, + }, { + tags: "b", + value: 3.0, + }, { + tags: "a", value: 2.5, + }, { + w: worker2, tags: "b", value: 1.0, + }, + } + + for _, d := range data { + ctx, err := tag.New(context.Background(), tag.Upsert(key, d.tags)) + if err != nil { + t.Fatalf("%s: failed to add tag %q: %v", d.w, key.Name(), err) + } + if d.w != nil { + d.w.Record(tag.FromContext(ctx), []stats.Measurement{m.M(d.value)}, nil) + } else { + stats.Record(ctx, m.M(d.value)) + } + } + + wantRows := []struct { + w Meter + view string + rows []*Row + }{{ + view: count.Name, + rows: []*Row{ + {[]tag.Tag{{Key: key, Value: "a"}}, &CountData{Value: 2}}, + {[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}}, + }, + }, { + view: sum.Name, + rows: []*Row{ + {nil, &SumData{Value: 7.5}}}, + }, { + w: worker2, + view: count.Name, + rows: []*Row{ + {[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}}, + }, + }} + + for _, wantRow := range wantRows { + retrieve := RetrieveData + if wantRow.w != nil { + retrieve = wantRow.w.(*worker).RetrieveData + } + gotRows, err := retrieve(wantRow.view) + if err != nil { + t.Fatalf("RetrieveData(%v), got error %v", wantRow.view, err) + } + for _, got := range gotRows { + if !containsRow(wantRow.rows, got) { + t.Errorf("%s: got row %#v; want none", wantRow.view, got) + break + } + } + for _, want := range wantRow.rows { + if !containsRow(gotRows, want) { + t.Errorf("%s: got none, want %#v", wantRow.view, want) + break + } + } + } + // Verify that worker has not been computing sum: + got, err := worker2.RetrieveData(sum.Name) + if err == nil { + t.Errorf("%s: expected no data because it was not registered, got %#v", sum.Name, got) + } + + Unregister(count, sum) + worker2.Unregister(count) + worker2.Stop() +} + func Test_Worker_RecordFloat64(t *testing.T) { restart() @@ -298,7 +397,7 @@ func TestReportUsage(t *testing.T) { func Test_SetReportingPeriodReqNeverBlocks(t *testing.T) { t.Parallel() - worker := newWorker() + worker := NewMeter().(*worker) durations := []time.Duration{-1, 0, 10, 100 * time.Millisecond} for i, duration := range durations { ackChan := make(chan bool, 1) @@ -516,7 +615,7 @@ func (e *vdExporter) ExportView(vd *Data) { // restart stops the current processors and creates a new one. func restart() { - defaultWorker.stop() - defaultWorker = newWorker() + defaultWorker.Stop() + defaultWorker = NewMeter().(*worker) go defaultWorker.start() }