Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Allow creating additional View universes. #1196

Merged
merged 7 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions stats/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,17 @@ func init() {
}
}

// Recorder is a subset of the view.Meter interface which only includes
// the Record method, to avoid circular imports between stats and view.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep this definition and simply include this in the Meter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I updated the comment, too.

What do you think of changing the second argument to Record to be []stats.Measurement rather than the current interface{}?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done.

Copy link
Contributor

@rghetia rghetia Feb 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of changing the second argument to Record to be []stats.Measurement rather than the current interface{}?

that is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out that this introduces an import cycle:

import cycle not allowed
 package go.opencensus.io/stats/view
	imports go.opencensus.io/stats
	imports go.opencensus.io/stats/internal
	imports go.opencensus.io/stats

Would you prefer that I move internal.DefaultRecorder out of internal, or do something like create an InternalMeasurement struct or interface and embed it in stats.Measurement?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say leave it as interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out that stats.Measurement.Measure() returns a stats.Measure, which seemed like a bridge too far in terms of putting things into Internal.

Take a look at 6003f86 for what that change would look like. I'm slightly concerned about moving DefaultRecorder from the internal package (and making it more public/obvious).

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted this to stick to interfaces.

type Recorder interface {
Record(*tag.Map, interface{}, map[string]interface{})
}

type recordOptions struct {
attachments metricdata.Attachments
mutators []tag.Mutator
measurements []Measurement
recorder Recorder
}

// WithAttachments applies provided exemplar attachments.
Expand All @@ -58,6 +65,14 @@ func WithMeasurements(measurements ...Measurement) Options {
}
}

// WithMeter records the measurements to the specified `view.Meter`, rather
// than to the global metrics recorder.
func WithMeter(meter Recorder) Options {
evankanderson marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance of RecordWithOptions WithMeter is 2/3rd of RecordWithOptions without meter

BenchmarkRecord8WithMeter-8      	  241356	       270 ns/op	     400 B/op	       4 allocs/op
BenchmarkRecord8WithoutMeter-8   	  335601	       197 ns/op	     368 B/op	       3 allocs/op

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the following, I get 231 ns/op, rather than 270 ns/op:

func BenchmarkRecord8_WithRecorder(b *testing.B) {
	ctx := context.Background()
	meter := view.NewMeter()
	meter.Start()
	defer meter.Stop()
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		stats.RecordWithOptions(ctx, stats.WithRecorder(meter), stats.WithMeasurements(m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1)))
	}

	b.StopTimer()
}

For the _Parallel case, I get 117 ns/op vs 108 ns/op:

goos: windows
goarch: amd64
pkg: go.opencensus.io/stats
BenchmarkRecord0-16                             12499986                98.0 ns/op
BenchmarkRecord1-16                              8571868               139 ns/op
BenchmarkRecord8-16                              6417102               188 ns/op
BenchmarkRecord8_WithRecorder-16                 5194611               231 ns/op
BenchmarkRecord8_WithRecorder_Parallel-16       10085448               117 ns/op
BenchmarkRecord8_Parallel-16                    11215130               108 ns/op
BenchmarkRecord8_8Tags-16                        5940622               198 ns/op

I can avoid the extra allocation and make the time between BenchmarkRecord8_WithRecorder and BenchmarkRecord8 match by pre-creating the stats.WithRecorder(meter) object like so:

rec := stats.WithRecorder(meter)

