From f546dd20c7576b3982aa95862eff89172b15fd8d Mon Sep 17 00:00:00 2001 From: crowu Date: Mon, 23 Nov 2020 18:49:35 -0800 Subject: [PATCH 1/6] support statsd for vitess Signed-off-by: crowu --- go.mod | 1 + go/cmd/vtgate/plugin_statsd.go | 7 + go/cmd/vttablet/plugin_statsd.go | 7 + go/stats/statsd/statsd.go | 220 ++++++++++++++ go/stats/statsd/statsd_test.go | 477 +++++++++++++++++++++++++++++++ go/stats/timings.go | 43 +++ 6 files changed, 755 insertions(+) create mode 100644 go/cmd/vtgate/plugin_statsd.go create mode 100644 go/cmd/vttablet/plugin_statsd.go create mode 100644 go/stats/statsd/statsd.go create mode 100644 go/stats/statsd/statsd_test.go diff --git a/go.mod b/go.mod index 19d3687ecd3..2428cc21e96 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/Azure/azure-storage-blob-go v0.10.0 github.com/Azure/go-autorest/autorest v0.10.0 github.com/Azure/go-autorest/autorest/validation v0.3.0 // indirect + github.com/DataDog/datadog-go v2.2.0+incompatible github.com/GeertJohan/go.rice v1.0.0 github.com/PuerkitoBio/goquery v1.5.1 github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6 diff --git a/go/cmd/vtgate/plugin_statsd.go b/go/cmd/vtgate/plugin_statsd.go new file mode 100644 index 00000000000..ae2ecb5b2e0 --- /dev/null +++ b/go/cmd/vtgate/plugin_statsd.go @@ -0,0 +1,7 @@ +package main + +import "vitess.io/vitess/go/stats/statsd" + +func init() { + statsd.Init("vtgate") +} diff --git a/go/cmd/vttablet/plugin_statsd.go b/go/cmd/vttablet/plugin_statsd.go new file mode 100644 index 00000000000..51761e6c406 --- /dev/null +++ b/go/cmd/vttablet/plugin_statsd.go @@ -0,0 +1,7 @@ +package main + +import "vitess.io/vitess/go/stats/statsd" + +func init() { + statsd.Init("vttablet") +} diff --git a/go/stats/statsd/statsd.go b/go/stats/statsd/statsd.go new file mode 100644 index 00000000000..599802ddd8d --- /dev/null +++ b/go/stats/statsd/statsd.go @@ -0,0 +1,220 @@ +package statsd + +import ( + "encoding/json" + "expvar" + "flag" + "fmt" + "strings" + + "github.com/DataDog/datadog-go/statsd" + "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/servenv" +) + +var ( + statsdAddress = flag.String("statsd_address", "", "Address for statsd client") + statsdSampleRate = flag.Float64("statsd_sample_rate", 1.0, "") +) + +// StatsBackend implements PullBackend using statsd +type StatsBackend struct { + namespace string + statsdClient *statsd.Client + sampleRate float64 +} + +var ( + sb StatsBackend +) + +// makeLabel builds a tag list with a single label + value. +func makeLabel(labelName string, labelVal string) []string { + return []string{fmt.Sprintf("%s:%s", labelName, labelVal)} +} + +// makeLabels takes the vitess stat representation of label values ("."-separated list) and breaks it +// apart into a map of label name -> label value. +func makeLabels(labelNames []string, labelValsCombined string) []string { + tags := make([]string, len(labelNames)) + labelVals := strings.Split(labelValsCombined, ".") + for i, v := range labelVals { + tags[i] = fmt.Sprintf("%s:%s", labelNames[i], v) + } + return tags +} + +// Init initializes the statsd with the given namespace. +func Init(namespace string) { + servenv.OnRun(func() { + if *statsdAddress == "" { + return + } + statsdC, err := statsd.NewBuffered(*statsdAddress, 100) + if err != nil { + log.Errorf("Failed to create statsd client %v", err) + return + } + statsdC.Namespace = namespace + "." + sb.namespace = namespace + sb.statsdClient = statsdC + sb.sampleRate = *statsdSampleRate + stats.RegisterPushBackend("statsd", sb) + }) +} + +func (sb StatsBackend) addExpVar(kv expvar.KeyValue) { + k := kv.Key + switch v := kv.Value.(type) { + case *stats.String: + if err := sb.statsdClient.Set(k, v.Get(), nil, sb.sampleRate); err != nil { + log.Errorf("Failed to add String %v for key %v", v, k) + } + case *stats.Counter: + if err := sb.statsdClient.Count(k, v.Get(), nil, sb.sampleRate); err != nil { + log.Errorf("Failed to add Counter %v for key %v", v, k) + } + case *stats.Gauge: + if err := sb.statsdClient.Gauge(k, float64(v.Get()), nil, sb.sampleRate); err != nil { + log.Errorf("Failed to add Gauge %v for key %v", v, k) + } + case *stats.GaugeFunc: + if err := sb.statsdClient.Gauge(k, float64(v.F()), nil, sb.sampleRate); err != nil { + log.Errorf("Failed to add GaugeFunc %v for key %v", v, k) + } + case *stats.CounterFunc: + if err := sb.statsdClient.Gauge(k, float64(v.F()), nil, sb.sampleRate); err != nil { + log.Errorf("Failed to add CounterFunc %v for key %v", v, k) + } + case *stats.CounterDuration: + if err := sb.statsdClient.TimeInMilliseconds(k, float64(v.Get().Milliseconds()), nil, sb.sampleRate); err != nil { + log.Errorf("Failed to add CounterDuration %v for key %v", v, k) + } + case *stats.CounterDurationFunc: + if err := sb.statsdClient.TimeInMilliseconds(k, float64(v.F().Milliseconds()), nil, sb.sampleRate); err != nil { + log.Errorf("Failed to add CounterDuration %v for key %v", v, k) + } + case *stats.GaugeDuration: + if err := sb.statsdClient.TimeInMilliseconds(k, float64(v.Get().Milliseconds()), nil, sb.sampleRate); err != nil { + log.Errorf("Failed to add GaugeDuration %v for key %v", v, k) + } + case *stats.GaugeDurationFunc: + if err := sb.statsdClient.TimeInMilliseconds(k, float64(v.F().Milliseconds()), nil, sb.sampleRate); err != nil { + log.Errorf("Failed to add GaugeDuration %v for key %v", v, k) + } + case *stats.CountersWithSingleLabel: + for labelVal, val := range v.Counts() { + if err := sb.statsdClient.Count(k, val, makeLabel(v.Label(), labelVal), sb.sampleRate); err != nil { + log.Errorf("Failed to add CountersWithSingleLabel %v for key %v", v, k) + } + } + case *stats.CountersWithMultiLabels: + for labelVals, val := range v.Counts() { + if err := sb.statsdClient.Count(k, val, makeLabels(v.Labels(), labelVals), sb.sampleRate); err != nil { + log.Errorf("Failed to add CountersFuncWithMultiLabels %v for key %v", v, k) + } + } + case *stats.CountersFuncWithMultiLabels: + for labelVals, val := range v.Counts() { + if err := sb.statsdClient.Count(k, val, makeLabels(v.Labels(), labelVals), sb.sampleRate); err != nil { + log.Errorf("Failed to add CountersFuncWithMultiLabels %v for key %v", v, k) + } + } + case *stats.GaugesWithMultiLabels: + for labelVals, val := range v.Counts() { + if err := sb.statsdClient.Gauge(k, float64(val), makeLabels(v.Labels(), labelVals), sb.sampleRate); err != nil { + log.Errorf("Failed to add GaugesWithMultiLabels %v for key %v", v, k) + } + } + case *stats.GaugesFuncWithMultiLabels: + for labelVals, val := range v.Counts() { + if err := sb.statsdClient.Gauge(k, float64(val), makeLabels(v.Labels(), labelVals), sb.sampleRate); err != nil { + log.Errorf("Failed to add GaugesFuncWithMultiLabels %v for key %v", v, k) + } + } + case *stats.GaugesWithSingleLabel: + for labelVal, val := range v.Counts() { + if err := sb.statsdClient.Gauge(k, float64(val), makeLabel(v.Label(), labelVal), sb.sampleRate); err != nil { + log.Errorf("Failed to add GaugesWithSingleLabel %v for key %v", v, k) + } + } + case *stats.MultiTimings: + labels := v.Labels() + buffers := v.Buffers() + for labelValsCombined, buffer := range buffers { + tags := makeLabels(labels, labelValsCombined) + for _, elapsedNs := range buffer.Values() { + if err := sb.statsdClient.TimeInMilliseconds(k, float64(elapsedNs)/1000.0/1000.0, tags, sb.sampleRate); err != nil { + log.Errorf("Failed to add TimeInMilliseconds %v for key %v", buffer.Values(), k) + } + } + } + case *stats.Timings: + label := v.Label() + buffers := v.Buffers() + for labelValsCombined, buffer := range buffers { + tags := makeLabel(label, labelValsCombined) + for _, elapsedNs := range buffer.Values() { + if err := sb.statsdClient.TimeInMilliseconds(k, float64(elapsedNs)/1000.0/1000.0, tags, sb.sampleRate); err != nil { + log.Errorf("Failed to add TimeInMilliseconds %v for key %v", buffer.Values(), k) + } + } + } + case *stats.Histogram: + labels := v.Labels() + buckets := v.Buckets() + for i := range labels { + name := fmt.Sprintf("%s.%s", k, labels[i]) + if err := sb.statsdClient.Count(name, buckets[i], []string{}, sb.sampleRate); err != nil { + log.Errorf("Failed to add Histogram %v for key %v", buckets[i], name) + } + } + case expvar.Func: + // Export memstats as gauge so that we don't need to call extra ReadMemStats + if k == "memstats" { + var obj map[string]interface{} + if err := json.Unmarshal([]byte(v.String()), &obj); err != nil { + return + } + for k, v := range obj { + if k == "NumGC" { + if err := sb.statsdClient.Gauge("NumGC", v.(float64), []string{}, sb.sampleRate); err != nil { + log.Errorf("Failed to export NumGC %v", v) + } + } else if k == "Frees" { + if err := sb.statsdClient.Gauge("Frees", v.(float64), []string{}, sb.sampleRate); err != nil { + log.Errorf("Failed to export Frees %v", v) + } + } else if k == "GCCPUFraction" { + if err := sb.statsdClient.Gauge("GCCPUFraction", v.(float64), []string{}, sb.sampleRate); err != nil { + log.Errorf("Failed to export GCCPUFraction %v", v) + } + } else if k == "PauseTotalNs" { + if err := sb.statsdClient.Gauge("PauseTotalNs", v.(float64), []string{}, sb.sampleRate); err != nil { + log.Errorf("Failed to export PauseTotalNs %v", v) + } + } else if k == "HeapAlloc" { + if err := sb.statsdClient.Gauge("HeapAlloc", v.(float64), []string{}, sb.sampleRate); err != nil { + log.Errorf("Failed to export HeapAlloc %v", v) + } + } + } + } else { + log.Warningf("Silently ignore metrics with key %v [%T]", k, kv.Value) + } + default: + log.Warningf("Silently ignore metrics with key %v [%T]", k, kv.Value) + } +} + +// PushAll flush out the pending metrics +func (sb StatsBackend) PushAll() error { + expvar.Do(func(kv expvar.KeyValue) { + sb.addExpVar(kv) + }) + if err := sb.statsdClient.Flush(); err != nil { + return err + } + return nil +} diff --git a/go/stats/statsd/statsd_test.go b/go/stats/statsd/statsd_test.go new file mode 100644 index 00000000000..0f9852c6f4e --- /dev/null +++ b/go/stats/statsd/statsd_test.go @@ -0,0 +1,477 @@ +package statsd + +import ( + "expvar" + "net" + "strings" + "testing" + "time" + + "github.com/DataDog/datadog-go/statsd" + "gotest.tools/assert" + "vitess.io/vitess/go/stats" +) + +func getBackend(t *testing.T) (StatsBackend, *net.UDPConn) { + addr := "localhost:1201" + udpAddr, _ := net.ResolveUDPAddr("udp", addr) + server, _ := net.ListenUDP("udp", udpAddr) + bufferLength := 9 + client, _ := statsd.NewBuffered(addr, bufferLength) + client.Namespace = "test." + var sb StatsBackend + sb.namespace = "foo" + sb.sampleRate = 1 + sb.statsdClient = client + return sb, server +} + +func TestStatsdString(t *testing.T) { + sb, server := getBackend(t) + defer server.Close() + name := "string_name" + stats.NewString(name).Set("foo") + found := false + expvar.Do(func(kv expvar.KeyValue) { + if kv.Key == name { + found = true + sb.addExpVar(kv) + if err := sb.statsdClient.Flush(); err != nil { + t.Errorf("Error flushing: %s", err) + } + bytes := make([]byte, 4096) + n, err := server.Read(bytes) + if err != nil { + t.Fatal(err) + } + result := string(bytes[:n]) + expected := "test.string_name:foo|s" + assert.Equal(t, result, expected) + } + }) + if !found { + t.Errorf("Stat %s not found...", name) + } +} + +func TestStatsdCounter(t *testing.T) { + sb, server := getBackend(t) + defer server.Close() + name := "counter_name" + c := stats.NewCounter(name, "counter description") + c.Add(1) + found := false + expvar.Do(func(kv expvar.KeyValue) { + found = true + if kv.Key == name { + sb.addExpVar(kv) + if err := sb.statsdClient.Flush(); err != nil { + t.Errorf("Error flushing: %s", err) + } + bytes := make([]byte, 4096) + n, err := server.Read(bytes) + if err != nil { + t.Fatal(err) + } + result := string(bytes[:n]) + expected := "test.counter_name:1|c" + assert.Equal(t, result, expected) + } + }) + if !found { + t.Errorf("Stat %s not found...", name) + } +} + +func TestStatsdGauge(t *testing.T) { + sb, server := getBackend(t) + defer server.Close() + name := "gauge_name" + s := stats.NewGauge(name, "help") + s.Set(10) + found := false + expvar.Do(func(kv expvar.KeyValue) { + if kv.Key == name { + found = true + sb.addExpVar(kv) + if err := sb.statsdClient.Flush(); err != nil { + t.Errorf("Error flushing: %s", err) + } + bytes := make([]byte, 4096) + n, err := server.Read(bytes) + if err != nil { + t.Fatal(err) + } + result := string(bytes[:n]) + expected := "test.gauge_name:10.000000|g" + assert.Equal(t, result, expected) + } + }) + if !found { + t.Errorf("Stat %s not found...", name) + } +} + +func TestStatsdGaugeFunc(t *testing.T) { + sb, server := getBackend(t) + defer server.Close() + name := "gauge_func_name" + stats.NewGaugeFunc(name, "help", func() int64 { + return 2 + }) + found := false + expvar.Do(func(kv expvar.KeyValue) { + if kv.Key == name { + found = true + sb.addExpVar(kv) + if err := sb.statsdClient.Flush(); err != nil { + t.Errorf("Error flushing: %s", err) + } + bytes := make([]byte, 4096) + n, err := server.Read(bytes) + if err != nil { + t.Fatal(err) + } + result := string(bytes[:n]) + expected := "test.gauge_func_name:2.000000|g" + assert.Equal(t, result, expected) + } + }) + if !found { + t.Errorf("Stat %s not found...", name) + } +} + +func TestStatsdCounterDuration(t *testing.T) { + sb, server := getBackend(t) + defer server.Close() + name := "counter_duration_name" + s := stats.NewCounterDuration(name, "help") + s.Add(1 * time.Millisecond) + found := false + expvar.Do(func(kv expvar.KeyValue) { + if kv.Key == name { + found = true + sb.addExpVar(kv) + if err := sb.statsdClient.Flush(); err != nil { + t.Errorf("Error flushing: %s", err) + } + bytes := make([]byte, 4096) + n, err := server.Read(bytes) + if err != nil { + t.Fatal(err) + } + result := string(bytes[:n]) + expected := "test.counter_duration_name:1.000000|ms" + assert.Equal(t, result, expected) + } + }) + if !found { + t.Errorf("Stat %s not found...", name) + } +} + +func TestStatsdCountersWithSingleLabel(t *testing.T) { + sb, server := getBackend(t) + defer server.Close() + name := "counter_with_single_label_name" + s := stats.NewCountersWithSingleLabel(name, "help", "label", "tag1", "tag2") + s.Add("tag1", 2) + found := false + expvar.Do(func(kv expvar.KeyValue) { + if kv.Key == name { + found = true + sb.addExpVar(kv) + if err := sb.statsdClient.Flush(); err != nil { + t.Errorf("Error flushing: %s", err) + } + bytes := make([]byte, 4096) + n, err := server.Read(bytes) + if err != nil { + t.Fatal(err) + } + result := string(bytes[:n]) + expected := []string{ + "test.counter_with_single_label_name:2|c|#label:tag1", + "test.counter_with_single_label_name:0|c|#label:tag2", + } + for i, res := range strings.Split(result, "\n") { + assert.Equal(t, res, expected[i]) + } + } + }) + if !found { + t.Errorf("Stat %s not found...", name) + } +} + +func TestStatsdCountersWithMultiLabels(t *testing.T) { + sb, server := getBackend(t) + defer server.Close() + name := "counter_with_multiple_label_name" + s := stats.NewCountersWithMultiLabels(name, "help", []string{"label1", "label2"}) + s.Add([]string{"foo", "bar"}, 1) + found := false + expvar.Do(func(kv expvar.KeyValue) { + if kv.Key == name { + found = true + sb.addExpVar(kv) + if err := sb.statsdClient.Flush(); err != nil { + t.Errorf("Error flushing: %s", err) + } + bytes := make([]byte, 4096) + n, err := server.Read(bytes) + if err != nil { + t.Fatal(err) + } + result := string(bytes[:n]) + expected := "test.counter_with_multiple_label_name:1|c|#label1:foo,label2:bar" + assert.Equal(t, result, expected) + } + }) + if !found { + t.Errorf("Stat %s not found...", name) + } +} + +func TestStatsdCountersFuncWithMultiLabels(t *testing.T) { + sb, server := getBackend(t) + defer server.Close() + name := "counter_func_with_multiple_labels_name" + stats.NewCountersFuncWithMultiLabels(name, "help", []string{"label1", "label2"}, func() map[string]int64 { + m := make(map[string]int64) + m["foo.bar"] = 1 + m["bar.baz"] = 2 + return m + }) + found := false + expvar.Do(func(kv expvar.KeyValue) { + if kv.Key == name { + found = true + sb.addExpVar(kv) + if err := sb.statsdClient.Flush(); err != nil { + t.Errorf("Error flushing: %s", err) + } + bytes := make([]byte, 4096) + n, err := server.Read(bytes) + if err != nil { + t.Fatal(err) + } + result := string(bytes[:n]) + expected := []string{ + "test.counter_func_with_multiple_labels_name:1|c|#label1:foo,label2:bar", + "test.counter_func_with_multiple_labels_name:2|c|#label1:bar,label2:baz", + } + for i, res := range strings.Split(result, "\n") { + assert.Equal(t, res, expected[i]) + } + } + }) + if !found { + t.Errorf("Stat %s not found...", name) + } +} + +func TestStatsdGaugesWithMultiLabels(t *testing.T) { + sb, server := getBackend(t) + defer server.Close() + name := "gauges_with_multiple_label_name" + s := stats.NewGaugesWithMultiLabels(name, "help", []string{"label1", "label2"}) + s.Add([]string{"foo", "bar"}, 3) + found := false + expvar.Do(func(kv expvar.KeyValue) { + if kv.Key == name { + found = true + sb.addExpVar(kv) + if err := sb.statsdClient.Flush(); err != nil { + t.Errorf("Error flushing: %s", err) + } + bytes := make([]byte, 4096) + n, err := server.Read(bytes) + if err != nil { + t.Fatal(err) + } + result := string(bytes[:n]) + expected := "test.gauges_with_multiple_label_name:3.000000|g|#label1:foo,label2:bar" + assert.Equal(t, result, expected) + } + }) + if !found { + t.Errorf("Stat %s not found...", name) + } +} + +func TestStatsdGaugesFuncWithMultiLabels(t *testing.T) { + sb, server := getBackend(t) + defer server.Close() + name := "gauges_func_with_multiple_labels_name" + stats.NewGaugesFuncWithMultiLabels(name, "help", []string{"label1", "label2"}, func() map[string]int64 { + m := make(map[string]int64) + m["foo.bar"] = 1 + m["bar.baz"] = 2 + return m + }) + found := false + expvar.Do(func(kv expvar.KeyValue) { + if kv.Key == name { + found = true + sb.addExpVar(kv) + if err := sb.statsdClient.Flush(); err != nil { + t.Errorf("Error flushing: %s", err) + } + bytes := make([]byte, 4096) + n, err := server.Read(bytes) + if err != nil { + t.Fatal(err) + } + result := string(bytes[:n]) + expected := []string{ + "test.gauges_func_with_multiple_labels_name:1.000000|g|#label1:foo,label2:bar", + "test.gauges_func_with_multiple_labels_name:2.000000|g|#label1:bar,label2:baz", + } + for i, res := range strings.Split(result, "\n") { + assert.Equal(t, res, expected[i]) + } + } + }) + if !found { + t.Errorf("Stat %s not found...", name) + } +} + +func TestStatsdGaugesWithSingleLabel(t *testing.T) { + sb, server := getBackend(t) + defer server.Close() + name := "gauges_with_single_label_name" + s := stats.NewGaugesWithSingleLabel(name, "help", "label1") + s.Add("bar", 1) + found := false + expvar.Do(func(kv expvar.KeyValue) { + if kv.Key == name { + found = true + sb.addExpVar(kv) + if err := sb.statsdClient.Flush(); err != nil { + t.Errorf("Error flushing: %s", err) + } + bytes := make([]byte, 4096) + n, err := server.Read(bytes) + if err != nil { + t.Fatal(err) + } + result := string(bytes[:n]) + expected := "test.gauges_with_single_label_name:1.000000|g|#label1:bar" + assert.Equal(t, result, expected) + } + }) + if !found { + t.Errorf("Stat %s not found...", name) + } +} + +func TestStatsdMultiTimings(t *testing.T) { + sb, server := getBackend(t) + defer server.Close() + name := "multi_timings_name" + s := stats.NewMultiTimings(name, "help", []string{"label1", "label2"}) + s.Add([]string{"foo", "bar"}, 10*time.Millisecond) + s.Add([]string{"foo", "bar"}, 2*time.Millisecond) + found := false + expvar.Do(func(kv expvar.KeyValue) { + if kv.Key == name { + found = true + sb.addExpVar(kv) + if err := sb.statsdClient.Flush(); err != nil { + t.Errorf("Error flushing: %s", err) + } + bytes := make([]byte, 4096) + n, err := server.Read(bytes) + if err != nil { + t.Fatal(err) + } + result := string(bytes[:n]) + expected := []string{ + "test.multi_timings_name:10.000000|ms|#label1:foo,label2:bar", + "test.multi_timings_name:2.000000|ms|#label1:foo,label2:bar", + } + for i, res := range strings.Split(result, "\n") { + assert.Equal(t, res, expected[i]) + } + } + }) + if !found { + t.Errorf("Stat %s not found...", name) + } +} + +func TestStatsdTimings(t *testing.T) { + sb, server := getBackend(t) + defer server.Close() + name := "timings_name" + s := stats.NewTimings(name, "help", "label1") + s.Add("foo", 2*time.Millisecond) + s.Add("bar", 10*time.Millisecond) + found := false + expvar.Do(func(kv expvar.KeyValue) { + if kv.Key == name { + found = true + sb.addExpVar(kv) + if err := sb.statsdClient.Flush(); err != nil { + t.Errorf("Error flushing: %s", err) + } + bytes := make([]byte, 12288) + n, err := server.Read(bytes) + if err != nil { + t.Fatal(err) + } + result := string(bytes[:n]) + expected := []string{ + "test.timings_name:2.000000|ms|#label1:foo", + "test.timings_name:10.000000|ms|#label1:bar", + } + for i, res := range strings.Split(result, "\n") { + assert.Equal(t, res, expected[i]) + } + } + }) + if !found { + t.Errorf("Stat %s not found...", name) + } +} + +func TestStatsdHistogram(t *testing.T) { + sb, server := getBackend(t) + defer server.Close() + name := "histogram_name" + s := stats.NewHistogram(name, "help", []int64{1, 5, 10}) + s.Add(2) + s.Add(3) + s.Add(6) + found := false + expvar.Do(func(kv expvar.KeyValue) { + if kv.Key == name { + found = true + sb.addExpVar(kv) + if err := sb.statsdClient.Flush(); err != nil { + t.Errorf("Error flushing: %s", err) + } + bytes := make([]byte, 4096) + n, err := server.Read(bytes) + if err != nil { + t.Fatal(err) + } + result := string(bytes[:n]) + expected := []string{ + "test.histogram_name.1:0|c", + "test.histogram_name.5:2|c", + "test.histogram_name.10:1|c", + "test.histogram_name.inf:0|c", + } + for i, res := range strings.Split(result, "\n") { + assert.Equal(t, res, expected[i]) + } + } + }) + if !found { + t.Errorf("Stat %s not found...", name) + } +} diff --git a/go/stats/timings.go b/go/stats/timings.go index 38d3c77f749..ea04f6d4670 100644 --- a/go/stats/timings.go +++ b/go/stats/timings.go @@ -18,6 +18,7 @@ package stats import ( "encoding/json" + "flag" "fmt" "sync" "time" @@ -25,6 +26,8 @@ import ( "vitess.io/vitess/go/sync2" ) +var timingsBufferSize = flag.Int("timings_buffer_size", 5, "RingFloat64 capacity used by Timings stats") + // Timings is meant to tracks timing data // by named categories as well as histograms. type Timings struct { @@ -33,6 +36,7 @@ type Timings struct { mu sync.RWMutex histograms map[string]*Histogram + buffers map[string]*RingInt64 help string label string @@ -44,14 +48,17 @@ type Timings struct { // Categories that aren't initialized will be missing from the map until the // first time they are updated. func NewTimings(name, help, label string, categories ...string) *Timings { + bufferSize := *timingsBufferSize t := &Timings{ histograms: make(map[string]*Histogram), + buffers: make(map[string]*RingInt64), help: help, label: label, labelCombined: IsDimensionCombined(label), } for _, cat := range categories { t.histograms[cat] = NewGenericHistogram("", "", bucketCutoffs, bucketLabels, "Count", "Time") + t.buffers[cat] = NewRingInt64(bufferSize) } if name != "" { publish(name, t) @@ -64,6 +71,7 @@ func NewTimings(name, help, label string, categories ...string) *Timings { func (t *Timings) Reset() { t.mu.RLock() t.histograms = make(map[string]*Histogram) + t.buffers = make(map[string]*RingInt64) t.mu.RUnlock() } @@ -92,6 +100,27 @@ func (t *Timings) Add(name string, elapsed time.Duration) { hist.Add(elapsedNs) t.totalCount.Add(1) t.totalTime.Add(elapsedNs) + + if *timingsBufferSize > 0 { + bufferSize := *timingsBufferSize + // Get existing buffer + t.mu.RLock() + buffer, ok := t.buffers[name] + t.mu.RUnlock() + + // Create buffer if it does not exist. + if !ok { + t.mu.Lock() + buffer, ok = t.buffers[name] + if !ok { + buffer = NewRingInt64(bufferSize) + t.buffers[name] = buffer + } + t.mu.Unlock() + } + + buffer.Add(elapsedNs) + } } // Record is a convenience function that records completion @@ -112,10 +141,12 @@ func (t *Timings) String() string { TotalCount int64 TotalTime int64 Histograms map[string]*Histogram + Buffers map[string]*RingInt64 }{ t.totalCount.Get(), t.totalTime.Get(), t.histograms, + t.buffers, } data, err := json.Marshal(tm) @@ -125,6 +156,17 @@ func (t *Timings) String() string { return string(data) } +// Buffers returns a map pointing at the buffers. +func (t *Timings) Buffers() (b map[string]*RingInt64) { + t.mu.RLock() + defer t.mu.RUnlock() + b = make(map[string]*RingInt64, len(t.buffers)) + for k, v := range t.buffers { + b[k] = v + } + return +} + // Histograms returns a map pointing at the histograms. func (t *Timings) Histograms() (h map[string]*Histogram) { t.mu.RLock() @@ -201,6 +243,7 @@ func NewMultiTimings(name string, help string, labels []string) *MultiTimings { t := &MultiTimings{ Timings: Timings{ histograms: make(map[string]*Histogram), + buffers: make(map[string]*RingInt64), help: help, }, labels: labels, From b5506f7497e900ebc903369e7369283b80581e65 Mon Sep 17 00:00:00 2001 From: crowu Date: Mon, 23 Nov 2020 19:11:54 -0800 Subject: [PATCH 2/6] ignore metrics that does not make sense to statsd Signed-off-by: crowu --- go/stats/statsd/statsd.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/stats/statsd/statsd.go b/go/stats/statsd/statsd.go index 599802ddd8d..c8c05ed6c14 100644 --- a/go/stats/statsd/statsd.go +++ b/go/stats/statsd/statsd.go @@ -200,9 +200,9 @@ func (sb StatsBackend) addExpVar(kv expvar.KeyValue) { } } } - } else { - log.Warningf("Silently ignore metrics with key %v [%T]", k, kv.Value) } + case *stats.StringMapFunc, *stats.Rates, *stats.RatesFunc: + // Silently ignore metrics that does not make sense to be exported to statsd default: log.Warningf("Silently ignore metrics with key %v [%T]", k, kv.Value) } From 308772c9684189230efc4e7224d7a336ffac9e6f Mon Sep 17 00:00:00 2001 From: crowu Date: Mon, 23 Nov 2020 20:39:37 -0800 Subject: [PATCH 3/6] fix test Signed-off-by: crowu --- go/stats/ring.go | 14 ++++++++++++++ go/stats/timings_test.go | 12 ++++++------ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/go/stats/ring.go b/go/stats/ring.go index 269f28675b1..e2a82bf1f07 100644 --- a/go/stats/ring.go +++ b/go/stats/ring.go @@ -16,6 +16,12 @@ limitations under the License. package stats +import ( + "bytes" + "encoding/json" + "fmt" +) + // Ring of int64 values // Not thread safe type RingInt64 struct { @@ -43,3 +49,11 @@ func (ri *RingInt64) Values() (values []int64) { } return values } + +// MarshalJSON returns a JSON representation of the RingInt64. +func (ri *RingInt64) MarshalJSON() ([]byte, error) { + b := bytes.NewBuffer(make([]byte, 0, 4096)) + s, _ := json.Marshal(ri.values) + fmt.Fprintf(b, "%v", string(s)) + return b.Bytes(), nil +} diff --git a/go/stats/timings_test.go b/go/stats/timings_test.go index c43e6a61683..b4772002efc 100644 --- a/go/stats/timings_test.go +++ b/go/stats/timings_test.go @@ -31,7 +31,7 @@ func TestTimings(t *testing.T) { tm.Add("tag1", 500*time.Microsecond) tm.Add("tag1", 1*time.Millisecond) tm.Add("tag2", 1*time.Millisecond) - want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}}}` + want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}},"Buffers":{"tag1":[500000,1000000],"tag2":[1000000]}}` if got := tm.String(); got != want { t.Errorf("got %s, want %s", got, want) } @@ -43,7 +43,7 @@ func TestMultiTimings(t *testing.T) { mtm.Add([]string{"tag1a", "tag1b"}, 500*time.Microsecond) mtm.Add([]string{"tag1a", "tag1b"}, 1*time.Millisecond) mtm.Add([]string{"tag2a", "tag2b"}, 1*time.Millisecond) - want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1a.tag1b":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2a.tag2b":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}}}` + want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1a.tag1b":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2a.tag2b":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}},"Buffers":{"tag1a.tag1b":[500000,1000000],"tag2a.tag2b":[1000000]}}` if got := mtm.String(); got != want { t.Errorf("got %s, want %s", got, want) } @@ -55,7 +55,7 @@ func TestMultiTimingsDot(t *testing.T) { mtm.Add([]string{"value.dot"}, 500*time.Microsecond) safe := safeLabel("value.dot") safeJSON := strings.Replace(safe, "\\", "\\\\", -1) - want := `{"TotalCount":1,"TotalTime":500000,"Histograms":{"` + safeJSON + `":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":500000}}}` + want := `{"TotalCount":1,"TotalTime":500000,"Histograms":{"` + safeJSON + `":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":500000}},"Buffers":{"value_dot":[500000]}}` if got := mtm.String(); got != want { t.Errorf("got %s, want %s", got, want) } @@ -86,16 +86,16 @@ func TestTimingsCombineDimension(t *testing.T) { t1 := NewTimings("timing_combine_dim1", "help", "label") t1.Add("t1", 1*time.Nanosecond) - want := `{"TotalCount":1,"TotalTime":1,"Histograms":{"t1":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}}}` + want := `{"TotalCount":1,"TotalTime":1,"Histograms":{"t1":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}},"Buffers":{"t1":[1]}}` assert.Equal(t, want, t1.String()) t2 := NewTimings("timing_combine_dim2", "help", "a") t2.Add("t1", 1) - want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}}}` + want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}},"Buffers":{"all":[1]}}` assert.Equal(t, want, t2.String()) t3 := NewMultiTimings("timing_combine_dim3", "help", []string{"a", "b", "c"}) t3.Add([]string{"c1", "c2", "c3"}, 1) - want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all.c2.all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}}}` + want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all.c2.all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}},"Buffers":{"all.c2.all":[1]}}` assert.Equal(t, want, t3.String()) } From 2c9b9592691db44eeb14eb44e9352c5d4c50dd94 Mon Sep 17 00:00:00 2001 From: crowu Date: Mon, 23 Nov 2020 22:09:20 -0800 Subject: [PATCH 4/6] race condition Signed-off-by: crowu --- go/stats/ring.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/go/stats/ring.go b/go/stats/ring.go index e2a82bf1f07..a0bd10ad23c 100644 --- a/go/stats/ring.go +++ b/go/stats/ring.go @@ -20,13 +20,16 @@ import ( "bytes" "encoding/json" "fmt" + "sync" + "sync/atomic" ) // Ring of int64 values // Not thread safe type RingInt64 struct { - position int + position int64 values []int64 + mu sync.RWMutex } func NewRingInt64(capacity int) *RingInt64 { @@ -34,18 +37,23 @@ func NewRingInt64(capacity int) *RingInt64 { } func (ri *RingInt64) Add(val int64) { - if len(ri.values) == cap(ri.values) { + if int(ri.position) == cap(ri.values)-1 { + ri.mu.Lock() ri.values[ri.position] = val - ri.position = (ri.position + 1) % cap(ri.values) + ri.position = (ri.position + 1) % int64(cap(ri.values)) + ri.mu.Unlock() } else { - ri.values = append(ri.values, val) + // add 1 atomically so that next call will see the most up to update position + pos := int(atomic.AddInt64(&ri.position, 1)) + ri.values[pos-1] = val } } func (ri *RingInt64) Values() (values []int64) { + pos := int(ri.position) values = make([]int64, len(ri.values)) for i := 0; i < len(ri.values); i++ { - values[i] = ri.values[(ri.position+i)%cap(ri.values)] + values[i] = ri.values[(pos+i)%cap(ri.values)] } return values } From 03740a68080b94d1a8945e29141d5babf666b6a9 Mon Sep 17 00:00:00 2001 From: crowu Date: Tue, 24 Nov 2020 00:57:31 -0800 Subject: [PATCH 5/6] export timing as histogram Signed-off-by: crowu --- go/stats/ring.go | 32 +++---------------- go/stats/statsd/statsd.go | 57 +++++++++++++++++++--------------- go/stats/statsd/statsd_test.go | 49 ++++++++++++++++++++--------- go/stats/timings.go | 43 ------------------------- go/stats/timings_test.go | 12 +++---- 5 files changed, 77 insertions(+), 116 deletions(-) diff --git a/go/stats/ring.go b/go/stats/ring.go index a0bd10ad23c..269f28675b1 100644 --- a/go/stats/ring.go +++ b/go/stats/ring.go @@ -16,20 +16,11 @@ limitations under the License. package stats -import ( - "bytes" - "encoding/json" - "fmt" - "sync" - "sync/atomic" -) - // Ring of int64 values // Not thread safe type RingInt64 struct { - position int64 + position int values []int64 - mu sync.RWMutex } func NewRingInt64(capacity int) *RingInt64 { @@ -37,31 +28,18 @@ func NewRingInt64(capacity int) *RingInt64 { } func (ri *RingInt64) Add(val int64) { - if int(ri.position) == cap(ri.values)-1 { - ri.mu.Lock() + if len(ri.values) == cap(ri.values) { ri.values[ri.position] = val - ri.position = (ri.position + 1) % int64(cap(ri.values)) - ri.mu.Unlock() + ri.position = (ri.position + 1) % cap(ri.values) } else { - // add 1 atomically so that next call will see the most up to update position - pos := int(atomic.AddInt64(&ri.position, 1)) - ri.values[pos-1] = val + ri.values = append(ri.values, val) } } func (ri *RingInt64) Values() (values []int64) { - pos := int(ri.position) values = make([]int64, len(ri.values)) for i := 0; i < len(ri.values); i++ { - values[i] = ri.values[(pos+i)%cap(ri.values)] + values[i] = ri.values[(ri.position+i)%cap(ri.values)] } return values } - -// MarshalJSON returns a JSON representation of the RingInt64. -func (ri *RingInt64) MarshalJSON() ([]byte, error) { - b := bytes.NewBuffer(make([]byte, 0, 4096)) - s, _ := json.Marshal(ri.values) - fmt.Fprintf(b, "%v", string(s)) - return b.Bytes(), nil -} diff --git a/go/stats/statsd/statsd.go b/go/stats/statsd/statsd.go index c8c05ed6c14..61f33f9acca 100644 --- a/go/stats/statsd/statsd.go +++ b/go/stats/statsd/statsd.go @@ -45,6 +45,25 @@ func makeLabels(labelNames []string, labelValsCombined string) []string { return tags } +func (sb StatsBackend) addHistogram(name string, h *stats.Histogram, tags []string) { + labels := h.Labels() + buckets := h.Buckets() + for i := range labels { + name := fmt.Sprintf("%s.%s", name, labels[i]) + sb.statsdClient.Gauge(name, float64(buckets[i]), tags, sb.sampleRate) + } + sb.statsdClient.Gauge(fmt.Sprintf("%s.%s", name, h.CountLabel()), + (float64)(h.Count()), + tags, + sb.sampleRate, + ) + sb.statsdClient.Gauge(fmt.Sprintf("%s.%s", name, h.TotalLabel()), + (float64)(h.Total()), + tags, + sb.sampleRate, + ) +} + // Init initializes the statsd with the given namespace. func Init(namespace string) { servenv.OnRun(func() { @@ -141,35 +160,23 @@ func (sb StatsBackend) addExpVar(kv expvar.KeyValue) { } case *stats.MultiTimings: labels := v.Labels() - buffers := v.Buffers() - for labelValsCombined, buffer := range buffers { - tags := makeLabels(labels, labelValsCombined) - for _, elapsedNs := range buffer.Values() { - if err := sb.statsdClient.TimeInMilliseconds(k, float64(elapsedNs)/1000.0/1000.0, tags, sb.sampleRate); err != nil { - log.Errorf("Failed to add TimeInMilliseconds %v for key %v", buffer.Values(), k) - } - } + hists := v.Histograms() + for labelValsCombined, histogram := range hists { + sb.addHistogram(k, histogram, makeLabels(labels, labelValsCombined)) } case *stats.Timings: - label := v.Label() - buffers := v.Buffers() - for labelValsCombined, buffer := range buffers { - tags := makeLabel(label, labelValsCombined) - for _, elapsedNs := range buffer.Values() { - if err := sb.statsdClient.TimeInMilliseconds(k, float64(elapsedNs)/1000.0/1000.0, tags, sb.sampleRate); err != nil { - log.Errorf("Failed to add TimeInMilliseconds %v for key %v", buffer.Values(), k) - } - } + // TODO: for statsd.timing metrics, there is no good way to transfer the histogram to it + // If we store a in memory buffer for stats.Timings and flush it here it's hard to make the stats + // thread safe. + // Instead, we export the timings stats as histogram here. We won't have the percentile breakdown + // for the metrics, but we can still get the average from total and count + labels := []string{v.Label()} + hists := v.Histograms() + for labelValsCombined, histogram := range hists { + sb.addHistogram(k, histogram, makeLabels(labels, labelValsCombined)) } case *stats.Histogram: - labels := v.Labels() - buckets := v.Buckets() - for i := range labels { - name := fmt.Sprintf("%s.%s", k, labels[i]) - if err := sb.statsdClient.Count(name, buckets[i], []string{}, sb.sampleRate); err != nil { - log.Errorf("Failed to add Histogram %v for key %v", buckets[i], name) - } - } + sb.addHistogram(k, v, []string{}) case expvar.Func: // Export memstats as gauge so that we don't need to call extra ReadMemStats if k == "memstats" { diff --git a/go/stats/statsd/statsd_test.go b/go/stats/statsd/statsd_test.go index 0f9852c6f4e..18f41da8cc3 100644 --- a/go/stats/statsd/statsd_test.go +++ b/go/stats/statsd/statsd_test.go @@ -3,6 +3,7 @@ package statsd import ( "expvar" "net" + "sort" "strings" "testing" "time" @@ -192,10 +193,12 @@ func TestStatsdCountersWithSingleLabel(t *testing.T) { } result := string(bytes[:n]) expected := []string{ - "test.counter_with_single_label_name:2|c|#label:tag1", "test.counter_with_single_label_name:0|c|#label:tag2", + "test.counter_with_single_label_name:2|c|#label:tag1", } - for i, res := range strings.Split(result, "\n") { + res := strings.Split(result, "\n") + sort.Strings(res) + for i, res := range res { assert.Equal(t, res, expected[i]) } } @@ -329,7 +332,9 @@ func TestStatsdGaugesFuncWithMultiLabels(t *testing.T) { "test.gauges_func_with_multiple_labels_name:1.000000|g|#label1:foo,label2:bar", "test.gauges_func_with_multiple_labels_name:2.000000|g|#label1:bar,label2:baz", } - for i, res := range strings.Split(result, "\n") { + res := strings.Split(result, "\n") + sort.Strings(res) + for i, res := range res { assert.Equal(t, res, expected[i]) } } @@ -374,7 +379,6 @@ func TestStatsdMultiTimings(t *testing.T) { name := "multi_timings_name" s := stats.NewMultiTimings(name, "help", []string{"label1", "label2"}) s.Add([]string{"foo", "bar"}, 10*time.Millisecond) - s.Add([]string{"foo", "bar"}, 2*time.Millisecond) found := false expvar.Do(func(kv expvar.KeyValue) { if kv.Key == name { @@ -383,15 +387,22 @@ func TestStatsdMultiTimings(t *testing.T) { if err := sb.statsdClient.Flush(); err != nil { t.Errorf("Error flushing: %s", err) } - bytes := make([]byte, 4096) + bytes := make([]byte, 49152) n, err := server.Read(bytes) if err != nil { t.Fatal(err) } result := string(bytes[:n]) expected := []string{ - "test.multi_timings_name:10.000000|ms|#label1:foo,label2:bar", - "test.multi_timings_name:2.000000|ms|#label1:foo,label2:bar", + "test.multi_timings_name.500000:0.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.1000000:0.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.5000000:0.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.10000000:1.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.50000000:0.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.100000000:0.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.500000000:0.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.1000000000:0.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.5000000000:0.000000|g|#label1:foo,label2:bar", } for i, res := range strings.Split(result, "\n") { assert.Equal(t, res, expected[i]) @@ -409,7 +420,6 @@ func TestStatsdTimings(t *testing.T) { name := "timings_name" s := stats.NewTimings(name, "help", "label1") s.Add("foo", 2*time.Millisecond) - s.Add("bar", 10*time.Millisecond) found := false expvar.Do(func(kv expvar.KeyValue) { if kv.Key == name { @@ -418,15 +428,22 @@ func TestStatsdTimings(t *testing.T) { if err := sb.statsdClient.Flush(); err != nil { t.Errorf("Error flushing: %s", err) } - bytes := make([]byte, 12288) + bytes := make([]byte, 49152) n, err := server.Read(bytes) if err != nil { t.Fatal(err) } result := string(bytes[:n]) expected := []string{ - "test.timings_name:2.000000|ms|#label1:foo", - "test.timings_name:10.000000|ms|#label1:bar", + "test.timings_name.500000:0.000000|g|#label1:foo", + "test.timings_name.1000000:0.000000|g|#label1:foo", + "test.timings_name.5000000:1.000000|g|#label1:foo", + "test.timings_name.10000000:0.000000|g|#label1:foo", + "test.timings_name.50000000:0.000000|g|#label1:foo", + "test.timings_name.100000000:0.000000|g|#label1:foo", + "test.timings_name.500000000:0.000000|g|#label1:foo", + "test.timings_name.1000000000:0.000000|g|#label1:foo", + "test.timings_name.5000000000:0.000000|g|#label1:foo", } for i, res := range strings.Split(result, "\n") { assert.Equal(t, res, expected[i]) @@ -461,10 +478,12 @@ func TestStatsdHistogram(t *testing.T) { } result := string(bytes[:n]) expected := []string{ - "test.histogram_name.1:0|c", - "test.histogram_name.5:2|c", - "test.histogram_name.10:1|c", - "test.histogram_name.inf:0|c", + "test.histogram_name.1:0.000000|g", + "test.histogram_name.5:2.000000|g", + "test.histogram_name.10:1.000000|g", + "test.histogram_name.inf:0.000000|g", + "test.histogram_name.Count:3.000000|g", + "test.histogram_name.Total:11.000000|g", } for i, res := range strings.Split(result, "\n") { assert.Equal(t, res, expected[i]) diff --git a/go/stats/timings.go b/go/stats/timings.go index ea04f6d4670..38d3c77f749 100644 --- a/go/stats/timings.go +++ b/go/stats/timings.go @@ -18,7 +18,6 @@ package stats import ( "encoding/json" - "flag" "fmt" "sync" "time" @@ -26,8 +25,6 @@ import ( "vitess.io/vitess/go/sync2" ) -var timingsBufferSize = flag.Int("timings_buffer_size", 5, "RingFloat64 capacity used by Timings stats") - // Timings is meant to tracks timing data // by named categories as well as histograms. type Timings struct { @@ -36,7 +33,6 @@ type Timings struct { mu sync.RWMutex histograms map[string]*Histogram - buffers map[string]*RingInt64 help string label string @@ -48,17 +44,14 @@ type Timings struct { // Categories that aren't initialized will be missing from the map until the // first time they are updated. func NewTimings(name, help, label string, categories ...string) *Timings { - bufferSize := *timingsBufferSize t := &Timings{ histograms: make(map[string]*Histogram), - buffers: make(map[string]*RingInt64), help: help, label: label, labelCombined: IsDimensionCombined(label), } for _, cat := range categories { t.histograms[cat] = NewGenericHistogram("", "", bucketCutoffs, bucketLabels, "Count", "Time") - t.buffers[cat] = NewRingInt64(bufferSize) } if name != "" { publish(name, t) @@ -71,7 +64,6 @@ func NewTimings(name, help, label string, categories ...string) *Timings { func (t *Timings) Reset() { t.mu.RLock() t.histograms = make(map[string]*Histogram) - t.buffers = make(map[string]*RingInt64) t.mu.RUnlock() } @@ -100,27 +92,6 @@ func (t *Timings) Add(name string, elapsed time.Duration) { hist.Add(elapsedNs) t.totalCount.Add(1) t.totalTime.Add(elapsedNs) - - if *timingsBufferSize > 0 { - bufferSize := *timingsBufferSize - // Get existing buffer - t.mu.RLock() - buffer, ok := t.buffers[name] - t.mu.RUnlock() - - // Create buffer if it does not exist. - if !ok { - t.mu.Lock() - buffer, ok = t.buffers[name] - if !ok { - buffer = NewRingInt64(bufferSize) - t.buffers[name] = buffer - } - t.mu.Unlock() - } - - buffer.Add(elapsedNs) - } } // Record is a convenience function that records completion @@ -141,12 +112,10 @@ func (t *Timings) String() string { TotalCount int64 TotalTime int64 Histograms map[string]*Histogram - Buffers map[string]*RingInt64 }{ t.totalCount.Get(), t.totalTime.Get(), t.histograms, - t.buffers, } data, err := json.Marshal(tm) @@ -156,17 +125,6 @@ func (t *Timings) String() string { return string(data) } -// Buffers returns a map pointing at the buffers. -func (t *Timings) Buffers() (b map[string]*RingInt64) { - t.mu.RLock() - defer t.mu.RUnlock() - b = make(map[string]*RingInt64, len(t.buffers)) - for k, v := range t.buffers { - b[k] = v - } - return -} - // Histograms returns a map pointing at the histograms. func (t *Timings) Histograms() (h map[string]*Histogram) { t.mu.RLock() @@ -243,7 +201,6 @@ func NewMultiTimings(name string, help string, labels []string) *MultiTimings { t := &MultiTimings{ Timings: Timings{ histograms: make(map[string]*Histogram), - buffers: make(map[string]*RingInt64), help: help, }, labels: labels, diff --git a/go/stats/timings_test.go b/go/stats/timings_test.go index b4772002efc..c43e6a61683 100644 --- a/go/stats/timings_test.go +++ b/go/stats/timings_test.go @@ -31,7 +31,7 @@ func TestTimings(t *testing.T) { tm.Add("tag1", 500*time.Microsecond) tm.Add("tag1", 1*time.Millisecond) tm.Add("tag2", 1*time.Millisecond) - want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}},"Buffers":{"tag1":[500000,1000000],"tag2":[1000000]}}` + want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}}}` if got := tm.String(); got != want { t.Errorf("got %s, want %s", got, want) } @@ -43,7 +43,7 @@ func TestMultiTimings(t *testing.T) { mtm.Add([]string{"tag1a", "tag1b"}, 500*time.Microsecond) mtm.Add([]string{"tag1a", "tag1b"}, 1*time.Millisecond) mtm.Add([]string{"tag2a", "tag2b"}, 1*time.Millisecond) - want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1a.tag1b":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2a.tag2b":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}},"Buffers":{"tag1a.tag1b":[500000,1000000],"tag2a.tag2b":[1000000]}}` + want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1a.tag1b":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2a.tag2b":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}}}` if got := mtm.String(); got != want { t.Errorf("got %s, want %s", got, want) } @@ -55,7 +55,7 @@ func TestMultiTimingsDot(t *testing.T) { mtm.Add([]string{"value.dot"}, 500*time.Microsecond) safe := safeLabel("value.dot") safeJSON := strings.Replace(safe, "\\", "\\\\", -1) - want := `{"TotalCount":1,"TotalTime":500000,"Histograms":{"` + safeJSON + `":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":500000}},"Buffers":{"value_dot":[500000]}}` + want := `{"TotalCount":1,"TotalTime":500000,"Histograms":{"` + safeJSON + `":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":500000}}}` if got := mtm.String(); got != want { t.Errorf("got %s, want %s", got, want) } @@ -86,16 +86,16 @@ func TestTimingsCombineDimension(t *testing.T) { t1 := NewTimings("timing_combine_dim1", "help", "label") t1.Add("t1", 1*time.Nanosecond) - want := `{"TotalCount":1,"TotalTime":1,"Histograms":{"t1":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}},"Buffers":{"t1":[1]}}` + want := `{"TotalCount":1,"TotalTime":1,"Histograms":{"t1":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}}}` assert.Equal(t, want, t1.String()) t2 := NewTimings("timing_combine_dim2", "help", "a") t2.Add("t1", 1) - want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}},"Buffers":{"all":[1]}}` + want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}}}` assert.Equal(t, want, t2.String()) t3 := NewMultiTimings("timing_combine_dim3", "help", []string{"a", "b", "c"}) t3.Add([]string{"c1", "c2", "c3"}, 1) - want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all.c2.all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}},"Buffers":{"all.c2.all":[1]}}` + want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all.c2.all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}}}` assert.Equal(t, want, t3.String()) } From 239b8688d47440fe4c128ed9d3647a741ded0f8a Mon Sep 17 00:00:00 2001 From: crowu Date: Tue, 24 Nov 2020 09:07:14 -0800 Subject: [PATCH 6/6] sort to avoid flacky test Signed-off-by: crowu --- go/stats/statsd/statsd_test.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/go/stats/statsd/statsd_test.go b/go/stats/statsd/statsd_test.go index 18f41da8cc3..9d115bd44a4 100644 --- a/go/stats/statsd/statsd_test.go +++ b/go/stats/statsd/statsd_test.go @@ -191,14 +191,13 @@ func TestStatsdCountersWithSingleLabel(t *testing.T) { if err != nil { t.Fatal(err) } - result := string(bytes[:n]) + result := strings.Split(string(bytes[:n]), "\n") + sort.Strings(result) expected := []string{ "test.counter_with_single_label_name:0|c|#label:tag2", "test.counter_with_single_label_name:2|c|#label:tag1", } - res := strings.Split(result, "\n") - sort.Strings(res) - for i, res := range res { + for i, res := range result { assert.Equal(t, res, expected[i]) } } @@ -260,12 +259,13 @@ func TestStatsdCountersFuncWithMultiLabels(t *testing.T) { if err != nil { t.Fatal(err) } - result := string(bytes[:n]) + result := strings.Split(string(bytes[:n]), "\n") + sort.Strings(result) expected := []string{ "test.counter_func_with_multiple_labels_name:1|c|#label1:foo,label2:bar", "test.counter_func_with_multiple_labels_name:2|c|#label1:bar,label2:baz", } - for i, res := range strings.Split(result, "\n") { + for i, res := range result { assert.Equal(t, res, expected[i]) } } @@ -327,14 +327,13 @@ func TestStatsdGaugesFuncWithMultiLabels(t *testing.T) { if err != nil { t.Fatal(err) } - result := string(bytes[:n]) + result := strings.Split(string(bytes[:n]), "\n") + sort.Strings(result) expected := []string{ "test.gauges_func_with_multiple_labels_name:1.000000|g|#label1:foo,label2:bar", "test.gauges_func_with_multiple_labels_name:2.000000|g|#label1:bar,label2:baz", } - res := strings.Split(result, "\n") - sort.Strings(res) - for i, res := range res { + for i, res := range result { assert.Equal(t, res, expected[i]) } }