Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

heartbeat: lag histogram #6634

Merged
merged 2 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go/stats/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ func (h *Histogram) MarshalJSON() ([]byte, error) {
fmt.Fprintf(b, "{")
totalCount := int64(0)
for i, label := range h.labels {
totalCount += h.buckets[i].Get()
fmt.Fprintf(b, "\"%v\": %v, ", label, totalCount)
count := h.buckets[i].Get()
totalCount += count
fmt.Fprintf(b, "\"%v\": %v, ", label, count)
}
fmt.Fprintf(b, "\"%s\": %v, ", h.countLabel, totalCount)
fmt.Fprintf(b, "\"%s\": %v", h.totalLabel, h.total.Get())
Expand Down
2 changes: 1 addition & 1 deletion go/stats/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestHistogram(t *testing.T) {
for i := 0; i < 10; i++ {
h.Add(int64(i))
}
want := `{"1": 2, "5": 6, "inf": 10, "Count": 10, "Total": 45}`
want := `{"1": 2, "5": 4, "inf": 4, "Count": 10, "Total": 45}`
if h.String() != want {
t.Errorf("got %v, want %v", h.String(), want)
}
Expand Down
12 changes: 6 additions & 6 deletions go/stats/timings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestTimings(t *testing.T) {
tm.Add("tag1", 500*time.Microsecond)
tm.Add("tag1", 1*time.Millisecond)
tm.Add("tag2", 1*time.Millisecond)
want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1":{"500000":1,"1000000":2,"5000000":2,"10000000":2,"50000000":2,"100000000":2,"500000000":2,"1000000000":2,"5000000000":2,"10000000000":2,"inf":2,"Count":2,"Time":1500000},"tag2":{"500000":0,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1000000}}}`
want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}}}`
if got := tm.String(); got != want {
t.Errorf("got %s, want %s", got, want)
}
Expand All @@ -43,7 +43,7 @@ func TestMultiTimings(t *testing.T) {
mtm.Add([]string{"tag1a", "tag1b"}, 500*time.Microsecond)
mtm.Add([]string{"tag1a", "tag1b"}, 1*time.Millisecond)
mtm.Add([]string{"tag2a", "tag2b"}, 1*time.Millisecond)
want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1a.tag1b":{"500000":1,"1000000":2,"5000000":2,"10000000":2,"50000000":2,"100000000":2,"500000000":2,"1000000000":2,"5000000000":2,"10000000000":2,"inf":2,"Count":2,"Time":1500000},"tag2a.tag2b":{"500000":0,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1000000}}}`
want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1a.tag1b":{"500000":1,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":2,"Time":1500000},"tag2a.tag2b":{"500000":0,"1000000":1,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1000000}}}`
if got := mtm.String(); got != want {
t.Errorf("got %s, want %s", got, want)
}
Expand All @@ -55,7 +55,7 @@ func TestMultiTimingsDot(t *testing.T) {
mtm.Add([]string{"value.dot"}, 500*time.Microsecond)
safe := safeLabel("value.dot")
safeJSON := strings.Replace(safe, "\\", "\\\\", -1)
want := `{"TotalCount":1,"TotalTime":500000,"Histograms":{"` + safeJSON + `":{"500000":1,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":500000}}}`
want := `{"TotalCount":1,"TotalTime":500000,"Histograms":{"` + safeJSON + `":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":500000}}}`
if got := mtm.String(); got != want {
t.Errorf("got %s, want %s", got, want)
}
Expand Down Expand Up @@ -86,16 +86,16 @@ func TestTimingsCombineDimension(t *testing.T) {

t1 := NewTimings("timing_combine_dim1", "help", "label")
t1.Add("t1", 1*time.Nanosecond)
want := `{"TotalCount":1,"TotalTime":1,"Histograms":{"t1":{"500000":1,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1}}}`
want := `{"TotalCount":1,"TotalTime":1,"Histograms":{"t1":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}}}`
assert.Equal(t, want, t1.String())

t2 := NewTimings("timing_combine_dim2", "help", "a")
t2.Add("t1", 1)
want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all":{"500000":1,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1}}}`
want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}}}`
assert.Equal(t, want, t2.String())

t3 := NewMultiTimings("timing_combine_dim3", "help", []string{"a", "b", "c"})
t3.Add([]string{"c1", "c2", "c3"}, 1)
want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all.c2.all":{"500000":1,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1}}}`
want = `{"TotalCount":1,"TotalTime":1,"Histograms":{"all.c2.all":{"500000":1,"1000000":0,"5000000":0,"10000000":0,"50000000":0,"100000000":0,"500000000":0,"1000000000":0,"5000000000":0,"10000000000":0,"inf":0,"Count":1,"Time":1}}}`
assert.Equal(t, want, t3.String())
}
6 changes: 3 additions & 3 deletions go/vt/servenv/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func TestHistogram(t *testing.T) {
ebd := NewExporter("", "")
g := ebd.NewHistogram("ghistogram", "", []int64{10})
g.Add(1)
assert.Contains(t, expvar.Get("ghistogram").String(), `{"10": 1, "inf": 1, "Count": 1, "Total": 1}`)
assert.Contains(t, expvar.Get("ghistogram").String(), `{"10": 1, "inf": 0, "Count": 1, "Total": 1}`)

ebd = NewExporter("i1", "label")

Expand All @@ -607,13 +607,13 @@ func TestHistogram(t *testing.T) {
g.Add(1)
g.Add(1)
assert.Contains(t, expvar.Get("lmtimings").String(), `i1`)
assert.Contains(t, expvar.Get("lhistogram").String(), `{"10": 2, "inf": 2, "Count": 2, "Total": 2}`)
assert.Contains(t, expvar.Get("lhistogram").String(), `{"10": 2, "inf": 0, "Count": 2, "Total": 2}`)

// Ensure var gets replaced.
g = ebd.NewHistogram("lhistogram", "", []int64{10})
g.Add(1)
assert.Contains(t, expvar.Get("lmtimings").String(), `i1`)
assert.Contains(t, expvar.Get("lhistogram").String(), `{"10": 1, "inf": 1, "Count": 1, "Total": 1}`)
assert.Contains(t, expvar.Get("lhistogram").String(), `{"10": 1, "inf": 0, "Count": 1, "Total": 1}`)
}

func TestPublish(t *testing.T) {
Expand Down
71 changes: 21 additions & 50 deletions go/vt/vttablet/endtoend/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ limitations under the License.
package endtoend

import (
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -71,13 +72,11 @@ func TestPoolSize(t *testing.T) {
tag := "ConnPoolWaitCount"
got := framework.FetchInt(framework.DebugVars(), tag)
want := framework.FetchInt(vstart, tag)
if got <= want {
t.Errorf("%s: %d, must be greater than %d", tag, got, want)
}
assert.LessOrEqual(t, want, got)
}

func TestDisableConsolidator(t *testing.T) {
totalConsolidationsTag := "Waits/Histograms/Consolidations/inf"
totalConsolidationsTag := "Waits/Histograms/Consolidations/Count"
initial := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
var wg sync.WaitGroup
wg.Add(2)
Expand All @@ -91,9 +90,8 @@ func TestDisableConsolidator(t *testing.T) {
}()
wg.Wait()
afterOne := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
if initial+1 != afterOne {
t.Errorf("expected one consolidation, but got: before consolidation count: %v; after consolidation count: %v", initial, afterOne)
}
assert.Equal(t, initial+1, afterOne, "expected one consolidation")

framework.Server.SetConsolidatorMode(tabletenv.Disable)
defer framework.Server.SetConsolidatorMode(tabletenv.Enable)
var wg2 sync.WaitGroup
Expand All @@ -108,13 +106,11 @@ func TestDisableConsolidator(t *testing.T) {
}()
wg2.Wait()
noNewConsolidations := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
if afterOne != noNewConsolidations {
t.Errorf("expected no new consolidations, but got: before consolidation count: %v; after consolidation count: %v", afterOne, noNewConsolidations)
}
assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")
}

func TestConsolidatorReplicasOnly(t *testing.T) {
totalConsolidationsTag := "Waits/Histograms/Consolidations/inf"
totalConsolidationsTag := "Waits/Histograms/Consolidations/Count"
initial := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
var wg sync.WaitGroup
wg.Add(2)
Expand All @@ -128,9 +124,7 @@ func TestConsolidatorReplicasOnly(t *testing.T) {
}()
wg.Wait()
afterOne := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
if initial+1 != afterOne {
t.Errorf("expected one consolidation, but got: before consolidation count: %v; after consolidation count: %v", initial, afterOne)
}
assert.Equal(t, initial+1, afterOne, "expected one consolidation")

framework.Server.SetConsolidatorMode(tabletenv.NotOnMaster)
defer framework.Server.SetConsolidatorMode(tabletenv.Enable)
Expand All @@ -148,22 +142,16 @@ func TestConsolidatorReplicasOnly(t *testing.T) {
}()
wg2.Wait()
noNewConsolidations := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
if afterOne != noNewConsolidations {
t.Errorf("expected no new consolidations, but got: before consolidation count: %v; after consolidation count: %v", afterOne, noNewConsolidations)
}
assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")

// become a replica, where query consolidation should happen
client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA)

err := client.SetServingType(topodatapb.TabletType_REPLICA)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
defer func() {
err = client.SetServingType(topodatapb.TabletType_MASTER)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
}()

initial = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
Expand All @@ -179,9 +167,7 @@ func TestConsolidatorReplicasOnly(t *testing.T) {
}()
wg3.Wait()
afterOne = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
if initial+1 != afterOne {
t.Errorf("expected another consolidation, but got: before consolidation count: %v; after consolidation count: %v", initial, afterOne)
}
assert.Equal(t, initial+1, afterOne, "expected another consolidation")
}

func TestQueryPlanCache(t *testing.T) {
Expand Down Expand Up @@ -221,17 +207,13 @@ func TestMaxResultSize(t *testing.T) {
client := framework.NewClient()
query := "select * from vitess_test"
_, err := client.Execute(query, nil)
assert.Error(t, err)
want := "Row count exceeded"
if err == nil || !strings.HasPrefix(err.Error(), want) {
t.Errorf("Error: %v, must start with %s", err, want)
}
assert.Contains(t, err.Error(), want, "Error: %v, must start with %s", err, want)
verifyIntValue(t, framework.DebugVars(), "MaxResultSize", 2)
framework.Server.SetMaxResultSize(10)
_, err = client.Execute(query, nil)
if err != nil {
t.Error(err)
return
}
require.NoError(t, err)
}

func TestWarnResultSize(t *testing.T) {
Expand All @@ -244,18 +226,14 @@ func TestWarnResultSize(t *testing.T) {
_, _ = client.Execute(query, nil)
newWarningsResultsExceededCount := framework.FetchInt(framework.DebugVars(), "Warnings/ResultsExceeded")
exceededCountDiff := newWarningsResultsExceededCount - originalWarningsResultsExceededCount
if exceededCountDiff != 1 {
t.Errorf("Warnings.ResultsExceeded counter should have increased by 1, instead got %v", exceededCountDiff)
}
assert.Equal(t, 1, exceededCountDiff, "Warnings.ResultsExceeded counter should have increased by 1")

verifyIntValue(t, framework.DebugVars(), "WarnResultSize", 2)
framework.Server.SetWarnResultSize(10)
_, _ = client.Execute(query, nil)
newerWarningsResultsExceededCount := framework.FetchInt(framework.DebugVars(), "Warnings/ResultsExceeded")
exceededCountDiff = newerWarningsResultsExceededCount - newWarningsResultsExceededCount
if exceededCountDiff != 0 {
t.Errorf("Warnings.ResultsExceeded counter should not have increased, instead got %v", exceededCountDiff)
}
assert.Equal(t, 0, exceededCountDiff, "Warnings.ResultsExceeded counter should not have increased")
}

func TestQueryTimeout(t *testing.T) {
Expand All @@ -265,18 +243,11 @@ func TestQueryTimeout(t *testing.T) {

client := framework.NewClient()
err := client.Begin(false)
if err != nil {
t.Error(err)
return
}
require.NoError(t, err)
_, err = client.Execute("select sleep(1) from vitess_test", nil)
if code := vterrors.Code(err); code != vtrpcpb.Code_CANCELED {
t.Errorf("Error code: %v, want %v", code, vtrpcpb.Code_CANCELED)
}
assert.Equal(t, vtrpcpb.Code_CANCELED, vterrors.Code(err))
_, err = client.Execute("select 1 from dual", nil)
if code := vterrors.Code(err); code != vtrpcpb.Code_ABORTED {
t.Errorf("Error code: %v, want %v", code, vtrpcpb.Code_ABORTED)
}
assert.Equal(t, vtrpcpb.Code_ABORTED, vterrors.Code(err))
vend := framework.DebugVars()
verifyIntValue(t, vend, "QueryTimeout", int(100*time.Millisecond))
compareIntDiff(t, vend, "Kills/Queries", vstart, 1)
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/repltracker/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (r *heartbeatReader) readHeartbeat() {
lag := r.now().Sub(time.Unix(0, ts))
cumulativeLagNs.Add(lag.Nanoseconds())
currentLagNs.Set(lag.Nanoseconds())
heartbeatLagNsHistogram.Add(lag.Nanoseconds())
reads.Add(1)

r.lagMu.Lock()
Expand Down
54 changes: 28 additions & 26 deletions go/vt/vttablet/tabletserver/repltracker/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
"testing"
"time"

"vitess.io/vitess/go/test/utils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconfigs"
Expand Down Expand Up @@ -53,21 +58,25 @@ func TestReaderReadHeartbeat(t *testing.T) {
tr.readHeartbeat()
lag, err := tr.Status()

if err != nil {
t.Fatalf("Should not be in error: %v", tr.lastKnownError)
}
if got, want := lag, 10*time.Second; got != want {
t.Fatalf("wrong latest lag: got = %v, want = %v", tr.lastKnownLag, want)
}
if got, want := cumulativeLagNs.Get(), 10*time.Second.Nanoseconds(); got != want {
t.Fatalf("wrong cumulative lag: got = %v, want = %v", got, want)
}
if got, want := reads.Get(), int64(1); got != want {
t.Fatalf("wrong read count: got = %v, want = %v", got, want)
}
if got, want := readErrors.Get(), int64(0); got != want {
t.Fatalf("wrong read error count: got = %v, want = %v", got, want)
require.NoError(t, err)
expectedLag := 10 * time.Second
assert.Equal(t, expectedLag, lag, "wrong latest lag")
expectedCumLag := 10 * time.Second.Nanoseconds()
assert.Equal(t, expectedCumLag, cumulativeLagNs.Get(), "wrong cumulative lag")
assert.Equal(t, int64(1), reads.Get(), "wrong read count")
assert.Equal(t, int64(0), readErrors.Get(), "wrong read error count")
expectedHisto := map[string]int64{
"0": int64(0),
"1ms": int64(0),
"10ms": int64(0),
"100ms": int64(0),
"1s": int64(0),
"10s": int64(1),
"100s": int64(0),
"1000s": int64(0),
">1000s": int64(0),
}
utils.MustMatch(t, expectedHisto, heartbeatLagNsHistogram.Counts(), "wrong counts in histogram")
}

// TestReaderReadHeartbeatError tests that we properly account for errors
Expand All @@ -84,18 +93,11 @@ func TestReaderReadHeartbeatError(t *testing.T) {
tr.readHeartbeat()
lag, err := tr.Status()

if err == nil {
t.Fatalf("Should be in error: %v", tr.lastKnownError)
}
if got, want := lag, 0*time.Second; got != want {
t.Fatalf("wrong lastKnownLag: got = %v, want = %v", got, want)
}
if got, want := cumulativeLagNs.Get(), int64(0); got != want {
t.Fatalf("wrong cumulative lag: got = %v, want = %v", got, want)
}
if got, want := readErrors.Get(), int64(1); got != want {
t.Fatalf("wrong read error count: got = %v, want = %v", got, want)
}
require.Error(t, err)
assert.EqualError(t, err, tr.lastKnownError.Error(), "expected error")
assert.Equal(t, 0*time.Second, lag, "wrong lastKnownLag")
assert.Equal(t, int64(0), cumulativeLagNs.Get(), "wrong cumulative lag")
assert.Equal(t, int64(1), readErrors.Get(), "wrong read error count")
}

func newReader(db *fakesqldb.DB, nowFunc func() time.Time) *heartbeatReader {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletserver/repltracker/repltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ var (
cumulativeLagNs = stats.NewCounter("HeartbeatCumulativeLagNs", "Incremented by the current lag at each heartbeat read interval")
// HeartbeatCurrentLagNs is a point-in-time calculation of the lag, updated at each heartbeat read interval.
currentLagNs = stats.NewGauge("HeartbeatCurrentLagNs", "Point in time calculation of the heartbeat lag")
// HeartbeatLagNsHistogram is a histogram of the lag values. Cutoffs are 0, 1ms, 10ms, 100ms, 1s, 10s, 100s, 1000s
heartbeatLagNsHistogram = stats.NewGenericHistogram("HeartbeatLagNsHistogram",
"Histogram of lag values in nanoseconds", []int64{0, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11, 1e12},
[]string{"0", "1ms", "10ms", "100ms", "1s", "10s", "100s", "1000s", ">1000s"}, "Count", "Total")
)

// ReplTracker tracks replication lag.
Expand Down