Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: show back-off details in slow log #13770

Merged
merged 10 commits into from
Dec 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,23 +317,33 @@ func (*testSuite) TestT(c *C) {

res := dom.ShowSlowQuery(&ast.ShowSlow{Tp: ast.ShowSlowTop, Count: 2})
c.Assert(res, HasLen, 2)
c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "bbb", Duration: 3 * time.Second})
c.Assert(*res[1], Equals, SlowQueryInfo{SQL: "ccc", Duration: 2 * time.Second})
c.Assert(res[0].SQL, Equals, "bbb")
c.Assert(res[0].Duration, Equals, 3*time.Second)
c.Assert(res[1].SQL, Equals, "ccc")
c.Assert(res[1].Duration, Equals, 2*time.Second)

res = dom.ShowSlowQuery(&ast.ShowSlow{Tp: ast.ShowSlowTop, Count: 2, Kind: ast.ShowSlowKindInternal})
c.Assert(res, HasLen, 1)
c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "aaa", Duration: time.Second, Internal: true})
c.Assert(res[0].SQL, Equals, "aaa")
c.Assert(res[0].Duration, Equals, time.Second)
c.Assert(res[0].Internal, Equals, true)

res = dom.ShowSlowQuery(&ast.ShowSlow{Tp: ast.ShowSlowTop, Count: 4, Kind: ast.ShowSlowKindAll})
c.Assert(res, HasLen, 3)
c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "bbb", Duration: 3 * time.Second})
c.Assert(*res[1], Equals, SlowQueryInfo{SQL: "ccc", Duration: 2 * time.Second})
c.Assert(*res[2], Equals, SlowQueryInfo{SQL: "aaa", Duration: time.Second, Internal: true})
c.Assert(res[0].SQL, Equals, "bbb")
c.Assert(res[0].Duration, Equals, 3*time.Second)
c.Assert(res[1].SQL, Equals, "ccc")
c.Assert(res[1].Duration, Equals, 2*time.Second)
c.Assert(res[2].SQL, Equals, "aaa")
c.Assert(res[2].Duration, Equals, time.Second)
c.Assert(res[2].Internal, Equals, true)

res = dom.ShowSlowQuery(&ast.ShowSlow{Tp: ast.ShowSlowRecent, Count: 2})
c.Assert(res, HasLen, 2)
c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "ccc", Duration: 2 * time.Second})
c.Assert(*res[1], Equals, SlowQueryInfo{SQL: "bbb", Duration: 3 * time.Second})
c.Assert(res[0].SQL, Equals, "ccc")
c.Assert(res[0].Duration, Equals, 2*time.Second)
c.Assert(res[1].SQL, Equals, "bbb")
c.Assert(res[1].Duration, Equals, 3*time.Second)

metrics.PanicCounter.Reset()
// Since the stats lease is 0 now, so create a new ticker will panic.
Expand Down
28 changes: 14 additions & 14 deletions domain/topn_slow_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,28 +116,28 @@ func (t *testTopNSlowQuerySuite) TestQueue(c *C) {
q.Append(&SlowQueryInfo{SQL: "ccc"})

query := q.recent.Query(1)
c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ccc"})
c.Assert(query[0].SQL, Equals, "ccc")
query = q.recent.Query(2)
c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ccc"})
c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "bbb"})
c.Assert(query[0].SQL, Equals, "ccc")
c.Assert(query[1].SQL, Equals, "bbb")
query = q.recent.Query(6)
c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ccc"})
c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "bbb"})
c.Assert(*query[2], Equals, SlowQueryInfo{SQL: "aaa"})
c.Assert(query[0].SQL, Equals, "ccc")
c.Assert(query[1].SQL, Equals, "bbb")
c.Assert(query[2].SQL, Equals, "aaa")

q.Append(&SlowQueryInfo{SQL: "ddd"})
q.Append(&SlowQueryInfo{SQL: "eee"})
q.Append(&SlowQueryInfo{SQL: "fff"})
q.Append(&SlowQueryInfo{SQL: "ggg"})

