diff --git a/go/stats/ring.go b/go/stats/ring.go index a0bd10ad23c..269f28675b1 100644 --- a/go/stats/ring.go +++ b/go/stats/ring.go @@ -16,20 +16,11 @@ limitations under the License. package stats -import ( - "bytes" - "encoding/json" - "fmt" - "sync" - "sync/atomic" -) - // Ring of int64 values // Not thread safe type RingInt64 struct { - position int64 + position int values []int64 - mu sync.RWMutex } func NewRingInt64(capacity int) *RingInt64 { @@ -37,31 +28,18 @@ func NewRingInt64(capacity int) *RingInt64 { } func (ri *RingInt64) Add(val int64) { - if int(ri.position) == cap(ri.values)-1 { - ri.mu.Lock() + if len(ri.values) == cap(ri.values) { ri.values[ri.position] = val - ri.position = (ri.position + 1) % int64(cap(ri.values)) - ri.mu.Unlock() + ri.position = (ri.position + 1) % cap(ri.values) } else { - // add 1 atomically so that next call will see the most up to update position - pos := int(atomic.AddInt64(&ri.position, 1)) - ri.values[pos-1] = val + ri.values = append(ri.values, val) } } func (ri *RingInt64) Values() (values []int64) { - pos := int(ri.position) values = make([]int64, len(ri.values)) for i := 0; i < len(ri.values); i++ { - values[i] = ri.values[(pos+i)%cap(ri.values)] + values[i] = ri.values[(ri.position+i)%cap(ri.values)] } return values } - -// MarshalJSON returns a JSON representation of the RingInt64. -func (ri *RingInt64) MarshalJSON() ([]byte, error) { - b := bytes.NewBuffer(make([]byte, 0, 4096)) - s, _ := json.Marshal(ri.values) - fmt.Fprintf(b, "%v", string(s)) - return b.Bytes(), nil -} diff --git a/go/stats/statsd/statsd.go b/go/stats/statsd/statsd.go index c8c05ed6c14..61f33f9acca 100644 --- a/go/stats/statsd/statsd.go +++ b/go/stats/statsd/statsd.go @@ -45,6 +45,25 @@ func makeLabels(labelNames []string, labelValsCombined string) []string { return tags } +func (sb StatsBackend) addHistogram(name string, h *stats.Histogram, tags []string) { + labels := h.Labels() + buckets := h.Buckets() + for i := range labels { + name := fmt.Sprintf("%s.%s", name, labels[i]) + sb.statsdClient.Gauge(name, float64(buckets[i]), tags, sb.sampleRate) + } + sb.statsdClient.Gauge(fmt.Sprintf("%s.%s", name, h.CountLabel()), + (float64)(h.Count()), + tags, + sb.sampleRate, + ) + sb.statsdClient.Gauge(fmt.Sprintf("%s.%s", name, h.TotalLabel()), + (float64)(h.Total()), + tags, + sb.sampleRate, + ) +} + // Init initializes the statsd with the given namespace. func Init(namespace string) { servenv.OnRun(func() { @@ -141,35 +160,23 @@ func (sb StatsBackend) addExpVar(kv expvar.KeyValue) { } case *stats.MultiTimings: labels := v.Labels() - buffers := v.Buffers() - for labelValsCombined, buffer := range buffers { - tags := makeLabels(labels, labelValsCombined) - for _, elapsedNs := range buffer.Values() { - if err := sb.statsdClient.TimeInMilliseconds(k, float64(elapsedNs)/1000.0/1000.0, tags, sb.sampleRate); err != nil { - log.Errorf("Failed to add TimeInMilliseconds %v for key %v", buffer.Values(), k) - } - } + hists := v.Histograms() + for labelValsCombined, histogram := range hists { + sb.addHistogram(k, histogram, makeLabels(labels, labelValsCombined)) } case *stats.Timings: - label := v.Label() - buffers := v.Buffers() - for labelValsCombined, buffer := range buffers { - tags := makeLabel(label, labelValsCombined) - for _, elapsedNs := range buffer.Values() { - if err := sb.statsdClient.TimeInMilliseconds(k, float64(elapsedNs)/1000.0/1000.0, tags, sb.sampleRate); err != nil { - log.Errorf("Failed to add TimeInMilliseconds %v for key %v", buffer.Values(), k) - } - } + // TODO: for statsd.timing metrics, there is no good way to transfer the histogram to it + // If we store a in memory buffer for stats.Timings and flush it here it's hard to make the stats + // thread safe. + // Instead, we export the timings stats as histogram here. We won't have the percentile breakdown + // for the metrics, but we can still get the average from total and count + labels := []string{v.Label()} + hists := v.Histograms() + for labelValsCombined, histogram := range hists { + sb.addHistogram(k, histogram, makeLabels(labels, labelValsCombined)) } case *stats.Histogram: - labels := v.Labels() - buckets := v.Buckets() - for i := range labels { - name := fmt.Sprintf("%s.%s", k, labels[i]) - if err := sb.statsdClient.Count(name, buckets[i], []string{}, sb.sampleRate); err != nil { - log.Errorf("Failed to add Histogram %v for key %v", buckets[i], name) - } - } + sb.addHistogram(k, v, []string{}) case expvar.Func: // Export memstats as gauge so that we don't need to call extra ReadMemStats if k == "memstats" { diff --git a/go/stats/statsd/statsd_test.go b/go/stats/statsd/statsd_test.go index 0f9852c6f4e..18f41da8cc3 100644 --- a/go/stats/statsd/statsd_test.go +++ b/go/stats/statsd/statsd_test.go @@ -3,6 +3,7 @@ package statsd import ( "expvar" "net" + "sort" "strings" "testing" "time" @@ -192,10 +193,12 @@ func TestStatsdCountersWithSingleLabel(t *testing.T) { } result := string(bytes[:n]) expected := []string{ - "test.counter_with_single_label_name:2|c|#label:tag1", "test.counter_with_single_label_name:0|c|#label:tag2", + "test.counter_with_single_label_name:2|c|#label:tag1", } - for i, res := range strings.Split(result, "\n") { + res := strings.Split(result, "\n") + sort.Strings(res) + for i, res := range res { assert.Equal(t, res, expected[i]) } } @@ -329,7 +332,9 @@ func TestStatsdGaugesFuncWithMultiLabels(t *testing.T) { "test.gauges_func_with_multiple_labels_name:1.000000|g|#label1:foo,label2:bar", "test.gauges_func_with_multiple_labels_name:2.000000|g|#label1:bar,label2:baz", } - for i, res := range strings.Split(result, "\n") { + res := strings.Split(result, "\n") + sort.Strings(res) + for i, res := range res { assert.Equal(t, res, expected[i]) } } @@ -374,7 +379,6 @@ func TestStatsdMultiTimings(t *testing.T) { name := "multi_timings_name" s := stats.NewMultiTimings(name, "help", []string{"label1", "label2"}) s.Add([]string{"foo", "bar"}, 10*time.Millisecond) - s.Add([]string{"foo", "bar"}, 2*time.Millisecond) found := false expvar.Do(func(kv expvar.KeyValue) { if kv.Key == name { @@ -383,15 +387,22 @@ func TestStatsdMultiTimings(t *testing.T) { if err := sb.statsdClient.Flush(); err != nil { t.Errorf("Error flushing: %s", err) } - bytes := make([]byte, 4096) + bytes := make([]byte, 49152) n, err := server.Read(bytes) if err != nil { t.Fatal(err) } result := string(bytes[:n]) expected := []string{ - "test.multi_timings_name:10.000000|ms|#label1:foo,label2:bar", - "test.multi_timings_name:2.000000|ms|#label1:foo,label2:bar", + "test.multi_timings_name.500000:0.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.1000000:0.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.5000000:0.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.10000000:1.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.50000000:0.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.100000000:0.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.500000000:0.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.1000000000:0.000000|g|#label1:foo,label2:bar", + "test.multi_timings_name.5000000000:0.000000|g|#label1:foo,label2:bar", } for i, res := range strings.Split(result, "\n") { assert.Equal(t, res, expected[i]) @@ -409,7 +420,6 @@ func TestStatsdTimings(t *testing.T) { name := "timings_name" s := stats.NewTimings(name, "help", "label1") s.Add("foo", 2*time.Millisecond) - s.Add("bar", 10*time.Millisecond) found := false expvar.Do(func(kv expvar.KeyValue) { if kv.Key == name { @@ -418,15 +428,22 @@ func TestStatsdTimings(t *testing.T) { if err := sb.statsdClient.Flush(); err != nil { t.Errorf("Error flushing: %s", err) } - bytes := make([]byte, 12288) + bytes := make([]byte, 49152) n, err := server.Read(bytes) if err != nil { t.Fatal(err) } result := string(bytes[:n]) expected := []string{ - "test.timings_name:2.000000|ms|#label1:foo", - "test.timings_name:10.000000|ms|#label1:bar", + "test.timings_name.500000:0.000000|g|#label1:foo", + "test.timings_name.1000000:0.000000|g|#label1:foo", + "test.timings_name.5000000:1.000000|g|#label1:foo", + "test.timings_name.10000000:0.000000|g|#label1:foo", + "test.timings_name.50000000:0.000000|g|#label1:foo", + "test.timings_name.100000000:0.000000|g|#label1:foo", + "test.timings_name.500000000:0.000000|g|#label1:foo", + "test.timings_name.1000000000:0.000000|g|#label1:foo", + "test.timings_name.5000000000:0.000000|g|#label1:foo", } for i, res := range strings.Split(result, "\n") { assert.Equal(t, res, expected[i]) @@ -461,10 +478,12 @@ func TestStatsdHistogram(t *testing.T) { } result := string(bytes[:n]) expected := []string{ - "test.histogram_name.1:0|c", - "test.histogram_name.5:2|c", - "test.histogram_name.10:1|c", - "test.histogram_name.inf:0|c", + "test.histogram_name.1:0.000000|g", + "test.histogram_name.5:2.000000|g", + "test.histogram_name.10:1.000000|g", + "test.histogram_name.inf:0.000000|g", + "test.histogram_name.Count:3.000000|g", + "test.histogram_name.Total:11.000000|g", } for i, res := range strings.Split(result, "\n") { assert.Equal(t, res, expected[i]) diff --git a/go/stats/timings.go b/go/stats/timings.go index ea04f6d4670..38d3c77f749 100644 --- a/go/stats/timings.go +++ b/go/stats/timings.go @@ -18,7 +18,6 @@ package stats import ( "encoding/json" - "flag" "fmt" "sync" "time" @@ -26,8 +25,6 @@ import ( "vitess.io/vitess/go/sync2" ) -var timingsBufferSize = flag.Int("timings_buffer_size", 5, "RingFloat64 capacity used by Timings stats") - // Timings is meant to tracks timing data // by named categories as well as histograms. type Timings struct { @@ -36,7 +33,6 @@ type Timings struct { mu sync.RWMutex histograms map[string]*Histogram - buffers map[string]*RingInt64 help string label string @@ -48,17 +44,14 @@ type Timings struct { // Categories that aren't initialized will be missing from the map until the // first time they are updated. func NewTimings(name, help, label string, categories ...string) *Timings { - bufferSize := *timingsBufferSize t := &Timings{ histograms: make(map[string]*Histogram), - buffers: make(map[string]*RingInt64), help: help, label: label, labelCombined: IsDimensionCombined(label), } for _, cat := range categories { t.histograms[cat] = NewGenericHistogram("", "", bucketCutoffs, bucketLabels, "Count", "Time") - t.buffers[cat] = NewRingInt64(bufferSize) } if name != "" { publish(name, t) @@ -71,7 +64,6 @@ func NewTimings(name, help, label string, categories ...string) *Timings { func (t *Timings) Reset() { t.mu.RLock() t.histograms = make(map[string]*Histogram) - t.buffers = make(map[string]*RingInt64) t.mu.RUnlock() } @@ -100,27 +92,6 @@ func (t *Timings) Add(name string, elapsed time.Duration) { hist.Add(elapsedNs) t.totalCount.Add(1) t.totalTime.Add(elapsedNs) - - if *timingsBufferSize > 0 { - bufferSize := *timingsBufferSize - // Get existing buffer - t.mu.RLock() - buffer, ok := t.buffers[name] - t.mu.RUnlock() - - // Create buffer if it does not exist. - if !ok { - t.mu.Lock() - buffer, ok = t.buffers[name] - if !ok { - buffer = NewRingInt64(bufferSize) - t.buffers[name] = buffer - } - t.mu.Unlock() - } - - buffer.Add(elapsedNs) - } } // Record is a convenience function that records completion @@ -141,12 +112,10 @@ func (t *Timings) String() string { TotalCount int64 TotalTime int64 Histograms map[string]*Histogram - Buffers map[string]*RingInt64 }{ t.totalCount.Get(), t.totalTime.Get(), t.histograms, - t.buffers, } data, err := json.Marshal(tm) @@ -156,17 +125,6 @@ func (t *Timings) String() string { return string(data) } -// Buffers returns a map pointing at the buffers. -func (t *Timings) Buffers() (b map[string]*RingInt64) { - t.mu.RLock() - defer t.mu.RUnlock() - b = make(map[string]*RingInt64, len(t.buffers)) - for k, v := range t.buffers { - b[k] = v - } - return -} - // Histograms returns a map pointing at the histograms. func (t *Timings) Histograms() (h map[string]*Histogram) { t.mu.RLock() @@ -243,7 +201,6 @@ func NewMultiTimings(name string, help string, labels []string) *MultiTimings { t := &MultiTimings{ Timings: Timings{ histograms: make(map[string]*Histogram), - buffers: make(map[string]*RingInt64), help: help, }, labels: labels, diff --git a/go/stats/timings_test.go b/go/stats/timings_test.go index b4772002efc..c43e6a61683 100644 --- a/go/stats/timings_test.go +++ b/go/stats/timings_test.go @@ -31,7 +31,7 @@ func TestTimings(t *testing.T) { tm.Add("tag1", 500*time.Microsecond) tm.Add("tag1", 1*time.Millisecond) tm.Add("tag2", 1*time.Millisecond) - want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}},"Buffers":{"tag1":[500000,1000000],"tag2":[1000000]}}` + want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}}}` if got := tm.String(); got != want { t.Errorf("got %s, want %s", got, want) } @@ -43,7 +43,7 @@ func TestMultiTimings(t *testing.T) { mtm.Add([]string{"tag1a", "tag1b"}, 500*time.Microsecond) mtm.Add([]string{"tag1a", "tag1b"}, 1*time.Millisecond) mtm.Add([]string{"tag2a", "tag2b"}, 1*time.Millisecond) - want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1a.tag1b":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2a.tag2b":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}},"Buffers":{"tag1a.tag1b":[500000,1000000],"tag2a.tag2b":[1000000]}}` + want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1a.tag1b":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2a.tag2b":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}}}` if got := mtm.String(); got != want { t.Errorf("got %s, want %s", got, want) } @@ -55,7 +55,7 @@ func TestMultiTimingsDot(t *testing.T) { mtm.Add([]string{"value.dot"}, 500*time.Microsecond) safe := safeLabel("value.dot") safeJSON := strings.Replace(safe, "\\", "\\\\", -1) - want := `{"TotalCount":1,"TotalTime":500000,"Histograms":{"` + safeJSON + `":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":500000}},"Buffers":{"value_dot":[500000]}}` + want := `{"TotalCount":1,"TotalTime":500000,"Histograms":{"` + safeJSON + `":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":500000}}}` if got := mtm.String(); got != want { t.Errorf("got %s, want %s", got, want) } @@ -86,16 +86,16 @@ func TestTimingsCombineDimension(t *testing.T) { t1 := NewTimings("timing_combine_dim1", "help", "label") t1.Add("t1", 1*time.Nanosecond) - want := `{"TotalCount":1,"TotalTime":1,"Histograms":{"t1":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}},"Buffers":{"t1":[1]}}` + want := `{"TotalCount":1,"TotalTime":1,"Histograms":{"t1":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}}}` assert.Equal(t, want, t1.String()) t2 := NewTimings("timing_combine_dim2", "help", "a") t2.Add("t1", 1) - want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}},"Buffers":{"all":[1]}}` + want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}}}` assert.Equal(t, want, t2.String()) t3 := NewMultiTimings("timing_combine_dim3", "help", []string{"a", "b", "c"}) t3.Add([]string{"c1", "c2", "c3"}, 1) - want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all.c2.all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}},"Buffers":{"all.c2.all":[1]}}` + want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all.c2.all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}}}` assert.Equal(t, want, t3.String()) }