Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topsql: introduce stmtstats and sql execution count #30277

Merged
merged 62 commits into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
ee242e4
Init sql execution count
mornyx Nov 22, 2021
101fd27
Merge branch 'master' into exec-count
mornyx Nov 24, 2021
f9719b7
Merge branch 'master' into exec-count
mornyx Nov 29, 2021
91b2048
Add exec-count for prepared statements; Add comments
mornyx Nov 30, 2021
89f10ba
Add unit-tests
mornyx Nov 30, 2021
9340a27
Add todo
mornyx Nov 30, 2021
136675a
Add license
mornyx Nov 30, 2021
671ccf1
Rename
mornyx Nov 30, 2021
979efa7
Add comments
mornyx Nov 30, 2021
9868bf2
Init sql execution count for tikv
mornyx Dec 3, 2021
1c95a0a
Use exec-count as a separate package to avoid circular dependencies
mornyx Dec 3, 2021
5a53b8e
Integrate ExecCounter and KvExecCounter
mornyx Dec 3, 2021
21df3ec
Add unit-tests
mornyx Dec 3, 2021
5fb0752
Add comments
mornyx Dec 3, 2021
d6919bb
Simplify the code in distsql
mornyx Dec 3, 2021
347480c
Rename
mornyx Dec 3, 2021
684ddf4
Add comments
mornyx Dec 3, 2021
7c0889f
Introduce stmtstats
mornyx Dec 9, 2021
50629ef
Fix npe; Add comments; Remove print
mornyx Dec 9, 2021
a520036
Remove local mod replace
mornyx Dec 9, 2021
119a525
Merge branch 'master' into exec-count
mornyx Dec 9, 2021
b51b394
Fix ut
mornyx Dec 9, 2021
7bc8589
Merge branch 'master' into exec-count
crazycs520 Dec 10, 2021
8fcd43b
Merge branch 'master' into exec-count
mornyx Dec 13, 2021
99dad2c
Merge branch 'master' into exec-count
mornyx Dec 13, 2021
73e2e0a
Remove KvExecCounter pointer in session vars
mornyx Dec 14, 2021
a0f8143
Add plan digest for statement stats
mornyx Dec 14, 2021
dfa0590
Upgrade client-go
mornyx Dec 14, 2021
946ea85
Fix typo
mornyx Dec 14, 2021
1b46677
Comments
mornyx Dec 14, 2021
c2209e9
Comments
mornyx Dec 14, 2021
eed9af8
Merge branch 'master' into exec-count
crazycs520 Dec 15, 2021
f66c79b
Remove session.stmtStats
mornyx Dec 15, 2021
6f1e427
Merge remote-tracking branch 'mornyx/exec-count' into exec-count
mornyx Dec 15, 2021
2381e11
Resolve pr comments
mornyx Dec 15, 2021
bb05c16
Eliminate race
mornyx Dec 15, 2021
f99a7c9
Remove explicit ts param
mornyx Dec 16, 2021
0d74444
Add string method
mornyx Dec 16, 2021
98256cf
Merge branch 'master' into exec-count
mornyx Dec 16, 2021
de99322
Add integration tests for stmt stats
mornyx Dec 17, 2021
816bca2
Add comments
mornyx Dec 17, 2021
d1bce96
Upgrade client-go
mornyx Dec 19, 2021
3bfc8d0
Add kv exec count test case
mornyx Dec 19, 2021
32c85e5
Rename
mornyx Dec 19, 2021
474b6d2
Remove print
mornyx Dec 19, 2021
3a4ff9b
Merge branch 'master' into exec-count
mornyx Dec 19, 2021
fb9b379
Use bytes instead of string
mornyx Dec 20, 2021
78a30ca
Move String() to test file
mornyx Dec 20, 2021
467ac79
Add more UT
mornyx Dec 20, 2021
b8ed21b
Fix compile error in test
mornyx Dec 20, 2021
90d94f5
Simplify globalAggregator
mornyx Dec 20, 2021
e9ecf63
Add StatementObserver
mornyx Dec 20, 2021
699cf7e
Extract method
mornyx Dec 20, 2021
db63338
Merge branch 'master' into exec-count
crazycs520 Dec 21, 2021
b9ed865
Reduce boilerplate code
mornyx Dec 21, 2021
94803d6
Use atomic.Bool instead of uint32
mornyx Dec 21, 2021
a295cf2
Mark SetupAggregator & CloseAggregator as not thread-safe
mornyx Dec 21, 2021
f069615
Rename kvexeccount.go -> kv_exec_count.go
mornyx Dec 21, 2021
f5728d6
Resolve comments
mornyx Dec 21, 2021
b855763
Merge remote-tracking branch 'mornyx/exec-count' into exec-count
mornyx Dec 21, 2021
d6dcca9
Merge branch 'master' into exec-count
mornyx Dec 21, 2021
83dc182
Merge branch 'master' into exec-count
ti-chi-bot Dec 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ type session struct {
builtinFunctionUsage telemetry.BuiltinFunctionsUsage
// allowed when tikv disk full happened.
diskFullOpt kvrpcpb.DiskFullOpt

// execCounter is used to count the number of executions of each SQL in
// this session at each point in time. These data will be periodically
// taken away by the background goroutine. The background goroutine will
// continue to aggregate all the local data in each session, and finally
// report them to the remote regularly.
execCounter *topsql.ExecCounter
}

