Skip to content

Commit

Permalink
executor: Optimize statements summary by using buffer pool (#58544)
Browse files Browse the repository at this point in the history
ref #56649
  • Loading branch information
crazycs520 authored Dec 27, 2024
1 parent e44c60c commit 9c2ce65
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 162 deletions.
1 change: 1 addition & 0 deletions pkg/util/stmtsummary/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_test(
"//pkg/types",
"//pkg/util",
"//pkg/util/execdetails",
"//pkg/util/hack",
"//pkg/util/plancodec",
"//pkg/util/ppcpuusage",
"@com_github_pingcap_log//:log",
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/stmtsummary/evicted.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func newStmtSummaryByDigestEvictedElement(beginTime int64, endTime int64) *stmtS
}

// AddEvicted is used add an evicted record to stmtSummaryByDigestEvicted
func (ssbde *stmtSummaryByDigestEvicted) AddEvicted(evictedKey *stmtSummaryByDigestKey, evictedValue *stmtSummaryByDigest, historySize int) {
func (ssbde *stmtSummaryByDigestEvicted) AddEvicted(evictedKey *StmtDigestKey, evictedValue *stmtSummaryByDigest, historySize int) {
if evictedValue == nil {
return
}
Expand Down Expand Up @@ -152,7 +152,7 @@ func (ssbde *stmtSummaryByDigestEvicted) Clear() {
}

// add an evicted record to stmtSummaryByDigestEvictedElement
func (seElement *stmtSummaryByDigestEvictedElement) addEvicted(digestKey *stmtSummaryByDigestKey, digestValue *stmtSummaryByDigestElement) {
func (seElement *stmtSummaryByDigestEvictedElement) addEvicted(digestKey *StmtDigestKey, digestValue *stmtSummaryByDigestElement) {
if digestKey != nil {
seElement.count++
addInfo(seElement.otherSummary, digestValue)
Expand All @@ -169,7 +169,7 @@ const (
// if matches, it will add the digest and return enum match
// if digest too old, it will return enum tooOld and do nothing
// if digest too young, it will return enum tooYoung and do nothing
func (seElement *stmtSummaryByDigestEvictedElement) matchAndAdd(digestKey *stmtSummaryByDigestKey, digestValue *stmtSummaryByDigestElement) (statement int) {
func (seElement *stmtSummaryByDigestEvictedElement) matchAndAdd(digestKey *StmtDigestKey, digestValue *stmtSummaryByDigestElement) (statement int) {
if seElement == nil || digestValue == nil {
return isTooYoung
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/util/stmtsummary/evicted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ func newInduceSsbde(beginTime int64, endTime int64) *stmtSummaryByDigestElement
return newSsbde
}

// generate new stmtSummaryByDigestKey and stmtSummaryByDigest
func generateStmtSummaryByDigestKeyValue(schema string, beginTime int64, endTime int64) (*stmtSummaryByDigestKey, *stmtSummaryByDigest) {
key := &stmtSummaryByDigestKey{
schemaName: schema,
}
// generate new StmtDigestKey and stmtSummaryByDigest
func generateStmtSummaryByDigestKeyValue(schema string, beginTime int64, endTime int64) (*StmtDigestKey, *stmtSummaryByDigest) {
key := &StmtDigestKey{}
key.Init(schema, "", "", "", "")
value := newInduceSsbd(beginTime, endTime)
return key, value
}
Expand Down Expand Up @@ -191,7 +190,8 @@ func TestSimpleStmtSummaryByDigestEvicted(t *testing.T) {
ssbde.AddEvicted(evictedKey, evictedValue, 3)
require.Equal(t, "{begin: 8, end: 9, count: 1}, {begin: 5, end: 6, count: 1}, {begin: 2, end: 3, count: 1}", getAllEvicted(ssbde))

evictedKey = &stmtSummaryByDigestKey{schemaName: "b"}
evictedKey = &StmtDigestKey{}
evictedKey.Init("b", "", "", "", "")
ssbde.AddEvicted(evictedKey, evictedValue, 4)
require.Equal(t, "{begin: 8, end: 9, count: 2}, {begin: 5, end: 6, count: 2}, {begin: 2, end: 3, count: 2}, {begin: 1, end: 2, count: 1}", getAllEvicted(ssbde))

Expand Down Expand Up @@ -307,7 +307,7 @@ func TestEvictedCountDetailed(t *testing.T) {
ssMap.Clear()
other := ssMap.other
// test poisoning with empty-history digestValue
other.AddEvicted(new(stmtSummaryByDigestKey), new(stmtSummaryByDigest), 100)
other.AddEvicted(new(StmtDigestKey), new(stmtSummaryByDigest), 100)
require.Equal(t, 0, other.history.Len())
}

Expand Down
68 changes: 36 additions & 32 deletions pkg/util/stmtsummary/statement_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,33 +40,38 @@ import (
atomic2 "go.uber.org/atomic"
)

// stmtSummaryByDigestKey defines key for stmtSummaryByDigestMap.summaryMap.
type stmtSummaryByDigestKey struct {
// Same statements may appear in different schema, but they refer to different tables.
schemaName string
digest string
// The digest of the previous statement.
prevDigest string
// The digest of the plan of this SQL.
planDigest string
// `resourceGroupName` is the resource group's name of this statement is bind to.
resourceGroupName string
// StmtDigestKeyPool is the pool for StmtDigestKey.
var StmtDigestKeyPool = sync.Pool{
New: func() any {
return &StmtDigestKey{}
},
}

// StmtDigestKey defines key for stmtSummaryByDigestMap.summaryMap.
type StmtDigestKey struct {
// `hash` is the hash value of this object.
hash []byte
}

// Init initialize the hash key.
func (key *StmtDigestKey) Init(schemaName, digest, prevDigest, planDigest, resourceGroupName string) {
length := len(schemaName) + len(digest) + len(prevDigest) + len(planDigest) + len(resourceGroupName)
if cap(key.hash) < length {
key.hash = make([]byte, 0, length)
} else {
key.hash = key.hash[:0]
}
key.hash = append(key.hash, hack.Slice(digest)...)
key.hash = append(key.hash, hack.Slice(schemaName)...)
key.hash = append(key.hash, hack.Slice(prevDigest)...)
key.hash = append(key.hash, hack.Slice(planDigest)...)
key.hash = append(key.hash, hack.Slice(resourceGroupName)...)
}

// Hash implements SimpleLRUCache.Key.
// Only when current SQL is `commit` do we record `prevSQL`. Otherwise, `prevSQL` is empty.
// `prevSQL` is included in the key To distinguish different transactions.
func (key *stmtSummaryByDigestKey) Hash() []byte {
if len(key.hash) == 0 {
key.hash = make([]byte, 0, len(key.schemaName)+len(key.digest)+len(key.prevDigest)+len(key.planDigest)+len(key.resourceGroupName))
key.hash = append(key.hash, hack.Slice(key.digest)...)
key.hash = append(key.hash, hack.Slice(key.schemaName)...)
key.hash = append(key.hash, hack.Slice(key.prevDigest)...)
key.hash = append(key.hash, hack.Slice(key.planDigest)...)
key.hash = append(key.hash, hack.Slice(key.resourceGroupName)...)
}
func (key *StmtDigestKey) Hash() []byte {
return key.hash
}

Expand Down Expand Up @@ -308,7 +313,7 @@ func newStmtSummaryByDigestMap() *stmtSummaryByDigestMap {
}
newSsMap.summaryMap.SetOnEvict(func(k kvcache.Key, v kvcache.Value) {
historySize := newSsMap.historySize()
newSsMap.other.AddEvicted(k.(*stmtSummaryByDigestKey), v.(*stmtSummaryByDigest), historySize)
newSsMap.other.AddEvicted(k.(*StmtDigestKey), v.(*stmtSummaryByDigest), historySize)
})
return newSsMap
}
Expand All @@ -335,16 +340,11 @@ func (ssMap *stmtSummaryByDigestMap) AddStatement(sei *StmtExecInfo) {
historySize = ssMap.historySize()
}

key := &stmtSummaryByDigestKey{
schemaName: sei.SchemaName,
digest: sei.Digest,
prevDigest: sei.PrevSQLDigest,
planDigest: sei.PlanDigest,
resourceGroupName: sei.ResourceGroupName,
}
// Calculate hash value in advance, to reduce the time holding the lock.
key.Hash()
key := StmtDigestKeyPool.Get().(*StmtDigestKey)
// Init hash value in advance, to reduce the time holding the lock.
key.Init(sei.SchemaName, sei.Digest, sei.PrevSQLDigest, sei.PlanDigest, sei.ResourceGroupName)

var exist bool
// Enclose the block in a function to ensure the lock will always be released.
summary, beginTime := func() (*stmtSummaryByDigest, int64) {
ssMap.Lock()
Expand All @@ -365,9 +365,10 @@ func (ssMap *stmtSummaryByDigestMap) AddStatement(sei *StmtExecInfo) {
}

beginTime := ssMap.beginTimeForCurInterval
value, ok := ssMap.summaryMap.Get(key)
var value kvcache.Value
value, exist = ssMap.summaryMap.Get(key)
var summary *stmtSummaryByDigest
if !ok {
if !exist {
// Lazy initialize it to release ssMap.mutex ASAP.
summary = new(stmtSummaryByDigest)
ssMap.summaryMap.Put(key, summary)
Expand All @@ -381,6 +382,9 @@ func (ssMap *stmtSummaryByDigestMap) AddStatement(sei *StmtExecInfo) {
if summary != nil {
summary.add(sei, beginTime, intervalSeconds, historySize)
}
if exist {
StmtDigestKeyPool.Put(key)
}
}

// Clear removes all statement summaries.
Expand Down
106 changes: 30 additions & 76 deletions pkg/util/stmtsummary/statement_summary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package stmtsummary

import (
"bytes"
"container/list"
"fmt"
"strings"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/types"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/plancodec"
"github.com/pingcap/tidb/pkg/util/ppcpuusage"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -75,12 +77,8 @@ func TestAddStatement(t *testing.T) {
// first statement
stmtExecInfo1 := generateAnyExecInfo()
stmtExecInfo1.ExecDetail.CommitDetail.Mu.PrewriteBackoffTypes = make([]string, 0)
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: stmtExecInfo1.PlanDigest,
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName)
samplePlan, _, _ := stmtExecInfo1.LazyInfo.GetEncodedPlan()
stmtExecInfo1.ExecDetail.CommitDetail.Mu.Lock()
expectedSummaryElement := stmtSummaryByDigestElement{
Expand Down Expand Up @@ -454,12 +452,8 @@ func TestAddStatement(t *testing.T) {
stmtExecInfo4 := stmtExecInfo1
stmtExecInfo4.SchemaName = "schema2"
stmtExecInfo4.ExecDetail.CommitDetail = nil
key = &stmtSummaryByDigestKey{
schemaName: stmtExecInfo4.SchemaName,
digest: stmtExecInfo4.Digest,
planDigest: stmtExecInfo4.PlanDigest,
resourceGroupName: stmtExecInfo4.ResourceGroupName,
}
key = &StmtDigestKey{}
key.Init(stmtExecInfo4.SchemaName, stmtExecInfo4.Digest, "", stmtExecInfo4.PlanDigest, stmtExecInfo4.ResourceGroupName)
ssMap.AddStatement(stmtExecInfo4)
require.Equal(t, 2, ssMap.summaryMap.Size())
_, ok = ssMap.summaryMap.Get(key)
Expand All @@ -468,12 +462,8 @@ func TestAddStatement(t *testing.T) {
// Fifth statement has a different digest.
stmtExecInfo5 := stmtExecInfo1
stmtExecInfo5.Digest = "digest2"
key = &stmtSummaryByDigestKey{
schemaName: stmtExecInfo5.SchemaName,
digest: stmtExecInfo5.Digest,
planDigest: stmtExecInfo4.PlanDigest,
resourceGroupName: stmtExecInfo5.ResourceGroupName,
}
key = &StmtDigestKey{}
key.Init(stmtExecInfo5.SchemaName, stmtExecInfo5.Digest, "", stmtExecInfo5.PlanDigest, stmtExecInfo5.ResourceGroupName)
ssMap.AddStatement(stmtExecInfo5)
require.Equal(t, 3, ssMap.summaryMap.Size())
_, ok = ssMap.summaryMap.Get(key)
Expand All @@ -482,12 +472,8 @@ func TestAddStatement(t *testing.T) {
// Sixth statement has a different plan digest.
stmtExecInfo6 := stmtExecInfo1
stmtExecInfo6.PlanDigest = "plan_digest2"
key = &stmtSummaryByDigestKey{
schemaName: stmtExecInfo6.SchemaName,
digest: stmtExecInfo6.Digest,
planDigest: stmtExecInfo6.PlanDigest,
resourceGroupName: stmtExecInfo6.ResourceGroupName,
}
key = &StmtDigestKey{}
key.Init(stmtExecInfo6.SchemaName, stmtExecInfo6.Digest, "", stmtExecInfo6.PlanDigest, stmtExecInfo6.ResourceGroupName)
ssMap.AddStatement(stmtExecInfo6)
require.Equal(t, 4, ssMap.summaryMap.Size())
_, ok = ssMap.summaryMap.Get(key)
Expand All @@ -507,18 +493,16 @@ func TestAddStatement(t *testing.T) {
binPlan: "",
planDigest: "",
}
key = &stmtSummaryByDigestKey{
schemaName: stmtExecInfo7.SchemaName,
digest: stmtExecInfo7.Digest,
planDigest: stmtExecInfo7.PlanDigest,
resourceGroupName: stmtExecInfo7.ResourceGroupName,
}
key = &StmtDigestKey{}
key.Init(stmtExecInfo7.SchemaName, stmtExecInfo7.Digest, "", stmtExecInfo7.PlanDigest, stmtExecInfo7.ResourceGroupName)
ssMap.AddStatement(stmtExecInfo7)
require.Equal(t, 5, ssMap.summaryMap.Size())
v, ok := ssMap.summaryMap.Get(key)
require.True(t, ok)
stmt := v.(*stmtSummaryByDigest)
require.Equal(t, key.digest, stmt.digest)
require.True(t, bytes.Contains(key.Hash(), hack.Slice(stmt.schemaName)))
require.True(t, bytes.Contains(key.Hash(), hack.Slice(stmt.digest)))
require.True(t, bytes.Contains(key.Hash(), hack.Slice(stmt.planDigest)))
e := stmt.history.Back()
ssElement := e.Value.(*stmtSummaryByDigestElement)
require.Equal(t, plancodec.PlanDiscardedEncoded, ssElement.samplePlan)
Expand Down Expand Up @@ -1044,12 +1028,8 @@ func TestMaxStmtCount(t *testing.T) {

// LRU cache should work.
for i := loops - 10; i < loops; i++ {
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: fmt.Sprintf("digest%d", i),
planDigest: stmtExecInfo1.PlanDigest,
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, fmt.Sprintf("digest%d", i), "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName)
key.Hash()
_, ok := sm.Get(key)
require.True(t, ok)
Expand Down Expand Up @@ -1092,13 +1072,8 @@ func TestMaxSQLLength(t *testing.T) {
stmtExecInfo1.NormalizedSQL = str
ssMap.AddStatement(stmtExecInfo1)

key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: stmtExecInfo1.PlanDigest,
prevDigest: stmtExecInfo1.PrevSQLDigest,
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName)
value, ok := ssMap.summaryMap.Get(key)
require.True(t, ok)

Expand Down Expand Up @@ -1301,12 +1276,8 @@ func TestRefreshCurrentSummary(t *testing.T) {

ssMap.beginTimeForCurInterval = now + 10
stmtExecInfo1 := generateAnyExecInfo()
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: stmtExecInfo1.PlanDigest,
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName)
ssMap.AddStatement(stmtExecInfo1)
require.Equal(t, 1, ssMap.summaryMap.Size())
value, ok := ssMap.summaryMap.Get(key)
Expand Down Expand Up @@ -1352,12 +1323,8 @@ func TestSummaryHistory(t *testing.T) {
}()

stmtExecInfo1 := generateAnyExecInfo()
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: stmtExecInfo1.PlanDigest,
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName)
for i := range 11 {
ssMap.beginTimeForCurInterval = now + int64(i+1)*10
ssMap.AddStatement(stmtExecInfo1)
Expand Down Expand Up @@ -1425,13 +1392,8 @@ func TestPrevSQL(t *testing.T) {
stmtExecInfo1.PrevSQL = "prevSQL"
stmtExecInfo1.PrevSQLDigest = "prevSQLDigest"
ssMap.AddStatement(stmtExecInfo1)
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: stmtExecInfo1.PlanDigest,
prevDigest: stmtExecInfo1.PrevSQLDigest,
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, stmtExecInfo1.PrevSQLDigest, stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName)
require.Equal(t, 1, ssMap.summaryMap.Size())
_, ok := ssMap.summaryMap.Get(key)
require.True(t, ok)
Expand All @@ -1444,9 +1406,9 @@ func TestPrevSQL(t *testing.T) {
stmtExecInfo2 := stmtExecInfo1
stmtExecInfo2.PrevSQL = "prevSQL1"
stmtExecInfo2.PrevSQLDigest = "prevSQLDigest1"
key.prevDigest = stmtExecInfo2.PrevSQLDigest
ssMap.AddStatement(stmtExecInfo2)
require.Equal(t, 2, ssMap.summaryMap.Size())
key.Init(stmtExecInfo2.SchemaName, stmtExecInfo2.Digest, stmtExecInfo2.PrevSQLDigest, stmtExecInfo2.PlanDigest, stmtExecInfo2.ResourceGroupName)
_, ok = ssMap.summaryMap.Get(key)
require.True(t, ok)
}
Expand All @@ -1458,12 +1420,8 @@ func TestEndTime(t *testing.T) {

stmtExecInfo1 := generateAnyExecInfo()
ssMap.AddStatement(stmtExecInfo1)
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: stmtExecInfo1.PlanDigest,
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", stmtExecInfo1.PlanDigest, stmtExecInfo1.ResourceGroupName)
require.Equal(t, 1, ssMap.summaryMap.Size())
value, ok := ssMap.summaryMap.Get(key)
require.True(t, ok)
Expand Down Expand Up @@ -1508,12 +1466,8 @@ func TestPointGet(t *testing.T) {
stmtExecInfo1.PlanDigest = ""
stmtExecInfo1.LazyInfo.(*mockLazyInfo).plan = fakePlanDigestGenerator()
ssMap.AddStatement(stmtExecInfo1)
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: "",
resourceGroupName: stmtExecInfo1.ResourceGroupName,
}
key := &StmtDigestKey{}
key.Init(stmtExecInfo1.SchemaName, stmtExecInfo1.Digest, "", "", stmtExecInfo1.ResourceGroupName)
require.Equal(t, 1, ssMap.summaryMap.Size())
value, ok := ssMap.summaryMap.Get(key)
require.True(t, ok)
Expand Down
Loading

0 comments on commit 9c2ce65

Please sign in to comment.