Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Add support for metrics in prometheus exporter #1105

Merged
merged 5 commits into from
Apr 15, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions exporter/prometheus/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ import (
"net/http"

"go.opencensus.io/exporter/prometheus"
"go.opencensus.io/stats/view"
)

func Example() {
exporter, err := prometheus.NewExporter(prometheus.Options{})
if err != nil {
log.Fatal(err)
}
view.RegisterExporter(exporter)

// Serve the scrape endpoint on port 9999.
http.Handle("/metrics", exporter)
Expand Down
281 changes: 132 additions & 149 deletions exporter/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@
package prometheus // import "go.opencensus.io/exporter/prometheus"

import (
"bytes"
"fmt"
"log"
"net/http"
"sync"

"go.opencensus.io/internal"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"

"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opencensus.io/internal"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricexport"
"go.opencensus.io/stats/view"
)

// Exporter exports stats to Prometheus, users need
Expand Down Expand Up @@ -61,39 +61,12 @@ func NewExporter(o Options) (*Exporter, error) {
c: collector,
handler: promhttp.HandlerFor(o.Registry, promhttp.HandlerOpts{}),
}
collector.ensureRegisteredOnce()

return e, nil
}

var _ http.Handler = (*Exporter)(nil)
var _ view.Exporter = (*Exporter)(nil)

func (c *collector) registerViews(views ...*view.View) {
count := 0
for _, view := range views {
sig := viewSignature(c.opts.Namespace, view)
c.registeredViewsMu.Lock()
_, ok := c.registeredViews[sig]
c.registeredViewsMu.Unlock()

if !ok {
desc := prometheus.NewDesc(
viewName(c.opts.Namespace, view),
view.Description,
tagKeysToLabels(view.TagKeys),
c.opts.ConstLabels,
)
c.registeredViewsMu.Lock()
c.registeredViews[sig] = desc
c.registeredViewsMu.Unlock()
count++
}
}
if count == 0 {
return
}

c.ensureRegisteredOnce()
}

// ensureRegisteredOnce invokes reg.Register on the collector itself
// exactly once to ensure that we don't get errors such as
Expand Down Expand Up @@ -123,11 +96,8 @@ func (o *Options) onError(err error) {
// corresponding Prometheus Metric: SumData will be converted
// to Untyped Metric, CountData will be a Counter Metric,
// DistributionData will be a Histogram Metric.
// Deprecated in lieu of metricexport.Reader interface.
func (e *Exporter) ExportView(vd *view.Data) {
songy23 marked this conversation as resolved.
Show resolved Hide resolved
if len(vd.Rows) == 0 {
return
}
e.c.addViewData(vd)
}

// ServeHTTP serves the Prometheus endpoint.
Expand All @@ -145,151 +115,164 @@ type collector struct {
// reg helps collector register views dynamically.
reg *prometheus.Registry

// viewData are accumulated and atomically
// appended to on every Export invocation, from
// stats. These views are cleared out when
// Collect is invoked and the cycle is repeated.
viewData map[string]*view.Data

registeredViewsMu sync.Mutex
// registeredViews maps a view to a prometheus desc.
registeredViews map[string]*prometheus.Desc
}

func (c *collector) addViewData(vd *view.Data) {
c.registerViews(vd.View)
sig := viewSignature(c.opts.Namespace, vd.View)

c.mu.Lock()
c.viewData[sig] = vd
c.mu.Unlock()
// reader reads metrics from all registered producers.
reader *metricexport.Reader
}

func (c *collector) Describe(ch chan<- *prometheus.Desc) {
c.registeredViewsMu.Lock()
registered := make(map[string]*prometheus.Desc)
for k, desc := range c.registeredViews {
registered[k] = desc
}
c.registeredViewsMu.Unlock()

for _, desc := range registered {
ch <- desc
}
de := &descExporter{c: c, descCh: ch}
c.reader.ReadAndExport(de)
}

// Collect fetches the statistics from OpenCensus
// and delivers them as Prometheus Metrics.
// Collect is invoked everytime a prometheus.Gatherer is run
// Collect is invoked every time a prometheus.Gatherer is run
// for example when the HTTP endpoint is invoked by Prometheus.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
// We need a copy of all the view data up until this point.
viewData := c.cloneViewData()

for _, vd := range viewData {
sig := viewSignature(c.opts.Namespace, vd.View)
c.registeredViewsMu.Lock()
desc := c.registeredViews[sig]
c.registeredViewsMu.Unlock()

for _, row := range vd.Rows {
metric, err := c.toMetric(desc, vd.View, row)
if err != nil {
c.opts.onError(err)
} else {
ch <- metric
}
}
}

me := &metricExporter{c: c, metricCh: ch}
c.reader.ReadAndExport(me)
}