var parserPool = &sync.Pool{New: func() interface{} { return parser.New() }}
Expand Down Expand Up @@ -1537,6 +1544,9 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex
normalizedSQL, digest := s.sessionVars.StmtCtx.SQLDigest()
if variable.TopSQLEnabled() {
ctx = topsql.AttachSQLInfo(ctx, normalizedSQL, digest, "", nil, s.sessionVars.InRestrictedSQL)
if s.execCounter != nil {
s.execCounter.Count(digest.String(), 1)
}
}

if err := s.validateStatementReadOnlyInStaleness(stmtNode); err != nil {
Expand Down Expand Up @@ -1968,6 +1978,11 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
return nil, errors.Errorf("invalid CachedPrepareStmt type")
}
executor.CountStmtNode(preparedStmt.PreparedAst.Stmt, s.sessionVars.InRestrictedSQL)
if variable.TopSQLEnabled() {
if s.execCounter != nil {
s.execCounter.Count(preparedStmt.SQLDigest.String(), 1)
}
}
ok, err = s.IsCachedExecOk(ctx, preparedStmt)
if err != nil {
return nil, err
Expand Down Expand Up @@ -2207,6 +2222,9 @@ func (s *session) Close() {
s.sessionVars.WithdrawAllPreparedStmt()
}
s.ClearDiskFullOpt()
if s.execCounter != nil {
s.execCounter.Close()
}
}

