Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix ExistsTableRow and add tests for skip reorg checks #57778

Merged
merged 7 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 39 additions & 13 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"math"
"os"
"slices"
"strings"
Expand Down Expand Up @@ -70,6 +69,7 @@ import (
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/generatedexpr"
"github.com/pingcap/tidb/pkg/util/intest"
tidblogutil "github.com/pingcap/tidb/pkg/util/logutil"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
"github.com/pingcap/tidb/pkg/util/size"
Expand Down Expand Up @@ -1119,44 +1119,56 @@ SwitchIndexState:

func checkIfTableReorgWorkCanSkip(
store kv.Storage,
sessCtx sessionctx.Context,
tbl table.Table,
job *model.Job,
) bool {
if job.SnapshotVer != 0 {
// Reorg work has begun.
return false
}
txn, err := sessCtx.Txn(false)
validTxn := err == nil && txn != nil && txn.Valid()
intest.Assert(validTxn)
if !validTxn {
logutil.DDLLogger().Warn("check if table is empty failed", zap.Error(err))
return false
}
startTS := txn.StartTS()
ctx := NewReorgContext()
ctx.resourceGroupName = job.ReorgMeta.ResourceGroupName
ctx.setDDLLabelForTopSQL(job.Query)
return checkIfTableIsEmpty(ctx, store, tbl)
return checkIfTableIsEmpty(ctx, store, tbl, startTS)
}

func checkIfTableIsEmpty(
ctx *ReorgContext,
store kv.Storage,
tbl table.Table,
startTS uint64,
) bool {
if pTbl, ok := tbl.(table.PartitionedTable); ok {
for _, pid := range pTbl.GetAllPartitionIDs() {
pTbl := pTbl.GetPartition(pid)
if !checkIfPhysicalTableIsEmpty(ctx, store, pTbl) {
if !checkIfPhysicalTableIsEmpty(ctx, store, pTbl, startTS) {
return false
}
}
return true
}
//nolint:forcetypeassert
plainTbl := tbl.(table.PhysicalTable)
return checkIfPhysicalTableIsEmpty(ctx, store, plainTbl)
return checkIfPhysicalTableIsEmpty(ctx, store, plainTbl, startTS)
}

func checkIfPhysicalTableIsEmpty(
ctx *ReorgContext,
store kv.Storage,
tbl table.PhysicalTable,
startTS uint64,
) bool {
hasRecord, err := ExistsTableRow(ctx, store, math.MaxInt64, tbl)
hasRecord, err := existsTableRow(ctx, store, tbl, startTS)
intest.Assert(err == nil)
if err != nil {
logutil.DDLLogger().Info("check if table is empty failed", zap.Error(err))
return false
Expand All @@ -1166,6 +1178,7 @@ func checkIfPhysicalTableIsEmpty(

func checkIfTempIndexReorgWorkCanSkip(
store kv.Storage,
sessCtx sessionctx.Context,
tbl table.Table,
allIndexInfos []*model.IndexInfo,
job *model.Job,
Expand All @@ -1179,6 +1192,14 @@ func checkIfTempIndexReorgWorkCanSkip(
// Reorg work has begun.
return false
}
txn, err := sessCtx.Txn(false)
validTxn := err == nil && txn != nil && txn.Valid()
intest.Assert(validTxn)
if !validTxn {
logutil.DDLLogger().Warn("check if temp index is empty failed", zap.Error(err))
return false
}
startTS := txn.StartTS()
ctx := NewReorgContext()
ctx.resourceGroupName = job.ReorgMeta.ResourceGroupName
ctx.setDDLLabelForTopSQL(job.Query)
Expand All @@ -1190,7 +1211,7 @@ func checkIfTempIndexReorgWorkCanSkip(
globalIdxIDs = append(globalIdxIDs, idxInfo.ID)
}
}
return checkIfTempIndexIsEmpty(ctx, store, tbl, firstIdxID, lastIdxID, globalIdxIDs)
return checkIfTempIndexIsEmpty(ctx, store, tbl, firstIdxID, lastIdxID, globalIdxIDs, startTS)
}

func checkIfTempIndexIsEmpty(
Expand All @@ -1199,38 +1220,41 @@ func checkIfTempIndexIsEmpty(
tbl table.Table,
firstIdxID, lastIdxID int64,
globalIdxIDs []int64,
startTS uint64,
) bool {
tblMetaID := tbl.Meta().ID
if pTbl, ok := tbl.(table.PartitionedTable); ok {
for _, pid := range pTbl.GetAllPartitionIDs() {
if !checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, pid, firstIdxID, lastIdxID) {
if !checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, pid, firstIdxID, lastIdxID, startTS) {
return false
}
}
for _, globalIdxID := range globalIdxIDs {
if !checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, tblMetaID, globalIdxID, globalIdxID) {
if !checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, tblMetaID, globalIdxID, globalIdxID, startTS) {
return false
}
}
return true
}
return checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, tblMetaID, firstIdxID, lastIdxID)
return checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, tblMetaID, firstIdxID, lastIdxID, startTS)
}

func checkIfTempIndexIsEmptyForPhysicalTable(
ctx *ReorgContext,
store kv.Storage,
pid int64,
firstIdxID, lastIdxID int64,
startTS uint64,
) bool {
start, end := encodeTempIndexRange(pid, firstIdxID, lastIdxID)
foundKey := false
idxPrefix := tablecodec.GenTableIndexPrefix(pid)
err := iterateSnapshotKeys(ctx, store, kv.PriorityLow, idxPrefix, math.MaxUint64, start, end,
err := iterateSnapshotKeys(ctx, store, kv.PriorityLow, idxPrefix, startTS, start, end,
func(_ kv.Handle, _ kv.Key, _ []byte) (more bool, err error) {
foundKey = true
return false, nil
})
intest.Assert(err == nil)
if err != nil {
logutil.DDLLogger().Info("check if temp index is empty failed", zap.Error(err))
return false
Expand Down Expand Up @@ -1306,7 +1330,7 @@ func doReorgWorkForCreateIndex(
return false, ver, err
}
if !reorgTp.NeedMergeProcess() {
skipReorg := checkIfTableReorgWorkCanSkip(w.store, tbl, job)
skipReorg := checkIfTableReorgWorkCanSkip(w.store, w.sess.Session(), tbl, job)
if skipReorg {
logutil.DDLLogger().Info("table is empty, skipping reorg work",
zap.Int64("jobID", job.ID),
Expand All @@ -1317,7 +1341,7 @@ func doReorgWorkForCreateIndex(
}
switch allIndexInfos[0].BackfillState {
case model.BackfillStateRunning:
skipReorg := checkIfTableReorgWorkCanSkip(w.store, tbl, job)
skipReorg := checkIfTableReorgWorkCanSkip(w.store, w.sess.Session(), tbl, job)
if !skipReorg {
logutil.DDLLogger().Info("index backfill state running",
zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O),
Expand All @@ -1337,6 +1361,7 @@ func doReorgWorkForCreateIndex(
return false, ver, errors.Trace(err)
}
} else {
failpoint.InjectCall("afterCheckTableReorgCanSkip")
logutil.DDLLogger().Info("table is empty, skipping reorg work",
zap.Int64("jobID", job.ID),
zap.String("table", tbl.Meta().Name.O))
Expand Down Expand Up @@ -1367,13 +1392,14 @@ func doReorgWorkForCreateIndex(
ver, err = updateVersionAndTableInfo(jobCtx, job, tbl.Meta(), true)
return false, ver, errors.Trace(err)
case model.BackfillStateMerging:
skipReorg := checkIfTempIndexReorgWorkCanSkip(w.store, tbl, allIndexInfos, job)
skipReorg := checkIfTempIndexReorgWorkCanSkip(w.store, w.sess.Session(), tbl, allIndexInfos, job)
if !skipReorg {
done, ver, err = runReorgJobAndHandleErr(w, jobCtx, job, tbl, allIndexInfos, true)
if !done {
return false, ver, err
}
} else {
failpoint.InjectCall("afterCheckTempIndexReorgCanSkip")
logutil.DDLLogger().Info("temp index is empty, skipping reorg work",
zap.Int64("jobID", job.ID),
zap.String("table", tbl.Meta().Name.O))
Expand Down
21 changes: 9 additions & 12 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,22 +767,19 @@ func GetTableMaxHandle(ctx *ReorgContext, store kv.Storage, startTS uint64, tbl
return kv.IntHandle(row.GetInt64(0)), false, nil
}

// ExistsTableRow checks if there is at least one row in the specified table.
// existsTableRow checks if there is at least one row in the specified table.
// In case of an error during the operation, it returns false along with the error.
func ExistsTableRow(ctx *ReorgContext, store kv.Storage, startTS uint64, tbl table.PhysicalTable) (bool, error) {
handleCols := buildHandleCols(tbl)
result, err := buildOneRowTableScan(ctx, store, startTS, tbl, handleCols, 1, false)
if err != nil {
return false, errors.Trace(err)
}
defer terror.Call(result.Close)

chk := chunk.New(getColumnsTypes(handleCols), 1, 1)
err = result.Next(ctx.ddlJobCtx, chk)
func existsTableRow(ctx *ReorgContext, store kv.Storage, tbl table.PhysicalTable, startTS uint64) (bool, error) {
found := false
err := iterateSnapshotKeys(ctx, store, kv.PriorityLow, tbl.RecordPrefix(), startTS, nil, nil,
func(_ kv.Handle, _ kv.Key, _ []byte) (bool, error) {
found = true
return false, nil
})
if err != nil {
return false, errors.Trace(err)
}
return chk.NumRows() != 0, nil
return found, nil
}

func buildHandleCols(tbl table.PhysicalTable) []*model.ColumnInfo {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/tests/indexmerge/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 21,
shard_count = 23,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
74 changes: 74 additions & 0 deletions pkg/ddl/tests/indexmerge/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,3 +857,77 @@ func TestAddUniqueIndexFalsePositiveDuplicate(t *testing.T) {
tk.MustExec("alter table t add unique index idx(b);")
tk.MustExec("admin check table t;")
}

func TestAddIndexSkipReorgCheck(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int);")

skipTableReorg := false
skipTempIdxReorg := false
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterCheckTableReorgCanSkip", func() {
skipTableReorg = true
})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterCheckTempIndexReorgCanSkip", func() {
skipTempIdxReorg = true
})
tk.MustExec("alter table t add index idx1(a);")
require.True(t, skipTableReorg)
require.True(t, skipTempIdxReorg)

skipTableReorg = false
skipTempIdxReorg = false
tk.MustExec("insert into t values (1);")
tk.MustExec("alter table t add index idx2(a);")
require.False(t, skipTableReorg)
require.True(t, skipTempIdxReorg)

skipTableReorg = false
skipTempIdxReorg = false
var runDML bool
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobRunAfter", func(job *model.Job) {
if t.Failed() || runDML {
return
}
switch job.SchemaState {
case model.StateWriteReorganization:
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("insert into t values (2);")
runDML = true
}
})
tk.MustExec("alter table t add index idx3(a);")
require.False(t, skipTableReorg)
require.False(t, skipTempIdxReorg)
tk.MustQuery("select * from t;").Check(testkit.Rows("1", "2"))
tk.MustExec("admin check table t;")
}

func TestAddIndexInsertAfterReorgSkipCheck(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int);")
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterCheckTableReorgCanSkip", func() {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("insert into t values (1);")
})
tk.MustExec("alter table t add index idx(a);")
tk.MustQuery("select * from t;").Check(testkit.Rows("1"))
tk.MustExec("admin check table t;")
err := failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/afterCheckTableReorgCanSkip")
require.NoError(t, err)

tk.MustExec("truncate table t;")
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterCheckTempIndexReorgCanSkip", func() {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("insert into t values (2);")
})
tk.MustExec("alter table t add index idx2(a);")
tk.MustQuery("select * from t;").Check(testkit.Rows("2"))
tk.MustExec("admin check table t;")
}