Skip to content

Commit

Permalink
export timing as histogram
Browse files Browse the repository at this point in the history
Signed-off-by: crowu <y.wu4515@gmail.com>
  • Loading branch information
5antelope committed Nov 24, 2020
1 parent 2c9b959 commit 03740a6
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 116 deletions.
32 changes: 5 additions & 27 deletions go/stats/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,30 @@ limitations under the License.

package stats

import (
"bytes"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
)

// Ring of int64 values
// Not thread safe
type RingInt64 struct {
position int64
position int
values []int64
mu sync.RWMutex
}

func NewRingInt64(capacity int) *RingInt64 {
return &RingInt64{values: make([]int64, 0, capacity)}
}

func (ri *RingInt64) Add(val int64) {
if int(ri.position) == cap(ri.values)-1 {
ri.mu.Lock()
if len(ri.values) == cap(ri.values) {
ri.values[ri.position] = val
ri.position = (ri.position + 1) % int64(cap(ri.values))
ri.mu.Unlock()
ri.position = (ri.position + 1) % cap(ri.values)
} else {
// add 1 atomically so that next call will see the most up to update position
pos := int(atomic.AddInt64(&ri.position, 1))
ri.values[pos-1] = val
ri.values = append(ri.values, val)
}
}

func (ri *RingInt64) Values() (values []int64) {
pos := int(ri.position)
values = make([]int64, len(ri.values))
for i := 0; i < len(ri.values); i++ {
values[i] = ri.values[(pos+i)%cap(ri.values)]
values[i] = ri.values[(ri.position+i)%cap(ri.values)]
}
return values
}

// MarshalJSON returns a JSON representation of the RingInt64.
func (ri *RingInt64) MarshalJSON() ([]byte, error) {
b := bytes.NewBuffer(make([]byte, 0, 4096))
s, _ := json.Marshal(ri.values)
fmt.Fprintf(b, "%v", string(s))
return b.Bytes(), nil
}
57 changes: 32 additions & 25 deletions go/stats/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,25 @@ func makeLabels(labelNames []string, labelValsCombined string) []string {
return tags
}

func (sb StatsBackend) addHistogram(name string, h *stats.Histogram, tags []string) {
labels := h.Labels()
buckets := h.Buckets()
for i := range labels {
name := fmt.Sprintf("%s.%s", name, labels[i])
sb.statsdClient.Gauge(name, float64(buckets[i]), tags, sb.sampleRate)
}
sb.statsdClient.Gauge(fmt.Sprintf("%s.%s", name, h.CountLabel()),
(float64)(h.Count()),
tags,
sb.sampleRate,
)
sb.statsdClient.Gauge(fmt.Sprintf("%s.%s", name, h.TotalLabel()),
(float64)(h.Total()),
tags,
sb.sampleRate,
)
}

// Init initializes the statsd with the given namespace.
func Init(namespace string) {
servenv.OnRun(func() {
Expand Down Expand Up @@ -141,35 +160,23 @@ func (sb StatsBackend) addExpVar(kv expvar.KeyValue) {
}
case *stats.MultiTimings:
labels := v.Labels()
buffers := v.Buffers()
for labelValsCombined, buffer := range buffers {
tags := makeLabels(labels, labelValsCombined)
for _, elapsedNs := range buffer.Values() {
if err := sb.statsdClient.TimeInMilliseconds(k, float64(elapsedNs)/1000.0/1000.0, tags, sb.sampleRate); err != nil {
log.Errorf("Failed to add TimeInMilliseconds %v for key %v", buffer.Values(), k)
}
}
hists := v.Histograms()
for labelValsCombined, histogram := range hists {
sb.addHistogram(k, histogram, makeLabels(labels, labelValsCombined))
}
case *stats.Timings:
label := v.Label()
buffers := v.Buffers()
for labelValsCombined, buffer := range buffers {
tags := makeLabel(label, labelValsCombined)
for _, elapsedNs := range buffer.Values() {
if err := sb.statsdClient.TimeInMilliseconds(k, float64(elapsedNs)/1000.0/1000.0, tags, sb.sampleRate); err != nil {
log.Errorf("Failed to add TimeInMilliseconds %v for key %v", buffer.Values(), k)
}
}
// TODO: for statsd.timing metrics, there is no good way to transfer the histogram to it
// If we store a in memory buffer for stats.Timings and flush it here it's hard to make the stats
// thread safe.
// Instead, we export the timings stats as histogram here. We won't have the percentile breakdown
// for the metrics, but we can still get the average from total and count
labels := []string{v.Label()}
hists := v.Histograms()
for labelValsCombined, histogram := range hists {
sb.addHistogram(k, histogram, makeLabels(labels, labelValsCombined))
}
case *stats.Histogram:
labels := v.Labels()
buckets := v.Buckets()
for i := range labels {
name := fmt.Sprintf("%s.%s", k, labels[i])
if err := sb.statsdClient.Count(name, buckets[i], []string{}, sb.sampleRate); err != nil {
log.Errorf("Failed to add Histogram %v for key %v", buckets[i], name)
}
}
sb.addHistogram(k, v, []string{})
case expvar.Func:
// Export memstats as gauge so that we don't need to call extra ReadMemStats
if k == "memstats" {
Expand Down
49 changes: 34 additions & 15 deletions go/stats/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package statsd
import (
"expvar"
"net"
"sort"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -192,10 +193,12 @@ func TestStatsdCountersWithSingleLabel(t *testing.T) {
}
result := string(bytes[:n])
expected := []string{
"test.counter_with_single_label_name:2|c|#label:tag1",
"test.counter_with_single_label_name:0|c|#label:tag2",
"test.counter_with_single_label_name:2|c|#label:tag1",
}
for i, res := range strings.Split(result, "\n") {
res := strings.Split(result, "\n")
sort.Strings(res)
for i, res := range res {
assert.Equal(t, res, expected[i])
}
}
Expand Down Expand Up @@ -329,7 +332,9 @@ func TestStatsdGaugesFuncWithMultiLabels(t *testing.T) {
"test.gauges_func_with_multiple_labels_name:1.000000|g|#label1:foo,label2:bar",
"test.gauges_func_with_multiple_labels_name:2.000000|g|#label1:bar,label2:baz",
}
for i, res := range strings.Split(result, "\n") {
res := strings.Split(result, "\n")
sort.Strings(res)
for i, res := range res {
assert.Equal(t, res, expected[i])
}
}
Expand Down Expand Up @@ -374,7 +379,6 @@ func TestStatsdMultiTimings(t *testing.T) {
name := "multi_timings_name"
s := stats.NewMultiTimings(name, "help", []string{"label1", "label2"})
s.Add([]string{"foo", "bar"}, 10*time.Millisecond)
s.Add([]string{"foo", "bar"}, 2*time.Millisecond)
found := false
expvar.Do(func(kv expvar.KeyValue) {
if kv.Key == name {
Expand All @@ -383,15 +387,22 @@ func TestStatsdMultiTimings(t *testing.T) {
if err := sb.statsdClient.Flush(); err != nil {
t.Errorf("Error flushing: %s", err)
}
bytes := make([]byte, 4096)
bytes := make([]byte, 49152)
n, err := server.Read(bytes)
if err != nil {
t.Fatal(err)
}
result := string(bytes[:n])
expected := []string{
"test.multi_timings_name:10.000000|ms|#label1:foo,label2:bar",
"test.multi_timings_name:2.000000|ms|#label1:foo,label2:bar",
"test.multi_timings_name.500000:0.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.1000000:0.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.5000000:0.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.10000000:1.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.50000000:0.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.100000000:0.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.500000000:0.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.1000000000:0.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.5000000000:0.000000|g|#label1:foo,label2:bar",
}
for i, res := range strings.Split(result, "\n") {
assert.Equal(t, res, expected[i])
Expand All @@ -409,7 +420,6 @@ func TestStatsdTimings(t *testing.T) {
name := "timings_name"
s := stats.NewTimings(name, "help", "label1")
s.Add("foo", 2*time.Millisecond)
s.Add("bar", 10*time.Millisecond)
found := false
expvar.Do(func(kv expvar.KeyValue) {
if kv.Key == name {
Expand All @@ -418,15 +428,22 @@ func TestStatsdTimings(t *testing.T) {
if err := sb.statsdClient.Flush(); err != nil {
t.Errorf("Error flushing: %s", err)
}
bytes := make([]byte, 12288)
bytes := make([]byte, 49152)
n, err := server.Read(bytes)
if err != nil {
t.Fatal(err)
}
result := string(bytes[:n])
expected := []string{
"test.timings_name:2.000000|ms|#label1:foo",
"test.timings_name:10.000000|ms|#label1:bar",
"test.timings_name.500000:0.000000|g|#label1:foo",
"test.timings_name.1000000:0.000000|g|#label1:foo",
"test.timings_name.5000000:1.000000|g|#label1:foo",
"test.timings_name.10000000:0.000000|g|#label1:foo",
"test.timings_name.50000000:0.000000|g|#label1:foo",
"test.timings_name.100000000:0.000000|g|#label1:foo",
"test.timings_name.500000000:0.000000|g|#label1:foo",
"test.timings_name.1000000000:0.000000|g|#label1:foo",
"test.timings_name.5000000000:0.000000|g|#label1:foo",
}
for i, res := range strings.Split(result, "\n") {
assert.Equal(t, res, expected[i])
Expand Down Expand Up @@ -461,10 +478,12 @@ func TestStatsdHistogram(t *testing.T) {
}
result := string(bytes[:n])
expected := []string{
"test.histogram_name.1:0|c",
"test.histogram_name.5:2|c",
"test.histogram_name.10:1|c",
"test.histogram_name.inf:0|c",
"test.histogram_name.1:0.000000|g",
"test.histogram_name.5:2.000000|g",
"test.histogram_name.10:1.000000|g",
"test.histogram_name.inf:0.000000|g",
"test.histogram_name.Count:3.000000|g",
"test.histogram_name.Total:11.000000|g",
}
for i, res := range strings.Split(result, "\n") {
assert.Equal(t, res, expected[i])
Expand Down
43 changes: 0 additions & 43 deletions go/stats/timings.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@ package stats

import (
"encoding/json"
"flag"
"fmt"
"sync"
"time"

"vitess.io/vitess/go/sync2"
)

var timingsBufferSize = flag.Int("timings_buffer_size", 5, "RingFloat64 capacity used by Timings stats")

// Timings is meant to tracks timing data
// by named categories as well as histograms.
type Timings struct {
Expand All @@ -36,7 +33,6 @@ type Timings struct {

mu sync.RWMutex
histograms map[string]*Histogram
buffers map[string]*RingInt64

help string
label string
Expand All @@ -48,17 +44,14 @@ type Timings struct {
// Categories that aren't initialized will be missing from the map until the
// first time they are updated.
func NewTimings(name, help, label string, categories ...string) *Timings {
bufferSize := *timingsBufferSize
t := &Timings{
histograms: make(map[string]*Histogram),
buffers: make(map[string]*RingInt64),
help: help,
label: label,
labelCombined: IsDimensionCombined(label),
}
for _, cat := range categories {
t.histograms[cat] = NewGenericHistogram("", "", bucketCutoffs, bucketLabels, "Count", "Time")
t.buffers[cat] = NewRingInt64(bufferSize)
}
if name != "" {
publish(name, t)
Expand All @@ -71,7 +64,6 @@ func NewTimings(name, help, label string, categories ...string) *Timings {
func (t *Timings) Reset() {
t.mu.RLock()
t.histograms = make(map[string]*Histogram)
t.buffers = make(map[string]*RingInt64)
t.mu.RUnlock()
}

Expand Down Expand Up @@ -100,27 +92,6 @@ func (t *Timings) Add(name string, elapsed time.Duration) {
hist.Add(elapsedNs)
t.totalCount.Add(1)
t.totalTime.Add(elapsedNs)

if *timingsBufferSize > 0 {
bufferSize := *timingsBufferSize
// Get existing buffer
t.mu.RLock()
buffer, ok := t.buffers[name]
t.mu.RUnlock()

// Create buffer if it does not exist.
if !ok {
t.mu.Lock()
buffer, ok = t.buffers[name]
if !ok {
buffer = NewRingInt64(bufferSize)
t.buffers[name] = buffer
}
t.mu.Unlock()
}

buffer.Add(elapsedNs)
}
}

// Record is a convenience function that records completion
Expand All @@ -141,12 +112,10 @@ func (t *Timings) String() string {
TotalCount int64
TotalTime int64
Histograms map[string]*Histogram
Buffers map[string]*RingInt64
}{
t.totalCount.Get(),
t.totalTime.Get(),
t.histograms,
t.buffers,
}

data, err := json.Marshal(tm)
Expand All @@ -156,17 +125,6 @@ func (t *Timings) String() string {
return string(data)
}

// Buffers returns a map pointing at the buffers.
func (t *Timings) Buffers() (b map[string]*RingInt64) {
t.mu.RLock()
defer t.mu.RUnlock()
b = make(map[string]*RingInt64, len(t.buffers))
for k, v := range t.buffers {
b[k] = v
}
return
}

// Histograms returns a map pointing at the histograms.
func (t *Timings) Histograms() (h map[string]*Histogram) {
t.mu.RLock()
Expand Down Expand Up @@ -243,7 +201,6 @@ func NewMultiTimings(name string, help string, labels []string) *MultiTimings {
t := &MultiTimings{
Timings: Timings{
histograms: make(map[string]*Histogram),
buffers: make(map[string]*RingInt64),
help: help,
},
labels: labels,
Expand Down
Loading

0 comments on commit 03740a6

Please sign in to comment.