// GetSessionVars implements the context.Context interface.
Expand Down Expand Up @@ -2619,6 +2637,7 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
client: store.GetClient(),
mppClient: store.GetMPPClient(),
builtinFunctionUsage: make(telemetry.BuiltinFunctionsUsage),
execCounter: topsql.CreateExecCounter(),
}
if plannercore.PreparedPlanCacheEnabled() {
if opt != nil && opt.PreparedPlanCache != nil {
Expand Down Expand Up @@ -2652,6 +2671,7 @@ func CreateSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er
client: store.GetClient(),
mppClient: store.GetMPPClient(),
builtinFunctionUsage: make(telemetry.BuiltinFunctionsUsage),
execCounter: topsql.CreateExecCounter(),
}
if plannercore.PreparedPlanCacheEnabled() {
s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity,
Expand Down
188 changes: 188 additions & 0 deletions util/topsql/exec_count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package topsql

import (
"sync"
"time"

"go.uber.org/atomic"
)

const (
// execCounterManagerCollectDuration is the time period for execCounterManager
// to collect data from all ExecCounter s.
execCounterManagerCollectDuration = 3 * time.Second

// execCounterManagerUploadDuration is the time period for execCounterManager
// to report all aggregated data.
execCounterManagerUploadDuration = 30 * time.Second
)

// ExecCountMap represents Map<SQLDigest, Map<Timestamp, Count>>.
// We put SQLDigest in front of the two-dimensional map, because SQLDigest
// is larger than Timestamp. This can reduce unnecessary memory usage.
type ExecCountMap map[string]map[int64]uint64

// Merge merges other into ExecCountMap.
// Values with the same SQL and same timestamp will be added.
func (m ExecCountMap) Merge(other ExecCountMap) {
for newSQL, newTsCount := range other {
tsCount, ok := m[newSQL]
if !ok {
m[newSQL] = newTsCount
continue
}
for ts, count := range newTsCount {
tsCount[ts] += count
}
}
}

// ExecCounter is a counter used locally in each session.
// We can count the number of SQL executions on ExecCounter, and it
// is expected that these statistics will eventually be collected
// and merged in the background.
type ExecCounter struct {
mu sync.Mutex
execCount ExecCountMap
closed *atomic.Bool
}

// CreateExecCounter try to create and register an ExecCounter.
// If we are in the initialization phase and have not yet called SetupTopSQL
// to initialize the top-sql, nothing will happen, and we will get nil. But
// this scene should never appear, because we always call SetupTopSQL before
// starting the server, at this moment we cannot receive connections and will
// not create a valid session. So this case will never happen: "This function
// returns nil, so this session will never count execution of SQL sent by peer
// client".
func CreateExecCounter() *ExecCounter {
if globalExecCounterManager == nil {
return nil
}
counter := &ExecCounter{
execCount: ExecCountMap{},
closed: atomic.NewBool(false),
}
globalExecCounterManager.register(counter)
return counter
}

// Count is used to count the number of executions of a certain SQL.
// You don't need to provide execution time. By default, the time when
// you call Count is considered the time when SQL is ready to execute.
// The parameter sql is a universal string, and ExecCounter does not care
// whether it is a normalized SQL or a stringified SQL digest.
// Count is thread-safe.
func (c *ExecCounter) Count(sql string, n uint64) {
ts := time.Now().Unix()
c.mu.Lock()
defer c.mu.Unlock()
tsCount, ok := c.execCount[sql]
if !ok {
c.execCount[sql] = map[int64]uint64{}
tsCount = c.execCount[sql]
}
tsCount[ts] += n
}

// Take removes all existing data from ExecCounter.
// Take is thread-safe.
func (c *ExecCounter) Take() ExecCountMap {
c.mu.Lock()
defer c.mu.Unlock()
execCount := c.execCount
c.execCount = ExecCountMap{}
return execCount
}

// Close marks ExecCounter as "closed".
// The background goroutine will periodically detect whether each ExecCounter
// has been closed, and if so, it will be cleaned up.
func (c *ExecCounter) Close() {
c.closed.Store(true)
}

// Closed returns whether the ExecCounter has been closed.
func (c *ExecCounter) Closed() bool {
return c.closed.Load()
}

// execCounterManager is used to manage all ExecCounter s.
// It is responsible for collecting data from all ExecCounter s, aggregating
// them together, and regularly cleaning up the closed ExecCounter s.
type execCounterManager struct {
counters sync.Map // map[uint64]*ExecCounter
curCounterID atomic.Uint64
execCount ExecCountMap
closeCh chan struct{}
}

// newExecCountManager creates an empty execCounterManager.
func newExecCountManager() *execCounterManager {
return &execCounterManager{
execCount: ExecCountMap{},
closeCh: make(chan struct{}),
}
}

// Run will block the current goroutine and execute the main task of execCounterManager.
func (m *execCounterManager) Run() {
collectTicker := time.NewTicker(execCounterManagerCollectDuration)
defer collectTicker.Stop()

uploadTicker := time.NewTicker(execCounterManagerUploadDuration)
defer uploadTicker.Stop()

for {
select {
case <-m.closeCh:
return
case <-collectTicker.C:
m.collect()
case <-uploadTicker.C:
// TODO(mornyx): upload m.execCount. Here is a bridge connecting the
// exec-count module with the existing top-sql cpu reporter.
m.execCount = ExecCountMap{}
}
}
}

// collect data from all associated ExecCounter s.
// If an ExecCounter is closed, then remove it from the map.
func (m *execCounterManager) collect() {
m.counters.Range(func(idRaw, counterRaw interface{}) bool {
id := idRaw.(uint64)
counter := counterRaw.(*ExecCounter)
if counter.Closed() {
m.counters.Delete(id)
}
execCount := counter.Take()
m.execCount.Merge(execCount)
return true
})
}

// register binds ExecCounter to execCounterManager.
// register is thread-safe.
func (m *execCounterManager) register(counter *ExecCounter) {
m.counters.Store(m.curCounterID.Add(1), counter)
}

// close ends the execution of the current execCounterManager.
func (m *execCounterManager) close() {
m.closeCh <- struct{}{}
}
106 changes: 106 additions & 0 deletions util/topsql/exec_count_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package topsql

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestExecCountMap_Merge(t *testing.T) {
m1 := ExecCountMap{
"SQL-1": {
10001: 1,
10002: 2,
10003: 3,
},
"SQL-2": {
10001: 1,
10002: 2,
10003: 3,
},
}
m2 := ExecCountMap{
"SQL-2": {
10001: 1,
10002: 2,
10003: 3,
},
"SQL-3": {
10001: 1,
10002: 2,
10003: 3,
},
}
assert.Len(t, m1, 2)
assert.Len(t, m2, 2)
m1.Merge(m2)
assert.Len(t, m1, 3)
assert.Len(t, m2, 2)
assert.Equal(t, uint64(1), m1["SQL-1"][10001])
assert.Equal(t, uint64(2), m1["SQL-1"][10002])
assert.Equal(t, uint64(3), m1["SQL-1"][10003])
assert.Equal(t, uint64(2), m1["SQL-2"][10001])
assert.Equal(t, uint64(4), m1["SQL-2"][10002])
assert.Equal(t, uint64(6), m1["SQL-2"][10003])
assert.Equal(t, uint64(1), m1["SQL-3"][10001])
assert.Equal(t, uint64(2), m1["SQL-3"][10002])
assert.Equal(t, uint64(3), m1["SQL-3"][10003])
m1.Merge(nil)
assert.Len(t, m1, 3)
}

func TestCreateExecCounter(t *testing.T) {
globalExecCounterManager = nil
counter := CreateExecCounter()
assert.Nil(t, counter)
globalExecCounterManager = newExecCountManager()
counter = CreateExecCounter()
assert.NotNil(t, counter)
counter2Raw, ok := globalExecCounterManager.counters.Load(globalExecCounterManager.curCounterID.Load())
assert.True(t, ok)
counter2 := counter2Raw.(*ExecCounter)
assert.Equal(t, counter, counter2)
assert.False(t, counter.Closed())
counter2.Close()
assert.True(t, counter.Closed())
}

func TestExecCounter_Count_Take(t *testing.T) {
globalExecCounterManager = newExecCountManager()
counter := CreateExecCounter()
m := counter.Take()
assert.Len(t, m, 0)
counter.Count("SQL-1", 1)
counter.Count("SQL-2", 2)
counter.Count("SQL-3", 3)
m = counter.Take()
assert.Len(t, m, 3)
assert.Len(t, m["SQL-1"], 1)
assert.Len(t, m["SQL-2"], 1)
assert.Len(t, m["SQL-3"], 1)
for _, v := range m["SQL-1"] {
assert.Equal(t, uint64(1), v)
}
for _, v := range m["SQL-2"] {
assert.Equal(t, uint64(2), v)
}
for _, v := range m["SQL-3"] {
assert.Equal(t, uint64(3), v)
}
m = counter.Take()
assert.Len(t, m, 0)
}
10 changes: 9 additions & 1 deletion util/topsql/topsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,29 @@ const (
MaxPlanTextSize = 32 * 1024
)

var globalTopSQLReport reporter.TopSQLReporter
var (
globalTopSQLReport reporter.TopSQLReporter
globalExecCounterManager *execCounterManager
)

// SetupTopSQL sets up the top-sql worker.
func SetupTopSQL() {
rc := reporter.NewGRPCReportClient(plancodec.DecodeNormalizedPlan)
globalTopSQLReport = reporter.NewRemoteTopSQLReporter(rc)
tracecpu.GlobalSQLCPUProfiler.SetCollector(globalTopSQLReport)
tracecpu.GlobalSQLCPUProfiler.Run()
globalExecCounterManager = newExecCountManager()
go globalExecCounterManager.Run()
}

// Close uses to close and release the top sql resource.
func Close() {
if globalTopSQLReport != nil {
globalTopSQLReport.Close()
}
if globalExecCounterManager != nil {
globalExecCounterManager.close()
}
}

// AttachSQLInfo attach the sql information info top sql.
Expand Down