From 5f028d220bcdbfe2483764f5b6dc853ed0961b99 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 8 Sep 2018 16:57:53 +0800 Subject: [PATCH 1/9] domain,executor: store topN slow query in domain --- domain/domain.go | 46 +++++++++++++++ domain/topn_slow_query.go | 114 ++++++++++++++++++++++++++++++++++++++ executor/adapter.go | 4 ++ 3 files changed, 164 insertions(+) create mode 100644 domain/topn_slow_query.go diff --git a/domain/domain.go b/domain/domain.go index 1f6a5a341eac6..5169cf07fd67f 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/execdetails" log "github.com/sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/grpc" @@ -60,6 +61,7 @@ type Domain struct { etcdClient *clientv3.Client wg sync.WaitGroup gvc GlobalVariableCache + slowQuery *topNSlowQuery MockReloadFailed MockFailure // It mocks reload failed. } @@ -329,6 +331,46 @@ func (do *Domain) Reload() error { return nil } +// LogTopNSlowQuery keeps topN recent slow queries in domain. +func (do *Domain) LogTopNSlowQuery(sql string, start time.Time, duration time.Duration, + detail execdetails.ExecDetails, + succ bool, connID, txnTS uint64, + user, db, tableIDs, indexIDs string) { + select { + case do.slowQuery.ch <- &slowQueryInfo{ + sql: sql, + start: start, + duration: duration, + detail: detail, + succ: succ, + connID: connID, + txnTS: txnTS, + user: user, + db: db, + tableIDs: tableIDs, + indexIDs: indexIDs, + }: + default: + } +} + +func (do *Domain) topNSlowQueryLoop() { + defer do.wg.Done() + ticker := time.NewTicker(time.Minute * 10) + defer ticker.Stop() + for { + select { + case now := <-ticker.C: + do.slowQuery.Refresh(now) + case info, ok := <-do.slowQuery.ch: + if !ok { + return + } + do.slowQuery.Push(info) + } + } +} + func (do *Domain) loadSchemaInLoop(lease time.Duration) { defer do.wg.Done() // Lease renewal can run at any frequency. @@ -408,6 +450,7 @@ func (do *Domain) Close() { if do.etcdClient != nil { terror.Log(errors.Trace(do.etcdClient.Close())) } + do.slowQuery.Close() do.sysSessionPool.Close() do.wg.Wait() log.Info("[domain] close") @@ -471,6 +514,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, resourceIdleTimeout), statsLease: statsLease, infoHandle: infoschema.NewHandle(store), + slowQuery: newTopNSlowQuery(30, time.Hour*24*7), } } @@ -529,6 +573,8 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R // Local store needs to get the change information for every DDL state in each session. go do.loadSchemaInLoop(ddlLease) } + do.wg.Add(1) + go do.topNSlowQueryLoop() return nil } diff --git a/domain/topn_slow_query.go b/domain/topn_slow_query.go new file mode 100644 index 0000000000000..d4faeef847943 --- /dev/null +++ b/domain/topn_slow_query.go @@ -0,0 +1,114 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package domain + +import ( + "time" + + "github.com/pingcap/tidb/util/execdetails" +) + +// topNSlowQuery maintains a heap to store recent slow queries. +// N = 30, recent = 7 days by default. +type topNSlowQuery struct { + data []*slowQueryInfo + offset int + recent time.Duration + ch chan *slowQueryInfo +} + +func newTopNSlowQuery(topN int, recent time.Duration) *topNSlowQuery { + return &topNSlowQuery{ + data: make([]*slowQueryInfo, topN), + recent: recent, + ch: make(chan *slowQueryInfo, 1000), + } +} + +func (q *topNSlowQuery) Close() { + close(q.ch) +} + +func (q *topNSlowQuery) Push(info *slowQueryInfo) { + // Heap is not full, append to it and shift up. + if q.offset < len(q.data) { + q.data[q.offset] = info + q.shiftUp(q.offset) + q.offset++ + return + } + + // Replace the heap top and shift down. + if info.duration > q.data[0].duration { + q.data[0] = info + for i := 0; i < q.offset; { + left := 2*i + 1 + right := 2 * (i + 1) + if q.data[i].duration > q.data[left].duration { + q.data[i], q.data[left] = q.data[left], q.data[i] + i = left + } else if q.data[i].duration > q.data[right].duration { + q.data[i], q.data[right] = q.data[right], q.data[i] + i = right + } else { + break + } + } + } +} + +func (q *topNSlowQuery) shiftUp(end int) { + for i := end; i > 0; { + j := (i - 1) / 2 + if q.data[j].duration < q.data[i].duration { + break + } + q.data[i], q.data[j] = q.data[j], q.data[i] + i = j + } +} + +func (q *topNSlowQuery) Refresh(now time.Time) { + // Remove outdated slow query element. + idx := 0 + for i := 0; i < q.offset; i++ { + if q.data[i].start.Add(q.recent).Before(now) { + q.data[idx] = q.data[i] + idx++ + } + } + if q.offset == idx { + return + } + q.offset = idx + + // Rebuild the heap. + for i := 1; i < q.offset; i++ { + q.shiftUp(i) + } +} + +type slowQueryInfo struct { + sql string + start time.Time + duration time.Duration + detail execdetails.ExecDetails + succ bool + connID uint64 + txnTS uint64 + user string + db string + tableIDs string + indexIDs string +} diff --git a/executor/adapter.go b/executor/adapter.go index 13d74fc21206b..4d485d34415d9 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -22,6 +22,7 @@ import ( "github.com/juju/errors" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -367,6 +368,9 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) { logutil.SlowQueryLogger.Warnf( "[SLOW_QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v", internal, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql) + if !sessVars.InRestrictedSQL { + domain.GetDomain(a.Ctx).LogTopNSlowQuery(sql, a.startTime, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, txnTS, user.String(), currentDB, tableIDs, indexIDs) + } } } From 5f3878e2f1d83f86ab9eab5b22c91a8dcd12bbf5 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 10 Sep 2018 16:25:47 +0800 Subject: [PATCH 2/9] add test case and tiny fix --- domain/topn_slow_query.go | 20 +++--- domain/topn_slow_query_test.go | 110 +++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 8 deletions(-) create mode 100644 domain/topn_slow_query_test.go diff --git a/domain/topn_slow_query.go b/domain/topn_slow_query.go index d4faeef847943..480686c71a458 100644 --- a/domain/topn_slow_query.go +++ b/domain/topn_slow_query.go @@ -55,15 +55,18 @@ func (q *topNSlowQuery) Push(info *slowQueryInfo) { for i := 0; i < q.offset; { left := 2*i + 1 right := 2 * (i + 1) - if q.data[i].duration > q.data[left].duration { - q.data[i], q.data[left] = q.data[left], q.data[i] - i = left - } else if q.data[i].duration > q.data[right].duration { - q.data[i], q.data[right] = q.data[right], q.data[i] - i = right - } else { + if left >= q.offset { break } + smaller := left + if right < q.offset && q.data[right].duration < q.data[left].duration { + smaller = right + } + if q.data[i].duration <= q.data[smaller].duration { + break + } + q.data[i], q.data[smaller] = q.data[smaller], q.data[i] + i = smaller } } } @@ -83,7 +86,8 @@ func (q *topNSlowQuery) Refresh(now time.Time) { // Remove outdated slow query element. idx := 0 for i := 0; i < q.offset; i++ { - if q.data[i].start.Add(q.recent).Before(now) { + outdateTime := q.data[i].start.Add(q.recent) + if outdateTime.After(now) { q.data[idx] = q.data[i] idx++ } diff --git a/domain/topn_slow_query_test.go b/domain/topn_slow_query_test.go new file mode 100644 index 0000000000000..ccaa14e5e5519 --- /dev/null +++ b/domain/topn_slow_query_test.go @@ -0,0 +1,110 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package domain + +import ( + "time" + + . "github.com/pingcap/check" +) + +var _ = Suite(&testTopNSlowQuerySuite{}) + +type testTopNSlowQuerySuite struct{} + +func (t *testTopNSlowQuerySuite) TestPush(c *C) { + slowQuery := newTopNSlowQuery(10, 0) + // Insert data into the heap. + slowQuery.Push(&slowQueryInfo{duration: 300 * time.Millisecond}) + slowQuery.Push(&slowQueryInfo{duration: 400 * time.Millisecond}) + slowQuery.Push(&slowQueryInfo{duration: 500 * time.Millisecond}) + slowQuery.Push(&slowQueryInfo{duration: 600 * time.Millisecond}) + slowQuery.Push(&slowQueryInfo{duration: 700 * time.Millisecond}) + slowQuery.Push(&slowQueryInfo{duration: 800 * time.Millisecond}) + slowQuery.Push(&slowQueryInfo{duration: 900 * time.Millisecond}) + slowQuery.Push(&slowQueryInfo{duration: 1000 * time.Millisecond}) + slowQuery.Push(&slowQueryInfo{duration: 1100 * time.Millisecond}) + slowQuery.Push(&slowQueryInfo{duration: 1200 * time.Millisecond}) + c.Assert(slowQuery.data[0].duration, Equals, 300*time.Millisecond) + checkHeap(slowQuery, c) + + // Update all data in the heap. + slowQuery.Push(&slowQueryInfo{duration: 1300 * time.Millisecond}) + c.Assert(slowQuery.data[0].duration, Equals, 400*time.Millisecond) + slowQuery.Push(&slowQueryInfo{duration: 1400 * time.Millisecond}) + c.Assert(slowQuery.data[0].duration, Equals, 500*time.Millisecond) + slowQuery.Push(&slowQueryInfo{duration: 1500 * time.Millisecond}) + c.Assert(slowQuery.data[0].duration, Equals, 600*time.Millisecond) + slowQuery.Push(&slowQueryInfo{duration: 1500 * time.Millisecond}) + c.Assert(slowQuery.data[0].duration, Equals, 700*time.Millisecond) + slowQuery.Push(&slowQueryInfo{duration: 1600 * time.Millisecond}) + c.Assert(slowQuery.data[0].duration, Equals, 800*time.Millisecond) + slowQuery.Push(&slowQueryInfo{duration: 1700 * time.Millisecond}) + c.Assert(slowQuery.data[0].duration, Equals, 900*time.Millisecond) + slowQuery.Push(&slowQueryInfo{duration: 1800 * time.Millisecond}) + c.Assert(slowQuery.data[0].duration, Equals, 1000*time.Millisecond) + slowQuery.Push(&slowQueryInfo{duration: 1900 * time.Millisecond}) + c.Assert(slowQuery.data[0].duration, Equals, 1100*time.Millisecond) + slowQuery.Push(&slowQueryInfo{duration: 2000 * time.Millisecond}) + c.Assert(slowQuery.data[0].duration, Equals, 1200*time.Millisecond) + slowQuery.Push(&slowQueryInfo{duration: 2100 * time.Millisecond}) + c.Assert(slowQuery.data[0].duration, Equals, 1300*time.Millisecond) + checkHeap(slowQuery, c) + + // Data smaller than heap top will not be inserted. + slowQuery.Push(&slowQueryInfo{duration: 1200 * time.Millisecond}) + c.Assert(slowQuery.data[0].duration, Equals, 1300*time.Millisecond) + slowQuery.Push(&slowQueryInfo{duration: 666 * time.Millisecond}) + c.Assert(slowQuery.data[0].duration, Equals, 1300*time.Millisecond) +} + +func (t *testTopNSlowQuerySuite) TestRefresh(c *C) { + now := time.Now() + slowQuery := newTopNSlowQuery(6, 3*time.Second) + + slowQuery.Push(&slowQueryInfo{start: now, duration: 6}) + slowQuery.Push(&slowQueryInfo{start: now.Add(1 * time.Second), duration: 5}) + slowQuery.Push(&slowQueryInfo{start: now.Add(2 * time.Second), duration: 4}) + slowQuery.Push(&slowQueryInfo{start: now.Add(3 * time.Second), duration: 3}) + slowQuery.Push(&slowQueryInfo{start: now.Add(4 * time.Second), duration: 2}) + c.Assert(slowQuery.data[0].duration, Equals, 2*time.Nanosecond) + + slowQuery.Refresh(now.Add(5 * time.Second)) + c.Assert(slowQuery.offset, Equals, 2) + c.Assert(slowQuery.data[0].duration, Equals, 2*time.Nanosecond) + + slowQuery.Push(&slowQueryInfo{start: now.Add(3 * time.Second), duration: 3}) + slowQuery.Push(&slowQueryInfo{start: now.Add(4 * time.Second), duration: 2}) + slowQuery.Push(&slowQueryInfo{start: now.Add(5 * time.Second), duration: 1}) + slowQuery.Push(&slowQueryInfo{start: now.Add(6 * time.Second), duration: 0}) + c.Assert(slowQuery.offset, Equals, 6) + c.Assert(slowQuery.data[0].duration, Equals, 0*time.Nanosecond) + + slowQuery.Refresh(now.Add(6 * time.Second)) + c.Assert(slowQuery.offset, Equals, 4) + c.Assert(slowQuery.data[0].duration, Equals, 0*time.Nanosecond) +} + +func checkHeap(q *topNSlowQuery, c *C) { + for i := 0; i < q.offset; i++ { + left := 2*i + 1 + right := 2*i + 2 + if left < q.offset { + c.Assert(q.data[i].duration, LessEqual, q.data[left].duration) + } + if right < q.offset { + c.Assert(q.data[i].duration, LessEqual, q.data[right].duration) + } + } +} From 537e26d03a7b7209ea414be060298ae7b2b44844 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 10 Sep 2018 16:36:44 +0800 Subject: [PATCH 3/9] fix CI --- executor/adapter.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/executor/adapter.go b/executor/adapter.go index a30c538437bef..c5cfbb2d8f721 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -373,7 +373,11 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) { "[SLOW_QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v", internal, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql) if !sessVars.InRestrictedSQL { - domain.GetDomain(a.Ctx).LogTopNSlowQuery(sql, a.startTime, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, txnTS, user.String(), currentDB, tableIDs, indexIDs) + var userString string + if user != nil { + userString = user.String() + } + domain.GetDomain(a.Ctx).LogTopNSlowQuery(sql, a.startTime, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, txnTS, userString, currentDB, tableIDs, indexIDs) } } } From 85d5f0d7279facf83d115470f5342454f539fc2b Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 10 Sep 2018 20:25:36 +0800 Subject: [PATCH 4/9] fix typo --- domain/topn_slow_query.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/domain/topn_slow_query.go b/domain/topn_slow_query.go index 480686c71a458..e2e798eaee98c 100644 --- a/domain/topn_slow_query.go +++ b/domain/topn_slow_query.go @@ -41,15 +41,15 @@ func (q *topNSlowQuery) Close() { } func (q *topNSlowQuery) Push(info *slowQueryInfo) { - // Heap is not full, append to it and shift up. + // Heap is not full, append to it and sift up. if q.offset < len(q.data) { q.data[q.offset] = info - q.shiftUp(q.offset) + q.siftUp(q.offset) q.offset++ return } - // Replace the heap top and shift down. + // Replace the heap top and sift down. if info.duration > q.data[0].duration { q.data[0] = info for i := 0; i < q.offset; { @@ -71,7 +71,7 @@ func (q *topNSlowQuery) Push(info *slowQueryInfo) { } } -func (q *topNSlowQuery) shiftUp(end int) { +func (q *topNSlowQuery) siftUp(end int) { for i := end; i > 0; { j := (i - 1) / 2 if q.data[j].duration < q.data[i].duration { @@ -99,7 +99,7 @@ func (q *topNSlowQuery) Refresh(now time.Time) { // Rebuild the heap. for i := 1; i < q.offset; i++ { - q.shiftUp(i) + q.siftUp(i) } } From 3b126bf92138153191b5a8c9bfebf280399c9b9b Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 10 Sep 2018 20:55:51 +0800 Subject: [PATCH 5/9] use standard library for heap implementation --- domain/topn_slow_query.go | 74 +++++++++++++++------------------- domain/topn_slow_query_test.go | 12 +++--- 2 files changed, 39 insertions(+), 47 deletions(-) diff --git a/domain/topn_slow_query.go b/domain/topn_slow_query.go index e2e798eaee98c..829f2f8a72c33 100644 --- a/domain/topn_slow_query.go +++ b/domain/topn_slow_query.go @@ -14,23 +14,45 @@ package domain import ( + "container/heap" "time" "github.com/pingcap/tidb/util/execdetails" ) +type slowQueryInfoHeap []*slowQueryInfo + +func (h slowQueryInfoHeap) Len() int { return len(h) } +func (h slowQueryInfoHeap) Less(i, j int) bool { return h[i].duration < h[j].duration } +func (h slowQueryInfoHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *slowQueryInfoHeap) Push(x interface{}) { + // Push and Pop use pointer receivers because they modify the slice's length, + // not just its contents. + *h = append(*h, x.(*slowQueryInfo)) +} + +func (h *slowQueryInfoHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + // topNSlowQuery maintains a heap to store recent slow queries. // N = 30, recent = 7 days by default. type topNSlowQuery struct { - data []*slowQueryInfo - offset int + data slowQueryInfoHeap + topN int recent time.Duration ch chan *slowQueryInfo } func newTopNSlowQuery(topN int, recent time.Duration) *topNSlowQuery { return &topNSlowQuery{ - data: make([]*slowQueryInfo, topN), + data: make([]*slowQueryInfo, 0, topN), + topN: topN, recent: recent, ch: make(chan *slowQueryInfo, 1000), } @@ -42,65 +64,35 @@ func (q *topNSlowQuery) Close() { func (q *topNSlowQuery) Push(info *slowQueryInfo) { // Heap is not full, append to it and sift up. - if q.offset < len(q.data) { - q.data[q.offset] = info - q.siftUp(q.offset) - q.offset++ + if len(q.data) < q.topN { + heap.Push(&q.data, info) return } // Replace the heap top and sift down. if info.duration > q.data[0].duration { - q.data[0] = info - for i := 0; i < q.offset; { - left := 2*i + 1 - right := 2 * (i + 1) - if left >= q.offset { - break - } - smaller := left - if right < q.offset && q.data[right].duration < q.data[left].duration { - smaller = right - } - if q.data[i].duration <= q.data[smaller].duration { - break - } - q.data[i], q.data[smaller] = q.data[smaller], q.data[i] - i = smaller - } - } -} - -func (q *topNSlowQuery) siftUp(end int) { - for i := end; i > 0; { - j := (i - 1) / 2 - if q.data[j].duration < q.data[i].duration { - break - } - q.data[i], q.data[j] = q.data[j], q.data[i] - i = j + heap.Pop(&q.data) + heap.Push(&q.data, info) } } func (q *topNSlowQuery) Refresh(now time.Time) { // Remove outdated slow query element. idx := 0 - for i := 0; i < q.offset; i++ { + for i := 0; i < len(q.data); i++ { outdateTime := q.data[i].start.Add(q.recent) if outdateTime.After(now) { q.data[idx] = q.data[i] idx++ } } - if q.offset == idx { + if len(q.data) == idx { return } - q.offset = idx // Rebuild the heap. - for i := 1; i < q.offset; i++ { - q.siftUp(i) - } + q.data = q.data[:idx] + heap.Init(&q.data) } type slowQueryInfo struct { diff --git a/domain/topn_slow_query_test.go b/domain/topn_slow_query_test.go index ccaa14e5e5519..d86c6fb05f0d2 100644 --- a/domain/topn_slow_query_test.go +++ b/domain/topn_slow_query_test.go @@ -81,29 +81,29 @@ func (t *testTopNSlowQuerySuite) TestRefresh(c *C) { c.Assert(slowQuery.data[0].duration, Equals, 2*time.Nanosecond) slowQuery.Refresh(now.Add(5 * time.Second)) - c.Assert(slowQuery.offset, Equals, 2) + c.Assert(len(slowQuery.data), Equals, 2) c.Assert(slowQuery.data[0].duration, Equals, 2*time.Nanosecond) slowQuery.Push(&slowQueryInfo{start: now.Add(3 * time.Second), duration: 3}) slowQuery.Push(&slowQueryInfo{start: now.Add(4 * time.Second), duration: 2}) slowQuery.Push(&slowQueryInfo{start: now.Add(5 * time.Second), duration: 1}) slowQuery.Push(&slowQueryInfo{start: now.Add(6 * time.Second), duration: 0}) - c.Assert(slowQuery.offset, Equals, 6) + c.Assert(len(slowQuery.data), Equals, 6) c.Assert(slowQuery.data[0].duration, Equals, 0*time.Nanosecond) slowQuery.Refresh(now.Add(6 * time.Second)) - c.Assert(slowQuery.offset, Equals, 4) + c.Assert(len(slowQuery.data), Equals, 4) c.Assert(slowQuery.data[0].duration, Equals, 0*time.Nanosecond) } func checkHeap(q *topNSlowQuery, c *C) { - for i := 0; i < q.offset; i++ { + for i := 0; i < len(q.data); i++ { left := 2*i + 1 right := 2*i + 2 - if left < q.offset { + if left < len(q.data) { c.Assert(q.data[i].duration, LessEqual, q.data[left].duration) } - if right < q.offset { + if right < len(q.data) { c.Assert(q.data[i].duration, LessEqual, q.data[right].duration) } } From 5941733d7e592abc98dbf42422722c5996f53ebe Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 10 Sep 2018 23:56:32 +0800 Subject: [PATCH 6/9] address comment --- domain/domain.go | 6 +-- domain/topn_slow_query.go | 60 ++++++++++++++---------------- domain/topn_slow_query_test.go | 68 +++++++++++++++++----------------- 3 files changed, 65 insertions(+), 69 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index c401ccf55e069..4f3298ce29cb7 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -61,7 +61,7 @@ type Domain struct { etcdClient *clientv3.Client wg sync.WaitGroup gvc GlobalVariableCache - slowQuery *topNSlowQuery + slowQuery *topNSlowQueries MockReloadFailed MockFailure // It mocks reload failed. } @@ -366,7 +366,7 @@ func (do *Domain) topNSlowQueryLoop() { if !ok { return } - do.slowQuery.Push(info) + do.slowQuery.Append(info) } } } @@ -514,7 +514,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, resourceIdleTimeout), statsLease: statsLease, infoHandle: infoschema.NewHandle(store), - slowQuery: newTopNSlowQuery(30, time.Hour*24*7), + slowQuery: newTopNSlowQueries(30, time.Hour*24*7), } } diff --git a/domain/topn_slow_query.go b/domain/topn_slow_query.go index 829f2f8a72c33..218d11c8384c5 100644 --- a/domain/topn_slow_query.go +++ b/domain/topn_slow_query.go @@ -20,37 +20,17 @@ import ( "github.com/pingcap/tidb/util/execdetails" ) -type slowQueryInfoHeap []*slowQueryInfo - -func (h slowQueryInfoHeap) Len() int { return len(h) } -func (h slowQueryInfoHeap) Less(i, j int) bool { return h[i].duration < h[j].duration } -func (h slowQueryInfoHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } - -func (h *slowQueryInfoHeap) Push(x interface{}) { - // Push and Pop use pointer receivers because they modify the slice's length, - // not just its contents. - *h = append(*h, x.(*slowQueryInfo)) -} - -func (h *slowQueryInfoHeap) Pop() interface{} { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} - -// topNSlowQuery maintains a heap to store recent slow queries. +// topNSlowQueries maintains a heap to store recent slow queries. // N = 30, recent = 7 days by default. -type topNSlowQuery struct { - data slowQueryInfoHeap +type topNSlowQueries struct { + data []*slowQueryInfo topN int recent time.Duration ch chan *slowQueryInfo } -func newTopNSlowQuery(topN int, recent time.Duration) *topNSlowQuery { - return &topNSlowQuery{ +func newTopNSlowQueries(topN int, recent time.Duration) *topNSlowQueries { + return &topNSlowQueries{ data: make([]*slowQueryInfo, 0, topN), topN: topN, recent: recent, @@ -58,25 +38,41 @@ func newTopNSlowQuery(topN int, recent time.Duration) *topNSlowQuery { } } -func (q *topNSlowQuery) Close() { +func (q *topNSlowQueries) Close() { close(q.ch) } -func (q *topNSlowQuery) Push(info *slowQueryInfo) { +func (q *topNSlowQueries) Len() int { return len(q.data) } +func (q *topNSlowQueries) Less(i, j int) bool { return q.data[i].duration < q.data[j].duration } +func (q *topNSlowQueries) Swap(i, j int) { q.data[i], q.data[j] = q.data[j], q.data[i] } + +func (q *topNSlowQueries) Push(x interface{}) { + q.data = append(q.data, x.(*slowQueryInfo)) +} + +func (q *topNSlowQueries) Pop() interface{} { + old := q.data + n := len(old) + x := old[n-1] + q.data = old[0 : n-1] + return x +} + +func (q *topNSlowQueries) Append(info *slowQueryInfo) { // Heap is not full, append to it and sift up. if len(q.data) < q.topN { - heap.Push(&q.data, info) + heap.Push(q, info) return } // Replace the heap top and sift down. if info.duration > q.data[0].duration { - heap.Pop(&q.data) - heap.Push(&q.data, info) + heap.Pop(q) + heap.Push(q, info) } } -func (q *topNSlowQuery) Refresh(now time.Time) { +func (q *topNSlowQueries) Refresh(now time.Time) { // Remove outdated slow query element. idx := 0 for i := 0; i < len(q.data); i++ { @@ -92,7 +88,7 @@ func (q *topNSlowQuery) Refresh(now time.Time) { // Rebuild the heap. q.data = q.data[:idx] - heap.Init(&q.data) + heap.Init(q) } type slowQueryInfo struct { diff --git a/domain/topn_slow_query_test.go b/domain/topn_slow_query_test.go index d86c6fb05f0d2..5819b9a214a05 100644 --- a/domain/topn_slow_query_test.go +++ b/domain/topn_slow_query_test.go @@ -24,70 +24,70 @@ var _ = Suite(&testTopNSlowQuerySuite{}) type testTopNSlowQuerySuite struct{} func (t *testTopNSlowQuerySuite) TestPush(c *C) { - slowQuery := newTopNSlowQuery(10, 0) + slowQuery := newTopNSlowQueries(10, 0) // Insert data into the heap. - slowQuery.Push(&slowQueryInfo{duration: 300 * time.Millisecond}) - slowQuery.Push(&slowQueryInfo{duration: 400 * time.Millisecond}) - slowQuery.Push(&slowQueryInfo{duration: 500 * time.Millisecond}) - slowQuery.Push(&slowQueryInfo{duration: 600 * time.Millisecond}) - slowQuery.Push(&slowQueryInfo{duration: 700 * time.Millisecond}) - slowQuery.Push(&slowQueryInfo{duration: 800 * time.Millisecond}) - slowQuery.Push(&slowQueryInfo{duration: 900 * time.Millisecond}) - slowQuery.Push(&slowQueryInfo{duration: 1000 * time.Millisecond}) - slowQuery.Push(&slowQueryInfo{duration: 1100 * time.Millisecond}) - slowQuery.Push(&slowQueryInfo{duration: 1200 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 300 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 400 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 500 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 600 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 700 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 800 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 900 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 1000 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 1100 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 1200 * time.Millisecond}) c.Assert(slowQuery.data[0].duration, Equals, 300*time.Millisecond) checkHeap(slowQuery, c) // Update all data in the heap. - slowQuery.Push(&slowQueryInfo{duration: 1300 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 1300 * time.Millisecond}) c.Assert(slowQuery.data[0].duration, Equals, 400*time.Millisecond) - slowQuery.Push(&slowQueryInfo{duration: 1400 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 1400 * time.Millisecond}) c.Assert(slowQuery.data[0].duration, Equals, 500*time.Millisecond) - slowQuery.Push(&slowQueryInfo{duration: 1500 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 1500 * time.Millisecond}) c.Assert(slowQuery.data[0].duration, Equals, 600*time.Millisecond) - slowQuery.Push(&slowQueryInfo{duration: 1500 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 1500 * time.Millisecond}) c.Assert(slowQuery.data[0].duration, Equals, 700*time.Millisecond) - slowQuery.Push(&slowQueryInfo{duration: 1600 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 1600 * time.Millisecond}) c.Assert(slowQuery.data[0].duration, Equals, 800*time.Millisecond) - slowQuery.Push(&slowQueryInfo{duration: 1700 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 1700 * time.Millisecond}) c.Assert(slowQuery.data[0].duration, Equals, 900*time.Millisecond) - slowQuery.Push(&slowQueryInfo{duration: 1800 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 1800 * time.Millisecond}) c.Assert(slowQuery.data[0].duration, Equals, 1000*time.Millisecond) - slowQuery.Push(&slowQueryInfo{duration: 1900 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 1900 * time.Millisecond}) c.Assert(slowQuery.data[0].duration, Equals, 1100*time.Millisecond) - slowQuery.Push(&slowQueryInfo{duration: 2000 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 2000 * time.Millisecond}) c.Assert(slowQuery.data[0].duration, Equals, 1200*time.Millisecond) - slowQuery.Push(&slowQueryInfo{duration: 2100 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 2100 * time.Millisecond}) c.Assert(slowQuery.data[0].duration, Equals, 1300*time.Millisecond) checkHeap(slowQuery, c) // Data smaller than heap top will not be inserted. - slowQuery.Push(&slowQueryInfo{duration: 1200 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 1200 * time.Millisecond}) c.Assert(slowQuery.data[0].duration, Equals, 1300*time.Millisecond) - slowQuery.Push(&slowQueryInfo{duration: 666 * time.Millisecond}) + slowQuery.Append(&slowQueryInfo{duration: 666 * time.Millisecond}) c.Assert(slowQuery.data[0].duration, Equals, 1300*time.Millisecond) } func (t *testTopNSlowQuerySuite) TestRefresh(c *C) { now := time.Now() - slowQuery := newTopNSlowQuery(6, 3*time.Second) + slowQuery := newTopNSlowQueries(6, 3*time.Second) - slowQuery.Push(&slowQueryInfo{start: now, duration: 6}) - slowQuery.Push(&slowQueryInfo{start: now.Add(1 * time.Second), duration: 5}) - slowQuery.Push(&slowQueryInfo{start: now.Add(2 * time.Second), duration: 4}) - slowQuery.Push(&slowQueryInfo{start: now.Add(3 * time.Second), duration: 3}) - slowQuery.Push(&slowQueryInfo{start: now.Add(4 * time.Second), duration: 2}) + slowQuery.Append(&slowQueryInfo{start: now, duration: 6}) + slowQuery.Append(&slowQueryInfo{start: now.Add(1 * time.Second), duration: 5}) + slowQuery.Append(&slowQueryInfo{start: now.Add(2 * time.Second), duration: 4}) + slowQuery.Append(&slowQueryInfo{start: now.Add(3 * time.Second), duration: 3}) + slowQuery.Append(&slowQueryInfo{start: now.Add(4 * time.Second), duration: 2}) c.Assert(slowQuery.data[0].duration, Equals, 2*time.Nanosecond) slowQuery.Refresh(now.Add(5 * time.Second)) c.Assert(len(slowQuery.data), Equals, 2) c.Assert(slowQuery.data[0].duration, Equals, 2*time.Nanosecond) - slowQuery.Push(&slowQueryInfo{start: now.Add(3 * time.Second), duration: 3}) - slowQuery.Push(&slowQueryInfo{start: now.Add(4 * time.Second), duration: 2}) - slowQuery.Push(&slowQueryInfo{start: now.Add(5 * time.Second), duration: 1}) - slowQuery.Push(&slowQueryInfo{start: now.Add(6 * time.Second), duration: 0}) + slowQuery.Append(&slowQueryInfo{start: now.Add(3 * time.Second), duration: 3}) + slowQuery.Append(&slowQueryInfo{start: now.Add(4 * time.Second), duration: 2}) + slowQuery.Append(&slowQueryInfo{start: now.Add(5 * time.Second), duration: 1}) + slowQuery.Append(&slowQueryInfo{start: now.Add(6 * time.Second), duration: 0}) c.Assert(len(slowQuery.data), Equals, 6) c.Assert(slowQuery.data[0].duration, Equals, 0*time.Nanosecond) @@ -96,7 +96,7 @@ func (t *testTopNSlowQuerySuite) TestRefresh(c *C) { c.Assert(slowQuery.data[0].duration, Equals, 0*time.Nanosecond) } -func checkHeap(q *topNSlowQuery, c *C) { +func checkHeap(q *topNSlowQueries, c *C) { for i := 0; i < len(q.data); i++ { left := 2*i + 1 right := 2*i + 2 From c388314b9a20f990ef313b92692bb3cc669ed083 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 11 Sep 2018 11:28:36 +0800 Subject: [PATCH 7/9] address comment --- domain/domain.go | 1 + 1 file changed, 1 insertion(+) diff --git a/domain/domain.go b/domain/domain.go index 4f3298ce29cb7..d0e6d79c7528d 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -355,6 +355,7 @@ func (do *Domain) LogTopNSlowQuery(sql string, start time.Time, duration time.Du } func (do *Domain) topNSlowQueryLoop() { + defer recoverInDomain("topNSlowQueryLoop", false) defer do.wg.Done() ticker := time.NewTicker(time.Minute * 10) defer ticker.Stop() From 8dc40fb801f7f68a0fe86b2e15efdad8b48e3d41 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 11 Sep 2018 13:46:52 +0800 Subject: [PATCH 8/9] split into two heaps: user and interal --- domain/domain.go | 3 +- domain/topn_slow_query.go | 118 +++++++++++++++++++-------------- domain/topn_slow_query_test.go | 46 ++++++------- executor/adapter.go | 10 ++- 4 files changed, 98 insertions(+), 79 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index d0e6d79c7528d..63ec375c171d9 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -335,7 +335,7 @@ func (do *Domain) Reload() error { func (do *Domain) LogTopNSlowQuery(sql string, start time.Time, duration time.Duration, detail execdetails.ExecDetails, succ bool, connID, txnTS uint64, - user, db, tableIDs, indexIDs string) { + user, db, tableIDs, indexIDs string, internal bool) { select { case do.slowQuery.ch <- &slowQueryInfo{ sql: sql, @@ -349,6 +349,7 @@ func (do *Domain) LogTopNSlowQuery(sql string, start time.Time, duration time.Du db: db, tableIDs: tableIDs, indexIDs: indexIDs, + internal: internal, }: default: } diff --git a/domain/topn_slow_query.go b/domain/topn_slow_query.go index 218d11c8384c5..d03222d305efc 100644 --- a/domain/topn_slow_query.go +++ b/domain/topn_slow_query.go @@ -20,75 +20,94 @@ import ( "github.com/pingcap/tidb/util/execdetails" ) -// topNSlowQueries maintains a heap to store recent slow queries. -// N = 30, recent = 7 days by default. -type topNSlowQueries struct { - data []*slowQueryInfo - topN int - recent time.Duration - ch chan *slowQueryInfo -} - -func newTopNSlowQueries(topN int, recent time.Duration) *topNSlowQueries { - return &topNSlowQueries{ - data: make([]*slowQueryInfo, 0, topN), - topN: topN, - recent: recent, - ch: make(chan *slowQueryInfo, 1000), - } -} - -func (q *topNSlowQueries) Close() { - close(q.ch) +type slowQueryHeap struct { + data []*slowQueryInfo } -func (q *topNSlowQueries) Len() int { return len(q.data) } -func (q *topNSlowQueries) Less(i, j int) bool { return q.data[i].duration < q.data[j].duration } -func (q *topNSlowQueries) Swap(i, j int) { q.data[i], q.data[j] = q.data[j], q.data[i] } +func (h *slowQueryHeap) Len() int { return len(h.data) } +func (h *slowQueryHeap) Less(i, j int) bool { return h.data[i].duration < h.data[j].duration } +func (h *slowQueryHeap) Swap(i, j int) { h.data[i], h.data[j] = h.data[j], h.data[i] } -func (q *topNSlowQueries) Push(x interface{}) { - q.data = append(q.data, x.(*slowQueryInfo)) +func (h *slowQueryHeap) Push(x interface{}) { + h.data = append(h.data, x.(*slowQueryInfo)) } -func (q *topNSlowQueries) Pop() interface{} { - old := q.data +func (h *slowQueryHeap) Pop() interface{} { + old := h.data n := len(old) x := old[n-1] - q.data = old[0 : n-1] + h.data = old[0 : n-1] return x } -func (q *topNSlowQueries) Append(info *slowQueryInfo) { - // Heap is not full, append to it and sift up. - if len(q.data) < q.topN { - heap.Push(q, info) - return - } - - // Replace the heap top and sift down. - if info.duration > q.data[0].duration { - heap.Pop(q) - heap.Push(q, info) - } -} - -func (q *topNSlowQueries) Refresh(now time.Time) { +func (h *slowQueryHeap) Refresh(now time.Time, recent time.Duration) { // Remove outdated slow query element. idx := 0 - for i := 0; i < len(q.data); i++ { - outdateTime := q.data[i].start.Add(q.recent) + for i := 0; i < len(h.data); i++ { + outdateTime := h.data[i].start.Add(recent) if outdateTime.After(now) { - q.data[idx] = q.data[i] + h.data[idx] = h.data[i] idx++ } } - if len(q.data) == idx { + if len(h.data) == idx { return } // Rebuild the heap. - q.data = q.data[:idx] - heap.Init(q) + h.data = h.data[:idx] + heap.Init(h) +} + +// topNSlowQueries maintains two heaps to store recent slow queries: one for user's and one for internal. +// N = 30, recent = 7 days by default. +type topNSlowQueries struct { + user slowQueryHeap + internal slowQueryHeap + topN int + recent time.Duration + ch chan *slowQueryInfo +} + +func newTopNSlowQueries(topN int, recent time.Duration) *topNSlowQueries { + ret := &topNSlowQueries{ + topN: topN, + recent: recent, + ch: make(chan *slowQueryInfo, 1000), + } + ret.user.data = make([]*slowQueryInfo, 0, topN) + ret.internal.data = make([]*slowQueryInfo, 0, topN) + return ret +} + +func (q *topNSlowQueries) Append(info *slowQueryInfo) { + var h *slowQueryHeap + if info.internal { + h = &q.internal + } else { + h = &q.user + } + + // Heap is not full. + if len(h.data) < q.topN { + heap.Push(h, info) + return + } + + // Replace the heap top. + if info.duration > h.data[0].duration { + heap.Pop(h) + heap.Push(h, info) + } +} + +func (q *topNSlowQueries) Refresh(now time.Time) { + q.user.Refresh(now, q.recent) + q.internal.Refresh(now, q.recent) +} + +func (q *topNSlowQueries) Close() { + close(q.ch) } type slowQueryInfo struct { @@ -103,4 +122,5 @@ type slowQueryInfo struct { db string tableIDs string indexIDs string + internal bool } diff --git a/domain/topn_slow_query_test.go b/domain/topn_slow_query_test.go index 5819b9a214a05..653cd484226b5 100644 --- a/domain/topn_slow_query_test.go +++ b/domain/topn_slow_query_test.go @@ -36,37 +36,37 @@ func (t *testTopNSlowQuerySuite) TestPush(c *C) { slowQuery.Append(&slowQueryInfo{duration: 1000 * time.Millisecond}) slowQuery.Append(&slowQueryInfo{duration: 1100 * time.Millisecond}) slowQuery.Append(&slowQueryInfo{duration: 1200 * time.Millisecond}) - c.Assert(slowQuery.data[0].duration, Equals, 300*time.Millisecond) - checkHeap(slowQuery, c) + c.Assert(slowQuery.user.data[0].duration, Equals, 300*time.Millisecond) + checkHeap(&slowQuery.user, c) // Update all data in the heap. slowQuery.Append(&slowQueryInfo{duration: 1300 * time.Millisecond}) - c.Assert(slowQuery.data[0].duration, Equals, 400*time.Millisecond) + c.Assert(slowQuery.user.data[0].duration, Equals, 400*time.Millisecond) slowQuery.Append(&slowQueryInfo{duration: 1400 * time.Millisecond}) - c.Assert(slowQuery.data[0].duration, Equals, 500*time.Millisecond) + c.Assert(slowQuery.user.data[0].duration, Equals, 500*time.Millisecond) slowQuery.Append(&slowQueryInfo{duration: 1500 * time.Millisecond}) - c.Assert(slowQuery.data[0].duration, Equals, 600*time.Millisecond) + c.Assert(slowQuery.user.data[0].duration, Equals, 600*time.Millisecond) slowQuery.Append(&slowQueryInfo{duration: 1500 * time.Millisecond}) - c.Assert(slowQuery.data[0].duration, Equals, 700*time.Millisecond) + c.Assert(slowQuery.user.data[0].duration, Equals, 700*time.Millisecond) slowQuery.Append(&slowQueryInfo{duration: 1600 * time.Millisecond}) - c.Assert(slowQuery.data[0].duration, Equals, 800*time.Millisecond) + c.Assert(slowQuery.user.data[0].duration, Equals, 800*time.Millisecond) slowQuery.Append(&slowQueryInfo{duration: 1700 * time.Millisecond}) - c.Assert(slowQuery.data[0].duration, Equals, 900*time.Millisecond) + c.Assert(slowQuery.user.data[0].duration, Equals, 900*time.Millisecond) slowQuery.Append(&slowQueryInfo{duration: 1800 * time.Millisecond}) - c.Assert(slowQuery.data[0].duration, Equals, 1000*time.Millisecond) + c.Assert(slowQuery.user.data[0].duration, Equals, 1000*time.Millisecond) slowQuery.Append(&slowQueryInfo{duration: 1900 * time.Millisecond}) - c.Assert(slowQuery.data[0].duration, Equals, 1100*time.Millisecond) + c.Assert(slowQuery.user.data[0].duration, Equals, 1100*time.Millisecond) slowQuery.Append(&slowQueryInfo{duration: 2000 * time.Millisecond}) - c.Assert(slowQuery.data[0].duration, Equals, 1200*time.Millisecond) + c.Assert(slowQuery.user.data[0].duration, Equals, 1200*time.Millisecond) slowQuery.Append(&slowQueryInfo{duration: 2100 * time.Millisecond}) - c.Assert(slowQuery.data[0].duration, Equals, 1300*time.Millisecond) - checkHeap(slowQuery, c) + c.Assert(slowQuery.user.data[0].duration, Equals, 1300*time.Millisecond) + checkHeap(&slowQuery.user, c) // Data smaller than heap top will not be inserted. slowQuery.Append(&slowQueryInfo{duration: 1200 * time.Millisecond}) - c.Assert(slowQuery.data[0].duration, Equals, 1300*time.Millisecond) + c.Assert(slowQuery.user.data[0].duration, Equals, 1300*time.Millisecond) slowQuery.Append(&slowQueryInfo{duration: 666 * time.Millisecond}) - c.Assert(slowQuery.data[0].duration, Equals, 1300*time.Millisecond) + c.Assert(slowQuery.user.data[0].duration, Equals, 1300*time.Millisecond) } func (t *testTopNSlowQuerySuite) TestRefresh(c *C) { @@ -78,25 +78,25 @@ func (t *testTopNSlowQuerySuite) TestRefresh(c *C) { slowQuery.Append(&slowQueryInfo{start: now.Add(2 * time.Second), duration: 4}) slowQuery.Append(&slowQueryInfo{start: now.Add(3 * time.Second), duration: 3}) slowQuery.Append(&slowQueryInfo{start: now.Add(4 * time.Second), duration: 2}) - c.Assert(slowQuery.data[0].duration, Equals, 2*time.Nanosecond) + c.Assert(slowQuery.user.data[0].duration, Equals, 2*time.Nanosecond) slowQuery.Refresh(now.Add(5 * time.Second)) - c.Assert(len(slowQuery.data), Equals, 2) - c.Assert(slowQuery.data[0].duration, Equals, 2*time.Nanosecond) + c.Assert(len(slowQuery.user.data), Equals, 2) + c.Assert(slowQuery.user.data[0].duration, Equals, 2*time.Nanosecond) slowQuery.Append(&slowQueryInfo{start: now.Add(3 * time.Second), duration: 3}) slowQuery.Append(&slowQueryInfo{start: now.Add(4 * time.Second), duration: 2}) slowQuery.Append(&slowQueryInfo{start: now.Add(5 * time.Second), duration: 1}) slowQuery.Append(&slowQueryInfo{start: now.Add(6 * time.Second), duration: 0}) - c.Assert(len(slowQuery.data), Equals, 6) - c.Assert(slowQuery.data[0].duration, Equals, 0*time.Nanosecond) + c.Assert(len(slowQuery.user.data), Equals, 6) + c.Assert(slowQuery.user.data[0].duration, Equals, 0*time.Nanosecond) slowQuery.Refresh(now.Add(6 * time.Second)) - c.Assert(len(slowQuery.data), Equals, 4) - c.Assert(slowQuery.data[0].duration, Equals, 0*time.Nanosecond) + c.Assert(len(slowQuery.user.data), Equals, 4) + c.Assert(slowQuery.user.data[0].duration, Equals, 0*time.Nanosecond) } -func checkHeap(q *topNSlowQueries, c *C) { +func checkHeap(q *slowQueryHeap, c *C) { for i := 0; i < len(q.data); i++ { left := 2*i + 1 right := 2*i + 2 diff --git a/executor/adapter.go b/executor/adapter.go index c5cfbb2d8f721..cf68209ac71d2 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -372,13 +372,11 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) { logutil.SlowQueryLogger.Warnf( "[SLOW_QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v", internal, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql) - if !sessVars.InRestrictedSQL { - var userString string - if user != nil { - userString = user.String() - } - domain.GetDomain(a.Ctx).LogTopNSlowQuery(sql, a.startTime, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, txnTS, userString, currentDB, tableIDs, indexIDs) + var userString string + if user != nil { + userString = user.String() } + domain.GetDomain(a.Ctx).LogTopNSlowQuery(sql, a.startTime, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, txnTS, userString, currentDB, tableIDs, indexIDs, sessVars.InRestrictedSQL) } } From 907530764479f3c36ba0f906d447c628b4fb324b Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 11 Sep 2018 15:19:20 +0800 Subject: [PATCH 9/9] address comment --- domain/domain.go | 21 +------ domain/topn_slow_query.go | 49 ++++++++-------- domain/topn_slow_query_test.go | 100 ++++++++++++++++----------------- executor/adapter.go | 15 ++++- 4 files changed, 91 insertions(+), 94 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 63ec375c171d9..6e5ba7104ead5 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -38,7 +38,6 @@ import ( "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util" - "github.com/pingcap/tidb/util/execdetails" log "github.com/sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/grpc" @@ -332,25 +331,9 @@ func (do *Domain) Reload() error { } // LogTopNSlowQuery keeps topN recent slow queries in domain. -func (do *Domain) LogTopNSlowQuery(sql string, start time.Time, duration time.Duration, - detail execdetails.ExecDetails, - succ bool, connID, txnTS uint64, - user, db, tableIDs, indexIDs string, internal bool) { +func (do *Domain) LogTopNSlowQuery(query *SlowQueryInfo) { select { - case do.slowQuery.ch <- &slowQueryInfo{ - sql: sql, - start: start, - duration: duration, - detail: detail, - succ: succ, - connID: connID, - txnTS: txnTS, - user: user, - db: db, - tableIDs: tableIDs, - indexIDs: indexIDs, - internal: internal, - }: + case do.slowQuery.ch <- query: default: } } diff --git a/domain/topn_slow_query.go b/domain/topn_slow_query.go index d03222d305efc..b97a9181663ba 100644 --- a/domain/topn_slow_query.go +++ b/domain/topn_slow_query.go @@ -21,15 +21,15 @@ import ( ) type slowQueryHeap struct { - data []*slowQueryInfo + data []*SlowQueryInfo } func (h *slowQueryHeap) Len() int { return len(h.data) } -func (h *slowQueryHeap) Less(i, j int) bool { return h.data[i].duration < h.data[j].duration } +func (h *slowQueryHeap) Less(i, j int) bool { return h.data[i].Duration < h.data[j].Duration } func (h *slowQueryHeap) Swap(i, j int) { h.data[i], h.data[j] = h.data[j], h.data[i] } func (h *slowQueryHeap) Push(x interface{}) { - h.data = append(h.data, x.(*slowQueryInfo)) + h.data = append(h.data, x.(*SlowQueryInfo)) } func (h *slowQueryHeap) Pop() interface{} { @@ -44,7 +44,7 @@ func (h *slowQueryHeap) Refresh(now time.Time, recent time.Duration) { // Remove outdated slow query element. idx := 0 for i := 0; i < len(h.data); i++ { - outdateTime := h.data[i].start.Add(recent) + outdateTime := h.data[i].Start.Add(recent) if outdateTime.After(now) { h.data[idx] = h.data[i] idx++ @@ -66,23 +66,23 @@ type topNSlowQueries struct { internal slowQueryHeap topN int recent time.Duration - ch chan *slowQueryInfo + ch chan *SlowQueryInfo } func newTopNSlowQueries(topN int, recent time.Duration) *topNSlowQueries { ret := &topNSlowQueries{ topN: topN, recent: recent, - ch: make(chan *slowQueryInfo, 1000), + ch: make(chan *SlowQueryInfo, 1000), } - ret.user.data = make([]*slowQueryInfo, 0, topN) - ret.internal.data = make([]*slowQueryInfo, 0, topN) + ret.user.data = make([]*SlowQueryInfo, 0, topN) + ret.internal.data = make([]*SlowQueryInfo, 0, topN) return ret } -func (q *topNSlowQueries) Append(info *slowQueryInfo) { +func (q *topNSlowQueries) Append(info *SlowQueryInfo) { var h *slowQueryHeap - if info.internal { + if info.Internal { h = &q.internal } else { h = &q.user @@ -95,7 +95,7 @@ func (q *topNSlowQueries) Append(info *slowQueryInfo) { } // Replace the heap top. - if info.duration > h.data[0].duration { + if info.Duration > h.data[0].Duration { heap.Pop(h) heap.Push(h, info) } @@ -110,17 +110,18 @@ func (q *topNSlowQueries) Close() { close(q.ch) } -type slowQueryInfo struct { - sql string - start time.Time - duration time.Duration - detail execdetails.ExecDetails - succ bool - connID uint64 - txnTS uint64 - user string - db string - tableIDs string - indexIDs string - internal bool +// SlowQueryInfo is a struct to record slow query info. +type SlowQueryInfo struct { + SQL string + Start time.Time + Duration time.Duration + Detail execdetails.ExecDetails + Succ bool + ConnID uint64 + TxnTS uint64 + User string + DB string + TableIDs string + IndexIDs string + Internal bool } diff --git a/domain/topn_slow_query_test.go b/domain/topn_slow_query_test.go index 653cd484226b5..c804e8ba75b17 100644 --- a/domain/topn_slow_query_test.go +++ b/domain/topn_slow_query_test.go @@ -26,74 +26,74 @@ type testTopNSlowQuerySuite struct{} func (t *testTopNSlowQuerySuite) TestPush(c *C) { slowQuery := newTopNSlowQueries(10, 0) // Insert data into the heap. - slowQuery.Append(&slowQueryInfo{duration: 300 * time.Millisecond}) - slowQuery.Append(&slowQueryInfo{duration: 400 * time.Millisecond}) - slowQuery.Append(&slowQueryInfo{duration: 500 * time.Millisecond}) - slowQuery.Append(&slowQueryInfo{duration: 600 * time.Millisecond}) - slowQuery.Append(&slowQueryInfo{duration: 700 * time.Millisecond}) - slowQuery.Append(&slowQueryInfo{duration: 800 * time.Millisecond}) - slowQuery.Append(&slowQueryInfo{duration: 900 * time.Millisecond}) - slowQuery.Append(&slowQueryInfo{duration: 1000 * time.Millisecond}) - slowQuery.Append(&slowQueryInfo{duration: 1100 * time.Millisecond}) - slowQuery.Append(&slowQueryInfo{duration: 1200 * time.Millisecond}) - c.Assert(slowQuery.user.data[0].duration, Equals, 300*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 300 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 400 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 500 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 600 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 700 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 800 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 900 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 1000 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 1100 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 1200 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 300*time.Millisecond) checkHeap(&slowQuery.user, c) // Update all data in the heap. - slowQuery.Append(&slowQueryInfo{duration: 1300 * time.Millisecond}) - c.Assert(slowQuery.user.data[0].duration, Equals, 400*time.Millisecond) - slowQuery.Append(&slowQueryInfo{duration: 1400 * time.Millisecond}) - c.Assert(slowQuery.user.data[0].duration, Equals, 500*time.Millisecond) - slowQuery.Append(&slowQueryInfo{duration: 1500 * time.Millisecond}) - c.Assert(slowQuery.user.data[0].duration, Equals, 600*time.Millisecond) - slowQuery.Append(&slowQueryInfo{duration: 1500 * time.Millisecond}) - c.Assert(slowQuery.user.data[0].duration, Equals, 700*time.Millisecond) - slowQuery.Append(&slowQueryInfo{duration: 1600 * time.Millisecond}) - c.Assert(slowQuery.user.data[0].duration, Equals, 800*time.Millisecond) - slowQuery.Append(&slowQueryInfo{duration: 1700 * time.Millisecond}) - c.Assert(slowQuery.user.data[0].duration, Equals, 900*time.Millisecond) - slowQuery.Append(&slowQueryInfo{duration: 1800 * time.Millisecond}) - c.Assert(slowQuery.user.data[0].duration, Equals, 1000*time.Millisecond) - slowQuery.Append(&slowQueryInfo{duration: 1900 * time.Millisecond}) - c.Assert(slowQuery.user.data[0].duration, Equals, 1100*time.Millisecond) - slowQuery.Append(&slowQueryInfo{duration: 2000 * time.Millisecond}) - c.Assert(slowQuery.user.data[0].duration, Equals, 1200*time.Millisecond) - slowQuery.Append(&slowQueryInfo{duration: 2100 * time.Millisecond}) - c.Assert(slowQuery.user.data[0].duration, Equals, 1300*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1300 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 400*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1400 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 500*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1500 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 600*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1500 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 700*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1600 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 800*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1700 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 900*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1800 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 1000*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1900 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 1100*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 2000 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 1200*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 2100 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond) checkHeap(&slowQuery.user, c) // Data smaller than heap top will not be inserted. - slowQuery.Append(&slowQueryInfo{duration: 1200 * time.Millisecond}) - c.Assert(slowQuery.user.data[0].duration, Equals, 1300*time.Millisecond) - slowQuery.Append(&slowQueryInfo{duration: 666 * time.Millisecond}) - c.Assert(slowQuery.user.data[0].duration, Equals, 1300*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1200 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 666 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond) } func (t *testTopNSlowQuerySuite) TestRefresh(c *C) { now := time.Now() slowQuery := newTopNSlowQueries(6, 3*time.Second) - slowQuery.Append(&slowQueryInfo{start: now, duration: 6}) - slowQuery.Append(&slowQueryInfo{start: now.Add(1 * time.Second), duration: 5}) - slowQuery.Append(&slowQueryInfo{start: now.Add(2 * time.Second), duration: 4}) - slowQuery.Append(&slowQueryInfo{start: now.Add(3 * time.Second), duration: 3}) - slowQuery.Append(&slowQueryInfo{start: now.Add(4 * time.Second), duration: 2}) - c.Assert(slowQuery.user.data[0].duration, Equals, 2*time.Nanosecond) + slowQuery.Append(&SlowQueryInfo{Start: now, Duration: 6}) + slowQuery.Append(&SlowQueryInfo{Start: now.Add(1 * time.Second), Duration: 5}) + slowQuery.Append(&SlowQueryInfo{Start: now.Add(2 * time.Second), Duration: 4}) + slowQuery.Append(&SlowQueryInfo{Start: now.Add(3 * time.Second), Duration: 3}) + slowQuery.Append(&SlowQueryInfo{Start: now.Add(4 * time.Second), Duration: 2}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 2*time.Nanosecond) slowQuery.Refresh(now.Add(5 * time.Second)) c.Assert(len(slowQuery.user.data), Equals, 2) - c.Assert(slowQuery.user.data[0].duration, Equals, 2*time.Nanosecond) + c.Assert(slowQuery.user.data[0].Duration, Equals, 2*time.Nanosecond) - slowQuery.Append(&slowQueryInfo{start: now.Add(3 * time.Second), duration: 3}) - slowQuery.Append(&slowQueryInfo{start: now.Add(4 * time.Second), duration: 2}) - slowQuery.Append(&slowQueryInfo{start: now.Add(5 * time.Second), duration: 1}) - slowQuery.Append(&slowQueryInfo{start: now.Add(6 * time.Second), duration: 0}) + slowQuery.Append(&SlowQueryInfo{Start: now.Add(3 * time.Second), Duration: 3}) + slowQuery.Append(&SlowQueryInfo{Start: now.Add(4 * time.Second), Duration: 2}) + slowQuery.Append(&SlowQueryInfo{Start: now.Add(5 * time.Second), Duration: 1}) + slowQuery.Append(&SlowQueryInfo{Start: now.Add(6 * time.Second), Duration: 0}) c.Assert(len(slowQuery.user.data), Equals, 6) - c.Assert(slowQuery.user.data[0].duration, Equals, 0*time.Nanosecond) + c.Assert(slowQuery.user.data[0].Duration, Equals, 0*time.Nanosecond) slowQuery.Refresh(now.Add(6 * time.Second)) c.Assert(len(slowQuery.user.data), Equals, 4) - c.Assert(slowQuery.user.data[0].duration, Equals, 0*time.Nanosecond) + c.Assert(slowQuery.user.data[0].Duration, Equals, 0*time.Nanosecond) } func checkHeap(q *slowQueryHeap, c *C) { @@ -101,10 +101,10 @@ func checkHeap(q *slowQueryHeap, c *C) { left := 2*i + 1 right := 2*i + 2 if left < len(q.data) { - c.Assert(q.data[i].duration, LessEqual, q.data[left].duration) + c.Assert(q.data[i].Duration, LessEqual, q.data[left].Duration) } if right < len(q.data) { - c.Assert(q.data[i].duration, LessEqual, q.data[right].duration) + c.Assert(q.data[i].Duration, LessEqual, q.data[right].Duration) } } } diff --git a/executor/adapter.go b/executor/adapter.go index cf68209ac71d2..642bdb2b86c16 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -376,7 +376,20 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) { if user != nil { userString = user.String() } - domain.GetDomain(a.Ctx).LogTopNSlowQuery(sql, a.startTime, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, txnTS, userString, currentDB, tableIDs, indexIDs, sessVars.InRestrictedSQL) + domain.GetDomain(a.Ctx).LogTopNSlowQuery(&domain.SlowQueryInfo{ + SQL: sql, + Start: a.startTime, + Duration: costTime, + Detail: sessVars.StmtCtx.GetExecDetails(), + Succ: succ, + ConnID: connID, + TxnTS: txnTS, + User: userString, + DB: currentDB, + TableIDs: tableIDs, + IndexIDs: indexIDs, + Internal: sessVars.InRestrictedSQL, + }) } }