Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#53445
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
crazycs520 authored and ti-chi-bot committed May 27, 2024
1 parent b511a45 commit 2653442
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 0 deletions.
2 changes: 2 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
if diff == nil {
// Empty diff means the txn of generating schema version is committed, but the txn of `runDDLJob` is not or fail.
// It is safe to skip the empty diff because the infoschema is new enough and consistent.
logutil.BgLogger().Info("diff load InfoSchema get empty schema diff", zap.Int64("version", usedVersion))
do.infoCache.InsertEmptySchemaVersion(usedVersion)
continue
}
diffs = append(diffs, diff)
Expand Down
4 changes: 4 additions & 0 deletions executor/kvtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ go_test(
"main_test.go",
],
flaky = True,
<<<<<<< HEAD:executor/kvtest/BUILD.bazel
race = "on",
=======
shard_count = 8,
>>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/test/cachetest/BUILD.bazel
deps = [
"//config",
"//meta/autoid",
Expand Down
83 changes: 83 additions & 0 deletions infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ type InfoCache struct {
mu sync.RWMutex
// cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order
cache []schemaAndTimestamp
<<<<<<< HEAD:infoschema/cache.go
=======

// emptySchemaVersions stores schema version which has no schema_diff.
emptySchemaVersions map[int64]struct{}

r autoid.Requirement
Data *Data
>>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/cache.go
}

type schemaAndTimestamp struct {
Expand All @@ -40,7 +49,14 @@ type schemaAndTimestamp struct {
// NewCache creates a new InfoCache.
func NewCache(capacity int) *InfoCache {
return &InfoCache{
<<<<<<< HEAD:infoschema/cache.go
cache: make([]schemaAndTimestamp, 0, capacity),
=======
cache: make([]schemaAndTimestamp, 0, capacity),
emptySchemaVersions: make(map[int64]struct{}),
r: r,
Data: infoData,
>>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/cache.go
}
}

Expand All @@ -63,6 +79,19 @@ func (h *InfoCache) GetLatest() InfoSchema {
return nil
}

<<<<<<< HEAD:infoschema/cache.go
=======
// Len returns the size of the cache
func (h *InfoCache) Len() int {
return len(h.cache)
}

// GetEmptySchemaVersions returns emptySchemaVersions, exports for testing.
func (h *InfoCache) GetEmptySchemaVersions() map[int64]struct{} {
return h.emptySchemaVersions
}

>>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/cache.go
func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) {
logutil.BgLogger().Debug("SCHEMA CACHE get schema", zap.Uint64("timestamp", ts))
// search one by one instead of binary search, because the timestamp of a schema could be 0
Expand All @@ -79,6 +108,38 @@ func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) {
// found the largest version before the given ts
return is.infoschema, true
}
<<<<<<< HEAD:infoschema/cache.go
=======

if uint64(h.cache[i-1].timestamp) > ts {
// The first condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts.
lastVersion := h.cache[i-1].infoschema.SchemaMetaVersion()
currentVersion := is.infoschema.SchemaMetaVersion()
if lastVersion == currentVersion+1 {
// This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10,
// but current(cache[i]) schema-version is not 9, then current schema may not suitable for ts.
return is.infoschema, true
}
if lastVersion > currentVersion {
found := true
for ver := currentVersion + 1; ver < lastVersion; ver++ {
_, ok := h.emptySchemaVersions[ver]
if !ok {
found = false
break
}
}
if found {
// This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, and
// current(cache[i]) schema-version is 8, then there is a gap exist, and if all the gap version can be found in cache.emptySchemaVersions
// which means those gap versions don't have schema info, then current schema is also suitable for ts.
return is.infoschema, true
}
}
}
// current schema is not suitable for ts, then break the loop to avoid the unnecessary search.
break
>>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/cache.go
}

logutil.BgLogger().Debug("SCHEMA CACHE no schema found")
Expand Down Expand Up @@ -185,3 +246,25 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool {

return true
}

// InsertEmptySchemaVersion inserts empty schema version into a map. If exceeded the cache capacity, remove the oldest version.
func (h *InfoCache) InsertEmptySchemaVersion(version int64) {
h.mu.Lock()
defer h.mu.Unlock()

h.emptySchemaVersions[version] = struct{}{}
if len(h.emptySchemaVersions) > cap(h.cache) {
// remove oldest version.
versions := make([]int64, 0, len(h.emptySchemaVersions))
for ver := range h.emptySchemaVersions {
versions = append(versions, ver)
}
sort.Slice(versions, func(i, j int) bool { return versions[i] < versions[j] })
for _, ver := range versions {
delete(h.emptySchemaVersions, ver)
if len(h.emptySchemaVersions) <= cap(h.cache) {
break
}
}
}
}
172 changes: 172 additions & 0 deletions infoschema/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,176 @@ func TestGetByTimestamp(t *testing.T) {
require.Nil(t, ic.GetBySnapshotTS(2))
require.Equal(t, is3, ic.GetBySnapshotTS(3))
require.Equal(t, is3, ic.GetBySnapshotTS(4))
<<<<<<< HEAD:infoschema/cache_test.go
=======
require.Equal(t, 3, ic.Len())

// insert is2 again with correct timestamp, to correct previous wrong timestamp
ic.Insert(is2, 2)
require.Equal(t, is3, ic.GetLatest())
require.Equal(t, is1, ic.GetBySnapshotTS(1))
require.Equal(t, is2, ic.GetBySnapshotTS(2))
require.Equal(t, is3, ic.GetBySnapshotTS(3))
require.Equal(t, 3, ic.Len())
}

func TestReSize(t *testing.T) {
ic := infoschema.NewCache(nil, 2)
require.NotNil(t, ic)
is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1)
ic.Insert(is1, 1)
is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2)
ic.Insert(is2, 2)

