Skip to content

Commit

Permalink
*: reset infoschema v2 data before a full load (#54831)
Browse files Browse the repository at this point in the history
close #54796
  • Loading branch information
tiancaiamao authored Jul 24, 2024
1 parent 1cee700 commit 3148c80
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 1 deletion.
11 changes: 11 additions & 0 deletions pkg/ddl/placement_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/store/gcworker"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/external"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -169,6 +170,16 @@ func checkTableBundlesInPD(t *testing.T, do *domain.Domain, tt *meta.Meta, tblIn
}

func TestPlacementPolicy(t *testing.T) {
// Test for the first time
testPlacementPolicy(t)

// Test again with failpoint.
// For https://github.com/pingcap/tidb/issues/54796
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/domain/MockTryLoadDiffError", `return("exchangepartition")`)
testPlacementPolicy(t)
}

func testPlacementPolicy(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
// clearAllBundles(t)
tk := testkit.NewTestKit(t, store)
Expand Down
18 changes: 18 additions & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,24 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
}
diffs = append(diffs, diff)
}

failpoint.Inject("MockTryLoadDiffError", func(val failpoint.Value) {
switch val.(string) {
case "exchangepartition":
if diffs[0].Type == model.ActionExchangeTablePartition {
failpoint.Return(nil, nil, nil, errors.New("mock error"))
}
case "renametable":
if diffs[0].Type == model.ActionRenameTable {
failpoint.Return(nil, nil, nil, errors.New("mock error"))
}
case "dropdatabase":
if diffs[0].Type == model.ActionDropSchema {
failpoint.Return(nil, nil, nil, errors.New("mock error"))
}
}
})

