Skip to content

Commit

Permalink
planner: refactor some code of cross-db binding (pingcap#58424)
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Dec 20, 2024
1 parent 8388f0f commit 0f653f3
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 66 deletions.
155 changes: 108 additions & 47 deletions pkg/bindinfo/binding_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,108 @@ var GetBindingReturnNilAlways = stringutil.StringerStr("getBindingReturnNilAlway
// LoadBindingNothing is only for test
var LoadBindingNothing = stringutil.StringerStr("LoadBindingNothing")

// digestBiMap represents a bidirectional map between noDBDigest and sqlDigest, used to support cross-db binding.
// One noDBDigest can map to multiple sqlDigests, but one sqlDigest can only map to one noDBDigest.
type digestBiMap interface {
// Add adds a pair of noDBDigest and sqlDigest.
// noDBDigest is the digest calculated after eliminating all DB names, e.g. `select * from test.t` -> `select * from t` -> noDBDigest.
// sqlDigest is the digest where all DB names are kept, e.g. `select * from test.t` -> exactDigest.
Add(noDBDigest, sqlDigest string)

// Del deletes the pair of noDBDigest and sqlDigest.
Del(sqlDigest string)

// NoDBDigest2SQLDigest converts noDBDigest to sqlDigest.
NoDBDigest2SQLDigest(noDBDigest string) []string

// SQLDigest2NoDBDigest converts sqlDigest to noDBDigest.
SQLDigest2NoDBDigest(sqlDigest string) string

// Copy copies this digestBiMap.
Copy() digestBiMap
}

type digestBiMapImpl struct {
mu sync.RWMutex
noDBDigest2SQLDigest map[string][]string // noDBDigest --> sqlDigests
sqlDigest2noDBDigest map[string]string // sqlDigest --> noDBDigest
}

func newDigestBiMap() digestBiMap {
return &digestBiMapImpl{
noDBDigest2SQLDigest: make(map[string][]string),
sqlDigest2noDBDigest: make(map[string]string),
}
}

// Add adds a pair of noDBDigest and sqlDigest.
// noDBDigest is the digest calculated after eliminating all DB names, e.g. `select * from test.t` -> `select * from t` -> noDBDigest.
// sqlDigest is the digest where all DB names are kept, e.g. `select * from test.t` -> exactDigest.
func (b *digestBiMapImpl) Add(noDBDigest, sqlDigest string) {
b.mu.Lock()
defer b.mu.Unlock()
b.noDBDigest2SQLDigest[noDBDigest] = append(b.noDBDigest2SQLDigest[noDBDigest], sqlDigest)
b.sqlDigest2noDBDigest[sqlDigest] = noDBDigest
}

// Del deletes the pair of noDBDigest and sqlDigest.
func (b *digestBiMapImpl) Del(sqlDigest string) {
b.mu.Lock()
defer b.mu.Unlock()
noDBDigest, ok := b.sqlDigest2noDBDigest[sqlDigest]
if !ok {
return
}
digestList := b.noDBDigest2SQLDigest[noDBDigest]
for i := range digestList { // remove sqlDigest from this list
if digestList[i] == sqlDigest {
// Deleting binding is a low-frequently operation, so the O(n) performance is enough.
digestList = append(digestList[:i], digestList[i+1:]...)
break
}
}
if len(digestList) == 0 {
delete(b.noDBDigest2SQLDigest, noDBDigest)
} else {
b.noDBDigest2SQLDigest[noDBDigest] = digestList
}
delete(b.sqlDigest2noDBDigest, sqlDigest)
}

// NoDBDigest2SQLDigest converts noDBDigest to sqlDigest.
func (b *digestBiMapImpl) NoDBDigest2SQLDigest(noDBDigest string) []string {
b.mu.RLock()
defer b.mu.RUnlock()
return b.noDBDigest2SQLDigest[noDBDigest]
}

// SQLDigest2NoDBDigest converts sqlDigest to noDBDigest.
func (b *digestBiMapImpl) SQLDigest2NoDBDigest(sqlDigest string) string {
b.mu.RLock()
defer b.mu.RUnlock()
return b.sqlDigest2noDBDigest[sqlDigest]
}

// Copy copies this digestBiMap.
func (b *digestBiMapImpl) Copy() digestBiMap {
b.mu.RLock()
defer b.mu.RUnlock()
noDBDigest2SQLDigest := make(map[string][]string, len(b.noDBDigest2SQLDigest))
for k, list := range b.noDBDigest2SQLDigest {
newList := make([]string, len(list))
copy(newList, list)
noDBDigest2SQLDigest[k] = newList
}
sqlDigest2noDBDigest := make(map[string]string, len(b.sqlDigest2noDBDigest))
for k, v := range b.sqlDigest2noDBDigest {
sqlDigest2noDBDigest[k] = v
}
return &digestBiMapImpl{
noDBDigest2SQLDigest: noDBDigest2SQLDigest,
sqlDigest2noDBDigest: sqlDigest2noDBDigest,
}
}

// CrossDBBindingCache is based on BindingCache, and provide some more advanced features, like
// cross-db matching, loading binding if cache miss automatically (TODO).
type CrossDBBindingCache interface {
Expand All @@ -63,14 +165,7 @@ type CrossDBBindingCache interface {
type crossDBBindingCache struct {
BindingCache

mu sync.RWMutex

// noDBDigest2SQLDigest is used to support cross-db matching.
// noDBDigest is the digest calculated after eliminating all DB names, e.g. `select * from test.t` -> `select * from t` -> noDBDigest.
// sqlDigest is the digest where all DB names are kept, e.g. `select * from test.t` -> exactDigest.
noDBDigest2SQLDigest map[string][]string // noDBDigest --> sqlDigests

sqlDigest2noDBDigest map[string]string // sqlDigest --> noDBDigest
digestBiMap digestBiMap

// loadBindingFromStorageFunc is used to load binding from storage if cache miss.
loadBindingFromStorageFunc func(sctx sessionctx.Context, sqlDigest string) (Bindings, error)
Expand All @@ -79,8 +174,7 @@ type crossDBBindingCache struct {
func newCrossDBBindingCache(loadBindingFromStorageFunc func(sessionctx.Context, string) (Bindings, error)) CrossDBBindingCache {
return &crossDBBindingCache{
BindingCache: newBindCache(),
noDBDigest2SQLDigest: make(map[string][]string),
sqlDigest2noDBDigest: make(map[string]string),
digestBiMap: newDigestBiMap(),
loadBindingFromStorageFunc: loadBindingFromStorageFunc,
}
}
Expand Down Expand Up @@ -109,15 +203,13 @@ func (cc *crossDBBindingCache) MatchingBinding(sctx sessionctx.Context, noDBDige
}

func (cc *crossDBBindingCache) getFromMemory(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding Binding, isMatched bool, missingSQLDigest []string) {
cc.mu.RLock()
defer cc.mu.RUnlock()
bindingCache := cc.BindingCache
if bindingCache.Size() == 0 {
return
}
leastWildcards := len(tableNames) + 1
enableCrossDBBinding := sctx.GetSessionVars().EnableFuzzyBinding
for _, sqlDigest := range cc.noDBDigest2SQLDigest[noDBDigest] {
for _, sqlDigest := range cc.digestBiMap.NoDBDigest2SQLDigest(noDBDigest) {
bindings := bindingCache.GetBinding(sqlDigest)
if intest.InTest {
if sctx.Value(GetBindingReturnNil) != nil {
Expand Down Expand Up @@ -184,9 +276,6 @@ func (cc *crossDBBindingCache) loadFromStore(sctx sessionctx.Context, missingSQL
}

func (cc *crossDBBindingCache) SetBinding(sqlDigest string, bindings Bindings) (err error) {
cc.mu.Lock()
defer cc.mu.Unlock()

// prepare noDBDigests for all bindings
noDBDigests := make([]string, 0, len(bindings))
p := parser.New()
Expand All @@ -200,54 +289,26 @@ func (cc *crossDBBindingCache) SetBinding(sqlDigest string, bindings Bindings) (
}

for i, binding := range bindings {
cc.noDBDigest2SQLDigest[noDBDigests[i]] = append(cc.noDBDigest2SQLDigest[noDBDigests[i]], binding.SQLDigest)
cc.sqlDigest2noDBDigest[binding.SQLDigest] = noDBDigests[i]
cc.digestBiMap.Add(noDBDigests[i], binding.SQLDigest)
}
// NOTE: due to LRU eviction, the underlying BindingCache state might be inconsistent with noDBDigest2SQLDigest and
// sqlDigest2noDBDigest, but it's acceptable, just return cache-miss in that case.
return cc.BindingCache.SetBinding(sqlDigest, bindings)
}

func (cc *crossDBBindingCache) RemoveBinding(sqlDigest string) {
cc.mu.Lock()
defer cc.mu.Unlock()
noDBDigest, ok := cc.sqlDigest2noDBDigest[sqlDigest]
if !ok {
return
}
digestList := cc.noDBDigest2SQLDigest[noDBDigest]
for i := range digestList { // remove sqlDigest from this list
if digestList[i] == sqlDigest {
digestList = append(digestList[:i], digestList[i+1:]...)
break
}
}
cc.noDBDigest2SQLDigest[noDBDigest] = digestList
delete(cc.sqlDigest2noDBDigest, sqlDigest)
cc.digestBiMap.Del(sqlDigest)
cc.BindingCache.RemoveBinding(sqlDigest)
}

func (cc *crossDBBindingCache) Copy() (c CrossDBBindingCache, err error) {
cc.mu.RLock()
defer cc.mu.RUnlock()
bc, err := cc.BindingCache.CopyBindingCache()
if err != nil {
return nil, err
}
sql2noDBDigest := make(map[string]string, len(cc.sqlDigest2noDBDigest))
for k, v := range cc.sqlDigest2noDBDigest {
sql2noDBDigest[k] = v
}
noDBDigest2SQLDigest := make(map[string][]string, len(cc.noDBDigest2SQLDigest))
for k, list := range cc.noDBDigest2SQLDigest {
newList := make([]string, len(list))
copy(newList, list)
noDBDigest2SQLDigest[k] = newList
}
return &crossDBBindingCache{
BindingCache: bc,
noDBDigest2SQLDigest: noDBDigest2SQLDigest,
sqlDigest2noDBDigest: sql2noDBDigest,
digestBiMap: cc.digestBiMap.Copy(),
loadBindingFromStorageFunc: cc.loadBindingFromStorageFunc,
}, nil
}
Expand Down
38 changes: 19 additions & 19 deletions pkg/bindinfo/binding_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func bindingNoDBDigest(t *testing.T, b Binding) string {
return noDBDigest
}

func TestFuzzyBindingCache(t *testing.T) {
func TestCrossDBBindingCache(t *testing.T) {
fbc := newCrossDBBindingCache(nil).(*crossDBBindingCache)
b1 := Binding{BindSQL: "SELECT * FROM db1.t1", SQLDigest: "b1"}
fDigest1 := bindingNoDBDigest(t, b1)
Expand All @@ -46,38 +46,38 @@ func TestFuzzyBindingCache(t *testing.T) {
require.NoError(t, fbc.SetBinding(b1.SQLDigest, []Binding{b1}))
require.NoError(t, fbc.SetBinding(b2.SQLDigest, []Binding{b2}))
require.NoError(t, fbc.SetBinding(b3.SQLDigest, []Binding{b3}))
require.Equal(t, len(fbc.noDBDigest2SQLDigest), 2) // b1 and b2 have the same noDBDigest
require.Equal(t, len(fbc.noDBDigest2SQLDigest[fDigest1]), 2)
require.Equal(t, len(fbc.noDBDigest2SQLDigest[fDigest3]), 1)
require.Equal(t, len(fbc.sqlDigest2noDBDigest), 3)
_, ok := fbc.sqlDigest2noDBDigest[b1.SQLDigest]
require.Equal(t, len(fbc.digestBiMap.(*digestBiMapImpl).noDBDigest2SQLDigest), 2) // b1 and b2 have the same noDBDigest
require.Equal(t, len(fbc.digestBiMap.NoDBDigest2SQLDigest(fDigest1)), 2)
require.Equal(t, len(fbc.digestBiMap.NoDBDigest2SQLDigest(fDigest3)), 1)
require.Equal(t, len(fbc.digestBiMap.(*digestBiMapImpl).sqlDigest2noDBDigest), 3)
_, ok := fbc.digestBiMap.(*digestBiMapImpl).sqlDigest2noDBDigest[b1.SQLDigest]
require.True(t, ok)
_, ok = fbc.sqlDigest2noDBDigest[b2.SQLDigest]
_, ok = fbc.digestBiMap.(*digestBiMapImpl).sqlDigest2noDBDigest[b2.SQLDigest]
require.True(t, ok)
_, ok = fbc.sqlDigest2noDBDigest[b3.SQLDigest]
_, ok = fbc.digestBiMap.(*digestBiMapImpl).sqlDigest2noDBDigest[b3.SQLDigest]
require.True(t, ok)

// remove b2
fbc.RemoveBinding(b2.SQLDigest)
require.Equal(t, len(fbc.noDBDigest2SQLDigest), 2)
require.Equal(t, len(fbc.noDBDigest2SQLDigest[fDigest1]), 1)
require.Equal(t, len(fbc.noDBDigest2SQLDigest[fDigest3]), 1)
require.Equal(t, len(fbc.sqlDigest2noDBDigest), 2)
_, ok = fbc.sqlDigest2noDBDigest[b1.SQLDigest]
require.Equal(t, len(fbc.digestBiMap.(*digestBiMapImpl).noDBDigest2SQLDigest), 2)
require.Equal(t, len(fbc.digestBiMap.(*digestBiMapImpl).noDBDigest2SQLDigest[fDigest1]), 1)
require.Equal(t, len(fbc.digestBiMap.(*digestBiMapImpl).noDBDigest2SQLDigest[fDigest3]), 1)
require.Equal(t, len(fbc.digestBiMap.(*digestBiMapImpl).sqlDigest2noDBDigest), 2)
_, ok = fbc.digestBiMap.(*digestBiMapImpl).sqlDigest2noDBDigest[b1.SQLDigest]
require.True(t, ok)
_, ok = fbc.sqlDigest2noDBDigest[b2.SQLDigest]
_, ok = fbc.digestBiMap.(*digestBiMapImpl).sqlDigest2noDBDigest[b2.SQLDigest]
require.False(t, ok) // can't find b2 now
_, ok = fbc.sqlDigest2noDBDigest[b3.SQLDigest]
_, ok = fbc.digestBiMap.(*digestBiMapImpl).sqlDigest2noDBDigest[b3.SQLDigest]
require.True(t, ok)

// test deep copy
newCache, err := fbc.Copy()
require.NoError(t, err)
newFBC := newCache.(*crossDBBindingCache)
newFBC.noDBDigest2SQLDigest[fDigest1] = nil
delete(newFBC.sqlDigest2noDBDigest, b1.SQLDigest)
require.Equal(t, len(fbc.noDBDigest2SQLDigest[fDigest1]), 1) // no impact to the original cache
_, ok = fbc.sqlDigest2noDBDigest[b1.SQLDigest]
newFBC.digestBiMap.(*digestBiMapImpl).noDBDigest2SQLDigest[fDigest1] = nil
delete(newFBC.digestBiMap.(*digestBiMapImpl).sqlDigest2noDBDigest, b1.SQLDigest)
require.Equal(t, len(fbc.digestBiMap.(*digestBiMapImpl).noDBDigest2SQLDigest[fDigest1]), 1) // no impact to the original cache
_, ok = fbc.digestBiMap.(*digestBiMapImpl).sqlDigest2noDBDigest[b1.SQLDigest]
require.True(t, ok)
}

Expand Down

0 comments on commit 0f653f3

Please sign in to comment.