ic.ReSize(3)
require.Equal(t, 2, ic.Size())
require.Equal(t, is1, ic.GetByVersion(1))
require.Equal(t, is2, ic.GetByVersion(2))
is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3)
require.True(t, ic.Insert(is3, 3))
require.Equal(t, is1, ic.GetByVersion(1))
require.Equal(t, is2, ic.GetByVersion(2))
require.Equal(t, is3, ic.GetByVersion(3))

ic.ReSize(1)
require.Equal(t, 1, ic.Size())
require.Nil(t, ic.GetByVersion(1))
require.Nil(t, ic.GetByVersion(2))
require.Equal(t, is3, ic.GetByVersion(3))
require.False(t, ic.Insert(is2, 2))
require.Equal(t, 1, ic.Size())
is4 := infoschema.MockInfoSchemaWithSchemaVer(nil, 4)
require.True(t, ic.Insert(is4, 4))
require.Equal(t, 1, ic.Size())
require.Nil(t, ic.GetByVersion(1))
require.Nil(t, ic.GetByVersion(2))
require.Nil(t, ic.GetByVersion(3))
require.Equal(t, is4, ic.GetByVersion(4))
}

func TestCacheWithSchemaTsZero(t *testing.T) {
ic := infoschema.NewCache(nil, 16)
require.NotNil(t, ic)

for i := 1; i <= 8; i++ {
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i))
}

checkFn := func(start, end int64, exist bool) {
require.True(t, start <= end)
latestSchemaVersion := ic.GetLatest().SchemaMetaVersion()
for ts := start; ts <= end; ts++ {
is := ic.GetBySnapshotTS(uint64(ts))
if exist {
require.NotNil(t, is, fmt.Sprintf("ts %d", ts))
if ts > latestSchemaVersion {
require.Equal(t, latestSchemaVersion, is.SchemaMetaVersion(), fmt.Sprintf("ts %d", ts))
} else {
require.Equal(t, ts, is.SchemaMetaVersion(), fmt.Sprintf("ts %d", ts))
}
} else {
require.Nil(t, is, fmt.Sprintf("ts %d", ts))
}
}
}
checkFn(1, 8, true)
checkFn(8, 10, true)

// mock for meet error There is no Write MVCC info for the schema version
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 9), 0)
checkFn(1, 7, true)
checkFn(8, 9, false)
checkFn(9, 10, false)

for i := 10; i <= 16; i++ {
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i))
checkFn(1, 7, true)
checkFn(8, 9, false)
checkFn(10, 16, true)
}
require.Equal(t, 16, ic.Size())

// refill the cache
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 9), 9)
checkFn(1, 16, true)
require.Equal(t, 16, ic.Size())

// Test more than capacity
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 17), 17)
checkFn(1, 1, false)
checkFn(2, 17, true)
checkFn(2, 20, true)
require.Equal(t, 16, ic.Size())

// Test for there is a hole in the middle.
ic = infoschema.NewCache(nil, 16)

// mock for restart with full load the latest version schema.
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 100), 100)
checkFn(1, 99, false)
checkFn(100, 100, true)

for i := 1; i <= 16; i++ {
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i))
}
checkFn(1, 1, false)
checkFn(2, 15, true)
checkFn(16, 16, false)
checkFn(100, 100, true)
require.Equal(t, 16, ic.Size())

for i := 85; i < 100; i++ {
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i))
}
checkFn(1, 84, false)
checkFn(85, 100, true)
require.Equal(t, 16, ic.Size())

// Test cache with schema version hole, which is cause by schema version doesn't has related schema-diff.
ic = infoschema.NewCache(nil, 16)
require.NotNil(t, ic)
for i := 1; i <= 8; i++ {
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i))
}
checkFn(1, 10, true)
// mock for schema version hole, schema-version 9 is missing.
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 10), 10)
checkFn(1, 7, true)
// without empty schema version map, get snapshot by ts 8, 9 will both failed.
checkFn(8, 9, false)
checkFn(10, 10, true)
// add empty schema version 9.
ic.InsertEmptySchemaVersion(9)
// after set empty schema version, get snapshot by ts 8, 9 will both success.
checkFn(1, 8, true)
checkFn(10, 10, true)
is := ic.GetBySnapshotTS(uint64(9))
require.NotNil(t, is)
// since schema version 9 is empty, so get by ts 9 will get schema which version is 8.
require.Equal(t, int64(8), is.SchemaMetaVersion())
}

func TestCacheEmptySchemaVersion(t *testing.T) {
ic := infoschema.NewCache(nil, 16)
require.NotNil(t, ic)
require.Equal(t, 0, len(ic.GetEmptySchemaVersions()))
for i := 0; i < 16; i++ {
ic.InsertEmptySchemaVersion(int64(i))
}
emptyVersions := ic.GetEmptySchemaVersions()
require.Equal(t, 16, len(emptyVersions))
for i := 0; i < 16; i++ {
_, ok := emptyVersions[int64(i)]
require.True(t, ok)
}
for i := 16; i < 20; i++ {
ic.InsertEmptySchemaVersion(int64(i))
}
emptyVersions = ic.GetEmptySchemaVersions()
require.Equal(t, 16, len(emptyVersions))
for i := 4; i < 20; i++ {
_, ok := emptyVersions[int64(i)]
require.True(t, ok)
}
>>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/test/cachetest/cache_test.go
}

0 comments on commit 2653442

Please sign in to comment.