Skip to content

Commit

Permalink
ddl: refine BackendCtx.Flush to display Duplicate entry error (#54182)
Browse files Browse the repository at this point in the history
close #54184
  • Loading branch information
lance6716 authored Jun 25, 2024
1 parent 01cc027 commit 7211d3d
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 57 deletions.
20 changes: 2 additions & 18 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/pingcap/tidb/pkg/disttask/operator"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/terror"
Expand Down Expand Up @@ -662,7 +661,7 @@ func (w *indexIngestLocalWorker) HandleTask(rs IndexRecordChunk, send func(Index
}()
w.indexIngestBaseWorker.HandleTask(rs, send)
// needs to flush and import to avoid too much use of disk.
_, _, _, err := w.backendCtx.Flush(ingest.FlushModeAuto)
_, _, err := w.backendCtx.Flush(ingest.FlushModeAuto)
if err != nil {
w.ctx.onError(err)
return
Expand Down Expand Up @@ -835,23 +834,8 @@ func (s *indexWriteResultSink) flush() error {
failpoint.Inject("mockFlushError", func(_ failpoint.Value) {
failpoint.Return(errors.New("mock flush error"))
})
// TODO(lance6716): convert to ErrKeyExists inside Flush
_, _, errIdxID, err := s.backendCtx.Flush(ingest.FlushModeForceFlushAndImport)
_, _, err := s.backendCtx.Flush(ingest.FlushModeForceFlushAndImport)
if err != nil {
if common.ErrFoundDuplicateKeys.Equal(err) {
var idxInfo table.Index
for _, idx := range s.indexes {
if idx.Meta().ID == errIdxID {
idxInfo = idx
break
}
}
if idxInfo == nil {
logutil.Logger(s.ctx).Error("index not found", zap.Int64("indexID", errIdxID))
return kv.ErrKeyExists
}
return ingest.TryConvertToKeyExistsErr(err, idxInfo.Meta(), s.tbl.Meta())
}
logutil.Logger(s.ctx).Error("flush error",
zap.String("category", "ddl"), zap.Error(err))
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (r *readIndexExecutor) buildLocalStorePipeline(
indexIDs = append(indexIDs, index.ID)
uniques = append(uniques, index.Unique)
}
engines, err := r.bc.Register(indexIDs, uniques, r.job.TableName)
engines, err := r.bc.Register(indexIDs, uniques, r.ptbl.Meta())
if err != nil {
tidblogutil.Logger(opCtx).Error("cannot register new engine",
zap.Error(err),
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (b *ingestBackfillScheduler) setupWorkers() error {
default:
return errors.Errorf("unexpected argument type, got %T", job.Args[0])
}
engines, err := b.backendCtx.Register(indexIDs, uniques, job.TableName)
engines, err := b.backendCtx.Register(indexIDs, uniques, b.tbl.Meta())
if err != nil {
return errors.Trace(err)
}
Expand Down
70 changes: 44 additions & 26 deletions pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type BackendCtx interface {
// backend context. If the index ID is already registered, it will return the
// associated engines. Only one group of index ID is allowed to register for a
// BackendCtx.
Register(indexIDs []int64, uniques []bool, tableName string) ([]Engine, error)
Register(indexIDs []int64, uniques []bool, tblInfo *model.TableInfo) ([]Engine, error)
UnregisterEngines()
// FinishImport imports all Register-ed engines of into the storage, collects
// the duplicate errors for unique engines.
Expand Down Expand Up @@ -95,6 +95,7 @@ type litBackendCtx struct {
memRoot MemRoot
diskRoot DiskRoot
jobID int64
tblInfo *model.TableInfo
backend *local.Backend
ctx context.Context
cfg *lightning.Config
Expand Down Expand Up @@ -194,29 +195,29 @@ func acquireLock(ctx context.Context, se *concurrency.Session, key string) (*con
}

// Flush implements FlushController.
func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, errIdxID int64, err error) {
func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, err error) {
shouldFlush, shouldImport := bc.checkFlush(mode)
if !shouldFlush {
return false, false, 0, nil
return false, false, nil
}
if !bc.flushing.CompareAndSwap(false, true) {
return false, false, 0, nil
return false, false, nil
}
defer bc.flushing.Store(false)

for indexID, ei := range bc.engines {
for _, ei := range bc.engines {
ei.flushLock.Lock()
//nolint: all_revive,revive
defer ei.flushLock.Unlock()

if err = ei.Flush(); err != nil {
return false, false, indexID, err
return false, false, err
}
}
bc.timeOfLastFlush.Store(time.Now())

if !shouldImport {
return true, false, 0, nil
return true, false, nil
}

// Use distributed lock if run in distributed mode).
Expand All @@ -225,7 +226,7 @@ func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, errIdxID
se, _ := concurrency.NewSession(bc.etcdClient)
mu, err := acquireLock(bc.ctx, se, distLockKey)
if err != nil {
return true, false, 0, errors.Trace(err)
return true, false, errors.Trace(err)
}
logutil.Logger(bc.ctx).Info("acquire distributed flush lock success", zap.Int64("jobID", bc.jobID))
defer func() {
Expand All @@ -249,11 +250,44 @@ func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, errIdxID

for indexID, ei := range bc.engines {
if err = bc.unsafeImportAndReset(ei); err != nil {
return true, false, indexID, err
if common.ErrFoundDuplicateKeys.Equal(err) {
var idxInfo *model.IndexInfo
for _, idx := range bc.tblInfo.Indices {
if idx.ID == indexID {
idxInfo = idx
break
}
}
if idxInfo == nil {
logutil.Logger(bc.ctx).Error(
"index not found",
zap.Int64("indexID", indexID))
err = tikv.ErrKeyExists
} else {
err = TryConvertToKeyExistsErr(err, idxInfo, bc.tblInfo)
}
}
return true, false, err
}
}

var newTS uint64
if mgr := bc.GetCheckpointManager(); mgr != nil {
// for local disk case, we need to refresh TS because duplicate detection
// requires each ingest to have a unique TS.
//
// TODO(lance6716): there's still a chance that data is imported but because of
// checkpoint is low-watermark, the data will still be imported again with
// another TS after failover. Need to refine the checkpoint mechanism.
newTS, err = mgr.refreshTSAndUpdateCP()
if err == nil {
for _, ei := range bc.engines {
ei.openedEngine.SetTS(newTS)
}
}
}

return true, true, 0, nil
return true, true, err
}

func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {
Expand Down Expand Up @@ -299,22 +333,6 @@ func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {
ei.closedEngine = nil
return err
}

if mgr == nil {
return nil
}

// for local disk case, we need to refresh TS because duplicate detection
// requires each ingest to have a unique TS.
//
// TODO(lance6716): there's still a chance that data is imported but because of
// checkpoint is low-watermark, the data will still be imported again with
// another TS after failover. Need to refine the checkpoint mechanism.
newTS, err := mgr.refreshTSAndUpdateCP()
if err != nil {
return errors.Trace(err)
}
ei.openedEngine.SetTS(newTS)
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/ingest/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type taskCheckpoint struct {
type FlushController interface {
// Flush checks if al engines need to be flushed and imported based on given
// FlushMode. It's concurrent safe.
Flush(mode FlushMode) (flushed, imported bool, errIdxID int64, err error)
Flush(mode FlushMode) (flushed, imported bool, err error)
}

// NewCheckpointManager creates a new checkpoint manager.
Expand Down Expand Up @@ -206,7 +206,7 @@ func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int) error {
cp.writtenKeys += delta
s.mu.Unlock()

flushed, imported, _, err := s.flushCtrl.Flush(FlushModeAuto)
flushed, imported, err := s.flushCtrl.Flush(FlushModeAuto)
if !flushed || err != nil {
return err
}
Expand Down Expand Up @@ -261,7 +261,7 @@ func (s *CheckpointManager) Close() {
// Flush flushed the data and updates checkpoint.
func (s *CheckpointManager) Flush() {
// use FlushModeForceFlushNoImport to finish the flush process timely.
_, _, _, err := s.flushCtrl.Flush(FlushModeForceFlushNoImport)
_, _, err := s.flushCtrl.Flush(FlushModeForceFlushNoImport)
if err != nil {
s.logger.Warn("flush local engine failed", zap.Error(err))
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/ingest/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,6 @@ type dummyFlushCtrl struct {
imported bool
}

func (d *dummyFlushCtrl) Flush(_ ingest.FlushMode) (bool, bool, int64, error) {
return true, d.imported, 0, nil
func (d *dummyFlushCtrl) Flush(mode ingest.FlushMode) (bool, bool, error) {
return true, d.imported, nil
}
6 changes: 4 additions & 2 deletions pkg/ddl/ingest/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package ingest
import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

// Register implements BackendCtx.
func (bc *litBackendCtx) Register(indexIDs []int64, uniques []bool, tableName string) ([]Engine, error) {
func (bc *litBackendCtx) Register(indexIDs []int64, uniques []bool, tblInfo *model.TableInfo) ([]Engine, error) {
ret := make([]Engine, 0, len(indexIDs))

for _, indexID := range indexIDs {
Expand Down Expand Up @@ -59,7 +60,7 @@ func (bc *litBackendCtx) Register(indexIDs []int64, uniques []bool, tableName st
openedEngines := make(map[int64]*engineInfo, numIdx)

for i, indexID := range indexIDs {
openedEngine, err := mgr.OpenEngine(bc.ctx, cfg, tableName, int32(indexID))
openedEngine, err := mgr.OpenEngine(bc.ctx, cfg, tblInfo.Name.L, int32(indexID))
if err != nil {
logutil.Logger(bc.ctx).Warn(LitErrCreateEngineFail,
zap.Int64("job ID", bc.jobID),
Expand Down Expand Up @@ -91,6 +92,7 @@ func (bc *litBackendCtx) Register(indexIDs []int64, uniques []bool, tableName st
bc.engines[indexID] = ei
}
bc.memRoot.Consume(numIdx * structSizeEngineInfo)
bc.tblInfo = tblInfo

logutil.Logger(bc.ctx).Info(LitInfoOpenEngine, zap.Int64("job ID", bc.jobID),
zap.Int64s("index IDs", indexIDs),
Expand Down
7 changes: 4 additions & 3 deletions pkg/ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/table"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -106,7 +107,7 @@ type MockBackendCtx struct {
}

// Register implements BackendCtx.Register interface.
func (m *MockBackendCtx) Register(indexIDs []int64, _ []bool, _ string) ([]Engine, error) {
func (m *MockBackendCtx) Register(indexIDs []int64, _ []bool, _ *model.TableInfo) ([]Engine, error) {
logutil.DDLIngestLogger().Info("mock backend ctx register", zap.Int64("jobID", m.jobID), zap.Int64s("indexIDs", indexIDs))
ret := make([]Engine, 0, len(indexIDs))
for range indexIDs {
Expand Down Expand Up @@ -138,8 +139,8 @@ func (*MockBackendCtx) FinishImport(_ table.Table) error {
}

// Flush implements BackendCtx.Flush interface.
func (*MockBackendCtx) Flush(_ FlushMode) (flushed bool, imported bool, errIdxID int64, err error) {
return false, false, 0, nil
func (*MockBackendCtx) Flush(mode FlushMode) (flushed, imported bool, err error) {
return false, false, nil
}

// Done implements BackendCtx.Done interface.
Expand Down
1 change: 1 addition & 0 deletions tests/realtikvtest/addindextest1/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_test(
flaky = True,
deps = [
"//pkg/config",
"//pkg/ddl/ingest",
"//pkg/ddl/util/callback",
"//pkg/disttask/framework/storage",
"//pkg/errno",
Expand Down
25 changes: 25 additions & 0 deletions tests/realtikvtest/addindextest1/disttask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/errno"
Expand Down Expand Up @@ -262,3 +263,27 @@ func TestAddIndexForCurrentTimestampColumn(t *testing.T) {
tk.MustExec("alter table t add index idx(a);")
tk.MustExec("admin check table t;")
}

func TestAddUKErrorMessage(t *testing.T) {
ingest.ForceSyncFlagForTest = true
t.Cleanup(func() {
ingest.ForceSyncFlagForTest = false
})

store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
t.Cleanup(func() {
tk.MustExec("set global tidb_enable_dist_task = off;")
})
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("set global tidb_enable_dist_task = on;")

tk.MustExec("create table t (a int primary key, b int);")
tk.MustExec("insert into t values (5, 1), (10005, 1), (20005, 1), (30005, 1);")
tk.MustExec("split table t between (1) and (100001) regions 10;")
err := tk.ExecToErr("alter table t add unique index uk(b);")
require.ErrorContains(t, err, "Duplicate entry '1' for key 't.uk'")
}
2 changes: 1 addition & 1 deletion tests/realtikvtest/addindextest3/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestBackendCtxConcurrentUnregister(t *testing.T) {
for range idxIDs {
uniques = append(uniques, false)
}
_, err = bCtx.Register([]int64{1, 2, 3, 4, 5, 6, 7}, uniques, "t")
_, err = bCtx.Register([]int64{1, 2, 3, 4, 5, 6, 7}, uniques, &model.TableInfo{})
require.NoError(t, err)

wg := sync.WaitGroup{}
Expand Down

0 comments on commit 7211d3d

Please sign in to comment.