query = q.recent.Query(3)
c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ggg"})
c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "fff"})
c.Assert(*query[2], Equals, SlowQueryInfo{SQL: "eee"})
c.Assert(query[0].SQL, Equals, "ggg")
c.Assert(query[1].SQL, Equals, "fff")
c.Assert(query[2].SQL, Equals, "eee")
query = q.recent.Query(6)
c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ggg"})
c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "fff"})
c.Assert(*query[2], Equals, SlowQueryInfo{SQL: "eee"})
c.Assert(*query[3], Equals, SlowQueryInfo{SQL: "ddd"})
c.Assert(*query[4], Equals, SlowQueryInfo{SQL: "ccc"})
c.Assert(query[0].SQL, Equals, "ggg")
c.Assert(query[1].SQL, Equals, "fff")
c.Assert(query[2].SQL, Equals, "eee")
c.Assert(query[3].SQL, Equals, "ddd")
c.Assert(query[4].SQL, Equals, "ccc")
}
56 changes: 55 additions & 1 deletion sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,15 @@ func (sc *StatementContext) CopTasksDetails() *CopTasksDetails {
sc.mu.Lock()
defer sc.mu.Unlock()
n := len(sc.mu.allExecDetails)
d := &CopTasksDetails{NumCopTasks: n}
d := &CopTasksDetails{
NumCopTasks: n,
MaxBackoffTime: make(map[string]time.Duration),
AvgBackoffTime: make(map[string]time.Duration),
P90BackoffTime: make(map[string]time.Duration),
TotBackoffTime: make(map[string]time.Duration),
TotBackoffTimes: make(map[string]int),
MaxBackoffAddress: make(map[string]string),
}
if n == 0 {
return d
}
Expand All @@ -540,6 +548,45 @@ func (sc *StatementContext) CopTasksDetails() *CopTasksDetails {
d.P90WaitTime = sc.mu.allExecDetails[n*9/10].WaitTime
d.MaxWaitTime = sc.mu.allExecDetails[n-1].WaitTime
d.MaxWaitAddress = sc.mu.allExecDetails[n-1].CalleeAddress

// calculate backoff details
type backoffItem struct {
callee string
sleepTime time.Duration
times int
}
backoffInfo := make(map[string][]backoffItem)
for _, ed := range sc.mu.allExecDetails {
for backoff := range ed.BackoffTimes {
backoffInfo[backoff] = append(backoffInfo[backoff], backoffItem{
callee: ed.CalleeAddress,
sleepTime: ed.BackoffSleep[backoff],
times: ed.BackoffTimes[backoff],
})
}
}
for backoff, items := range backoffInfo {
if len(items) == 0 {
continue
}
sort.Slice(items, func(i, j int) bool {
return items[i].sleepTime < items[j].sleepTime
})
n := len(items)
d.MaxBackoffAddress[backoff] = items[n-1].callee
d.MaxBackoffTime[backoff] = items[n-1].sleepTime
d.P90BackoffTime[backoff] = items[n*9/10].sleepTime

var totalTime time.Duration
totalTimes := 0
for _, it := range items {
totalTime += it.sleepTime
totalTimes += it.times
}
d.AvgBackoffTime[backoff] = totalTime / time.Duration(n)
d.TotBackoffTime[backoff] = totalTime
d.TotBackoffTimes[backoff] = totalTimes
}
return d
}

Expand All @@ -556,6 +603,13 @@ type CopTasksDetails struct {
P90WaitTime time.Duration
MaxWaitAddress string
MaxWaitTime time.Duration

MaxBackoffTime map[string]time.Duration
MaxBackoffAddress map[string]string
AvgBackoffTime map[string]time.Duration
P90BackoffTime map[string]time.Duration
TotBackoffTime map[string]time.Duration
TotBackoffTimes map[string]int
}

// ToZapFields wraps the CopTasksDetails as zap.Fileds.
Expand Down
15 changes: 15 additions & 0 deletions sessionctx/stmtctx/stmtctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ var _ = Suite(&stmtctxSuit{})

func (s *stmtctxSuit) TestCopTasksDetails(c *C) {
ctx := new(stmtctx.StatementContext)
backoffs := []string{"tikvRPC", "pdRPC", "regionMiss"}
for i := 0; i < 100; i++ {
d := &execdetails.ExecDetails{
CalleeAddress: fmt.Sprintf("%v", i+1),
ProcessTime: time.Second * time.Duration(i+1),
WaitTime: time.Millisecond * time.Duration(i+1),
BackoffSleep: make(map[string]time.Duration),
BackoffTimes: make(map[string]int),
}
for _, backoff := range backoffs {
d.BackoffSleep[backoff] = time.Millisecond * 100 * time.Duration(i+1)
d.BackoffTimes[backoff] = i + 1
}
ctx.MergeExecDetails(d, nil)
}
Expand All @@ -53,6 +60,14 @@ func (s *stmtctxSuit) TestCopTasksDetails(c *C) {
c.Assert(d.MaxWaitAddress, Equals, "100")
fields := d.ToZapFields()
c.Assert(len(fields), Equals, 9)
for _, backoff := range backoffs {
c.Assert(d.MaxBackoffAddress[backoff], Equals, "100")
c.Assert(d.MaxBackoffTime[backoff], Equals, 100*time.Millisecond*100)
c.Assert(d.P90BackoffTime[backoff], Equals, time.Millisecond*100*91)
c.Assert(d.AvgBackoffTime[backoff], Equals, time.Millisecond*100*101/2)
c.Assert(d.TotBackoffTimes[backoff], Equals, 101*50)
c.Assert(d.TotBackoffTime[backoff], Equals, 101*50*100*time.Millisecond)
}
}

func (s *stmtctxSuit) TestStatementContextPushDownFLags(c *C) {
Expand Down
29 changes: 28 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"crypto/tls"
"fmt"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -1186,6 +1187,8 @@ const (
SlowLogCopWaitMax = "Cop_wait_max"
// SlowLogCopWaitAddr is the address of TiKV where the cop-task which cost wait process time run.
SlowLogCopWaitAddr = "Cop_wait_addr"
// SlowLogCopBackoffPrefix contains backoff information.
SlowLogCopBackoffPrefix = "Cop_backoff_"
// SlowLogMemMax is the max number bytes of memory used in this statement.
SlowLogMemMax = "Mem_max"
// SlowLogPrepared is used to indicate whether this sql execute in prepare.
Expand Down Expand Up @@ -1299,14 +1302,27 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string {
if logItems.CopTasks != nil {
writeSlowLogItem(&buf, SlowLogNumCopTasksStr, strconv.FormatInt(int64(logItems.CopTasks.NumCopTasks), 10))
if logItems.CopTasks.NumCopTasks > 0 {
// make the result stable
backoffs := make([]string, 0, 3)
for backoff := range logItems.CopTasks.TotBackoffTimes {
backoffs = append(backoffs, backoff)
}
sort.Strings(backoffs)

if logItems.CopTasks.NumCopTasks == 1 {
buf.WriteString(SlowLogRowPrefixStr + fmt.Sprintf("%v%v%v %v%v%v",
SlowLogCopProcAvg, SlowLogSpaceMarkStr, logItems.CopTasks.AvgProcessTime.Seconds(),
SlowLogCopProcAddr, SlowLogSpaceMarkStr, logItems.CopTasks.MaxProcessAddress) + "\n")
buf.WriteString(SlowLogRowPrefixStr + fmt.Sprintf("%v%v%v %v%v%v",
SlowLogCopWaitAvg, SlowLogSpaceMarkStr, logItems.CopTasks.AvgWaitTime.Seconds(),
SlowLogCopWaitAddr, SlowLogSpaceMarkStr, logItems.CopTasks.MaxWaitAddress) + "\n")

for _, backoff := range backoffs {
backoffPrefix := SlowLogCopBackoffPrefix + backoff + "_"
buf.WriteString(SlowLogRowPrefixStr + fmt.Sprintf("%v%v%v %v%v%v\n",
backoffPrefix+"total_times", SlowLogSpaceMarkStr, logItems.CopTasks.TotBackoffTimes[backoff],
backoffPrefix+"total_time", SlowLogSpaceMarkStr, logItems.CopTasks.TotBackoffTime[backoff].Seconds(),
))
}
} else {
buf.WriteString(SlowLogRowPrefixStr + fmt.Sprintf("%v%v%v %v%v%v %v%v%v %v%v%v",
SlowLogCopProcAvg, SlowLogSpaceMarkStr, logItems.CopTasks.AvgProcessTime.Seconds(),
Expand All @@ -1318,6 +1334,17 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string {
SlowLogCopWaitP90, SlowLogSpaceMarkStr, logItems.CopTasks.P90WaitTime.Seconds(),
SlowLogCopWaitMax, SlowLogSpaceMarkStr, logItems.CopTasks.MaxWaitTime.Seconds(),
SlowLogCopWaitAddr, SlowLogSpaceMarkStr, logItems.CopTasks.MaxWaitAddress) + "\n")
for _, backoff := range backoffs {
backoffPrefix := SlowLogCopBackoffPrefix + backoff + "_"
buf.WriteString(SlowLogRowPrefixStr + fmt.Sprintf("%v%v%v %v%v%v %v%v%v %v%v%v %v%v%v %v%v%v\n",
backoffPrefix+"total_times", SlowLogSpaceMarkStr, logItems.CopTasks.TotBackoffTimes[backoff],
backoffPrefix+"total_time", SlowLogSpaceMarkStr, logItems.CopTasks.TotBackoffTime[backoff].Seconds(),
backoffPrefix+"max_time", SlowLogSpaceMarkStr, logItems.CopTasks.MaxBackoffTime[backoff].Seconds(),
backoffPrefix+"max_addr", SlowLogSpaceMarkStr, logItems.CopTasks.MaxBackoffAddress[backoff],
backoffPrefix+"avg_time", SlowLogSpaceMarkStr, logItems.CopTasks.AvgBackoffTime[backoff].Seconds(),
backoffPrefix+"p90_time", SlowLogSpaceMarkStr, logItems.CopTasks.P90BackoffTime[backoff].Seconds(),
))
}
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,24 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
P90WaitTime: time.Millisecond * 20,
MaxWaitTime: time.Millisecond * 30,
MaxWaitAddress: "10.6.131.79",
MaxBackoffTime: make(map[string]time.Duration),
AvgBackoffTime: make(map[string]time.Duration),
P90BackoffTime: make(map[string]time.Duration),
TotBackoffTime: make(map[string]time.Duration),
TotBackoffTimes: make(map[string]int),
MaxBackoffAddress: make(map[string]string),
}

backoffs := []string{"rpcTiKV", "rpcPD", "regionMiss"}
for _, backoff := range backoffs {
copTasks.MaxBackoffTime[backoff] = time.Millisecond * 200
copTasks.MaxBackoffAddress[backoff] = "127.0.0.1"
copTasks.AvgBackoffTime[backoff] = time.Millisecond * 200
copTasks.P90BackoffTime[backoff] = time.Millisecond * 200
copTasks.TotBackoffTime[backoff] = time.Millisecond * 200
copTasks.TotBackoffTimes[backoff] = 200
}

var memMax int64 = 2333
resultString := `# Txn_start_ts: 406649736972468225
# User: root@192.168.0.1
Expand All @@ -168,6 +185,9 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
# Num_cop_tasks: 10
# Cop_proc_avg: 1 Cop_proc_p90: 2 Cop_proc_max: 3 Cop_proc_addr: 10.6.131.78
# Cop_wait_avg: 0.01 Cop_wait_p90: 0.02 Cop_wait_max: 0.03 Cop_wait_addr: 10.6.131.79
# Cop_backoff_regionMiss_total_times: 200 Cop_backoff_regionMiss_total_time: 0.2 Cop_backoff_regionMiss_max_time: 0.2 Cop_backoff_regionMiss_max_addr: 127.0.0.1 Cop_backoff_regionMiss_avg_time: 0.2 Cop_backoff_regionMiss_p90_time: 0.2
# Cop_backoff_rpcPD_total_times: 200 Cop_backoff_rpcPD_total_time: 0.2 Cop_backoff_rpcPD_max_time: 0.2 Cop_backoff_rpcPD_max_addr: 127.0.0.1 Cop_backoff_rpcPD_avg_time: 0.2 Cop_backoff_rpcPD_p90_time: 0.2
# Cop_backoff_rpcTiKV_total_times: 200 Cop_backoff_rpcTiKV_total_time: 0.2 Cop_backoff_rpcTiKV_max_time: 0.2 Cop_backoff_rpcTiKV_max_addr: 127.0.0.1 Cop_backoff_rpcTiKV_avg_time: 0.2 Cop_backoff_rpcTiKV_p90_time: 0.2
# Mem_max: 2333
# Prepared: true
# Has_more_results: true
Expand Down
11 changes: 11 additions & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ type Backoffer struct {
types []fmt.Stringer
vars *kv.Variables
noop bool

backoffSleepMS map[backoffType]int
backoffTimes map[backoffType]int
}

type txnStartCtxKeyType struct{}
Expand Down Expand Up @@ -332,6 +335,14 @@ func (b *Backoffer) BackoffWithMaxSleep(typ backoffType, maxSleepMs int, err err
realSleep := f(b.ctx, maxSleepMs)
backoffDuration.Observe(float64(realSleep) / 1000)
b.totalSleep += realSleep
if b.backoffSleepMS == nil {
b.backoffSleepMS = make(map[backoffType]int)
}
b.backoffSleepMS[typ] += realSleep
if b.backoffTimes == nil {
b.backoffTimes = make(map[backoffType]int)
}
b.backoffTimes[typ]++

var startTs interface{}
if ts := b.ctx.Value(txnStartKey); ts != nil {
Expand Down
6 changes: 6 additions & 0 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
resp.detail = new(execdetails.ExecDetails)
}
resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond
resp.detail.BackoffSleep, resp.detail.BackoffTimes = make(map[string]time.Duration), make(map[string]int)
for backoff := range bo.backoffTimes {
backoffName := backoff.String()
resp.detail.BackoffTimes[backoffName] = bo.backoffTimes[backoff]
resp.detail.BackoffSleep[backoffName] = time.Duration(bo.backoffSleepMS[backoff]) * time.Millisecond
}
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
if rpcCtx != nil {
resp.detail.CalleeAddress = rpcCtx.Addr
}
Expand Down
2 changes: 2 additions & 0 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type ExecDetails struct {
ProcessTime time.Duration
WaitTime time.Duration
BackoffTime time.Duration
BackoffSleep map[string]time.Duration
BackoffTimes map[string]int
RequestCount int
TotalKeys int64
ProcessedKeys int64
Expand Down