-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
domain,executor: store topN slow query in domain #7646
Changes from 7 commits
5f028d2
5f3878e
b2af9cb
537e26d
85d5f0d
3b126bf
5941733
c388314
8dc40fb
9075307
d88c674
94f4ab8
e01a1f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ import ( | |
"github.com/pingcap/tidb/statistics" | ||
"github.com/pingcap/tidb/terror" | ||
"github.com/pingcap/tidb/util" | ||
"github.com/pingcap/tidb/util/execdetails" | ||
log "github.com/sirupsen/logrus" | ||
"golang.org/x/net/context" | ||
"google.golang.org/grpc" | ||
|
@@ -60,6 +61,7 @@ type Domain struct { | |
etcdClient *clientv3.Client | ||
wg sync.WaitGroup | ||
gvc GlobalVariableCache | ||
slowQuery *topNSlowQueries | ||
|
||
MockReloadFailed MockFailure // It mocks reload failed. | ||
} | ||
|
@@ -329,6 +331,46 @@ func (do *Domain) Reload() error { | |
return nil | ||
} | ||
|
||
// LogTopNSlowQuery keeps topN recent slow queries in domain. | ||
func (do *Domain) LogTopNSlowQuery(sql string, start time.Time, duration time.Duration, | ||
detail execdetails.ExecDetails, | ||
succ bool, connID, txnTS uint64, | ||
user, db, tableIDs, indexIDs string) { | ||
select { | ||
case do.slowQuery.ch <- &slowQueryInfo{ | ||
sql: sql, | ||
start: start, | ||
duration: duration, | ||
detail: detail, | ||
succ: succ, | ||
connID: connID, | ||
txnTS: txnTS, | ||
user: user, | ||
db: db, | ||
tableIDs: tableIDs, | ||
indexIDs: indexIDs, | ||
}: | ||
default: | ||
} | ||
} | ||
|
||
func (do *Domain) topNSlowQueryLoop() { | ||
defer do.wg.Done() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. recover this goroutine. |
||
ticker := time.NewTicker(time.Minute * 10) | ||
defer ticker.Stop() | ||
for { | ||
select { | ||
case now := <-ticker.C: | ||
do.slowQuery.Refresh(now) | ||
case info, ok := <-do.slowQuery.ch: | ||
if !ok { | ||
return | ||
} | ||
do.slowQuery.Append(info) | ||
} | ||
} | ||
} | ||
|
||
func (do *Domain) loadSchemaInLoop(lease time.Duration) { | ||
defer do.wg.Done() | ||
// Lease renewal can run at any frequency. | ||
|
@@ -408,6 +450,7 @@ func (do *Domain) Close() { | |
if do.etcdClient != nil { | ||
terror.Log(errors.Trace(do.etcdClient.Close())) | ||
} | ||
do.slowQuery.Close() | ||
do.sysSessionPool.Close() | ||
do.wg.Wait() | ||
log.Info("[domain] close") | ||
|
@@ -471,6 +514,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio | |
sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, resourceIdleTimeout), | ||
statsLease: statsLease, | ||
infoHandle: infoschema.NewHandle(store), | ||
slowQuery: newTopNSlowQueries(30, time.Hour*24*7), | ||
} | ||
} | ||
|
||
|
@@ -529,6 +573,8 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R | |
// Local store needs to get the change information for every DDL state in each session. | ||
go do.loadSchemaInLoop(ddlLease) | ||
} | ||
do.wg.Add(1) | ||
go do.topNSlowQueryLoop() | ||
|
||
return nil | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
// Copyright 2018 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package domain | ||
|
||
import ( | ||
"container/heap" | ||
"time" | ||
|
||
"github.com/pingcap/tidb/util/execdetails" | ||
) | ||
|
||
// topNSlowQueries maintains a heap to store recent slow queries. | ||
// N = 30, recent = 7 days by default. | ||
type topNSlowQueries struct { | ||
data []*slowQueryInfo | ||
topN int | ||
recent time.Duration | ||
ch chan *slowQueryInfo | ||
} | ||
|
||
func newTopNSlowQueries(topN int, recent time.Duration) *topNSlowQueries { | ||
return &topNSlowQueries{ | ||
data: make([]*slowQueryInfo, 0, topN), | ||
topN: topN, | ||
recent: recent, | ||
ch: make(chan *slowQueryInfo, 1000), | ||
} | ||
} | ||
|
||
func (q *topNSlowQueries) Close() { | ||
close(q.ch) | ||
} | ||
|
||
func (q *topNSlowQueries) Len() int { return len(q.data) } | ||
func (q *topNSlowQueries) Less(i, j int) bool { return q.data[i].duration < q.data[j].duration } | ||
func (q *topNSlowQueries) Swap(i, j int) { q.data[i], q.data[j] = q.data[j], q.data[i] } | ||
|
||
func (q *topNSlowQueries) Push(x interface{}) { | ||
q.data = append(q.data, x.(*slowQueryInfo)) | ||
} | ||
|
||
func (q *topNSlowQueries) Pop() interface{} { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pop can only return the minimum duration query, how can we implement topn, for example, the n is 30, and I wanna get top 3 query. and how can we just peek the heap? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe a b-tree is better? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The read operation is not implemented in this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Read will not be a frequent operation, so we just copy on read. |
||
old := q.data | ||
n := len(old) | ||
x := old[n-1] | ||
q.data = old[0 : n-1] | ||
return x | ||
} | ||
|
||
func (q *topNSlowQueries) Append(info *slowQueryInfo) { | ||
// Heap is not full, append to it and sift up. | ||
if len(q.data) < q.topN { | ||
heap.Push(q, info) | ||
return | ||
} | ||
|
||
// Replace the heap top and sift down. | ||
if info.duration > q.data[0].duration { | ||
heap.Pop(q) | ||
heap.Push(q, info) | ||
} | ||
} | ||
|
||
func (q *topNSlowQueries) Refresh(now time.Time) { | ||
// Remove outdated slow query element. | ||
idx := 0 | ||
for i := 0; i < len(q.data); i++ { | ||
outdateTime := q.data[i].start.Add(q.recent) | ||
if outdateTime.After(now) { | ||
q.data[idx] = q.data[i] | ||
idx++ | ||
} | ||
} | ||
if len(q.data) == idx { | ||
return | ||
} | ||
|
||
// Rebuild the heap. | ||
q.data = q.data[:idx] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You must use a lock to protect q.data, as long as you need to read the slice later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can do reading in the same goroutine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll use copy on read, and they will be in one goroutine, no lock. @winkyao |
||
heap.Init(q) | ||
} | ||
|
||
type slowQueryInfo struct { | ||
sql string | ||
start time.Time | ||
duration time.Duration | ||
detail execdetails.ExecDetails | ||
succ bool | ||
connID uint64 | ||
txnTS uint64 | ||
user string | ||
db string | ||
tableIDs string | ||
indexIDs string | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
// Copyright 2018 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package domain | ||
|
||
import ( | ||
"time" | ||
|
||
. "github.com/pingcap/check" | ||
) | ||
|
||
var _ = Suite(&testTopNSlowQuerySuite{}) | ||
|
||
type testTopNSlowQuerySuite struct{} | ||
|
||
func (t *testTopNSlowQuerySuite) TestPush(c *C) { | ||
slowQuery := newTopNSlowQueries(10, 0) | ||
// Insert data into the heap. | ||
slowQuery.Append(&slowQueryInfo{duration: 300 * time.Millisecond}) | ||
slowQuery.Append(&slowQueryInfo{duration: 400 * time.Millisecond}) | ||
slowQuery.Append(&slowQueryInfo{duration: 500 * time.Millisecond}) | ||
slowQuery.Append(&slowQueryInfo{duration: 600 * time.Millisecond}) | ||
slowQuery.Append(&slowQueryInfo{duration: 700 * time.Millisecond}) | ||
slowQuery.Append(&slowQueryInfo{duration: 800 * time.Millisecond}) | ||
slowQuery.Append(&slowQueryInfo{duration: 900 * time.Millisecond}) | ||
slowQuery.Append(&slowQueryInfo{duration: 1000 * time.Millisecond}) | ||
slowQuery.Append(&slowQueryInfo{duration: 1100 * time.Millisecond}) | ||
slowQuery.Append(&slowQueryInfo{duration: 1200 * time.Millisecond}) | ||
c.Assert(slowQuery.data[0].duration, Equals, 300*time.Millisecond) | ||
checkHeap(slowQuery, c) | ||
|
||
// Update all data in the heap. | ||
slowQuery.Append(&slowQueryInfo{duration: 1300 * time.Millisecond}) | ||
c.Assert(slowQuery.data[0].duration, Equals, 400*time.Millisecond) | ||
slowQuery.Append(&slowQueryInfo{duration: 1400 * time.Millisecond}) | ||
c.Assert(slowQuery.data[0].duration, Equals, 500*time.Millisecond) | ||
slowQuery.Append(&slowQueryInfo{duration: 1500 * time.Millisecond}) | ||
c.Assert(slowQuery.data[0].duration, Equals, 600*time.Millisecond) | ||
slowQuery.Append(&slowQueryInfo{duration: 1500 * time.Millisecond}) | ||
c.Assert(slowQuery.data[0].duration, Equals, 700*time.Millisecond) | ||
slowQuery.Append(&slowQueryInfo{duration: 1600 * time.Millisecond}) | ||
c.Assert(slowQuery.data[0].duration, Equals, 800*time.Millisecond) | ||
slowQuery.Append(&slowQueryInfo{duration: 1700 * time.Millisecond}) | ||
c.Assert(slowQuery.data[0].duration, Equals, 900*time.Millisecond) | ||
slowQuery.Append(&slowQueryInfo{duration: 1800 * time.Millisecond}) | ||
c.Assert(slowQuery.data[0].duration, Equals, 1000*time.Millisecond) | ||
slowQuery.Append(&slowQueryInfo{duration: 1900 * time.Millisecond}) | ||
c.Assert(slowQuery.data[0].duration, Equals, 1100*time.Millisecond) | ||
slowQuery.Append(&slowQueryInfo{duration: 2000 * time.Millisecond}) | ||
c.Assert(slowQuery.data[0].duration, Equals, 1200*time.Millisecond) | ||
slowQuery.Append(&slowQueryInfo{duration: 2100 * time.Millisecond}) | ||
c.Assert(slowQuery.data[0].duration, Equals, 1300*time.Millisecond) | ||
checkHeap(slowQuery, c) | ||
|
||
// Data smaller than heap top will not be inserted. | ||
slowQuery.Append(&slowQueryInfo{duration: 1200 * time.Millisecond}) | ||
c.Assert(slowQuery.data[0].duration, Equals, 1300*time.Millisecond) | ||
slowQuery.Append(&slowQueryInfo{duration: 666 * time.Millisecond}) | ||
c.Assert(slowQuery.data[0].duration, Equals, 1300*time.Millisecond) | ||
} | ||
|
||
func (t *testTopNSlowQuerySuite) TestRefresh(c *C) { | ||
now := time.Now() | ||
slowQuery := newTopNSlowQueries(6, 3*time.Second) | ||
|
||
slowQuery.Append(&slowQueryInfo{start: now, duration: 6}) | ||
slowQuery.Append(&slowQueryInfo{start: now.Add(1 * time.Second), duration: 5}) | ||
slowQuery.Append(&slowQueryInfo{start: now.Add(2 * time.Second), duration: 4}) | ||
slowQuery.Append(&slowQueryInfo{start: now.Add(3 * time.Second), duration: 3}) | ||
slowQuery.Append(&slowQueryInfo{start: now.Add(4 * time.Second), duration: 2}) | ||
c.Assert(slowQuery.data[0].duration, Equals, 2*time.Nanosecond) | ||
|
||
slowQuery.Refresh(now.Add(5 * time.Second)) | ||
c.Assert(len(slowQuery.data), Equals, 2) | ||
c.Assert(slowQuery.data[0].duration, Equals, 2*time.Nanosecond) | ||
|
||
slowQuery.Append(&slowQueryInfo{start: now.Add(3 * time.Second), duration: 3}) | ||
slowQuery.Append(&slowQueryInfo{start: now.Add(4 * time.Second), duration: 2}) | ||
slowQuery.Append(&slowQueryInfo{start: now.Add(5 * time.Second), duration: 1}) | ||
slowQuery.Append(&slowQueryInfo{start: now.Add(6 * time.Second), duration: 0}) | ||
c.Assert(len(slowQuery.data), Equals, 6) | ||
c.Assert(slowQuery.data[0].duration, Equals, 0*time.Nanosecond) | ||
|
||
slowQuery.Refresh(now.Add(6 * time.Second)) | ||
c.Assert(len(slowQuery.data), Equals, 4) | ||
c.Assert(slowQuery.data[0].duration, Equals, 0*time.Nanosecond) | ||
} | ||
|
||
func checkHeap(q *topNSlowQueries, c *C) { | ||
for i := 0; i < len(q.data); i++ { | ||
left := 2*i + 1 | ||
right := 2*i + 2 | ||
if left < len(q.data) { | ||
c.Assert(q.data[i].duration, LessEqual, q.data[left].duration) | ||
} | ||
if right < len(q.data) { | ||
c.Assert(q.data[i].duration, LessEqual, q.data[right].duration) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ import ( | |
"github.com/juju/errors" | ||
"github.com/pingcap/tidb/ast" | ||
"github.com/pingcap/tidb/config" | ||
"github.com/pingcap/tidb/domain" | ||
"github.com/pingcap/tidb/expression" | ||
"github.com/pingcap/tidb/infoschema" | ||
"github.com/pingcap/tidb/kv" | ||
|
@@ -371,6 +372,13 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) { | |
logutil.SlowQueryLogger.Warnf( | ||
"[SLOW_QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v", | ||
internal, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql) | ||
if !sessVars.InRestrictedSQL { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just log general sql? I prefer to keep two heap to log the general sql and internal sql. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Internal SQL are always the same, it will not give us too much information. |
||
var userString string | ||
if user != nil { | ||
userString = user.String() | ||
} | ||
domain.GetDomain(a.Ctx).LogTopNSlowQuery(sql, a.startTime, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, txnTS, userString, currentDB, tableIDs, indexIDs) | ||
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function takes so many parameters, which makes it hard to read and maintain, could you extract a struct to store all the parameters and pass the struct as the parameter instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe
slowQueryInfo
is 🐶There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about exporting
slowQueryInfo
and useslowQueryInfo
instead?