func (c *collector) toMetric(desc *prometheus.Desc, v *view.View, row *view.Row) (prometheus.Metric, error) {
switch data := row.Data.(type) {
case *view.CountData:
return prometheus.NewConstMetric(desc, prometheus.CounterValue, float64(data.Value), tagValues(row.Tags, v.TagKeys)...)

case *view.DistributionData:
points := make(map[float64]uint64)
// Histograms are cumulative in Prometheus.
// Get cumulative bucket counts.
cumCount := uint64(0)
for i, b := range v.Aggregation.Buckets {
cumCount += uint64(data.CountPerBucket[i])
points[b] = cumCount
}
return prometheus.NewConstHistogram(desc, uint64(data.Count), data.Sum(), points, tagValues(row.Tags, v.TagKeys)...)
func newCollector(opts Options, registrar *prometheus.Registry) *collector {
return &collector{
reg: registrar,
opts: opts,
reader: metricexport.NewReader()}
}

case *view.SumData:
return prometheus.NewConstMetric(desc, prometheus.UntypedValue, data.Value, tagValues(row.Tags, v.TagKeys)...)
func (c *collector) toDesc(metric *metricdata.Metric) *prometheus.Desc {
return prometheus.NewDesc(
metricName(c.opts.Namespace, metric),
metric.Descriptor.Description,
toPromLabels(metric.Descriptor.LabelKeys),
c.opts.ConstLabels)
}

case *view.LastValueData:
return prometheus.NewConstMetric(desc, prometheus.GaugeValue, data.Value, tagValues(row.Tags, v.TagKeys)...)
type metricExporter struct {
c *collector
metricCh chan<- prometheus.Metric
}

default:
return nil, fmt.Errorf("aggregation %T is not yet supported", v.Aggregation)
// ExportMetrics exports to the Prometheus.
// Each OpenCensus Metric will be converted to
// corresponding Prometheus Metric:
// TypeCumulativeInt64 and TypeCumulativeFloat64 will be a Counter Metric,
// TypeCumulativeDistribution will be a Histogram Metric.
// TypeGaugeFloat64 and TypeGaugeInt64 will be a Gauge Metric
func (me *metricExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error {
for _, metric := range metrics {
desc := me.c.toDesc(metric)
for _, ts := range metric.TimeSeries {
tvs := toLabelValues(ts.LabelValues)
for _, point := range ts.Points {
metric, err := toPromMetric(desc, metric, point, tvs)
if err != nil {
me.c.opts.onError(err)
} else if metric != nil {
me.metricCh <- metric
}
}
}
}
return nil
}

func tagKeysToLabels(keys []tag.Key) (labels []string) {
for _, key := range keys {
labels = append(labels, internal.Sanitize(key.Name()))
}
return labels
type descExporter struct {
c *collector
descCh chan<- *prometheus.Desc
}

func newCollector(opts Options, registrar *prometheus.Registry) *collector {
return &collector{
reg: registrar,
opts: opts,
registeredViews: make(map[string]*prometheus.Desc),
viewData: make(map[string]*view.Data),
// ExportMetrics exports descriptor to the Prometheus.
// It is invoked when request to scrape descriptors is received.
func (me *descExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error {
for _, metric := range metrics {
desc := me.c.toDesc(metric)
me.descCh <- desc
}
return nil
}

func tagValues(t []tag.Tag, expectedKeys []tag.Key) []string {
var values []string
// Add empty string for all missing keys in the tags map.
idx := 0
for _, t := range t {
for t.Key != expectedKeys[idx] {
idx++
values = append(values, "")
}
values = append(values, t.Value)
idx++
}
for idx < len(expectedKeys) {
idx++
values = append(values, "")
func toPromLabels(mls []string) (labels []string) {
for _, ml := range mls {
labels = append(labels, internal.Sanitize(ml))
}
return values
return labels
}

func viewName(namespace string, v *view.View) string {
func metricName(namespace string, m *metricdata.Metric) string {
var name string
if namespace != "" {
name = namespace + "_"
}
return name + internal.Sanitize(v.Name)
return name + internal.Sanitize(m.Descriptor.Name)
}

func toPromMetric(
desc *prometheus.Desc,
metric *metricdata.Metric,
point metricdata.Point,
labelValues []string) (prometheus.Metric, error) {
switch metric.Descriptor.Type {
case metricdata.TypeCumulativeFloat64, metricdata.TypeCumulativeInt64:
pv, err := toPromValue(point)
if err != nil {
return nil, err
}
return prometheus.NewConstMetric(desc, prometheus.CounterValue, pv, labelValues...)

case metricdata.TypeGaugeFloat64, metricdata.TypeGaugeInt64:
pv, err := toPromValue(point)
if err != nil {
return nil, err
}
return prometheus.NewConstMetric(desc, prometheus.GaugeValue, pv, labelValues...)

case metricdata.TypeCumulativeDistribution:
switch v := point.Value.(type) {
case *metricdata.Distribution:
points := make(map[float64]uint64)
// Histograms are cumulative in Prometheus.
// Get cumulative bucket counts.
cumCount := uint64(0)
for i, b := range v.BucketOptions.Bounds {
cumCount += uint64(v.Buckets[i].Count)
points[b] = cumCount
}
return prometheus.NewConstHistogram(desc, uint64(v.Count), v.Sum, points, labelValues...)
default:
return nil, pointTypeError(point)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: leave a TODO to support Summary metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}
case metricdata.TypeSummary:
// TODO: [rghetia] add support for TypeSummary.
return nil, nil
default:
return nil, fmt.Errorf("aggregation %T is not yet supported", metric.Descriptor.Type)
}
}

func viewSignature(namespace string, v *view.View) string {
var buf bytes.Buffer
buf.WriteString(viewName(namespace, v))
for _, k := range v.TagKeys {
buf.WriteString("-" + k.Name())
func toLabelValues(labelValues []metricdata.LabelValue) (values []string) {
for _, lv := range labelValues {
if lv.Present {
values = append(values, lv.Value)
} else {
values = append(values, "")
}
}
return buf.String()
return values
}

func (c *collector) cloneViewData() map[string]*view.Data {
c.mu.Lock()
defer c.mu.Unlock()
func pointTypeError(point metricdata.Point) error {
return fmt.Errorf("point type %T is not yet supported", point)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the error message could be a bit confusing, consider "point type %T doesn't match with metric type".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


viewDataCopy := make(map[string]*view.Data)
for sig, viewData := range c.viewData {
viewDataCopy[sig] = viewData
}

func toPromValue(point metricdata.Point) (float64, error) {
switch v := point.Value.(type) {
case float64:
return v, nil
case int64:
return float64(v), nil
default:
return 0.0, pointTypeError(point)
}
return viewDataCopy
}
Loading