Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support statsd for vitess #7072

Merged
merged 6 commits into from
Nov 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/Azure/azure-storage-blob-go v0.10.0
github.com/Azure/go-autorest/autorest v0.10.0
github.com/Azure/go-autorest/autorest/validation v0.3.0 // indirect
github.com/DataDog/datadog-go v2.2.0+incompatible
github.com/GeertJohan/go.rice v1.0.0
github.com/PuerkitoBio/goquery v1.5.1
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6
Expand Down
7 changes: 7 additions & 0 deletions go/cmd/vtgate/plugin_statsd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package main

import "vitess.io/vitess/go/stats/statsd"

func init() {
statsd.Init("vtgate")
}
7 changes: 7 additions & 0 deletions go/cmd/vttablet/plugin_statsd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package main

import "vitess.io/vitess/go/stats/statsd"

func init() {
statsd.Init("vttablet")
}
227 changes: 227 additions & 0 deletions go/stats/statsd/statsd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package statsd

import (
"encoding/json"
"expvar"
"flag"
"fmt"
"strings"

"github.com/DataDog/datadog-go/statsd"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
)

var (
statsdAddress = flag.String("statsd_address", "", "Address for statsd client")
statsdSampleRate = flag.Float64("statsd_sample_rate", 1.0, "")
)

// StatsBackend implements PullBackend using statsd
type StatsBackend struct {
namespace string
statsdClient *statsd.Client
sampleRate float64
}

var (
sb StatsBackend
)

// makeLabel builds a tag list with a single label + value.
func makeLabel(labelName string, labelVal string) []string {
return []string{fmt.Sprintf("%s:%s", labelName, labelVal)}
}

// makeLabels takes the vitess stat representation of label values ("."-separated list) and breaks it
// apart into a map of label name -> label value.
func makeLabels(labelNames []string, labelValsCombined string) []string {
tags := make([]string, len(labelNames))
labelVals := strings.Split(labelValsCombined, ".")
for i, v := range labelVals {
tags[i] = fmt.Sprintf("%s:%s", labelNames[i], v)
}
return tags
}

func (sb StatsBackend) addHistogram(name string, h *stats.Histogram, tags []string) {
labels := h.Labels()
buckets := h.Buckets()
for i := range labels {
name := fmt.Sprintf("%s.%s", name, labels[i])
sb.statsdClient.Gauge(name, float64(buckets[i]), tags, sb.sampleRate)
}
sb.statsdClient.Gauge(fmt.Sprintf("%s.%s", name, h.CountLabel()),
(float64)(h.Count()),
tags,
sb.sampleRate,
)
sb.statsdClient.Gauge(fmt.Sprintf("%s.%s", name, h.TotalLabel()),
(float64)(h.Total()),
tags,
sb.sampleRate,
)
}

// Init initializes the statsd with the given namespace.
func Init(namespace string) {
servenv.OnRun(func() {
if *statsdAddress == "" {
return
}
statsdC, err := statsd.NewBuffered(*statsdAddress, 100)
if err != nil {
log.Errorf("Failed to create statsd client %v", err)
return
}
statsdC.Namespace = namespace + "."
sb.namespace = namespace
sb.statsdClient = statsdC
sb.sampleRate = *statsdSampleRate
stats.RegisterPushBackend("statsd", sb)
})
}

