Skip to content

Commit

Permalink
*: check delete unique key's handle to handle corner case (#52975) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 28, 2024
1 parent 318ffba commit 588e3b4
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 4 deletions.
8 changes: 8 additions & 0 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,11 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
case model.BackfillStateReadyToMerge:
logutil.BgLogger().Info("index backfill state ready to merge", zap.String("category", "ddl"), zap.Int64("job ID", job.ID),
zap.String("table", tbl.Meta().Name.O), zap.String("index", allIndexInfos[0].Name.O))
failpoint.Inject("mockDMLExecutionStateBeforeMerge", func(_ failpoint.Value) {
if MockDMLExecutionStateBeforeMerge != nil {
MockDMLExecutionStateBeforeMerge()
}
})
for _, indexInfo := range allIndexInfos {
indexInfo.BackfillState = model.BackfillStateMerging
}
Expand Down Expand Up @@ -1964,6 +1969,9 @@ var MockDMLExecutionStateMerging func()
// MockDMLExecutionStateBeforeImport is only used for test.
var MockDMLExecutionStateBeforeImport func()

// MockDMLExecutionStateBeforeMerge is only used for test.
var MockDMLExecutionStateBeforeMerge func()

func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error {
if reorgInfo.mergingTmpIdx {
logutil.BgLogger().Info("start to merge temp index", zap.String("category", "ddl"), zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
Expand Down
5 changes: 4 additions & 1 deletion pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ import (
"go.uber.org/zap"
)

// BackendCtx is the backend context for add index reorg task.
// MockDMLExecutionStateBeforeImport is a failpoint to mock the DML execution state before import.
var MockDMLExecutionStateBeforeImport func()

// BackendCtx is the backend context for one add index reorg task.
type BackendCtx interface {
Register(jobID, indexID int64, schemaName, tableName string) (Engine, error)
Unregister(jobID, indexID int64)
Expand Down
6 changes: 6 additions & 0 deletions pkg/ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync/atomic"

"github.com/google/uuid"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand Down Expand Up @@ -144,6 +145,11 @@ func (ei *engineInfo) ImportAndClean() error {
ei.closedEngine = closeEngine
}
if ei.closedEngine != nil {
failpoint.Inject("mockDMLExecutionStateBeforeImport", func(_ failpoint.Value) {
if MockDMLExecutionStateBeforeImport != nil {
MockDMLExecutionStateBeforeImport()
}
})
// Ingest data to TiKV.
logutil.Logger(ei.ctx).Info(LitInfoStartImport, zap.Int64("job ID", ei.jobID),
zap.Int64("index ID", ei.indexID),
Expand Down
25 changes: 22 additions & 3 deletions pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,28 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed

if distinct {
if len(key) > 0 {
err = txn.GetMemBuffer().DeleteWithFlags(key, kv.SetNeedLocked)
if err != nil {
return err
okToDelete := true
if c.idxInfo.BackfillState != model.BackfillStateInapplicable {
// #52914: the delete key is covered by the new ingested key, which shouldn't be deleted.
originVal, err := getKeyInTxn(context.TODO(), txn, key)
if err != nil {
return err
}
if len(originVal) > 0 {
oh, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, c.tblInfo.IsCommonHandle)
if err != nil {
return err
}
if !h.Equal(oh) {
okToDelete = false
}
}
}
if okToDelete {
err = txn.GetMemBuffer().DeleteWithFlags(key, kv.SetNeedLocked)
if err != nil {
return err
}
}
}
if len(tempKey) > 0 {
Expand Down
6 changes: 6 additions & 0 deletions tests/realtikvtest/addindextest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,14 @@ go_test(
embed = [":addindextest"],
deps = [
"//pkg/config",
"//pkg/ddl",
"//pkg/ddl/ingest",
"//pkg/ddl/util/callback",
"//pkg/parser/model",
"//pkg/testkit",
"//tests/realtikvtest",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
56 changes: 56 additions & 0 deletions tests/realtikvtest/addindextest/add_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@ package addindextest
import (
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func init() {
Expand Down Expand Up @@ -161,3 +168,52 @@ func TestAddUKWithSmallIntHandles(t *testing.T) {
tk.MustExec("insert into t values (-9223372036854775808, 1),(-9223372036854775807, 1)")
tk.MustContainErrMsg("alter table t add unique index uk(b)", "Duplicate entry '1' for key 't.uk'")
}

func TestAddUniqueDuplicateIndexes(t *testing.T) {
store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int DEFAULT '-13202', b varchar(221) NOT NULL DEFAULT 'duplicatevalue', " +
"c int NOT NULL DEFAULT '0');")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

d := dom.DDL()
originalCallback := d.GetHook()
defer d.SetHook(originalCallback)
callback := &callback.TestDDLCallback{}

tk1.Exec("INSERT INTO t VALUES (-18585,'duplicatevalue',0);")

onJobUpdatedExportedFunc := func(job *model.Job) {
switch job.SchemaState {
case model.StateDeleteOnly:
_, err := tk1.Exec("delete from t where c = 0;")
assert.NoError(t, err)
_, err = tk1.Exec("insert INTO t VALUES (-18585,'duplicatevalue',1);")
assert.NoError(t, err)
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(callback)

tk3 := testkit.NewTestKit(t, store)
tk3.MustExec("use test")
ingest.MockDMLExecutionStateBeforeImport = func() {
tk3.MustExec("replace INTO t VALUES (-18585,'duplicatevalue',4);")
tk3.MustQuery("select * from t;").Check(testkit.Rows("-18585 duplicatevalue 1", "-18585 duplicatevalue 4"))
}
ddl.MockDMLExecutionStateBeforeMerge = func() {
tk3.MustQuery("select * from t;").Check(testkit.Rows("-18585 duplicatevalue 1", "-18585 duplicatevalue 4"))
tk3.MustExec("replace into t values (-18585,'duplicatevalue',0);")
}

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/mockDMLExecutionStateBeforeImport", "1*return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionStateBeforeMerge", "return(true)"))
tk.MustExec("alter table t add unique index idx(b);")
tk.MustExec("admin check table t;")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ingest/mockDMLExecutionStateBeforeImport"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionStateBeforeMerge"))
}

0 comments on commit 588e3b4

Please sign in to comment.