From 7211d3d828d3c944e28cfb2f80f94a8a006a5d59 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 25 Jun 2024 11:37:21 +0800 Subject: [PATCH] ddl: refine `BackendCtx.Flush` to display Duplicate entry error (#54182) close pingcap/tidb#54184 --- pkg/ddl/backfilling_operators.go | 20 +----- pkg/ddl/backfilling_read_index.go | 2 +- pkg/ddl/backfilling_scheduler.go | 2 +- pkg/ddl/ingest/backend.go | 70 ++++++++++++------- pkg/ddl/ingest/checkpoint.go | 6 +- pkg/ddl/ingest/checkpoint_test.go | 4 +- pkg/ddl/ingest/engine_mgr.go | 6 +- pkg/ddl/ingest/mock.go | 7 +- tests/realtikvtest/addindextest1/BUILD.bazel | 1 + .../addindextest1/disttask_test.go | 25 +++++++ .../addindextest3/functional_test.go | 2 +- 11 files changed, 88 insertions(+), 57 deletions(-) diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index e1c75a4ab76c0..68382c46fede7 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -35,7 +35,6 @@ import ( "github.com/pingcap/tidb/pkg/disttask/operator" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/lightning/backend/external" - "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/terror" @@ -662,7 +661,7 @@ func (w *indexIngestLocalWorker) HandleTask(rs IndexRecordChunk, send func(Index }() w.indexIngestBaseWorker.HandleTask(rs, send) // needs to flush and import to avoid too much use of disk. - _, _, _, err := w.backendCtx.Flush(ingest.FlushModeAuto) + _, _, err := w.backendCtx.Flush(ingest.FlushModeAuto) if err != nil { w.ctx.onError(err) return @@ -835,23 +834,8 @@ func (s *indexWriteResultSink) flush() error { failpoint.Inject("mockFlushError", func(_ failpoint.Value) { failpoint.Return(errors.New("mock flush error")) }) - // TODO(lance6716): convert to ErrKeyExists inside Flush - _, _, errIdxID, err := s.backendCtx.Flush(ingest.FlushModeForceFlushAndImport) + _, _, err := s.backendCtx.Flush(ingest.FlushModeForceFlushAndImport) if err != nil { - if common.ErrFoundDuplicateKeys.Equal(err) { - var idxInfo table.Index - for _, idx := range s.indexes { - if idx.Meta().ID == errIdxID { - idxInfo = idx - break - } - } - if idxInfo == nil { - logutil.Logger(s.ctx).Error("index not found", zap.Int64("indexID", errIdxID)) - return kv.ErrKeyExists - } - return ingest.TryConvertToKeyExistsErr(err, idxInfo.Meta(), s.tbl.Meta()) - } logutil.Logger(s.ctx).Error("flush error", zap.String("category", "ddl"), zap.Error(err)) return err diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index c25990ad5784c..db2992c25ef63 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -216,7 +216,7 @@ func (r *readIndexExecutor) buildLocalStorePipeline( indexIDs = append(indexIDs, index.ID) uniques = append(uniques, index.Unique) } - engines, err := r.bc.Register(indexIDs, uniques, r.job.TableName) + engines, err := r.bc.Register(indexIDs, uniques, r.ptbl.Meta()) if err != nil { tidblogutil.Logger(opCtx).Error("cannot register new engine", zap.Error(err), diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index 3601ed16cd6cb..eb16019b28a71 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -440,7 +440,7 @@ func (b *ingestBackfillScheduler) setupWorkers() error { default: return errors.Errorf("unexpected argument type, got %T", job.Args[0]) } - engines, err := b.backendCtx.Register(indexIDs, uniques, job.TableName) + engines, err := b.backendCtx.Register(indexIDs, uniques, b.tbl.Meta()) if err != nil { return errors.Trace(err) } diff --git a/pkg/ddl/ingest/backend.go b/pkg/ddl/ingest/backend.go index 6e5f36fa4b4f5..c5bad57153c51 100644 --- a/pkg/ddl/ingest/backend.go +++ b/pkg/ddl/ingest/backend.go @@ -51,7 +51,7 @@ type BackendCtx interface { // backend context. If the index ID is already registered, it will return the // associated engines. Only one group of index ID is allowed to register for a // BackendCtx. - Register(indexIDs []int64, uniques []bool, tableName string) ([]Engine, error) + Register(indexIDs []int64, uniques []bool, tblInfo *model.TableInfo) ([]Engine, error) UnregisterEngines() // FinishImport imports all Register-ed engines of into the storage, collects // the duplicate errors for unique engines. @@ -95,6 +95,7 @@ type litBackendCtx struct { memRoot MemRoot diskRoot DiskRoot jobID int64 + tblInfo *model.TableInfo backend *local.Backend ctx context.Context cfg *lightning.Config @@ -194,29 +195,29 @@ func acquireLock(ctx context.Context, se *concurrency.Session, key string) (*con } // Flush implements FlushController. -func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, errIdxID int64, err error) { +func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, err error) { shouldFlush, shouldImport := bc.checkFlush(mode) if !shouldFlush { - return false, false, 0, nil + return false, false, nil } if !bc.flushing.CompareAndSwap(false, true) { - return false, false, 0, nil + return false, false, nil } defer bc.flushing.Store(false) - for indexID, ei := range bc.engines { + for _, ei := range bc.engines { ei.flushLock.Lock() //nolint: all_revive,revive defer ei.flushLock.Unlock() if err = ei.Flush(); err != nil { - return false, false, indexID, err + return false, false, err } } bc.timeOfLastFlush.Store(time.Now()) if !shouldImport { - return true, false, 0, nil + return true, false, nil } // Use distributed lock if run in distributed mode). @@ -225,7 +226,7 @@ func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, errIdxID se, _ := concurrency.NewSession(bc.etcdClient) mu, err := acquireLock(bc.ctx, se, distLockKey) if err != nil { - return true, false, 0, errors.Trace(err) + return true, false, errors.Trace(err) } logutil.Logger(bc.ctx).Info("acquire distributed flush lock success", zap.Int64("jobID", bc.jobID)) defer func() { @@ -249,11 +250,44 @@ func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, errIdxID for indexID, ei := range bc.engines { if err = bc.unsafeImportAndReset(ei); err != nil { - return true, false, indexID, err + if common.ErrFoundDuplicateKeys.Equal(err) { + var idxInfo *model.IndexInfo + for _, idx := range bc.tblInfo.Indices { + if idx.ID == indexID { + idxInfo = idx + break + } + } + if idxInfo == nil { + logutil.Logger(bc.ctx).Error( + "index not found", + zap.Int64("indexID", indexID)) + err = tikv.ErrKeyExists + } else { + err = TryConvertToKeyExistsErr(err, idxInfo, bc.tblInfo) + } + } + return true, false, err + } + } + + var newTS uint64 + if mgr := bc.GetCheckpointManager(); mgr != nil { + // for local disk case, we need to refresh TS because duplicate detection + // requires each ingest to have a unique TS. + // + // TODO(lance6716): there's still a chance that data is imported but because of + // checkpoint is low-watermark, the data will still be imported again with + // another TS after failover. Need to refine the checkpoint mechanism. + newTS, err = mgr.refreshTSAndUpdateCP() + if err == nil { + for _, ei := range bc.engines { + ei.openedEngine.SetTS(newTS) + } } } - return true, true, 0, nil + return true, true, err } func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error { @@ -299,22 +333,6 @@ func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error { ei.closedEngine = nil return err } - - if mgr == nil { - return nil - } - - // for local disk case, we need to refresh TS because duplicate detection - // requires each ingest to have a unique TS. - // - // TODO(lance6716): there's still a chance that data is imported but because of - // checkpoint is low-watermark, the data will still be imported again with - // another TS after failover. Need to refine the checkpoint mechanism. - newTS, err := mgr.refreshTSAndUpdateCP() - if err != nil { - return errors.Trace(err) - } - ei.openedEngine.SetTS(newTS) return nil } diff --git a/pkg/ddl/ingest/checkpoint.go b/pkg/ddl/ingest/checkpoint.go index 49fe71f2dfa6e..ff40c283aabe0 100644 --- a/pkg/ddl/ingest/checkpoint.go +++ b/pkg/ddl/ingest/checkpoint.go @@ -97,7 +97,7 @@ type taskCheckpoint struct { type FlushController interface { // Flush checks if al engines need to be flushed and imported based on given // FlushMode. It's concurrent safe. - Flush(mode FlushMode) (flushed, imported bool, errIdxID int64, err error) + Flush(mode FlushMode) (flushed, imported bool, err error) } // NewCheckpointManager creates a new checkpoint manager. @@ -206,7 +206,7 @@ func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int) error { cp.writtenKeys += delta s.mu.Unlock() - flushed, imported, _, err := s.flushCtrl.Flush(FlushModeAuto) + flushed, imported, err := s.flushCtrl.Flush(FlushModeAuto) if !flushed || err != nil { return err } @@ -261,7 +261,7 @@ func (s *CheckpointManager) Close() { // Flush flushed the data and updates checkpoint. func (s *CheckpointManager) Flush() { // use FlushModeForceFlushNoImport to finish the flush process timely. - _, _, _, err := s.flushCtrl.Flush(FlushModeForceFlushNoImport) + _, _, err := s.flushCtrl.Flush(FlushModeForceFlushNoImport) if err != nil { s.logger.Warn("flush local engine failed", zap.Error(err)) } diff --git a/pkg/ddl/ingest/checkpoint_test.go b/pkg/ddl/ingest/checkpoint_test.go index fa4f4059ff16b..805a276dc0ac4 100644 --- a/pkg/ddl/ingest/checkpoint_test.go +++ b/pkg/ddl/ingest/checkpoint_test.go @@ -198,6 +198,6 @@ type dummyFlushCtrl struct { imported bool } -func (d *dummyFlushCtrl) Flush(_ ingest.FlushMode) (bool, bool, int64, error) { - return true, d.imported, 0, nil +func (d *dummyFlushCtrl) Flush(mode ingest.FlushMode) (bool, bool, error) { + return true, d.imported, nil } diff --git a/pkg/ddl/ingest/engine_mgr.go b/pkg/ddl/ingest/engine_mgr.go index a9c3fd9081a02..0da62d4f78a46 100644 --- a/pkg/ddl/ingest/engine_mgr.go +++ b/pkg/ddl/ingest/engine_mgr.go @@ -17,12 +17,13 @@ package ingest import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/lightning/backend" + "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" ) // Register implements BackendCtx. -func (bc *litBackendCtx) Register(indexIDs []int64, uniques []bool, tableName string) ([]Engine, error) { +func (bc *litBackendCtx) Register(indexIDs []int64, uniques []bool, tblInfo *model.TableInfo) ([]Engine, error) { ret := make([]Engine, 0, len(indexIDs)) for _, indexID := range indexIDs { @@ -59,7 +60,7 @@ func (bc *litBackendCtx) Register(indexIDs []int64, uniques []bool, tableName st openedEngines := make(map[int64]*engineInfo, numIdx) for i, indexID := range indexIDs { - openedEngine, err := mgr.OpenEngine(bc.ctx, cfg, tableName, int32(indexID)) + openedEngine, err := mgr.OpenEngine(bc.ctx, cfg, tblInfo.Name.L, int32(indexID)) if err != nil { logutil.Logger(bc.ctx).Warn(LitErrCreateEngineFail, zap.Int64("job ID", bc.jobID), @@ -91,6 +92,7 @@ func (bc *litBackendCtx) Register(indexIDs []int64, uniques []bool, tableName st bc.engines[indexID] = ei } bc.memRoot.Consume(numIdx * structSizeEngineInfo) + bc.tblInfo = tblInfo logutil.Logger(bc.ctx).Info(LitInfoOpenEngine, zap.Int64("job ID", bc.jobID), zap.Int64s("index IDs", indexIDs), diff --git a/pkg/ddl/ingest/mock.go b/pkg/ddl/ingest/mock.go index 4c60ceafa088e..6a127dfd4f26d 100644 --- a/pkg/ddl/ingest/mock.go +++ b/pkg/ddl/ingest/mock.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/lightning/backend" "github.com/pingcap/tidb/pkg/lightning/backend/local" + "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/table" pd "github.com/tikv/pd/client" @@ -106,7 +107,7 @@ type MockBackendCtx struct { } // Register implements BackendCtx.Register interface. -func (m *MockBackendCtx) Register(indexIDs []int64, _ []bool, _ string) ([]Engine, error) { +func (m *MockBackendCtx) Register(indexIDs []int64, _ []bool, _ *model.TableInfo) ([]Engine, error) { logutil.DDLIngestLogger().Info("mock backend ctx register", zap.Int64("jobID", m.jobID), zap.Int64s("indexIDs", indexIDs)) ret := make([]Engine, 0, len(indexIDs)) for range indexIDs { @@ -138,8 +139,8 @@ func (*MockBackendCtx) FinishImport(_ table.Table) error { } // Flush implements BackendCtx.Flush interface. -func (*MockBackendCtx) Flush(_ FlushMode) (flushed bool, imported bool, errIdxID int64, err error) { - return false, false, 0, nil +func (*MockBackendCtx) Flush(mode FlushMode) (flushed, imported bool, err error) { + return false, false, nil } // Done implements BackendCtx.Done interface. diff --git a/tests/realtikvtest/addindextest1/BUILD.bazel b/tests/realtikvtest/addindextest1/BUILD.bazel index 6908b78ab9ddc..0c3baac3f93ec 100644 --- a/tests/realtikvtest/addindextest1/BUILD.bazel +++ b/tests/realtikvtest/addindextest1/BUILD.bazel @@ -10,6 +10,7 @@ go_test( flaky = True, deps = [ "//pkg/config", + "//pkg/ddl/ingest", "//pkg/ddl/util/callback", "//pkg/disttask/framework/storage", "//pkg/errno", diff --git a/tests/realtikvtest/addindextest1/disttask_test.go b/tests/realtikvtest/addindextest1/disttask_test.go index 70914eee446d1..601867e9d44f7 100644 --- a/tests/realtikvtest/addindextest1/disttask_test.go +++ b/tests/realtikvtest/addindextest1/disttask_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/ddl/ingest" "github.com/pingcap/tidb/pkg/ddl/util/callback" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/errno" @@ -262,3 +263,27 @@ func TestAddIndexForCurrentTimestampColumn(t *testing.T) { tk.MustExec("alter table t add index idx(a);") tk.MustExec("admin check table t;") } + +func TestAddUKErrorMessage(t *testing.T) { + ingest.ForceSyncFlagForTest = true + t.Cleanup(func() { + ingest.ForceSyncFlagForTest = false + }) + + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + t.Cleanup(func() { + tk.MustExec("set global tidb_enable_dist_task = off;") + }) + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec("set global tidb_enable_dist_task = on;") + + tk.MustExec("create table t (a int primary key, b int);") + tk.MustExec("insert into t values (5, 1), (10005, 1), (20005, 1), (30005, 1);") + tk.MustExec("split table t between (1) and (100001) regions 10;") + err := tk.ExecToErr("alter table t add unique index uk(b);") + require.ErrorContains(t, err, "Duplicate entry '1' for key 't.uk'") +} diff --git a/tests/realtikvtest/addindextest3/functional_test.go b/tests/realtikvtest/addindextest3/functional_test.go index fda8df2197fca..92ce602c037c9 100644 --- a/tests/realtikvtest/addindextest3/functional_test.go +++ b/tests/realtikvtest/addindextest3/functional_test.go @@ -89,7 +89,7 @@ func TestBackendCtxConcurrentUnregister(t *testing.T) { for range idxIDs { uniques = append(uniques, false) } - _, err = bCtx.Register([]int64{1, 2, 3, 4, 5, 6, 7}, uniques, "t") + _, err = bCtx.Register([]int64{1, 2, 3, 4, 5, 6, 7}, uniques, &model.TableInfo{}) require.NoError(t, err) wg := sync.WaitGroup{}