Skip to content

Commit

Permalink
executor: handle the corner case that temp index is not exist but the…
Browse files Browse the repository at this point in the history
… normal index is exist (#51862)

close #51784
  • Loading branch information
wjhuang2016 authored Mar 22, 2024
1 parent 4586b7b commit fb64325
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 25 deletions.
39 changes: 39 additions & 0 deletions pkg/ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/testutil"
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/domain"
Expand All @@ -49,6 +50,7 @@ import (
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tidb/pkg/util/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -1064,6 +1066,43 @@ func TestMDLTruncateTable(t *testing.T) {
require.True(t, timetk3.After(timeMain))
}

func TestInsertIgnore(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a smallint(6) DEFAULT '-13202', b varchar(221) NOT NULL DEFAULT 'duplicatevalue', " +
"c tinyint(1) NOT NULL DEFAULT '0', PRIMARY KEY (c, b));")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

d := dom.DDL()
originalCallback := d.GetHook()
defer d.SetHook(originalCallback)
callback := &callback.TestDDLCallback{}

onJobUpdatedExportedFunc := func(job *model.Job) {
switch job.SchemaState {
case model.StateDeleteOnly:
_, err := tk1.Exec("INSERT INTO t VALUES (-18585,'aaa',1), (-18585,'0',1), (-18585,'1',1), (-18585,'duplicatevalue',1);")
assert.NoError(t, err)
case model.StateWriteReorganization:
idx := testutil.FindIdxInfo(dom, "test", "t", "idx")
if idx.BackfillState == model.BackfillStateReadyToMerge {
_, err := tk1.Exec("insert ignore into `t` values ( 234,'duplicatevalue',-2028 );")
assert.NoError(t, err)
return
}
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(callback)

tk.MustExec("alter table t add unique index idx(b);")
tk.MustExec("admin check table t;")
}

func TestDDLJobErrEntrySizeTooLarge(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
73 changes: 48 additions & 25 deletions pkg/executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1170,6 +1170,28 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {
return false
}

func (e *InsertValues) handleDuplicateKey(ctx context.Context, txn kv.Transaction, uk *keyValueWithDupInfo, replace bool, r toBeCheckedRow) (bool, error) {
if !replace {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
if txnCtx := e.Ctx().GetSessionVars().TxnCtx; txnCtx.IsPessimistic && e.Ctx().GetSessionVars().LockUnchangedKeys {
txnCtx.AddUnchangedKeyForLock(uk.newKey)
}
return true, nil
}
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle)
if err != nil {
return false, err
}
if handle == nil {
return false, nil
}
_, err = e.removeRow(ctx, txn, handle, r, true)
if err != nil {
return false, err
}
return false, nil
}

// batchCheckAndInsert checks rows with duplicate errors.
// All duplicate rows will be ignored and appended as duplicate warnings.
func (e *InsertValues) batchCheckAndInsert(
Expand Down Expand Up @@ -1214,7 +1236,6 @@ func (e *InsertValues) batchCheckAndInsert(
}

// append warnings and get no duplicated error rows
CheckAndInsert:
for i, r := range toBeCheckedRows {
if r.ignored {
continue
Expand Down Expand Up @@ -1250,42 +1271,44 @@ CheckAndInsert:
}
}

rowInserted := false
for _, uk := range r.uniqueKeys {
_, err := txn.Get(ctx, uk.newKey)
if err != nil && !kv.IsErrNotFound(err) {
return err
}
if err == nil {
if !replace {
// If duplicate keys were found in BatchGet, mark row = nil.
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
if txnCtx := e.Ctx().GetSessionVars().TxnCtx; txnCtx.IsPessimistic &&
e.Ctx().GetSessionVars().LockUnchangedKeys {
// lock duplicated unique key on insert-ignore
txnCtx.AddUnchangedKeyForLock(uk.newKey)
}
continue CheckAndInsert
}
_, handle, err := tables.FetchDuplicatedHandle(
ctx,
uk.newKey,
true,
txn,
e.Table.Meta().ID,
uk.commonHandle,
)
rowInserted, err = e.handleDuplicateKey(ctx, txn, uk, replace, r)
if err != nil {
return err
}
if handle == nil {
continue
if rowInserted {
break
}
_, err = e.removeRow(ctx, txn, handle, r, true)
if err != nil {
continue
}
if tablecodec.IsTempIndexKey(uk.newKey) {
tablecodec.TempIndexKey2IndexKey(uk.newKey)
_, err = txn.Get(ctx, uk.newKey)
if err != nil && !kv.IsErrNotFound(err) {
return err
}
} else if !kv.IsErrNotFound(err) {
return err
if err == nil {
rowInserted, err = e.handleDuplicateKey(ctx, txn, uk, replace, r)
if err != nil {
return err
}
if rowInserted {
break
}
}
}
}

if rowInserted {
continue
}

// If row was checked with no duplicate keys,
// it should be added to values map for the further row check.
// There may be duplicate keys inside the insert statement.
Expand Down

0 comments on commit fb64325

Please sign in to comment.