Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain,executor: store topN slow query in domain #7646

Merged
merged 13 commits into from
Sep 12, 2018
31 changes: 31 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Domain struct {
etcdClient *clientv3.Client
wg sync.WaitGroup
gvc GlobalVariableCache
slowQuery *topNSlowQueries

MockReloadFailed MockFailure // It mocks reload failed.
}
Expand Down Expand Up @@ -329,6 +330,32 @@ func (do *Domain) Reload() error {
return nil
}

// LogTopNSlowQuery keeps topN recent slow queries in domain.
func (do *Domain) LogTopNSlowQuery(query *SlowQueryInfo) {
select {
case do.slowQuery.ch <- query:
default:
}
}

func (do *Domain) topNSlowQueryLoop() {
defer recoverInDomain("topNSlowQueryLoop", false)
defer do.wg.Done()
ticker := time.NewTicker(time.Minute * 10)
defer ticker.Stop()
for {
select {
case now := <-ticker.C:
do.slowQuery.Refresh(now)
case info, ok := <-do.slowQuery.ch:
if !ok {
return
}
do.slowQuery.Append(info)
}
}
}

func (do *Domain) loadSchemaInLoop(lease time.Duration) {
defer do.wg.Done()
// Lease renewal can run at any frequency.
Expand Down Expand Up @@ -408,6 +435,7 @@ func (do *Domain) Close() {
if do.etcdClient != nil {
terror.Log(errors.Trace(do.etcdClient.Close()))
}
do.slowQuery.Close()
do.sysSessionPool.Close()
do.wg.Wait()
log.Info("[domain] close")
Expand Down Expand Up @@ -471,6 +499,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, resourceIdleTimeout),
statsLease: statsLease,
infoHandle: infoschema.NewHandle(store),
slowQuery: newTopNSlowQueries(30, time.Hour*24*7),
}
}

Expand Down Expand Up @@ -529,6 +558,8 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
// Local store needs to get the change information for every DDL state in each session.
go do.loadSchemaInLoop(ddlLease)
}
do.wg.Add(1)
go do.topNSlowQueryLoop()

return nil
}
Expand Down
127 changes: 127 additions & 0 deletions domain/topn_slow_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package domain

import (
"container/heap"
"time"

"github.com/pingcap/tidb/util/execdetails"
)

type slowQueryHeap struct {
data []*SlowQueryInfo
}

func (h *slowQueryHeap) Len() int { return len(h.data) }
func (h *slowQueryHeap) Less(i, j int) bool { return h.data[i].Duration < h.data[j].Duration }
func (h *slowQueryHeap) Swap(i, j int) { h.data[i], h.data[j] = h.data[j], h.data[i] }

func (h *slowQueryHeap) Push(x interface{}) {
h.data = append(h.data, x.(*SlowQueryInfo))
}

func (h *slowQueryHeap) Pop() interface{} {
old := h.data
n := len(old)
x := old[n-1]
h.data = old[0 : n-1]
return x
}

func (h *slowQueryHeap) Refresh(now time.Time, recent time.Duration) {
// Remove outdated slow query element.
idx := 0
for i := 0; i < len(h.data); i++ {
outdateTime := h.data[i].Start.Add(recent)
if outdateTime.After(now) {
h.data[idx] = h.data[i]
idx++
}
}
if len(h.data) == idx {
return
}

// Rebuild the heap.
h.data = h.data[:idx]
heap.Init(h)
}

// topNSlowQueries maintains two heaps to store recent slow queries: one for user's and one for internal.
// N = 30, recent = 7 days by default.
type topNSlowQueries struct {
user slowQueryHeap
internal slowQueryHeap
topN int
recent time.Duration
ch chan *SlowQueryInfo
}

func newTopNSlowQueries(topN int, recent time.Duration) *topNSlowQueries {
ret := &topNSlowQueries{
topN: topN,
recent: recent,
ch: make(chan *SlowQueryInfo, 1000),
}
ret.user.data = make([]*SlowQueryInfo, 0, topN)
ret.internal.data = make([]*SlowQueryInfo, 0, topN)
return ret
}

func (q *topNSlowQueries) Append(info *SlowQueryInfo) {
var h *slowQueryHeap
if info.Internal {
h = &q.internal
} else {
h = &q.user
}

// Heap is not full.
if len(h.data) < q.topN {
heap.Push(h, info)
return
}

// Replace the heap top.
if info.Duration > h.data[0].Duration {
heap.Pop(h)
heap.Push(h, info)
}
}