for (....) {
  stats.RecordWithOptions(ctx, rec, stats.WithMeasurements(....)
}

I note that the current benchmark doesn't perform any aggregation, so it's actually testing the shortcut path AFAICT. Adding a view which is tracking a simple Sum (no tags) increases the benchmark time like so:

BenchmarkRecord8_Aggregated-16                   1000000              1017 ns/op

Note that using a view here also affects any subsequent benchmarks for the same measurement, because the m.desc.subscribed() check around line 120 of record.go will always return true once it has been registered once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BenchmarkRecord8_WithRecorder and BenchmarkRecord8 match by pre-creating the stats.WithRecorder(meter) object like so:

rec := stats.WithRecorder(meter)

for (....) {
  stats.RecordWithOptions(ctx, rec, stats.WithMeasurements(....)
}

+1.

return func(ro *recordOptions) {
ro.recorder = meter
}
}

// Options apply changes to recordOptions.
type Options func(*recordOptions)

Expand Down Expand Up @@ -93,6 +108,9 @@ func RecordWithOptions(ctx context.Context, ros ...Options) error {
return nil
}
recorder := internal.DefaultRecorder
if o.recorder != nil {
recorder = o.recorder.Record
}
if recorder == nil {
return nil
}
Expand Down
105 changes: 105 additions & 0 deletions stats/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestRecordWithAttachments(t *testing.T) {
if err := view.Register(v); err != nil {
log.Fatalf("Failed to register views: %v", err)
}
defer view.Unregister(v)

attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: spanCtx}
stats.RecordWithOptions(context.Background(), stats.WithAttachments(attachments), stats.WithMeasurements(m.M(12)))
Expand Down Expand Up @@ -93,3 +94,107 @@ func TestRecordWithAttachments(t *testing.T) {
func cmpExemplar(got, want *metricdata.Exemplar) string {
return cmp.Diff(got, want, cmpopts.IgnoreFields(metricdata.Exemplar{}, "Timestamp"), cmpopts.IgnoreUnexported(metricdata.Exemplar{}))
}

func TestResolveOptions(t *testing.T) {
evankanderson marked this conversation as resolved.
Show resolved Hide resolved
meter := view.NewWorker()
meter.Start()
k1 := tag.MustNewKey("k1")
k2 := tag.MustNewKey("k2")
m1 := stats.Int64("TestResolveOptions/m1", "", stats.UnitDimensionless)
m2 := stats.Int64("TestResolveOptions/m2", "", stats.UnitDimensionless)
v := []*view.View{{
Name: "test_view",
TagKeys: []tag.Key{k1, k2},
Measure: m1,
Aggregation: view.Distribution(5, 10),
}, {
Name: "second_view",
TagKeys: []tag.Key{k1},
Measure: m2,
Aggregation: view.Count(),
}}
meter.SetReportingPeriod(100 * time.Millisecond)
if err := meter.Register(v...); err != nil {
t.Fatalf("Failed to register view: %v", err)
}
defer meter.Unregister(v...)

attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: spanCtx}
ctx, err := tag.New(context.Background(), tag.Insert(k1, "foo"), tag.Insert(k2, "foo"))
if err != nil {
t.Fatalf("Failed to set context: %v", err)
}
err = stats.RecordWithOptions(ctx,
stats.WithTags(tag.Upsert(k1, "bar"), tag.Insert(k2, "bar")),
stats.WithAttachments(attachments),
stats.WithMeasurements(m1.M(12), m1.M(6), m2.M(5)),
stats.WithMeter(meter))
if err != nil {
t.Fatalf("Failed to resolve data point: %v", err)
}

rows, err := meter.RetrieveData("test_view")
if err != nil {
t.Fatalf("Unable to retrieve data for test_view: %v", err)
}
if len(rows) != 1 {
t.Fatalf("Expected one row, got %d rows: %+v", len(rows), rows)
}
if len(rows[0].Tags) != 2 {
t.Errorf("Wrong number of tags %d: %v", len(rows[0].Tags), rows[0].Tags)
}
// k2 was Insert() ed, and shouldn't update the value that was in the supplied context.
wantTags := []tag.Tag{{Key: k1, Value: "bar"}, {Key: k2, Value: "foo"}}
for i, tag := range rows[0].Tags {
if tag.Key != wantTags[i].Key {
t.Errorf("Incorrect tag %d, want: %q, got: %q", i, wantTags[i].Key, tag.Key)
}
if tag.Value != wantTags[i].Value {
t.Errorf("Incorrect tag for %s, want: %q, got: %v", tag.Key, wantTags[i].Value, tag.Value)
}

}
wantBuckets := []int64{0, 1, 1}
gotBuckets := rows[0].Data.(*view.DistributionData)
if !reflect.DeepEqual(gotBuckets.CountPerBucket, wantBuckets) {
t.Fatalf("want buckets %v, got %v", wantBuckets, gotBuckets)
}
for i, e := range gotBuckets.ExemplarsPerBucket {
if gotBuckets.CountPerBucket[i] == 0 {
if e != nil {
t.Errorf("Unexpected exemplar for bucket")
}
continue
}
// values from the metrics above
exemplarValues := []float64{0, 6, 12}
wantExemplar := &metricdata.Exemplar{Value: exemplarValues[i], Attachments: attachments}
if diff := cmpExemplar(e, wantExemplar); diff != "" {
t.Errorf("Bad exemplar for %d: %+v", i, diff)
}
}

rows2, err := meter.RetrieveData("second_view")
if err != nil {
t.Fatalf("Failed to read second_view: %v", err)
}
if len(rows2) != 1 {
t.Fatalf("Expected one row, got %d rows: %v", len(rows2), rows2)
}
if len(rows2[0].Tags) != 1 {
t.Errorf("Expected one tag, got %d tags: %v", len(rows2[0].Tags), rows2[0].Tags)
}
wantTags = []tag.Tag{{Key: k1, Value: "bar"}}
for i, tag := range rows2[0].Tags {
if wantTags[i].Key != tag.Key {
t.Errorf("Wrong key for %d, want %q, got %q", i, wantTags[i].Key, tag.Key)
}
if wantTags[i].Value != tag.Value {
t.Errorf("Wrong value for tag %s, want %q got %q", tag.Key, wantTags[i].Value, tag.Value)
}
}
gotCount := rows2[0].Data.(*view.CountData)
if gotCount.Value != 1 {
t.Errorf("Wrong count for second_view, want %d, got %d", 1, gotCount.Value)
}
}
2 changes: 1 addition & 1 deletion stats/view/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var (
// BenchmarkRecordReqCommand benchmarks calling the internal recording machinery
// directly.
func BenchmarkRecordReqCommand(b *testing.B) {
w := newWorker()
w := NewWorker().(*worker)

register := &registerViewReq{views: []*View{view}, err: make(chan error, 1)}
register.handleCommand(w)
Expand Down
17 changes: 2 additions & 15 deletions stats/view/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@

package view

import "sync"

var (
exportersMu sync.RWMutex // guards exporters
exporters = make(map[Exporter]struct{})
)

// Exporter exports the collected records as view data.
//
// The ExportView method should return quickly; if an
Expand All @@ -43,16 +36,10 @@ type Exporter interface {
//
// Binaries can register exporters, libraries shouldn't register exporters.
func RegisterExporter(e Exporter) {
exportersMu.Lock()
defer exportersMu.Unlock()

exporters[e] = struct{}{}
defaultWorker.RegisterExporter(e)
}

// UnregisterExporter unregisters an exporter.
func UnregisterExporter(e Exporter) {
exportersMu.Lock()
defer exportersMu.Unlock()

delete(exporters, e)
defaultWorker.UnregisterExporter(e)
}
Loading