func (sb StatsBackend) addExpVar(kv expvar.KeyValue) {
k := kv.Key
switch v := kv.Value.(type) {
case *stats.String:
if err := sb.statsdClient.Set(k, v.Get(), nil, sb.sampleRate); err != nil {
log.Errorf("Failed to add String %v for key %v", v, k)
}
case *stats.Counter:
if err := sb.statsdClient.Count(k, v.Get(), nil, sb.sampleRate); err != nil {
log.Errorf("Failed to add Counter %v for key %v", v, k)
}
case *stats.Gauge:
if err := sb.statsdClient.Gauge(k, float64(v.Get()), nil, sb.sampleRate); err != nil {
log.Errorf("Failed to add Gauge %v for key %v", v, k)
}
case *stats.GaugeFunc:
if err := sb.statsdClient.Gauge(k, float64(v.F()), nil, sb.sampleRate); err != nil {
log.Errorf("Failed to add GaugeFunc %v for key %v", v, k)
}
case *stats.CounterFunc:
if err := sb.statsdClient.Gauge(k, float64(v.F()), nil, sb.sampleRate); err != nil {
log.Errorf("Failed to add CounterFunc %v for key %v", v, k)
}
case *stats.CounterDuration:
if err := sb.statsdClient.TimeInMilliseconds(k, float64(v.Get().Milliseconds()), nil, sb.sampleRate); err != nil {
log.Errorf("Failed to add CounterDuration %v for key %v", v, k)
}
case *stats.CounterDurationFunc:
if err := sb.statsdClient.TimeInMilliseconds(k, float64(v.F().Milliseconds()), nil, sb.sampleRate); err != nil {
log.Errorf("Failed to add CounterDuration %v for key %v", v, k)
}
case *stats.GaugeDuration:
if err := sb.statsdClient.TimeInMilliseconds(k, float64(v.Get().Milliseconds()), nil, sb.sampleRate); err != nil {
log.Errorf("Failed to add GaugeDuration %v for key %v", v, k)
}
case *stats.GaugeDurationFunc:
if err := sb.statsdClient.TimeInMilliseconds(k, float64(v.F().Milliseconds()), nil, sb.sampleRate); err != nil {
log.Errorf("Failed to add GaugeDuration %v for key %v", v, k)
}
case *stats.CountersWithSingleLabel:
for labelVal, val := range v.Counts() {
if err := sb.statsdClient.Count(k, val, makeLabel(v.Label(), labelVal), sb.sampleRate); err != nil {
log.Errorf("Failed to add CountersWithSingleLabel %v for key %v", v, k)
}
}
case *stats.CountersWithMultiLabels:
for labelVals, val := range v.Counts() {
if err := sb.statsdClient.Count(k, val, makeLabels(v.Labels(), labelVals), sb.sampleRate); err != nil {
log.Errorf("Failed to add CountersFuncWithMultiLabels %v for key %v", v, k)
}
}
case *stats.CountersFuncWithMultiLabels:
for labelVals, val := range v.Counts() {
if err := sb.statsdClient.Count(k, val, makeLabels(v.Labels(), labelVals), sb.sampleRate); err != nil {
log.Errorf("Failed to add CountersFuncWithMultiLabels %v for key %v", v, k)
}
}
case *stats.GaugesWithMultiLabels:
for labelVals, val := range v.Counts() {
if err := sb.statsdClient.Gauge(k, float64(val), makeLabels(v.Labels(), labelVals), sb.sampleRate); err != nil {
log.Errorf("Failed to add GaugesWithMultiLabels %v for key %v", v, k)
}
}
case *stats.GaugesFuncWithMultiLabels:
for labelVals, val := range v.Counts() {
if err := sb.statsdClient.Gauge(k, float64(val), makeLabels(v.Labels(), labelVals), sb.sampleRate); err != nil {
log.Errorf("Failed to add GaugesFuncWithMultiLabels %v for key %v", v, k)
}
}
case *stats.GaugesWithSingleLabel:
for labelVal, val := range v.Counts() {
if err := sb.statsdClient.Gauge(k, float64(val), makeLabel(v.Label(), labelVal), sb.sampleRate); err != nil {
log.Errorf("Failed to add GaugesWithSingleLabel %v for key %v", v, k)
}
}
case *stats.MultiTimings:
labels := v.Labels()
hists := v.Histograms()
for labelValsCombined, histogram := range hists {
sb.addHistogram(k, histogram, makeLabels(labels, labelValsCombined))
}
case *stats.Timings:
// TODO: for statsd.timing metrics, there is no good way to transfer the histogram to it
// If we store a in memory buffer for stats.Timings and flush it here it's hard to make the stats
// thread safe.
// Instead, we export the timings stats as histogram here. We won't have the percentile breakdown
// for the metrics, but we can still get the average from total and count
labels := []string{v.Label()}
hists := v.Histograms()
for labelValsCombined, histogram := range hists {
sb.addHistogram(k, histogram, makeLabels(labels, labelValsCombined))
}
case *stats.Histogram:
sb.addHistogram(k, v, []string{})
case expvar.Func:
// Export memstats as gauge so that we don't need to call extra ReadMemStats
if k == "memstats" {
var obj map[string]interface{}
if err := json.Unmarshal([]byte(v.String()), &obj); err != nil {
return
}
for k, v := range obj {
if k == "NumGC" {
if err := sb.statsdClient.Gauge("NumGC", v.(float64), []string{}, sb.sampleRate); err != nil {
log.Errorf("Failed to export NumGC %v", v)
}
} else if k == "Frees" {
if err := sb.statsdClient.Gauge("Frees", v.(float64), []string{}, sb.sampleRate); err != nil {
log.Errorf("Failed to export Frees %v", v)
}
} else if k == "GCCPUFraction" {
if err := sb.statsdClient.Gauge("GCCPUFraction", v.(float64), []string{}, sb.sampleRate); err != nil {
log.Errorf("Failed to export GCCPUFraction %v", v)
}
} else if k == "PauseTotalNs" {
if err := sb.statsdClient.Gauge("PauseTotalNs", v.(float64), []string{}, sb.sampleRate); err != nil {
log.Errorf("Failed to export PauseTotalNs %v", v)
}
} else if k == "HeapAlloc" {
if err := sb.statsdClient.Gauge("HeapAlloc", v.(float64), []string{}, sb.sampleRate); err != nil {
log.Errorf("Failed to export HeapAlloc %v", v)
}
}
}
}
case *stats.StringMapFunc, *stats.Rates, *stats.RatesFunc:
// Silently ignore metrics that does not make sense to be exported to statsd
default:
log.Warningf("Silently ignore metrics with key %v [%T]", k, kv.Value)
}
}

// PushAll flush out the pending metrics
func (sb StatsBackend) PushAll() error {
expvar.Do(func(kv expvar.KeyValue) {
sb.addExpVar(kv)
})
if err := sb.statsdClient.Flush(); err != nil {
return err
}
return nil
}
Loading