Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Allow creating additional View universes. #1196

Merged
merged 7 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions stats/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ func init() {
}
}

// ResolvedOptions can be used to extract the current tags and measurements
// from context and stats arguments when using custom workers to export stats
// to a separate exporter.
type ResolvedOptions struct {
Attachments metricdata.Attachments
Tags *tag.Map
Measures []Measurement
}

type recordOptions struct {
attachments metricdata.Attachments
mutators []tag.Mutator
Expand Down Expand Up @@ -84,6 +93,23 @@ func RecordWithTags(ctx context.Context, mutators []tag.Mutator, ms ...Measureme
return RecordWithOptions(ctx, WithTags(mutators...), WithMeasurements(ms...))
}

// ResolveOptions determines the full set of Tags, Measurements, etc from the
// provided Options and context.Context. This is mostly useful when using
// multiple exporters.
func ResolveOptions(ctx context.Context, ros ...Options) (*ResolvedOptions, error) {
evankanderson marked this conversation as resolved.
Show resolved Hide resolved
o := createRecordOption(ros...)

if len(o.mutators) > 0 {
var err error
if ctx, err = tag.New(ctx, o.mutators...); err != nil {
return nil, err
}
}
return &ResolvedOptions{Tags: tag.FromContext(ctx),
Measures: o.measurements,
Attachments: o.attachments}, nil
}

// RecordWithOptions records measurements from the given options (if any) against context
// and tags and attachments in the options (if any).
// If there are any tags in the context, measurements will be tagged with them.
Expand All @@ -92,6 +118,8 @@ func RecordWithOptions(ctx context.Context, ros ...Options) error {
if len(o.measurements) == 0 {
return nil
}
// This could use ResolveOptions, but it does additional work to
// short-circuit if there are no metrics that need to be exported.
recorder := internal.DefaultRecorder
if recorder == nil {
return nil
Expand Down
63 changes: 63 additions & 0 deletions stats/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestRecordWithAttachments(t *testing.T) {
if err := view.Register(v); err != nil {
log.Fatalf("Failed to register views: %v", err)
}
defer view.Unregister(v)

attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: spanCtx}
stats.RecordWithOptions(context.Background(), stats.WithAttachments(attachments), stats.WithMeasurements(m.M(12)))
Expand Down Expand Up @@ -93,3 +94,65 @@ func TestRecordWithAttachments(t *testing.T) {
func cmpExemplar(got, want *metricdata.Exemplar) string {
return cmp.Diff(got, want, cmpopts.IgnoreFields(metricdata.Exemplar{}, "Timestamp"), cmpopts.IgnoreUnexported(metricdata.Exemplar{}))
}

func TestResolveOptions(t *testing.T) {
evankanderson marked this conversation as resolved.
Show resolved Hide resolved
k1 := tag.MustNewKey("k1")
k2 := tag.MustNewKey("k2")
m1 := stats.Int64("TestResolveOptions/m1", "", stats.UnitDimensionless)
m2 := stats.Int64("TestResolveOptions/m2", "", stats.UnitDimensionless)
v := []*view.View{{
Name: "test_view",
TagKeys: []tag.Key{k1, k2},
Measure: m1,
Aggregation: view.Distribution(5, 10),
}, {
Name: "second_view",
TagKeys: []tag.Key{k1},
Measure: m2,
Aggregation: view.Count(),
}}
view.SetReportingPeriod(100 * time.Millisecond)
if err := view.Register(v...); err != nil {
t.Fatalf("Failed to register view: %v", err)
}
defer view.Unregister(v...)

attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: spanCtx}
ctx, err := tag.New(context.Background(), tag.Insert(k1, "foo"), tag.Insert(k2, "foo"))
if err != nil {
t.Fatalf("Failed to set context: %v", err)
}
ro, err := stats.ResolveOptions(ctx,
stats.WithTags(tag.Upsert(k1, "bar"), tag.Insert(k2, "bar")),
stats.WithAttachments(attachments),
stats.WithMeasurements(m1.M(12), m2.M(5)))
if err != nil {
t.Fatalf("Failed to resolve data point: %v", err)
}

s, ok := ro.Attachments[metricdata.AttachmentKeySpanContext]
if !ok || s != spanCtx {
t.Errorf("Unexpected SpanContext: want %v, got %v", spanCtx, s)
}
if len(ro.Attachments) != 1 {
t.Errorf("Expected only one attachment (SpanContext), got %v", ro.Attachments)
}

if len(ro.Measures) != 2 {
t.Errorf("Expected two measurements, got %v", ro.Measures)
}
mWant := []stats.Measurement{m1.M(12), m2.M(5)}
if ro.Measures[0] != mWant[0] || ro.Measures[1] != mWant[1] {
t.Errorf("Unexpected measurements: want %v, got %v", mWant, ro.Measures)
}

// k2 was Insert() ed, and shouldn't update the value that was in the supplied context.
tCtx, err := tag.New(context.Background(), tag.Insert(k1, "bar"), tag.Insert(k2, "foo"))
if err != nil {
t.Fatalf("Failed to construct tWant: %v", err)
}
tWant := tag.FromContext(tCtx)
if ro.Tags.String() != tWant.String() {
t.Errorf("Unexpected tags: want %v, got %v", tWant, ro.Tags)
}
}
2 changes: 1 addition & 1 deletion stats/view/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var (
// BenchmarkRecordReqCommand benchmarks calling the internal recording machinery
// directly.
func BenchmarkRecordReqCommand(b *testing.B) {
w := newWorker()
w := NewWorker().(*worker)

register := &registerViewReq{views: []*View{view}, err: make(chan error, 1)}
register.handleCommand(w)
Expand Down
17 changes: 2 additions & 15 deletions stats/view/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@

package view

import "sync"

var (
exportersMu sync.RWMutex // guards exporters
exporters = make(map[Exporter]struct{})
)

// Exporter exports the collected records as view data.
//
// The ExportView method should return quickly; if an
Expand All @@ -43,16 +36,10 @@ type Exporter interface {
//
// Binaries can register exporters, libraries shouldn't register exporters.
func RegisterExporter(e Exporter) {
exportersMu.Lock()
defer exportersMu.Unlock()

exporters[e] = struct{}{}
defaultWorker.RegisterExporter(e)
}

// UnregisterExporter unregisters an exporter.
func UnregisterExporter(e Exporter) {
exportersMu.Lock()
defer exportersMu.Unlock()

delete(exporters, e)
defaultWorker.UnregisterExporter(e)
}
116 changes: 103 additions & 13 deletions stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

func init() {
defaultWorker = newWorker()
defaultWorker = NewWorker().(*worker)
go defaultWorker.start()
internal.DefaultRecorder = record
}
Expand All @@ -47,32 +47,71 @@ type worker struct {
c chan command
quit, done chan bool
mu sync.RWMutex

exportersMu sync.RWMutex
exporters map[Exporter]struct{}
}

// Worker defines an interface which allows a single process to maintain
// multiple sets of metrics exports (intended for the advanced case where a
// single process wants to report metrics about multiple objects, such as
// multiple databases or HTTP services).
//
// Note that this is an advanced use case, and the static functions in this
// module should cover the common use cases.
type Worker interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open-telemetry has concept of Meter, where you can create multiple meters each having their exporter. The worker is similar to that concept. I would probably rename Worker to Meter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, happy to do so.

Record(tags *tag.Map, ms []stats.Measurement, attachments map[string]interface{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add doc for each method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Find(name string) *View
RetrieveData(viewName string) ([]*Row, error)
Register(views ...*View) error
Unregister(views ...*View)
SetReportingPeriod(time.Duration)

RegisterExporter(Exporter)
UnregisterExporter(Exporter)

Start()
Stop()
}

var _ Worker = (*worker)(nil)

var defaultWorker *worker

var defaultReportingDuration = 10 * time.Second

// Find returns a registered view associated with this name.
// If no registered view is found, nil is returned.
func Find(name string) (v *View) {
return defaultWorker.Find(name)
}

// Find returns a registered view associated with this name.
// If no registered view is found, nil is returned.
func (w *worker) Find(name string) (v *View) {
req := &getViewByNameReq{
name: name,
c: make(chan *getViewByNameResp),
}
defaultWorker.c <- req
w.c <- req
resp := <-req.c
return resp.v
}

// Register begins collecting data for the given views.
// Once a view is registered, it reports data to the registered exporters.
func Register(views ...*View) error {
return defaultWorker.Register(views...)
}

// Register begins collecting data for the given views.
// Once a view is registered, it reports data to the registered exporters.
func (w *worker) Register(views ...*View) error {
req := &registerViewReq{
views: views,
err: make(chan error),
}
defaultWorker.c <- req
w.c <- req
return <-req.err
}

Expand All @@ -81,6 +120,14 @@ func Register(views ...*View) error {
// It is not necessary to unregister from views you expect to collect for the
// duration of your program execution.
func Unregister(views ...*View) {
defaultWorker.Unregister(views...)
}

// Unregister the given views. Data will not longer be exported for these views
// after Unregister returns.
// It is not necessary to unregister from views you expect to collect for the
// duration of your program execution.
func (w *worker) Unregister(views ...*View) {
names := make([]string, len(views))
for i := range views {
names[i] = views[i].Name
Expand All @@ -89,31 +136,42 @@ func Unregister(views ...*View) {
views: names,
done: make(chan struct{}),
}
defaultWorker.c <- req
w.c <- req
<-req.done
}

// RetrieveData gets a snapshot of the data collected for the the view registered
// with the given name. It is intended for testing only.
func RetrieveData(viewName string) ([]*Row, error) {
return defaultWorker.RetrieveData(viewName)
}

// RetrieveData gets a snapshot of the data collected for the the view registered
// with the given name. It is intended for testing only.
func (w *worker) RetrieveData(viewName string) ([]*Row, error) {
evankanderson marked this conversation as resolved.
Show resolved Hide resolved
req := &retrieveDataReq{
now: time.Now(),
v: viewName,
c: make(chan *retrieveDataResp),
}
defaultWorker.c <- req
w.c <- req
resp := <-req.c
return resp.rows, resp.err
}

func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
defaultWorker.Record(tags, ms.([]stats.Measurement), attachments)
}

// Record records a set of measurements ms associated with the given tags and attachments.
func (w *worker) Record(tags *tag.Map, ms []stats.Measurement, attachments map[string]interface{}) {
req := &recordReq{
tm: tags,
ms: ms.([]stats.Measurement),
ms: ms,
attachments: attachments,
t: time.Now(),
}
defaultWorker.c <- req
w.c <- req
}

// SetReportingPeriod sets the interval between reporting aggregated views in
Expand All @@ -124,17 +182,29 @@ func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
// duration is. For example, the Stackdriver exporter recommends a value no
// lower than 1 minute. Consult each exporter per your needs.
func SetReportingPeriod(d time.Duration) {
defaultWorker.SetReportingPeriod(d)
}

// SetReportingPeriod sets the interval between reporting aggregated views in
// the program. If duration is less than or equal to zero, it enables the
// default behavior.
//
// Note: each exporter makes different promises about what the lowest supported
// duration is. For example, the Stackdriver exporter recommends a value no
// lower than 1 minute. Consult each exporter per your needs.
func (w *worker) SetReportingPeriod(d time.Duration) {
// TODO(acetechnologist): ensure that the duration d is more than a certain
// value. e.g. 1s
req := &setReportingPeriodReq{
d: d,
c: make(chan bool),
}
defaultWorker.c <- req
w.c <- req
<-req.c // don't return until the timer is set to the new duration.
}

func newWorker() *worker {
// NewWorker constructs a
func NewWorker() Worker {
return &worker{
measures: make(map[string]*measureRef),
views: make(map[string]*viewInternal),
Expand All @@ -143,9 +213,15 @@ func newWorker() *worker {
c: make(chan command, 1024),
quit: make(chan bool),
done: make(chan bool),

exporters: make(map[Exporter]struct{}),
}
}

func (w *worker) Start() {
go w.start()
evankanderson marked this conversation as resolved.
Show resolved Hide resolved
}

func (w *worker) start() {
prodMgr := metricproducer.GlobalManager()
prodMgr.AddProducer(w)
Expand All @@ -165,7 +241,7 @@ func (w *worker) start() {
}
}

func (w *worker) stop() {
func (w *worker) Stop() {
prodMgr := metricproducer.GlobalManager()
prodMgr.DeleteProducer(w)

Expand Down Expand Up @@ -228,11 +304,11 @@ func (w *worker) reportView(v *viewInternal, now time.Time) {
End: time.Now(),
Rows: rows,
}
exportersMu.Lock()
for e := range exporters {
w.exportersMu.Lock()
defer w.exportersMu.Unlock()
for e := range w.exporters {
e.ExportView(viewData)
}
exportersMu.Unlock()
}

func (w *worker) reportUsage(now time.Time) {
Expand Down Expand Up @@ -279,3 +355,17 @@ func (w *worker) Read() []*metricdata.Metric {
}
return metrics
}

func (w *worker) RegisterExporter(e Exporter) {
w.exportersMu.Lock()
defer w.exportersMu.Unlock()

w.exporters[e] = struct{}{}
}

func (w *worker) UnregisterExporter(e Exporter) {
w.exportersMu.Lock()
defer w.exportersMu.Unlock()

delete(w.exporters, e)
}
Loading