Skip to content

Commit

Permalink
Merge pull request #6634 from planetscale/ds-lag-histogram
Browse files Browse the repository at this point in the history
heartbeat: lag histogram
  • Loading branch information
sougou authored Sep 9, 2020
2 parents faf184f + e54822a commit 6a25895
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 88 deletions.
5 changes: 3 additions & 2 deletions go/stats/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ func (h *Histogram) MarshalJSON() ([]byte, error) {
fmt.Fprintf(b, "{")
totalCount := int64(0)
for i, label := range h.labels {
totalCount += h.buckets[i].Get()
fmt.Fprintf(b, "\"%v\": %v, ", label, totalCount)
count := h.buckets[i].Get()
totalCount += count
fmt.Fprintf(b, "\"%v\": %v, ", label, count)
}
fmt.Fprintf(b, "\"%s\": %v, ", h.countLabel, totalCount)
fmt.Fprintf(b, "\"%s\": %v", h.totalLabel, h.total.Get())
Expand Down
2 changes: 1 addition & 1 deletion go/stats/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestHistogram(t *testing.T) {
for i := 0; i < 10; i++ {
h.Add(int64(i))
}
want := `{"1": 2, "5": 6, "inf": 10, "Count": 10, "Total": 45}`
want := `{"1": 2, "5": 4, "inf": 4, "Count": 10, "Total": 45}`
if h.String() != want {
t.Errorf("got %v, want %v", h.String(), want)
}
Expand Down
12 changes: 6 additions & 6 deletions go/stats/timings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestTimings(t *testing.T) {
tm.Add("tag1", 500*time.Microsecond)
tm.Add("tag1", 1*time.Millisecond)
tm.Add("tag2", 1*time.Millisecond)
want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1":{"500000":1,"1000000":2,"5000000":2,"10000000":2,"50000000":2,"100000000":2,"500000000":2,"1000000000":2,"5000000000":2,"10000000000":2,"inf":2,"Count":2,"Time":1500000},"tag2":{"500000":0,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1000000}}}`
want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}}}`
if got := tm.String(); got != want {
t.Errorf("got %s, want %s", got, want)
}
Expand All @@ -43,7 +43,7 @@ func TestMultiTimings(t *testing.T) {
mtm.Add([]string{"tag1a", "tag1b"}, 500*time.Microsecond)
mtm.Add([]string{"tag1a", "tag1b"}, 1*time.Millisecond)
mtm.Add([]string{"tag2a", "tag2b"}, 1*time.Millisecond)
want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1a.tag1b":{"500000":1,"1000000":2,"5000000":2,"10000000":2,"50000000":2,"100000000":2,"500000000":2,"1000000000":2,"5000000000":2,"10000000000":2,"inf":2,"Count":2,"Time":1500000},"tag2a.tag2b":{"500000":0,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1000000}}}`
want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1a.tag1b":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2a.tag2b":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}}}`
if got := mtm.String(); got != want {
t.Errorf("got %s, want %s", got, want)
}
Expand All @@ -55,7 +55,7 @@ func TestMultiTimingsDot(t *testing.T) {
mtm.Add([]string{"value.dot"}, 500*time.Microsecond)
safe := safeLabel("value.dot")
safeJSON := strings.Replace(safe, "\\", "\\\\", -1)
want := `{"TotalCount":1,"TotalTime":500000,"Histograms":{"` + safeJSON + `":{"500000":1,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":500000}}}`
want := `{"TotalCount":1,"TotalTime":500000,"Histograms":{"` + safeJSON + `":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":500000}}}`
if got := mtm.String(); got != want {
t.Errorf("got %s, want %s", got, want)
}
Expand Down Expand Up @@ -86,16 +86,16 @@ func TestTimingsCombineDimension(t *testing.T) {

t1 := NewTimings("timing_combine_dim1", "help", "label")
t1.Add("t1", 1*time.Nanosecond)
want := `{"TotalCount":1,"TotalTime":1,"Histograms":{"t1":{"500000":1,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1}}}`
want := `{"TotalCount":1,"TotalTime":1,"Histograms":{"t1":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}}}`
assert.Equal(t, want, t1.String())

t2 := NewTimings("timing_combine_dim2", "help", "a")
t2.Add("t1", 1)
want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all":{"500000":1,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1}}}`
want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}}}`
assert.Equal(t, want, t2.String())

t3 := NewMultiTimings("timing_combine_dim3", "help", []string{"a", "b", "c"})
t3.Add([]string{"c1", "c2", "c3"}, 1)
want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all.c2.all":{"500000":1,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1}}}`
want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all.c2.all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}}}`
assert.Equal(t, want, t3.String())
}
6 changes: 3 additions & 3 deletions go/vt/servenv/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func TestHistogram(t *testing.T) {
ebd := NewExporter("", "")
g := ebd.NewHistogram("ghistogram", "", []int64{10})
g.Add(1)
assert.Contains(t, expvar.Get("ghistogram").String(), `{"10": 1, "inf": 1, "Count": 1, "Total": 1}`)
assert.Contains(t, expvar.Get("ghistogram").String(), `{"10": 1, "inf": 0, "Count": 1, "Total": 1}`)

ebd = NewExporter("i1", "label")

Expand All @@ -607,13 +607,13 @@ func TestHistogram(t *testing.T) {
g.Add(1)
g.Add(1)
assert.Contains(t, expvar.Get("lmtimings").String(), `i1`)
assert.Contains(t, expvar.Get("lhistogram").String(), `{"10": 2, "inf": 2, "Count": 2, "Total": 2}`)
assert.Contains(t, expvar.Get("lhistogram").String(), `{"10": 2, "inf": 0, "Count": 2, "Total": 2}`)

// Ensure var gets replaced.
g = ebd.NewHistogram("lhistogram", "", []int64{10})
g.Add(1)
assert.Contains(t, expvar.Get("lmtimings").String(), `i1`)
assert.Contains(t, expvar.Get("lhistogram").String(), `{"10": 1, "inf": 1, "Count": 1, "Total": 1}`)
assert.Contains(t, expvar.Get("lhistogram").String(), `{"10": 1, "inf": 0, "Count": 1, "Total": 1}`)
}

func TestPublish(t *testing.T) {
Expand Down
71 changes: 21 additions & 50 deletions go/vt/vttablet/endtoend/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ limitations under the License.
package endtoend

import (
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -71,13 +72,11 @@ func TestPoolSize(t *testing.T) {
tag := "ConnPoolWaitCount"
got := framework.FetchInt(framework.DebugVars(), tag)
want := framework.FetchInt(vstart, tag)
if got <= want {
t.Errorf("%s: %d, must be greater than %d", tag, got, want)
}
assert.LessOrEqual(t, want, got)
}

func TestDisableConsolidator(t *testing.T) {
totalConsolidationsTag := "Waits/Histograms/Consolidations/inf"
totalConsolidationsTag := "Waits/Histograms/Consolidations/Count"
initial := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
var wg sync.WaitGroup
wg.Add(2)
Expand All @@ -91,9 +90,8 @@ func TestDisableConsolidator(t *testing.T) {
}()
wg.Wait()
afterOne := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
if initial+1 != afterOne {
t.Errorf("expected one consolidation, but got: before consolidation count: %v; after consolidation count: %v", initial, afterOne)
}
assert.Equal(t, initial+1, afterOne, "expected one consolidation")

framework.Server.SetConsolidatorMode(tabletenv.Disable)
defer framework.Server.SetConsolidatorMode(tabletenv.Enable)
var wg2 sync.WaitGroup
Expand All @@ -108,13 +106,11 @@ func TestDisableConsolidator(t *testing.T) {
}()
wg2.Wait()
noNewConsolidations := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
if afterOne != noNewConsolidations {
t.Errorf("expected no new consolidations, but got: before consolidation count: %v; after consolidation count: %v", afterOne, noNewConsolidations)
}
assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")
}

func TestConsolidatorReplicasOnly(t *testing.T) {
totalConsolidationsTag := "Waits/Histograms/Consolidations/inf"
totalConsolidationsTag := "Waits/Histograms/Consolidations/Count"
initial := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
var wg sync.WaitGroup
wg.Add(2)
Expand All @@ -128,9 +124,7 @@ func TestConsolidatorReplicasOnly(t *testing.T) {
}()
wg.Wait()
afterOne := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
if initial+1 != afterOne {
t.Errorf("expected one consolidation, but got: before consolidation count: %v; after consolidation count: %v", initial, afterOne)
}
assert.Equal(t, initial+1, afterOne, "expected one consolidation")

framework.Server.SetConsolidatorMode(tabletenv.NotOnMaster)
defer framework.Server.SetConsolidatorMode(tabletenv.Enable)
Expand All @@ -148,22 +142,16 @@ func TestConsolidatorReplicasOnly(t *testing.T) {
}()
wg2.Wait()
noNewConsolidations := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
if afterOne != noNewConsolidations {
t.Errorf("expected no new consolidations, but got: before consolidation count: %v; after consolidation count: %v", afterOne, noNewConsolidations)
}
assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")

// become a replica, where query consolidation should happen
client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA)

err := client.SetServingType(topodatapb.TabletType_REPLICA)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
defer func() {
err = client.SetServingType(topodatapb.TabletType_MASTER)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
}()

initial = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
Expand All @@ -179,9 +167,7 @@ func TestConsolidatorReplicasOnly(t *testing.T) {
}()
wg3.Wait()
afterOne = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
if initial+1 != afterOne {
t.Errorf("expected another consolidation, but got: before consolidation count: %v; after consolidation count: %v", initial, afterOne)
}
assert.Equal(t, initial+1, afterOne, "expected another consolidation")
}

func TestQueryPlanCache(t *testing.T) {
Expand Down Expand Up @@ -221,17 +207,13 @@ func TestMaxResultSize(t *testing.T) {
client := framework.NewClient()
query := "select * from vitess_test"
_, err := client.Execute(query, nil)
assert.Error(t, err)
want := "Row count exceeded"
if err == nil || !strings.HasPrefix(err.Error(), want) {
t.Errorf("Error: %v, must start with %s", err, want)
}
assert.Contains(t, err.Error(), want, "Error: %v, must start with %s", err, want)
verifyIntValue(t, framework.DebugVars(), "MaxResultSize", 2)
framework.Server.SetMaxResultSize(10)
_, err = client.Execute(query, nil)
if err != nil {
t.Error(err)
return
}
require.NoError(t, err)
}

func TestWarnResultSize(t *testing.T) {
Expand All @@ -244,18 +226,14 @@ func TestWarnResultSize(t *testing.T) {
_, _ = client.Execute(query, nil)
newWarningsResultsExceededCount := framework.FetchInt(framework.DebugVars(), "Warnings/ResultsExceeded")
exceededCountDiff := newWarningsResultsExceededCount - originalWarningsResultsExceededCount
if exceededCountDiff != 1 {
t.Errorf("Warnings.ResultsExceeded counter should have increased by 1, instead got %v", exceededCountDiff)
}
assert.Equal(t, 1, exceededCountDiff, "Warnings.ResultsExceeded counter should have increased by 1")

verifyIntValue(t, framework.DebugVars(), "WarnResultSize", 2)
framework.Server.SetWarnResultSize(10)
_, _ = client.Execute(query, nil)
newerWarningsResultsExceededCount := framework.FetchInt(framework.DebugVars(), "Warnings/ResultsExceeded")
exceededCountDiff = newerWarningsResultsExceededCount - newWarningsResultsExceededCount
if exceededCountDiff != 0 {
t.Errorf("Warnings.ResultsExceeded counter should not have increased, instead got %v", exceededCountDiff)
}
assert.Equal(t, 0, exceededCountDiff, "Warnings.ResultsExceeded counter should not have increased")
}

func TestQueryTimeout(t *testing.T) {
Expand All @@ -265,18 +243,11 @@ func TestQueryTimeout(t *testing.T) {

client := framework.NewClient()
err := client.Begin(false)
if err != nil {
t.Error(err)
return
}
require.NoError(t, err)
_, err = client.Execute("select sleep(1) from vitess_test", nil)
if code := vterrors.Code(err); code != vtrpcpb.Code_CANCELED {
t.Errorf("Error code: %v, want %v", code, vtrpcpb.Code_CANCELED)
}
assert.Equal(t, vtrpcpb.Code_CANCELED, vterrors.Code(err))
_, err = client.Execute("select 1 from dual", nil)
if code := vterrors.Code(err); code != vtrpcpb.Code_ABORTED {
t.Errorf("Error code: %v, want %v", code, vtrpcpb.Code_ABORTED)
}
assert.Equal(t, vtrpcpb.Code_ABORTED, vterrors.Code(err))
vend := framework.DebugVars()
verifyIntValue(t, vend, "QueryTimeout", int(100*time.Millisecond))
compareIntDiff(t, vend, "Kills/Queries", vstart, 1)
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/repltracker/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (r *heartbeatReader) readHeartbeat() {
lag := r.now().Sub(time.Unix(0, ts))
cumulativeLagNs.Add(lag.Nanoseconds())
currentLagNs.Set(lag.Nanoseconds())
heartbeatLagNsHistogram.Add(lag.Nanoseconds())
reads.Add(1)

r.lagMu.Lock()
Expand Down
54 changes: 28 additions & 26 deletions go/vt/vttablet/tabletserver/repltracker/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
"testing"
"time"

"vitess.io/vitess/go/test/utils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconfigs"
Expand Down Expand Up @@ -53,21 +58,25 @@ func TestReaderReadHeartbeat(t *testing.T) {
tr.readHeartbeat()
lag, err := tr.Status()

if err != nil {
t.Fatalf("Should not be in error: %v", tr.lastKnownError)
}
if got, want := lag, 10*time.Second; got != want {
t.Fatalf("wrong latest lag: got = %v, want = %v", tr.lastKnownLag, want)
}
if got, want := cumulativeLagNs.Get(), 10*time.Second.Nanoseconds(); got != want {
t.Fatalf("wrong cumulative lag: got = %v, want = %v", got, want)
}
if got, want := reads.Get(), int64(1); got != want {
t.Fatalf("wrong read count: got = %v, want = %v", got, want)
}
if got, want := readErrors.Get(), int64(0); got != want {
t.Fatalf("wrong read error count: got = %v, want = %v", got, want)
require.NoError(t, err)
expectedLag := 10 * time.Second
assert.Equal(t, expectedLag, lag, "wrong latest lag")
expectedCumLag := 10 * time.Second.Nanoseconds()
assert.Equal(t, expectedCumLag, cumulativeLagNs.Get(), "wrong cumulative lag")
assert.Equal(t, int64(1), reads.Get(), "wrong read count")
assert.Equal(t, int64(0), readErrors.Get(), "wrong read error count")
expectedHisto := map[string]int64{
"0": int64(0),
"1ms": int64(0),
"10ms": int64(0),
"100ms": int64(0),
"1s": int64(0),
"10s": int64(1),
"100s": int64(0),
"1000s": int64(0),
">1000s": int64(0),
}
utils.MustMatch(t, expectedHisto, heartbeatLagNsHistogram.Counts(), "wrong counts in histogram")
}

// TestReaderReadHeartbeatError tests that we properly account for errors
Expand All @@ -84,18 +93,11 @@ func TestReaderReadHeartbeatError(t *testing.T) {
tr.readHeartbeat()
lag, err := tr.Status()

if err == nil {
t.Fatalf("Should be in error: %v", tr.lastKnownError)
}
if got, want := lag, 0*time.Second; got != want {
t.Fatalf("wrong lastKnownLag: got = %v, want = %v", got, want)
}
if got, want := cumulativeLagNs.Get(), int64(0); got != want {
t.Fatalf("wrong cumulative lag: got = %v, want = %v", got, want)
}
if got, want := readErrors.Get(), int64(1); got != want {
t.Fatalf("wrong read error count: got = %v, want = %v", got, want)
}
require.Error(t, err)
assert.EqualError(t, err, tr.lastKnownError.Error(), "expected error")
assert.Equal(t, 0*time.Second, lag, "wrong lastKnownLag")
assert.Equal(t, int64(0), cumulativeLagNs.Get(), "wrong cumulative lag")
assert.Equal(t, int64(1), readErrors.Get(), "wrong read error count")
}

func newReader(db *fakesqldb.DB, nowFunc func() time.Time) *heartbeatReader {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletserver/repltracker/repltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ var (
cumulativeLagNs = stats.NewCounter("HeartbeatCumulativeLagNs", "Incremented by the current lag at each heartbeat read interval")
// HeartbeatCurrentLagNs is a point-in-time calculation of the lag, updated at each heartbeat read interval.
currentLagNs = stats.NewGauge("HeartbeatCurrentLagNs", "Point in time calculation of the heartbeat lag")
// HeartbeatLagNsHistogram is a histogram of the lag values. Cutoffs are 0, 1ms, 10ms, 100ms, 1s, 10s, 100s, 1000s
heartbeatLagNsHistogram = stats.NewGenericHistogram("HeartbeatLagNsHistogram",
"Histogram of lag values in nanoseconds", []int64{0, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11, 1e12},
[]string{"0", "1ms", "10ms", "100ms", "1s", "10s", "100s", "1000s", ">1000s"}, "Count", "Total")
)

// ReplTracker tracks replication lag.
Expand Down

0 comments on commit 6a25895

Please sign in to comment.