From 047e38f50b5c5067c773d64a5ca5c393b871cfc6 Mon Sep 17 00:00:00 2001 From: rghetia Date: Mon, 15 Apr 2019 10:19:48 -0700 Subject: [PATCH] Add support for metrics in prometheus exporter (#1105) * Add prometheus support. * remove view related code and refactor metric specific code. * fix review comments. * remove dead code and fix comments. * fix error message. --- exporter/prometheus/example_test.go | 2 - exporter/prometheus/prometheus.go | 281 ++++++++++++------------- exporter/prometheus/prometheus_test.go | 151 +------------ 3 files changed, 136 insertions(+), 298 deletions(-) diff --git a/exporter/prometheus/example_test.go b/exporter/prometheus/example_test.go index 073a8bdd5..182ad2003 100644 --- a/exporter/prometheus/example_test.go +++ b/exporter/prometheus/example_test.go @@ -19,7 +19,6 @@ import ( "net/http" "go.opencensus.io/exporter/prometheus" - "go.opencensus.io/stats/view" ) func Example() { @@ -27,7 +26,6 @@ func Example() { if err != nil { log.Fatal(err) } - view.RegisterExporter(exporter) // Serve the scrape endpoint on port 9999. http.Handle("/metrics", exporter) diff --git a/exporter/prometheus/prometheus.go b/exporter/prometheus/prometheus.go index 203bd38ad..53ff6ba6e 100644 --- a/exporter/prometheus/prometheus.go +++ b/exporter/prometheus/prometheus.go @@ -17,18 +17,18 @@ package prometheus // import "go.opencensus.io/exporter/prometheus" import ( - "bytes" "fmt" "log" "net/http" "sync" - "go.opencensus.io/internal" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" - + "context" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opencensus.io/internal" + "go.opencensus.io/metric/metricdata" + "go.opencensus.io/metric/metricexport" + "go.opencensus.io/stats/view" ) // Exporter exports stats to Prometheus, users need @@ -61,39 +61,12 @@ func NewExporter(o Options) (*Exporter, error) { c: collector, handler: promhttp.HandlerFor(o.Registry, promhttp.HandlerOpts{}), } + collector.ensureRegisteredOnce() + return e, nil } var _ http.Handler = (*Exporter)(nil) -var _ view.Exporter = (*Exporter)(nil) - -func (c *collector) registerViews(views ...*view.View) { - count := 0 - for _, view := range views { - sig := viewSignature(c.opts.Namespace, view) - c.registeredViewsMu.Lock() - _, ok := c.registeredViews[sig] - c.registeredViewsMu.Unlock() - - if !ok { - desc := prometheus.NewDesc( - viewName(c.opts.Namespace, view), - view.Description, - tagKeysToLabels(view.TagKeys), - c.opts.ConstLabels, - ) - c.registeredViewsMu.Lock() - c.registeredViews[sig] = desc - c.registeredViewsMu.Unlock() - count++ - } - } - if count == 0 { - return - } - - c.ensureRegisteredOnce() -} // ensureRegisteredOnce invokes reg.Register on the collector itself // exactly once to ensure that we don't get errors such as @@ -123,11 +96,8 @@ func (o *Options) onError(err error) { // corresponding Prometheus Metric: SumData will be converted // to Untyped Metric, CountData will be a Counter Metric, // DistributionData will be a Histogram Metric. +// Deprecated in lieu of metricexport.Reader interface. func (e *Exporter) ExportView(vd *view.Data) { - if len(vd.Rows) == 0 { - return - } - e.c.addViewData(vd) } // ServeHTTP serves the Prometheus endpoint. @@ -145,151 +115,164 @@ type collector struct { // reg helps collector register views dynamically. reg *prometheus.Registry - // viewData are accumulated and atomically - // appended to on every Export invocation, from - // stats. These views are cleared out when - // Collect is invoked and the cycle is repeated. - viewData map[string]*view.Data - - registeredViewsMu sync.Mutex - // registeredViews maps a view to a prometheus desc. - registeredViews map[string]*prometheus.Desc -} - -func (c *collector) addViewData(vd *view.Data) { - c.registerViews(vd.View) - sig := viewSignature(c.opts.Namespace, vd.View) - - c.mu.Lock() - c.viewData[sig] = vd - c.mu.Unlock() + // reader reads metrics from all registered producers. + reader *metricexport.Reader } func (c *collector) Describe(ch chan<- *prometheus.Desc) { - c.registeredViewsMu.Lock() - registered := make(map[string]*prometheus.Desc) - for k, desc := range c.registeredViews { - registered[k] = desc - } - c.registeredViewsMu.Unlock() - - for _, desc := range registered { - ch <- desc - } + de := &descExporter{c: c, descCh: ch} + c.reader.ReadAndExport(de) } // Collect fetches the statistics from OpenCensus // and delivers them as Prometheus Metrics. -// Collect is invoked everytime a prometheus.Gatherer is run +// Collect is invoked every time a prometheus.Gatherer is run // for example when the HTTP endpoint is invoked by Prometheus. func (c *collector) Collect(ch chan<- prometheus.Metric) { - // We need a copy of all the view data up until this point. - viewData := c.cloneViewData() - - for _, vd := range viewData { - sig := viewSignature(c.opts.Namespace, vd.View) - c.registeredViewsMu.Lock() - desc := c.registeredViews[sig] - c.registeredViewsMu.Unlock() - - for _, row := range vd.Rows { - metric, err := c.toMetric(desc, vd.View, row) - if err != nil { - c.opts.onError(err) - } else { - ch <- metric - } - } - } - + me := &metricExporter{c: c, metricCh: ch} + c.reader.ReadAndExport(me) } -func (c *collector) toMetric(desc *prometheus.Desc, v *view.View, row *view.Row) (prometheus.Metric, error) { - switch data := row.Data.(type) { - case *view.CountData: - return prometheus.NewConstMetric(desc, prometheus.CounterValue, float64(data.Value), tagValues(row.Tags, v.TagKeys)...) - - case *view.DistributionData: - points := make(map[float64]uint64) - // Histograms are cumulative in Prometheus. - // Get cumulative bucket counts. - cumCount := uint64(0) - for i, b := range v.Aggregation.Buckets { - cumCount += uint64(data.CountPerBucket[i]) - points[b] = cumCount - } - return prometheus.NewConstHistogram(desc, uint64(data.Count), data.Sum(), points, tagValues(row.Tags, v.TagKeys)...) +func newCollector(opts Options, registrar *prometheus.Registry) *collector { + return &collector{ + reg: registrar, + opts: opts, + reader: metricexport.NewReader()} +} - case *view.SumData: - return prometheus.NewConstMetric(desc, prometheus.UntypedValue, data.Value, tagValues(row.Tags, v.TagKeys)...) +func (c *collector) toDesc(metric *metricdata.Metric) *prometheus.Desc { + return prometheus.NewDesc( + metricName(c.opts.Namespace, metric), + metric.Descriptor.Description, + toPromLabels(metric.Descriptor.LabelKeys), + c.opts.ConstLabels) +} - case *view.LastValueData: - return prometheus.NewConstMetric(desc, prometheus.GaugeValue, data.Value, tagValues(row.Tags, v.TagKeys)...) +type metricExporter struct { + c *collector + metricCh chan<- prometheus.Metric +} - default: - return nil, fmt.Errorf("aggregation %T is not yet supported", v.Aggregation) +// ExportMetrics exports to the Prometheus. +// Each OpenCensus Metric will be converted to +// corresponding Prometheus Metric: +// TypeCumulativeInt64 and TypeCumulativeFloat64 will be a Counter Metric, +// TypeCumulativeDistribution will be a Histogram Metric. +// TypeGaugeFloat64 and TypeGaugeInt64 will be a Gauge Metric +func (me *metricExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error { + for _, metric := range metrics { + desc := me.c.toDesc(metric) + for _, ts := range metric.TimeSeries { + tvs := toLabelValues(ts.LabelValues) + for _, point := range ts.Points { + metric, err := toPromMetric(desc, metric, point, tvs) + if err != nil { + me.c.opts.onError(err) + } else if metric != nil { + me.metricCh <- metric + } + } + } } + return nil } -func tagKeysToLabels(keys []tag.Key) (labels []string) { - for _, key := range keys { - labels = append(labels, internal.Sanitize(key.Name())) - } - return labels +type descExporter struct { + c *collector + descCh chan<- *prometheus.Desc } -func newCollector(opts Options, registrar *prometheus.Registry) *collector { - return &collector{ - reg: registrar, - opts: opts, - registeredViews: make(map[string]*prometheus.Desc), - viewData: make(map[string]*view.Data), +// ExportMetrics exports descriptor to the Prometheus. +// It is invoked when request to scrape descriptors is received. +func (me *descExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error { + for _, metric := range metrics { + desc := me.c.toDesc(metric) + me.descCh <- desc } + return nil } -func tagValues(t []tag.Tag, expectedKeys []tag.Key) []string { - var values []string - // Add empty string for all missing keys in the tags map. - idx := 0 - for _, t := range t { - for t.Key != expectedKeys[idx] { - idx++ - values = append(values, "") - } - values = append(values, t.Value) - idx++ - } - for idx < len(expectedKeys) { - idx++ - values = append(values, "") +func toPromLabels(mls []string) (labels []string) { + for _, ml := range mls { + labels = append(labels, internal.Sanitize(ml)) } - return values + return labels } -func viewName(namespace string, v *view.View) string { +func metricName(namespace string, m *metricdata.Metric) string { var name string if namespace != "" { name = namespace + "_" } - return name + internal.Sanitize(v.Name) + return name + internal.Sanitize(m.Descriptor.Name) +} + +func toPromMetric( + desc *prometheus.Desc, + metric *metricdata.Metric, + point metricdata.Point, + labelValues []string) (prometheus.Metric, error) { + switch metric.Descriptor.Type { + case metricdata.TypeCumulativeFloat64, metricdata.TypeCumulativeInt64: + pv, err := toPromValue(point) + if err != nil { + return nil, err + } + return prometheus.NewConstMetric(desc, prometheus.CounterValue, pv, labelValues...) + + case metricdata.TypeGaugeFloat64, metricdata.TypeGaugeInt64: + pv, err := toPromValue(point) + if err != nil { + return nil, err + } + return prometheus.NewConstMetric(desc, prometheus.GaugeValue, pv, labelValues...) + + case metricdata.TypeCumulativeDistribution: + switch v := point.Value.(type) { + case *metricdata.Distribution: + points := make(map[float64]uint64) + // Histograms are cumulative in Prometheus. + // Get cumulative bucket counts. + cumCount := uint64(0) + for i, b := range v.BucketOptions.Bounds { + cumCount += uint64(v.Buckets[i].Count) + points[b] = cumCount + } + return prometheus.NewConstHistogram(desc, uint64(v.Count), v.Sum, points, labelValues...) + default: + return nil, typeMismatchError(point) + } + case metricdata.TypeSummary: + // TODO: [rghetia] add support for TypeSummary. + return nil, nil + default: + return nil, fmt.Errorf("aggregation %T is not yet supported", metric.Descriptor.Type) + } } -func viewSignature(namespace string, v *view.View) string { - var buf bytes.Buffer - buf.WriteString(viewName(namespace, v)) - for _, k := range v.TagKeys { - buf.WriteString("-" + k.Name()) +func toLabelValues(labelValues []metricdata.LabelValue) (values []string) { + for _, lv := range labelValues { + if lv.Present { + values = append(values, lv.Value) + } else { + values = append(values, "") + } } - return buf.String() + return values } -func (c *collector) cloneViewData() map[string]*view.Data { - c.mu.Lock() - defer c.mu.Unlock() +func typeMismatchError(point metricdata.Point) error { + return fmt.Errorf("point type %T does not match metric type", point) - viewDataCopy := make(map[string]*view.Data) - for sig, viewData := range c.viewData { - viewDataCopy[sig] = viewData +} + +func toPromValue(point metricdata.Point) (float64, error) { + switch v := point.Value.(type) { + case float64: + return v, nil + case int64: + return float64(v), nil + default: + return 0.0, typeMismatchError(point) } - return viewDataCopy } diff --git a/exporter/prometheus/prometheus_test.go b/exporter/prometheus/prometheus_test.go index 4042e68b3..83fc90abb 100644 --- a/exporter/prometheus/prometheus_test.go +++ b/exporter/prometheus/prometheus_test.go @@ -16,12 +16,10 @@ package prometheus import ( "context" - "fmt" "io/ioutil" "net/http" "net/http/httptest" "strings" - "sync" "testing" "time" @@ -32,136 +30,6 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -func newView(measureName string, agg *view.Aggregation) *view.View { - m := stats.Int64(measureName, "bytes", stats.UnitBytes) - return &view.View{ - Name: "foo", - Description: "bar", - Measure: m, - Aggregation: agg, - } -} - -func TestOnlyCumulativeWindowSupported(t *testing.T) { - // See Issue https://github.com/census-instrumentation/opencensus-go/issues/214. - count1 := &view.CountData{Value: 1} - lastValue1 := &view.LastValueData{Value: 56.7} - tests := []struct { - vds *view.Data - want int - }{ - 0: { - vds: &view.Data{ - View: newView("TestOnlyCumulativeWindowSupported/m1", view.Count()), - }, - want: 0, // no rows present - }, - 1: { - vds: &view.Data{ - View: newView("TestOnlyCumulativeWindowSupported/m2", view.Count()), - Rows: []*view.Row{ - {Data: count1}, - }, - }, - want: 1, - }, - 2: { - vds: &view.Data{ - View: newView("TestOnlyCumulativeWindowSupported/m3", view.LastValue()), - Rows: []*view.Row{ - {Data: lastValue1}, - }, - }, - want: 1, - }, - } - - for i, tt := range tests { - reg := prometheus.NewRegistry() - collector := newCollector(Options{}, reg) - collector.addViewData(tt.vds) - mm, err := reg.Gather() - if err != nil { - t.Errorf("#%d: Gather err: %v", i, err) - } - reg.Unregister(collector) - if got, want := len(mm), tt.want; got != want { - t.Errorf("#%d: got nil %v want nil %v", i, got, want) - } - } -} - -func TestCollectNonRacy(t *testing.T) { - // Despite enforcing the singleton, for this case we - // need an exporter hence won't be using NewExporter. - exp, err := NewExporter(Options{}) - if err != nil { - t.Fatalf("NewExporter: %v", err) - } - collector := exp.c - - // Synchronize and make sure every goroutine has terminated before we exit - var waiter sync.WaitGroup - waiter.Add(3) - defer waiter.Wait() - - doneCh := make(chan bool) - // 1. Viewdata write routine at 700ns - go func() { - defer waiter.Done() - tick := time.NewTicker(700 * time.Nanosecond) - defer tick.Stop() - - defer func() { - close(doneCh) - }() - - for i := 0; i < 1e3; i++ { - count1 := &view.CountData{Value: 1} - vds := []*view.Data{ - {View: newView(fmt.Sprintf("TestCollectNonRacy/m2-%d", i), view.Count()), Rows: []*view.Row{{Data: count1}}}, - } - for _, v := range vds { - exp.ExportView(v) - } - <-tick.C - } - }() - - inMetricsChan := make(chan prometheus.Metric, 1000) - // 2. Simulating the Prometheus metrics consumption routine running at 900ns - go func() { - defer waiter.Done() - tick := time.NewTicker(900 * time.Nanosecond) - defer tick.Stop() - - for { - select { - case <-doneCh: - return - case <-inMetricsChan: - } - } - }() - - // 3. Collect/Read routine at 800ns - go func() { - defer waiter.Done() - tick := time.NewTicker(800 * time.Nanosecond) - defer tick.Stop() - - for { - select { - case <-doneCh: - return - case <-tick.C: - // Perform some collection here - collector.Collect(inMetricsChan) - } - } - }() -} - type mSlice []*stats.Int64Measure func (measures *mSlice) createAndAppend(name, desc, unit string) { @@ -187,7 +55,6 @@ func TestMetricsEndpointOutput(t *testing.T) { if err != nil { t.Fatalf("failed to create prometheus exporter: %v", err) } - view.RegisterExporter(exporter) names := []string{"foo", "bar", "baz"} @@ -261,9 +128,6 @@ func TestCumulativenessFromHistograms(t *testing.T) { if err != nil { t.Fatalf("failed to create prometheus exporter: %v", err) } - view.RegisterExporter(exporter) - reportPeriod := time.Millisecond - view.SetReportingPeriod(reportPeriod) m := stats.Float64("tests/bills", "payments by denomination", stats.UnitDimensionless) v := &view.View{ @@ -282,7 +146,7 @@ func TestCumulativenessFromHistograms(t *testing.T) { defer view.Unregister(v) // Give the reporter ample time to process registration - <-time.After(10 * reportPeriod) + //<-time.After(10 * reportPeriod) values := []float64{0.25, 245.67, 12, 1.45, 199.9, 7.69, 187.12} // We want the results that look like this: @@ -315,7 +179,7 @@ func TestCumulativenessFromHistograms(t *testing.T) { stats.Record(ctx, ms...) // Give the recorder ample time to process recording - <-time.After(10 * reportPeriod) + //<-time.After(10 * reportPeriod) cst := httptest.NewServer(exporter) defer cst.Close() @@ -348,9 +212,6 @@ func TestHistogramUnorderedBucketBounds(t *testing.T) { if err != nil { t.Fatalf("failed to create prometheus exporter: %v", err) } - view.RegisterExporter(exporter) - reportPeriod := time.Millisecond - view.SetReportingPeriod(reportPeriod) m := stats.Float64("tests/bills", "payments by denomination", stats.UnitDimensionless) v := &view.View{ @@ -369,7 +230,7 @@ func TestHistogramUnorderedBucketBounds(t *testing.T) { defer view.Unregister(v) // Give the reporter ample time to process registration - <-time.After(10 * reportPeriod) + //<-time.After(10 * reportPeriod) values := []float64{0.25, 245.67, 12, 1.45, 199.9, 7.69, 187.12} // We want the results that look like this: @@ -402,7 +263,7 @@ func TestHistogramUnorderedBucketBounds(t *testing.T) { stats.Record(ctx, ms...) // Give the recorder ample time to process recording - <-time.After(10 * reportPeriod) + //<-time.After(10 * reportPeriod) cst := httptest.NewServer(exporter) defer cst.Close() @@ -442,8 +303,6 @@ func TestConstLabelsIncluded(t *testing.T) { if err != nil { t.Fatalf("failed to create prometheus exporter: %v", err) } - view.RegisterExporter(exporter) - defer view.UnregisterExporter(exporter) names := []string{"foo", "bar", "baz"} @@ -526,8 +385,6 @@ func TestViewMeasureWithoutTag(t *testing.T) { if err != nil { t.Fatalf("failed to create prometheus exporter: %v", err) } - view.RegisterExporter(exporter) - defer view.UnregisterExporter(exporter) m := stats.Int64("tests/foo", "foo", stats.UnitDimensionless) k1, _ := tag.NewKey("key/1") k2, _ := tag.NewKey("key/2")