Skip to content

Commit

Permalink
ddl: fix ExistsTableRow and add tests for skip reorg checks (#57778)
Browse files Browse the repository at this point in the history
close #57769
  • Loading branch information
tangenta authored Nov 28, 2024
1 parent ce5ec3f commit b114219
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 26 deletions.
52 changes: 39 additions & 13 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"math"
"os"
"slices"
"strings"
Expand Down Expand Up @@ -70,6 +69,7 @@ import (
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/generatedexpr"
"github.com/pingcap/tidb/pkg/util/intest"
tidblogutil "github.com/pingcap/tidb/pkg/util/logutil"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
"github.com/pingcap/tidb/pkg/util/size"
Expand Down Expand Up @@ -1119,44 +1119,56 @@ SwitchIndexState:

func checkIfTableReorgWorkCanSkip(
store kv.Storage,
sessCtx sessionctx.Context,
tbl table.Table,
job *model.Job,
) bool {
if job.SnapshotVer != 0 {
// Reorg work has begun.
return false
}
txn, err := sessCtx.Txn(false)
validTxn := err == nil && txn != nil && txn.Valid()
intest.Assert(validTxn)
if !validTxn {
logutil.DDLLogger().Warn("check if table is empty failed", zap.Error(err))
return false
}
startTS := txn.StartTS()
ctx := NewReorgContext()
ctx.resourceGroupName = job.ReorgMeta.ResourceGroupName
ctx.setDDLLabelForTopSQL(job.Query)
return checkIfTableIsEmpty(ctx, store, tbl)
return checkIfTableIsEmpty(ctx, store, tbl, startTS)
}

func checkIfTableIsEmpty(
ctx *ReorgContext,
store kv.Storage,
tbl table.Table,
startTS uint64,
) bool {
if pTbl, ok := tbl.(table.PartitionedTable); ok {
for _, pid := range pTbl.GetAllPartitionIDs() {
pTbl := pTbl.GetPartition(pid)
if !checkIfPhysicalTableIsEmpty(ctx, store, pTbl) {
if !checkIfPhysicalTableIsEmpty(ctx, store, pTbl, startTS) {
return false
}
}
return true
}
//nolint:forcetypeassert
plainTbl := tbl.(table.PhysicalTable)
return checkIfPhysicalTableIsEmpty(ctx, store, plainTbl)
return checkIfPhysicalTableIsEmpty(ctx, store, plainTbl, startTS)
}

func checkIfPhysicalTableIsEmpty(
ctx *ReorgContext,
store kv.Storage,
tbl table.PhysicalTable,
startTS uint64,
) bool {
hasRecord, err := ExistsTableRow(ctx, store, math.MaxInt64, tbl)
hasRecord, err := existsTableRow(ctx, store, tbl, startTS)
intest.Assert(err == nil)
if err != nil {
logutil.DDLLogger().Info("check if table is empty failed", zap.Error(err))
return false
Expand All @@ -1166,6 +1178,7 @@ func checkIfPhysicalTableIsEmpty(

func checkIfTempIndexReorgWorkCanSkip(
store kv.Storage,
sessCtx sessionctx.Context,
tbl table.Table,
allIndexInfos []*model.IndexInfo,
job *model.Job,
Expand All @@ -1179,6 +1192,14 @@ func checkIfTempIndexReorgWorkCanSkip(
// Reorg work has begun.
return false
}
txn, err := sessCtx.Txn(false)
validTxn := err == nil && txn != nil && txn.Valid()
intest.Assert(validTxn)
if !validTxn {
logutil.DDLLogger().Warn("check if temp index is empty failed", zap.Error(err))
return false
}
startTS := txn.StartTS()
ctx := NewReorgContext()
ctx.resourceGroupName = job.ReorgMeta.ResourceGroupName
ctx.setDDLLabelForTopSQL(job.Query)
Expand All @@ -1190,7 +1211,7 @@ func checkIfTempIndexReorgWorkCanSkip(
globalIdxIDs = append(globalIdxIDs, idxInfo.ID)
}
}
return checkIfTempIndexIsEmpty(ctx, store, tbl, firstIdxID, lastIdxID, globalIdxIDs)
return checkIfTempIndexIsEmpty(ctx, store, tbl, firstIdxID, lastIdxID, globalIdxIDs, startTS)
}

func checkIfTempIndexIsEmpty(
Expand All @@ -1199,38 +1220,41 @@ func checkIfTempIndexIsEmpty(
tbl table.Table,
firstIdxID, lastIdxID int64,
globalIdxIDs []int64,
startTS uint64,
) bool {
tblMetaID := tbl.Meta().ID
if pTbl, ok := tbl.(table.PartitionedTable); ok {
for _, pid := range pTbl.GetAllPartitionIDs() {
if !checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, pid, firstIdxID, lastIdxID) {
if !checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, pid, firstIdxID, lastIdxID, startTS) {
return false
}
}
for _, globalIdxID := range globalIdxIDs {
if !checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, tblMetaID, globalIdxID, globalIdxID) {
if !checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, tblMetaID, globalIdxID, globalIdxID, startTS) {
return false
}
}
return true
}
return checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, tblMetaID, firstIdxID, lastIdxID)
return checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, tblMetaID, firstIdxID, lastIdxID, startTS)
}

func checkIfTempIndexIsEmptyForPhysicalTable(
ctx *ReorgContext,
store kv.Storage,
pid int64,
firstIdxID, lastIdxID int64,
startTS uint64,
) bool {
start, end := encodeTempIndexRange(pid, firstIdxID, lastIdxID)
foundKey := false
idxPrefix := tablecodec.GenTableIndexPrefix(pid)
err := iterateSnapshotKeys(ctx, store, kv.PriorityLow, idxPrefix, math.MaxUint64, start, end,
err := iterateSnapshotKeys(ctx, store, kv.PriorityLow, idxPrefix, startTS, start, end,
func(_ kv.Handle, _ kv.Key, _ []byte) (more bool, err error) {
foundKey = true
return false, nil
})
intest.Assert(err == nil)
if err != nil {
logutil.DDLLogger().Info("check if temp index is empty failed", zap.Error(err))
return false
Expand Down Expand Up @@ -1306,7 +1330,7 @@ func doReorgWorkForCreateIndex(
return false, ver, err
}
if !reorgTp.NeedMergeProcess() {
skipReorg := checkIfTableReorgWorkCanSkip(w.store, tbl, job)
skipReorg := checkIfTableReorgWorkCanSkip(w.store, w.sess.Session(), tbl, job)
if skipReorg {
logutil.DDLLogger().Info("table is empty, skipping reorg work",
zap.Int64("jobID", job.ID),
Expand All @@ -1317,7 +1341,7 @@ func doReorgWorkForCreateIndex(
}
switch allIndexInfos[0].BackfillState {
case model.BackfillStateRunning:
skipReorg := checkIfTableReorgWorkCanSkip(w.store, tbl, job)
skipReorg := checkIfTableReorgWorkCanSkip(w.store, w.sess.Session(), tbl, job)
if !skipReorg {
logutil.DDLLogger().Info("index backfill state running",
zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O),
Expand All @@ -1337,6 +1361,7 @@ func doReorgWorkForCreateIndex(
return false, ver, errors.Trace(err)
}
} else {
failpoint.InjectCall("afterCheckTableReorgCanSkip")
logutil.DDLLogger().Info("table is empty, skipping reorg work",
zap.Int64("jobID", job.ID),
zap.String("table", tbl.Meta().Name.O))
Expand Down Expand Up @@ -1367,13 +1392,14 @@ func doReorgWorkForCreateIndex(
ver, err = updateVersionAndTableInfo(jobCtx, job, tbl.Meta(), true)
return false, ver, errors.Trace(err)
case model.BackfillStateMerging:
skipReorg := checkIfTempIndexReorgWorkCanSkip(w.store, tbl, allIndexInfos, job)
skipReorg := checkIfTempIndexReorgWorkCanSkip(w.store, w.sess.Session(), tbl, allIndexInfos, job)
if !skipReorg {
done, ver, err = runReorgJobAndHandleErr(w, jobCtx, job, tbl, allIndexInfos, true)
if !done {
return false, ver, err
}
} else {
failpoint.InjectCall("afterCheckTempIndexReorgCanSkip")
logutil.DDLLogger().Info("temp index is empty, skipping reorg work",
zap.Int64("jobID", job.ID),
zap.String("table", tbl.Meta().Name.O))
Expand Down
21 changes: 9 additions & 12 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,22 +767,19 @@ func GetTableMaxHandle(ctx *ReorgContext, store kv.Storage, startTS uint64, tbl
return kv.IntHandle(row.GetInt64(0)), false, nil
}

// ExistsTableRow checks if there is at least one row in the specified table.
// existsTableRow checks if there is at least one row in the specified table.
// In case of an error during the operation, it returns false along with the error.
func ExistsTableRow(ctx *ReorgContext, store kv.Storage, startTS uint64, tbl table.PhysicalTable) (bool, error) {
handleCols := buildHandleCols(tbl)
result, err := buildOneRowTableScan(ctx, store, startTS, tbl, handleCols, 1, false)
if err != nil {
return false, errors.Trace(err)
}
defer terror.Call(result.Close)

chk := chunk.New(getColumnsTypes(handleCols), 1, 1)
err = result.Next(ctx.ddlJobCtx, chk)
func existsTableRow(ctx *ReorgContext, store kv.Storage, tbl table.PhysicalTable, startTS uint64) (bool, error) {
found := false
err := iterateSnapshotKeys(ctx, store, kv.PriorityLow, tbl.RecordPrefix(), startTS, nil, nil,
func(_ kv.Handle, _ kv.Key, _ []byte) (bool, error) {
found = true
return false, nil
})
if err != nil {
return false, errors.Trace(err)
}
return chk.NumRows() != 0, nil
return found, nil
}

func buildHandleCols(tbl table.PhysicalTable) []*model.ColumnInfo {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/tests/indexmerge/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 21,
shard_count = 23,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
74 changes: 74 additions & 0 deletions pkg/ddl/tests/indexmerge/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,3 +857,77 @@ func TestAddUniqueIndexFalsePositiveDuplicate(t *testing.T) {
tk.MustExec("alter table t add unique index idx(b);")
tk.MustExec("admin check table t;")
}

func TestAddIndexSkipReorgCheck(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int);")

skipTableReorg := false
skipTempIdxReorg := false
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterCheckTableReorgCanSkip", func() {
skipTableReorg = true
})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterCheckTempIndexReorgCanSkip", func() {
skipTempIdxReorg = true
})
tk.MustExec("alter table t add index idx1(a);")
require.True(t, skipTableReorg)
require.True(t, skipTempIdxReorg)

skipTableReorg = false
skipTempIdxReorg = false
tk.MustExec("insert into t values (1);")
tk.MustExec("alter table t add index idx2(a);")
require.False(t, skipTableReorg)
require.True(t, skipTempIdxReorg)

skipTableReorg = false
skipTempIdxReorg = false
var runDML bool
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobRunAfter", func(job *model.Job) {
if t.Failed() || runDML {
return
}
switch job.SchemaState {
case model.StateWriteReorganization:
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("insert into t values (2);")
runDML = true
}
})
tk.MustExec("alter table t add index idx3(a);")
require.False(t, skipTableReorg)
require.False(t, skipTempIdxReorg)
tk.MustQuery("select * from t;").Check(testkit.Rows("1", "2"))
tk.MustExec("admin check table t;")
}

func TestAddIndexInsertAfterReorgSkipCheck(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int);")
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterCheckTableReorgCanSkip", func() {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("insert into t values (1);")
})
tk.MustExec("alter table t add index idx(a);")
tk.MustQuery("select * from t;").Check(testkit.Rows("1"))
tk.MustExec("admin check table t;")
err := failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/afterCheckTableReorgCanSkip")
require.NoError(t, err)

tk.MustExec("truncate table t;")
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterCheckTempIndexReorgCanSkip", func() {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("insert into t values (2);")
})
tk.MustExec("alter table t add index idx2(a);")
tk.MustQuery("select * from t;").Check(testkit.Rows("2"))
tk.MustExec("admin check table t;")
}

0 comments on commit b114219

Please sign in to comment.