builder, err := infoschema.NewBuilder(do, do.sysFacHack, do.infoCache.Data).InitWithOldInfoSchema(do.infoCache.GetLatest())
if err != nil {
return nil, nil, nil, errors.Trace(err)
Expand Down
19 changes: 19 additions & 0 deletions pkg/infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,25 @@ func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, policies []*model.Pol

b.initMisc(dbInfos, policies, resourceGroups)

if b.enableV2 {
// We must not clear the historial versions like b.infoData = NewData(), because losing
// the historial versions would cause applyDiff get db not exist error and fail, then
// infoschema reloading retries with full load every time.
// See https://github.com/pingcap/tidb/issues/53442
//
// We must reset it, otherwise the stale tables remain and cause bugs later.
// For example, schema version 59:
// 107: t1
// 112: t2 (partitions p0=113, p1=114, p2=115)
// operation: alter table t2 exchange partition p0 with table t1
// schema version 60 if we do not reset:
// 107: t1 <- stale
// 112: t2 (partition p0=107, p1=114, p2=115)
// 113: t1
// See https://github.com/pingcap/tidb/issues/54796
b.infoData.resetBeforeFullLoad(schemaVersion)
}

for _, di := range dbInfos {
err := b.createSchemaTablesForDB(di, b.tableFromMeta, schemaVersion)
if err != nil {
Expand Down
200 changes: 200 additions & 0 deletions pkg/infoschema/infoschema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,206 @@ func (isd *Data) deleteDB(dbInfo *model.DBInfo, schemaVersion int64) {
isd.schemaID2Name.Set(schemaIDName{schemaVersion: schemaVersion, id: dbInfo.ID, name: dbInfo.Name.O, tomb: true})
}

// resetBeforeFullLoad is called before a full recreate operation within builder.InitWithDBInfos().
// TODO: write a generics version to avoid repeated code.
func (isd *Data) resetBeforeFullLoad(schemaVersion int64) {
resetTableInfoResidentBeforeFullLoad(isd.tableInfoResident, schemaVersion)

resetByIDBeforeFullLoad(isd.byID, schemaVersion)
resetByNameBeforeFullLoad(isd.byName, schemaVersion)

resetSchemaMapBeforeFullLoad(isd.schemaMap, schemaVersion)
resetSchemaID2NameBeforeFullLoad(isd.schemaID2Name, schemaVersion)

resetPID2TIDBeforeFullLoad(isd.pid2tid, schemaVersion)
}

func resetByIDBeforeFullLoad(bt *btree.BTreeG[tableItem], schemaVersion int64) {
pivot, ok := bt.Max()
if !ok {
return
}

batchSize := 1000
if bt.Len() < batchSize {
batchSize = bt.Len()
}
items := make([]tableItem, 0, batchSize)
items = append(items, pivot)
for {
bt.Descend(pivot, func(item tableItem) bool {
if pivot.tableID == item.tableID {
return true // skip MVCC version
}
pivot = item
items = append(items, pivot)
return len(items) < cap(items)
})
if len(items) == 0 {
break
}
for _, item := range items {
bt.Set(tableItem{
dbName: item.dbName,
dbID: item.dbID,
tableName: item.tableName,
tableID: item.tableID,
schemaVersion: schemaVersion,
tomb: true,
})
}
items = items[:0]
}
}

func resetByNameBeforeFullLoad(bt *btree.BTreeG[tableItem], schemaVersion int64) {
pivot, ok := bt.Max()
if !ok {
return
}

batchSize := 1000
if bt.Len() < batchSize {
batchSize = bt.Len()
}
items := make([]tableItem, 0, batchSize)
items = append(items, pivot)
for {
bt.Descend(pivot, func(item tableItem) bool {
if pivot.dbName == item.dbName && pivot.tableName == item.tableName {
return true // skip MVCC version
}
pivot = item
items = append(items, pivot)
return len(items) < cap(items)
})
if len(items) == 0 {
break
}
for _, item := range items {
bt.Set(tableItem{
dbName: item.dbName,
dbID: item.dbID,
tableName: item.tableName,
tableID: item.tableID,
schemaVersion: schemaVersion,
tomb: true,
})
}
items = items[:0]
}
}

func resetTableInfoResidentBeforeFullLoad(bt *btree.BTreeG[tableInfoItem], schemaVersion int64) {
pivot, ok := bt.Max()
if !ok {
return
}
items := make([]tableInfoItem, 0, bt.Len())
items = append(items, pivot)
bt.Descend(pivot, func(item tableInfoItem) bool {
if pivot.dbName == item.dbName && pivot.tableID == item.tableID {
return true // skip MVCC version
}
pivot = item
items = append(items, pivot)
return true
})
for _, item := range items {
bt.Set(tableInfoItem{
dbName: item.dbName,
tableID: item.tableID,
schemaVersion: schemaVersion,
tomb: true,
})
}
}

func resetSchemaMapBeforeFullLoad(bt *btree.BTreeG[schemaItem], schemaVersion int64) {
pivot, ok := bt.Max()
if !ok {
return
}
items := make([]schemaItem, 0, bt.Len())
items = append(items, pivot)
bt.Descend(pivot, func(item schemaItem) bool {
if pivot.Name() == item.Name() {
return true // skip MVCC version
}
pivot = item
items = append(items, pivot)
return true
})
for _, item := range items {
bt.Set(schemaItem{
dbInfo: item.dbInfo,
schemaVersion: schemaVersion,
tomb: true,
})
}
}

func resetSchemaID2NameBeforeFullLoad(bt *btree.BTreeG[schemaIDName], schemaVersion int64) {
pivot, ok := bt.Max()
if !ok {
return
}
items := make([]schemaIDName, 0, bt.Len())
items = append(items, pivot)
bt.Descend(pivot, func(item schemaIDName) bool {
if pivot.id == item.id {
return true // skip MVCC version
}
pivot = item
items = append(items, pivot)
return true
})
for _, item := range items {
bt.Set(schemaIDName{
id: item.id,
name: item.name,
schemaVersion: schemaVersion,
tomb: true,
})
}
}

func resetPID2TIDBeforeFullLoad(bt *btree.BTreeG[partitionItem], schemaVersion int64) {
pivot, ok := bt.Max()
if !ok {
return
}

batchSize := 1000
if bt.Len() < batchSize {
batchSize = bt.Len()
}
items := make([]partitionItem, 0, batchSize)
items = append(items, pivot)
for {
bt.Descend(pivot, func(item partitionItem) bool {
if pivot.partitionID == item.partitionID {
return true // skip MVCC version
}
pivot = item
items = append(items, pivot)
return len(items) < cap(items)
})
if len(items) == 0 {
break
}
for _, item := range items {
bt.Set(partitionItem{
partitionID: item.partitionID,
tableID: item.tableID,
schemaVersion: schemaVersion,
tomb: true,
})
}
items = items[:0]
}
}

func compareByID(a, b tableItem) bool {
if a.tableID < b.tableID {
return true
Expand Down
3 changes: 3 additions & 0 deletions pkg/infoschema/test/clustertablestest/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,13 +674,15 @@ func TestSelectHiddenColumn(t *testing.T) {
// Set column b to hidden
colInfo[1].Hidden = true
updateTableMeta(t, store, tbInfo.DBID, tbInfo)
dom.Reload()

tk.MustQuery("select count(*) from INFORMATION_SCHEMA.COLUMNS where table_name = 'hidden'").Check(testkit.Rows("2"))
tk.MustQuery("select count(*) from INFORMATION_SCHEMA.COLUMNS where table_name = 'hidden' and column_name = 'b'").Check(testkit.Rows("0"))

// Set column b to visible
colInfo[1].Hidden = false
updateTableMeta(t, store, tbInfo.DBID, tbInfo)
dom.Reload()

tk.MustQuery("select count(*) from INFORMATION_SCHEMA.COLUMNS where table_name = 'hidden' and column_name = 'b'").Check(testkit.Rows("1"))

Expand All @@ -689,6 +691,7 @@ func TestSelectHiddenColumn(t *testing.T) {
colInfo[1].Hidden = true
colInfo[2].Hidden = true
updateTableMeta(t, store, tbInfo.DBID, tbInfo)
dom.Reload()

tk.MustQuery("select count(*) from INFORMATION_SCHEMA.COLUMNS where table_name = 'hidden'").Check(testkit.Rows("0"))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/infoschema/test/infoschemav2test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"v2_test.go",
],
flaky = True,
shard_count = 6,
shard_count = 7,
deps = [
"//pkg/domain",
"//pkg/domain/infosync",
Expand Down
47 changes: 47 additions & 0 deletions pkg/infoschema/test/infoschemav2test/v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/domain/infosync"
Expand Down Expand Up @@ -328,3 +329,49 @@ func BenchmarkTableByName(t *testing.B) {
}
t.StopTimer()
}

func TestFullLoadAndSnapshot(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_schema_cache_size = 512 * 1024 * 1024")

// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
timeSafe := time.Now().Add(-48 * 60 * 60 * time.Second).Format("20060102-15:04:05 -0700 MST")
safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
ON DUPLICATE KEY
UPDATE variable_value = '%[1]s'`
tk.MustExec(fmt.Sprintf(safePointSQL, timeSafe))

tk.MustExec("create database db1")
tk.MustExec("create database db2")
tk.MustExec("use db1")
tk.MustExec("create table t (id int)")

timestamp := time.Now().Format(time.RFC3339Nano)
time.Sleep(100 * time.Millisecond)

testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/domain/MockTryLoadDiffError", `return("renametable")`)
tk.MustExec("rename table db1.t to db2.t")

tk.MustQuery("select * from db2.t").Check(testkit.Rows())
tk.MustExecToErr("select * from db1.t")
tk.MustExec("use db2")
tk.MustQuery("show tables").Check(testkit.Rows("t"))
tk.MustExec("use db1")
tk.MustQuery("show tables").Check(testkit.Rows())

testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/domain/MockTryLoadDiffError", `return("dropdatabase")`)
tk.MustExec("drop database db1")
tk.MustExecToErr("use db1")
tk.MustQuery("select table_schema from information_schema.tables where table_schema = 'db2'").Check(testkit.Rows("db2"))
tk.MustQuery("select * from information_schema.tables where table_schema = 'db1'").Check(testkit.Rows())

// Set snapthost and read old schema.
tk.MustExec(fmt.Sprintf("set @@tidb_snapshot= '%s'", timestamp))
tk.MustQuery("select * from db1.t").Check(testkit.Rows())
tk.MustExecToErr("select * from db2.t")
tk.MustExec("use db2")
tk.MustQuery("show tables").Check(testkit.Rows())
tk.MustExec("use db1")
tk.MustQuery("show tables").Check(testkit.Rows("t"))
}

0 comments on commit 3148c80

Please sign in to comment.