func (q *topNSlowQueries) Refresh(now time.Time) {
q.user.Refresh(now, q.recent)
q.internal.Refresh(now, q.recent)
}

func (q *topNSlowQueries) Close() {
close(q.ch)
}

// SlowQueryInfo is a struct to record slow query info.
type SlowQueryInfo struct {
SQL string
Start time.Time
Duration time.Duration
Detail execdetails.ExecDetails
Succ bool
ConnID uint64
TxnTS uint64
User string
DB string
TableIDs string
IndexIDs string
Internal bool
}
110 changes: 110 additions & 0 deletions domain/topn_slow_query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package domain

import (
"time"

. "github.com/pingcap/check"
)

var _ = Suite(&testTopNSlowQuerySuite{})

type testTopNSlowQuerySuite struct{}

func (t *testTopNSlowQuerySuite) TestPush(c *C) {
slowQuery := newTopNSlowQueries(10, 0)
// Insert data into the heap.
slowQuery.Append(&SlowQueryInfo{Duration: 300 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 400 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 500 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 600 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 700 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 800 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 900 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 1000 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 1100 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 1200 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 300*time.Millisecond)
checkHeap(&slowQuery.user, c)

// Update all data in the heap.
slowQuery.Append(&SlowQueryInfo{Duration: 1300 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 400*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1400 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 500*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1500 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 600*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1500 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 700*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1600 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 800*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1700 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 900*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1800 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1000*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1900 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1100*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 2000 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1200*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 2100 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond)
checkHeap(&slowQuery.user, c)

// Data smaller than heap top will not be inserted.
slowQuery.Append(&SlowQueryInfo{Duration: 1200 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 666 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond)
}

func (t *testTopNSlowQuerySuite) TestRefresh(c *C) {
now := time.Now()
slowQuery := newTopNSlowQueries(6, 3*time.Second)

slowQuery.Append(&SlowQueryInfo{Start: now, Duration: 6})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(1 * time.Second), Duration: 5})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(2 * time.Second), Duration: 4})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(3 * time.Second), Duration: 3})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(4 * time.Second), Duration: 2})
c.Assert(slowQuery.user.data[0].Duration, Equals, 2*time.Nanosecond)

slowQuery.Refresh(now.Add(5 * time.Second))
c.Assert(len(slowQuery.user.data), Equals, 2)
c.Assert(slowQuery.user.data[0].Duration, Equals, 2*time.Nanosecond)

slowQuery.Append(&SlowQueryInfo{Start: now.Add(3 * time.Second), Duration: 3})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(4 * time.Second), Duration: 2})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(5 * time.Second), Duration: 1})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(6 * time.Second), Duration: 0})
c.Assert(len(slowQuery.user.data), Equals, 6)
c.Assert(slowQuery.user.data[0].Duration, Equals, 0*time.Nanosecond)

slowQuery.Refresh(now.Add(6 * time.Second))
c.Assert(len(slowQuery.user.data), Equals, 4)
c.Assert(slowQuery.user.data[0].Duration, Equals, 0*time.Nanosecond)
}

func checkHeap(q *slowQueryHeap, c *C) {
for i := 0; i < len(q.data); i++ {
left := 2*i + 1
right := 2*i + 2
if left < len(q.data) {
c.Assert(q.data[i].Duration, LessEqual, q.data[left].Duration)
}
if right < len(q.data) {
c.Assert(q.data[i].Duration, LessEqual, q.data[right].Duration)
}
}
}
19 changes: 19 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -371,6 +372,24 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) {
logutil.SlowQueryLogger.Warnf(
"[SLOW_QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
internal, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
var userString string
if user != nil {
userString = user.String()
}
domain.GetDomain(a.Ctx).LogTopNSlowQuery(&domain.SlowQueryInfo{
SQL: sql,
Start: a.startTime,
Duration: costTime,
Detail: sessVars.StmtCtx.GetExecDetails(),
Succ: succ,
ConnID: connID,
TxnTS: txnTS,
User: userString,
DB: currentDB,
TableIDs: tableIDs,
IndexIDs: indexIDs,
Internal: sessVars.InRestrictedSQL,
})
}
}

Expand Down