Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: check delete unique key's handle to handle corner case #52975

Merged
merged 19 commits into from
May 16, 2024
8 changes: 8 additions & 0 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,11 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true)
return false, ver, errors.Trace(err)
case model.BackfillStateReadyToMerge:
failpoint.Inject("mockDMLExecutionStateBeforeMerge", func(_ failpoint.Value) {
if MockDMLExecutionStateBeforeMerge != nil {
MockDMLExecutionStateBeforeMerge()
}
})
logutil.DDLLogger().Info("index backfill state ready to merge",
zap.Int64("job ID", job.ID),
zap.String("table", tbl.Meta().Name.O),
Expand Down Expand Up @@ -1924,6 +1929,9 @@ var MockDMLExecutionStateMerging func()
// MockDMLExecutionStateBeforeImport is only used for test.
var MockDMLExecutionStateBeforeImport func()

// MockDMLExecutionStateBeforeMerge is only used for test.
var MockDMLExecutionStateBeforeMerge func()

func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error {
if reorgInfo.mergingTmpIdx {
logutil.DDLLogger().Info("start to merge temp index", zap.Stringer("job", reorgInfo.Job), zap.Stringer("reorgInfo", reorgInfo))
Expand Down
8 changes: 8 additions & 0 deletions pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ import (
"go.uber.org/zap"
)

// MockDMLExecutionStateBeforeImport is a failpoint to mock the DML execution state before import.
var MockDMLExecutionStateBeforeImport func()

// BackendCtx is the backend context for one add index reorg task.
type BackendCtx interface {
// Register create a new engineInfo for each index ID and register it to the
Expand Down Expand Up @@ -237,6 +240,11 @@ func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, errIdxID
}
}()
}
failpoint.Inject("mockDMLExecutionStateBeforeImport", func(_ failpoint.Value) {
if MockDMLExecutionStateBeforeImport != nil {
MockDMLExecutionStateBeforeImport()
}
})

for indexID, ei := range bc.engines {
if err = bc.unsafeImportAndReset(ei); err != nil {
Expand Down
25 changes: 22 additions & 3 deletions pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,28 @@ func (c *index) Delete(ctx table.MutateContext, txn kv.Transaction, indexedValue

if distinct {
if len(key) > 0 {
err = txn.GetMemBuffer().DeleteWithFlags(key, kv.SetNeedLocked)
if err != nil {
return err
okToDelete := true
if c.idxInfo.BackfillState != model.BackfillStateInapplicable {
// #52914: the delete key is covered by the new ingested key, which shouldn't be deleted.
originVal, err := getKeyInTxn(context.TODO(), txn, key)
if err != nil {
return err
}
if len(originVal) > 0 {
oh, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, c.tblInfo.IsCommonHandle)
if err != nil {
return err
}
if !h.Equal(oh) {
okToDelete = false
}
}
}
if okToDelete {
err = txn.GetMemBuffer().DeleteWithFlags(key, kv.SetNeedLocked)
if err != nil {
return err
}
}
}
if len(tempKey) > 0 {
Expand Down
5 changes: 5 additions & 0 deletions tests/realtikvtest/addindextest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ go_test(
],
deps = [
"//pkg/config",
"//pkg/ddl",
"//pkg/ddl/ingest",
"//pkg/ddl/util/callback",
"//pkg/parser/model",
"//pkg/testkit",
"//tests/realtikvtest",
"//tests/realtikvtest/addindextestutil",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
],
Expand Down
54 changes: 54 additions & 0 deletions tests/realtikvtest/addindextest/add_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@ import (
"path/filepath"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/pingcap/tidb/tests/realtikvtest/addindextestutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
)
Expand Down Expand Up @@ -200,3 +205,52 @@ func TestLitBackendCtxMgr(t *testing.T) {
_, ok = mgr.Load(jobID)
require.False(t, ok)
}

func TestAddUniqueDuplicateIndexes(t *testing.T) {
store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int DEFAULT '-13202', b varchar(221) NOT NULL DEFAULT 'duplicatevalue', " +
"c int NOT NULL DEFAULT '0');")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

d := dom.DDL()
originalCallback := d.GetHook()
defer d.SetHook(originalCallback)
callback := &callback.TestDDLCallback{}

tk1.Exec("INSERT INTO t VALUES (-18585,'duplicatevalue',0);")

onJobUpdatedExportedFunc := func(job *model.Job) {
switch job.SchemaState {
case model.StateDeleteOnly:
_, err := tk1.Exec("delete from t where c = 0;")
assert.NoError(t, err)
_, err = tk1.Exec("insert INTO t VALUES (-18585,'duplicatevalue',1);")
assert.NoError(t, err)
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(callback)

tk3 := testkit.NewTestKit(t, store)
tk3.MustExec("use test")
ingest.MockDMLExecutionStateBeforeImport = func() {
tk3.MustExec("replace INTO t VALUES (-18585,'duplicatevalue',4);")
tk3.MustQuery("select * from t;").Check(testkit.Rows("-18585 duplicatevalue 1", "-18585 duplicatevalue 4"))
}
ddl.MockDMLExecutionStateBeforeMerge = func() {
tk3.MustQuery("select * from t;").Check(testkit.Rows("-18585 duplicatevalue 1", "-18585 duplicatevalue 4"))
tk3.MustExec("replace into t values (-18585,'duplicatevalue',0);")
}

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/mockDMLExecutionStateBeforeImport", "1*return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionStateBeforeMerge", "return(true)"))
tk.MustExec("alter table t add unique index idx(b);")
tk.MustExec("admin check table t;")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ingest/mockDMLExecutionStateBeforeImport"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionStateBeforeMerge"))
}