diff --git a/br/pkg/lightning/backend/local/duplicate.go b/br/pkg/lightning/backend/local/duplicate.go index 64601e24a9af2..4fe01f940c6e6 100644 --- a/br/pkg/lightning/backend/local/duplicate.go +++ b/br/pkg/lightning/backend/local/duplicate.go @@ -17,6 +17,7 @@ package local import ( "bytes" "context" + "encoding/json" "fmt" "io" "math" @@ -1007,6 +1008,7 @@ func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table logger.Warn("skipping resolution due to selected algorithm. this table will become inconsistent!", zap.String("category", "resolve-dupe"), zap.Stringer("algorithm", algorithm)) return nil case config.DupeResAlgRemove: + case config.DupeResAlgReplace: default: panic(fmt.Sprintf("[resolve-dupe] unknown resolution algorithm %v", algorithm)) } @@ -1016,7 +1018,7 @@ func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table SQLMode: mysql.ModeStrictAllTables, }, log.FromContext(ctx)) if err != nil { - return err + return errors.Trace(err) } tableIDs := physicalTableIDs(tbl.Meta()) @@ -1026,30 +1028,104 @@ func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table errLimiter := rate.NewLimiter(1, 1) pool := utils.NewWorkerPool(uint(local.dupeConcurrency), "resolve duplicate rows") - err = local.errorMgr.ResolveAllConflictKeys( - ctx, tableName, pool, - func(ctx context.Context, handleRows [][2][]byte) error { - for { - err := local.deleteDuplicateRows(ctx, logger, handleRows, decoder, keyInTable) - if err == nil { - return nil - } - if types.ErrBadNumber.Equal(err) { - logger.Warn("delete duplicate rows encounter error", log.ShortError(err)) - return common.ErrResolveDuplicateRows.Wrap(err).GenWithStackByArgs(tableName) - } - if log.IsContextCanceledError(err) { - return err + + tblInfo, err := json.Marshal(tbl.Meta()) + if err != nil { + return errors.Trace(err) + } + logger.Debug("got tblInfo from tbl", + zap.ByteString("tblInfo", tblInfo)) + + switch algorithm { + case config.DupeResAlgRemove: + err = local.errorMgr.RemoveAllConflictKeys( + ctx, tableName, pool, + func(ctx context.Context, handleRows [][2][]byte) error { + for { + err := local.deleteDuplicateRows(ctx, logger, handleRows, decoder, keyInTable) + if err == nil { + return nil + } + if types.ErrBadNumber.Equal(err) { + logger.Warn("delete duplicate rows encounter error", log.ShortError(err)) + return common.ErrResolveDuplicateRows.Wrap(errors.Trace(err)).GenWithStackByArgs(tableName) + } + if log.IsContextCanceledError(err) { + return errors.Trace(err) + } + if !tikverror.IsErrWriteConflict(errors.Cause(err)) { + logger.Warn("delete duplicate rows encounter error", log.ShortError(errors.Trace(err))) + } + if err = errLimiter.Wait(ctx); err != nil { + return errors.Trace(err) + } } - if !tikverror.IsErrWriteConflict(errors.Cause(err)) { - logger.Warn("delete duplicate rows encounter error", log.ShortError(err)) + }, + ) + case config.DupeResAlgReplace: + err = local.errorMgr.ReplaceConflictKeys( + ctx, tbl, tableName, pool, + func(ctx context.Context, key []byte) ([]byte, error) { + value, err := local.getLatestValue(ctx, logger, key) + if err != nil { + return nil, errors.Trace(err) } - if err = errLimiter.Wait(ctx); err != nil { - return err + return value, nil + }, + func(ctx context.Context, key []byte) error { + err := local.deleteDuplicateRow(ctx, logger, key) + if err != nil { + logger.Debug("delete duplicate rows encounter error", zap.Error(err)) + return errors.Trace(err) } + return nil + }, + ) + } + + return errors.Trace(err) +} + +func (local *DupeController) getLatestValue( + ctx context.Context, + logger *log.Task, + key []byte, +) ([]byte, error) { + snapshot := local.tikvCli.GetSnapshot(math.MaxUint64) + value, err := snapshot.Get(ctx, key) + logger.Debug("getLatestValue", + logutil.Key("key", key), + zap.Binary("value", value), + zap.Error(err)) + if err != nil { + return nil, errors.Trace(err) + } + return value, nil +} + +func (local *DupeController) deleteDuplicateRow( + ctx context.Context, + logger *log.Task, + key []byte, +) (err error) { + // Starts a Delete transaction. + txn, err := local.tikvCli.Begin() + if err != nil { + return errors.Trace(err) + } + defer func() { + if err == nil { + err = txn.Commit(ctx) + } else { + if rollbackErr := txn.Rollback(); rollbackErr != nil { + logger.Warn("failed to rollback transaction", zap.Error(rollbackErr)) } - }, - ) + } + }() + + logger.Debug("will delete key", zap.String("category", "resolve-dupe"), logutil.Key("key", key)) + err = txn.Delete(key) + return errors.Trace(err) } @@ -1063,7 +1139,7 @@ func (local *DupeController) deleteDuplicateRows( // Starts a Delete transaction. txn, err := local.tikvCli.Begin() if err != nil { - return err + return errors.Trace(err) } defer func() { if err == nil { @@ -1095,17 +1171,17 @@ func (local *DupeController) deleteDuplicateRows( logutil.Key("row", handleRow[1])) if err := deleteKey(handleRow[0]); err != nil { - return err + return errors.Trace(err) } handle, err := decoder.DecodeHandleFromRowKey(handleRow[0]) if err != nil { - return err + return errors.Trace(err) } err = decoder.IterRawIndexKeys(handle, handleRow[1], deleteKey) if err != nil { - return err + return errors.Trace(err) } } diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 0fc5ae66e28ef..d293a41f7916d 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -595,6 +595,12 @@ const ( // duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1 table to add back the correct rows. DupeResAlgRemove + // DupeResAlgReplace records all duplicate records like the 'record' algorithm, and remove some rows with conflict + // and reserve other rows that can be kept and not cause conflict anymore. Users need to analyze the + // lightning_task_info.conflict_error_v1 table to check whether the reserved data cater to their need and check whether + // they need to add back the correct rows. + DupeResAlgReplace + // DupeResAlgErr reports an error and stops the import process. // Note: this value is only used for internal. DupeResAlgErr @@ -622,6 +628,8 @@ func (dra *DuplicateResolutionAlgorithm) FromStringValue(s string) error { *dra = DupeResAlgNone case "remove": *dra = DupeResAlgRemove + case "replace": + *dra = DupeResAlgReplace default: return errors.Errorf("invalid duplicate-resolution '%s', please choose valid option between ['record', 'none', 'remove']", s) } @@ -647,6 +655,8 @@ func (dra DuplicateResolutionAlgorithm) String() string { return "none" case DupeResAlgRemove: return "remove" + case DupeResAlgReplace: + return "replace" default: panic(fmt.Sprintf("invalid duplicate-resolution type '%d'", dra)) } diff --git a/br/pkg/lightning/config/config_test.go b/br/pkg/lightning/config/config_test.go index 5094203c2cbac..19ef66b8aeed9 100644 --- a/br/pkg/lightning/config/config_test.go +++ b/br/pkg/lightning/config/config_test.go @@ -743,10 +743,13 @@ func TestDuplicateResolutionAlgorithm(t *testing.T) { require.Equal(t, DupeResAlgNone, dra) require.NoError(t, dra.FromStringValue("remove")) require.Equal(t, DupeResAlgRemove, dra) + require.NoError(t, dra.FromStringValue("replace")) + require.Equal(t, DupeResAlgReplace, dra) require.Equal(t, "record", DupeResAlgRecord.String()) require.Equal(t, "none", DupeResAlgNone.String()) require.Equal(t, "remove", DupeResAlgRemove.String()) + require.Equal(t, "replace", DupeResAlgReplace.String()) } func TestLoadConfig(t *testing.T) { diff --git a/br/pkg/lightning/errormanager/BUILD.bazel b/br/pkg/lightning/errormanager/BUILD.bazel index 80f8147d7c2c1..4ccac3cff1f00 100644 --- a/br/pkg/lightning/errormanager/BUILD.bazel +++ b/br/pkg/lightning/errormanager/BUILD.bazel @@ -6,14 +6,22 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/lightning/errormanager", visibility = ["//visibility:public"], deps = [ + "//br/pkg/lightning/backend/encode", + "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", "//br/pkg/lightning/config", "//br/pkg/lightning/log", + "//br/pkg/logutil", "//br/pkg/redact", "//br/pkg/utils", + "//parser/mysql", + "//table", + "//table/tables", + "//tablecodec", "@com_github_jedib0t_go_pretty_v6//table", "@com_github_jedib0t_go_pretty_v6//text", "@com_github_pingcap_errors//:errors", + "@com_github_tikv_client_go_v2//error", "@org_golang_x_sync//errgroup", "@org_uber_go_atomic//:atomic", "@org_uber_go_multierr//:multierr", @@ -27,11 +35,16 @@ go_test( srcs = ["errormanager_test.go"], embed = [":errormanager"], flaky = True, - shard_count = 4, + shard_count = 5, deps = [ + "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/config", "//br/pkg/lightning/log", "//br/pkg/utils", + "//parser/model", + "//parser/mysql", + "//table/tables", + "//types", "@com_github_data_dog_go_sqlmock//:go-sqlmock", "@com_github_stretchr_testify//require", "@org_uber_go_atomic//:atomic", diff --git a/br/pkg/lightning/errormanager/errormanager.go b/br/pkg/lightning/errormanager/errormanager.go index d8a25ebe40d6d..269514977f892 100644 --- a/br/pkg/lightning/errormanager/errormanager.go +++ b/br/pkg/lightning/errormanager/errormanager.go @@ -15,6 +15,7 @@ package errormanager import ( + "bytes" "context" "database/sql" "fmt" @@ -25,11 +26,19 @@ import ( "github.com/jedib0t/go-pretty/v6/table" "github.com/jedib0t/go-pretty/v6/text" "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/redact" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/parser/mysql" + tidbtbl "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/tablecodec" + tikverr "github.com/tikv/client-go/v2/error" "go.uber.org/atomic" "go.uber.org/multierr" "go.uber.org/zap" @@ -125,13 +134,20 @@ const ( sqlValuesConflictErrorIndex = "(?,?,?,?,?,?,?,?,?)" - selectConflictKeys = ` + selectConflictKeysRemove = ` SELECT _tidb_rowid, raw_handle, raw_row FROM %s.` + ConflictErrorTableName + ` WHERE table_name = ? AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?; ` + selectIndexConflictKeysReplace = ` + SELECT raw_key, index_name, raw_value, raw_handle + FROM %s.` + ConflictErrorTableName + ` + WHERE table_name = ? AND index_name <> 'PRIMARY' + ORDER BY raw_key; + ` + insertIntoDupRecord = ` INSERT INTO %s.` + DupRecordTable + ` (task_id, table_name, path, offset, error, row_id, row_data) @@ -424,9 +440,9 @@ func (em *ErrorManager) RecordIndexConflictError( return gerr } -// ResolveAllConflictKeys query all conflicting rows (handle and their -// values) from the current error report and resolve them concurrently. -func (em *ErrorManager) ResolveAllConflictKeys( +// RemoveAllConflictKeys query all conflicting rows (handle and their +// values) from the current error report and resolve them concurrently by removing all of them. +func (em *ErrorManager) RemoveAllConflictKeys( ctx context.Context, tableName string, pool *utils.WorkerPool, @@ -458,7 +474,7 @@ func (em *ErrorManager) ResolveAllConflictKeys( var handleRows [][2][]byte for start < end { rows, err := em.db.QueryContext( - gCtx, fmt.Sprintf(selectConflictKeys, em.schemaEscaped), + gCtx, fmt.Sprintf(selectConflictKeysRemove, em.schemaEscaped), tableName, start, end, rowLimit) if err != nil { return errors.Trace(err) @@ -504,8 +520,142 @@ func (em *ErrorManager) ResolveAllConflictKeys( return errors.Trace(g.Wait()) } +// ReplaceConflictKeys query all conflicting rows (handle and their +// values) from the current error report and resolve them +// by replacing the necessary rows and reserving the others. +func (em *ErrorManager) ReplaceConflictKeys( + ctx context.Context, + tbl tidbtbl.Table, + tableName string, + pool *utils.WorkerPool, + fnGetLatest func(ctx context.Context, key []byte) ([]byte, error), + fnDeleteKey func(ctx context.Context, key []byte) error, +) error { + if em.db == nil { + return nil + } + + sessionOpts := encode.SessionOptions{ + // TODO: need to find the correct value for SQLMode + SQLMode: mysql.ModeStrictAllTables, + } + encoder, err := kv.NewBaseKVEncoder(&encode.EncodingConfig{ + Table: tbl, + SessionOptions: sessionOpts, + Logger: em.logger, + }) + if err != nil { + return errors.Trace(err) + } + + g, gCtx := errgroup.WithContext(ctx) + pool.ApplyOnErrorGroup(g, func() error { + // TODO: provide a detailed document to explain the algorithm and link it here + // demo for "replace" algorithm: https://github.com/lyzx2001/tidb-conflict-replace + // check index KV first + rawKeyRows, err := em.db.QueryContext( + gCtx, fmt.Sprintf(selectIndexConflictKeysReplace, em.schemaEscaped), + tableName) + if err != nil { + return errors.Trace(err) + } + defer rawKeyRows.Close() + for rawKeyRows.Next() { + var rawKey, rawValue, rawHandle []byte + var indexName string + if err := rawKeyRows.Scan(&rawKey, &indexName, &rawValue, &rawHandle); err != nil { + return errors.Trace(err) + } + em.logger.Debug("got raw_key, index_name, raw_value, raw_handle from table", + zap.Binary("raw_key", rawKey), + zap.String("index_name", indexName), + zap.Binary("raw_value", rawValue), + zap.Binary("raw_handle", rawHandle)) + + // get the latest value of rawKey from downstream TiDB + value, err := fnGetLatest(gCtx, rawKey) + if err != nil { + return errors.Trace(err) + } + + // if the latest value of rawKey equals to rawValue, that means this index KV is maintained in downstream TiDB + // if not, that means this index KV has been overwritten, and its corresponding data KV needs to be deleted + if bytes.Equal(rawValue, value) { + continue + } + + // rawHandle is the row key of the data KV that needs to be deleted + // get the latest value of the row key of the data KV that needs to be deleted + overwritten, err := fnGetLatest(gCtx, rawHandle) + // if the latest value cannot be found, that means the data KV has been deleted + if tikverr.IsErrNotFound(err) || overwritten == nil { + continue + } + if err != nil { + return errors.Trace(err) + } + + overwrittenHandle, err := tablecodec.DecodeRowKey(rawHandle) + if err != nil { + return errors.Trace(err) + } + decodedData, _, err := tables.DecodeRawRowData(encoder.SessionCtx, + tbl.Meta(), overwrittenHandle, tbl.Cols(), overwritten) + if err != nil { + return errors.Trace(err) + } + _, err = encoder.Table.AddRecord(encoder.SessionCtx, decodedData) + if err != nil { + return errors.Trace(err) + } + // find out all the KV pairs that are contained in the data KV + kvPairs := encoder.SessionCtx.TakeKvPairs() + for _, kvPair := range kvPairs.Pairs { + em.logger.Debug("got encoded KV", + logutil.Key("key", kvPair.Key), + zap.Binary("value", kvPair.Val), + logutil.Key("rawKey", rawKey), + zap.Binary("rawValue", rawValue)) + + // If rawKey equals to KV pair's key and rawValue equals to KV pair's value, + // this latest data KV of the index KV needs to be deleted; + // if not, this latest data KV of the index KV was inserted by other rows, + // so it is unrelated to the index KV that needs to be deleted, we cannot delete it. + + // An example is: + // (pk, uk) + // (1, a) + // (1, b) + // (2, a) + + // (1, a) is overwritten by (2, a). We found a->1 is an overwritten index KV, + // and we are considering if its data KV with key "1" can be deleted. + // We got the latest value of key "1" which is (1, b), + // and encode it to get all KV pairs which is [1->b, b->1]. + // Only if there is a->1 we dare to delete data KV with key "1". + + if bytes.Equal(kvPair.Key, rawKey) && bytes.Equal(kvPair.Val, rawValue) { + if err := fnDeleteKey(gCtx, rawHandle); err != nil { + return errors.Trace(err) + } + break + } + } + } + if err := rawKeyRows.Err(); err != nil { + return errors.Trace(err) + } + + // TODO: check data KV + + return nil + }) + + return errors.Trace(g.Wait()) +} + // RecordDuplicateCount reduce the counter of "duplicate entry" errors. -// Currently the count will not be shared for multiple lightning instances. +// Currently, the count will not be shared for multiple lightning instances. func (em *ErrorManager) RecordDuplicateCount(cnt int64) error { if em.conflictErrRemain.Sub(cnt) < 0 { threshold := em.configConflict.Threshold @@ -517,7 +667,7 @@ func (em *ErrorManager) RecordDuplicateCount(cnt int64) error { } // RecordDuplicate records a "duplicate entry" error so user can query them later. -// Currently the error will not be shared for multiple lightning instances. +// Currently, the error will not be shared for multiple lightning instances. func (em *ErrorManager) RecordDuplicate( ctx context.Context, logger log.Logger, diff --git a/br/pkg/lightning/errormanager/errormanager_test.go b/br/pkg/lightning/errormanager/errormanager_test.go index 2e5c913b71f40..f05c465d2ec21 100644 --- a/br/pkg/lightning/errormanager/errormanager_test.go +++ b/br/pkg/lightning/errormanager/errormanager_test.go @@ -15,18 +15,26 @@ package errormanager import ( + "bytes" "context" "database/sql" "database/sql/driver" + "encoding/base64" + "fmt" "io" "math/rand" "strconv" "testing" "github.com/DATA-DOG/go-sqlmock" + tidbkv "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/types" "github.com/stretchr/testify/require" "go.uber.org/atomic" ) @@ -149,7 +157,7 @@ func (c mockConn) QueryContext(_ context.Context, query string, args []driver.Na return &mockRows{start: start, end: end}, nil } -func TestResolveAllConflictKeys(t *testing.T) { +func TestRemoveAllConflictKeys(t *testing.T) { const totalRows = int64(1 << 18) driverName := "errmgr-mock-" + strconv.Itoa(rand.Int()) sql.Register(driverName, mockDriver{totalRows: totalRows}) @@ -166,8 +174,8 @@ func TestResolveAllConflictKeys(t *testing.T) { require.NoError(t, err) resolved := atomic.NewInt64(0) - pool := utils.NewWorkerPool(16, "resolve duplicate rows") - err = em.ResolveAllConflictKeys( + pool := utils.NewWorkerPool(16, "resolve duplicate rows by remove") + err = em.RemoveAllConflictKeys( ctx, "test", pool, func(ctx context.Context, handleRows [][2][]byte) error { resolved.Add(int64(len(handleRows))) @@ -178,6 +186,142 @@ func TestResolveAllConflictKeys(t *testing.T) { require.Equal(t, totalRows, resolved.Load()) } +func TestReplaceConflictKeysIndexKvChecking(t *testing.T) { + column1 := &model.ColumnInfo{ + ID: 1, + Name: model.NewCIStr("a"), + Offset: 0, + DefaultValue: 0, + FieldType: *types.NewFieldType(mysql.TypeLong), + Hidden: true, + State: model.StatePublic, + } + column1.AddFlag(mysql.PriKeyFlag) + + column2 := &model.ColumnInfo{ + ID: 2, + Name: model.NewCIStr("b"), + Offset: 1, + DefaultValue: 0, + FieldType: *types.NewFieldType(mysql.TypeLong), + Hidden: true, + State: model.StatePublic, + } + column2.AddFlag(mysql.UniqueKeyFlag) + + column3 := &model.ColumnInfo{ + ID: 3, + Name: model.NewCIStr("c"), + Offset: 2, + DefaultValue: 0, + FieldType: *types.NewFieldType(mysql.TypeBlob), + Hidden: true, + State: model.StatePublic, + } + + index := &model.IndexInfo{ + ID: 1, + Name: model.NewCIStr("uni_b"), + Table: model.NewCIStr(""), + Columns: []*model.IndexColumn{ + { + Name: model.NewCIStr("b"), + Offset: 1, + Length: -1, + }}, + Unique: true, + Primary: false, + State: model.StatePublic, + } + + table := &model.TableInfo{ + ID: 75, + Name: model.NewCIStr("a"), + Charset: "utf8mb4", + Collate: "utf8mb4_bin", + Columns: []*model.ColumnInfo{column1, column2, column3}, + Indices: []*model.IndexInfo{index}, + PKIsHandle: true, + State: model.StatePublic, + } + + tbl, err := tables.TableFromMeta(tidbkv.NewPanickingAllocators(0), table) + require.NoError(t, err) + + db, mockDB, err := sqlmock.New() + require.NoError(t, err) + defer func() { + _ = db.Close() + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockDB.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_task_info`"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v1.*"). + WillReturnResult(sqlmock.NewResult(2, 1)) + rawKeyBase64 := "dIAAAAAAAABLX2mAAAAAAAAAAQOAAAAAAAAABg==" + rawKey, err := base64.StdEncoding.DecodeString(rawKeyBase64) + require.NoError(t, err) + rawValue1Base64 := "AAAAAAAAAAE=" + rawValue1, err := base64.StdEncoding.DecodeString(rawValue1Base64) + require.NoError(t, err) + rawValue2Base64 := "AAAAAAAAAAI=" + rawValue2, err := base64.StdEncoding.DecodeString(rawValue2Base64) + require.NoError(t, err) + rawHandle1Base64 := "dIAAAAAAAABLX3KAAAAAAAAAAQ==" + rawHandle1, err := base64.StdEncoding.DecodeString(rawHandle1Base64) + require.NoError(t, err) + rawHandle2Base64 := "dIAAAAAAAABLX3KAAAAAAAAAAg==" + rawHandle2, err := base64.StdEncoding.DecodeString(rawHandle2Base64) + require.NoError(t, err) + mockDB.ExpectQuery("\\QSELECT raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v1 WHERE table_name = ? AND index_name <> 'PRIMARY' ORDER BY raw_key\\E"). + WillReturnRows(sqlmock.NewRows([]string{"raw_key", "index_name", "raw_value", "raw_handle"}). + AddRow(rawKey, "uni_b", rawValue1, rawHandle1). + AddRow(rawKey, "uni_b", rawValue2, rawHandle2)) + rawRowBase64 := "gAACAAAAAgMBAAYABjIuY3N2" + rawRow, err := base64.StdEncoding.DecodeString(rawRowBase64) + require.NoError(t, err) + + cfg := config.NewConfig() + cfg.TikvImporter.DuplicateResolution = config.DupeResAlgReplace + cfg.App.TaskInfoSchemaName = "lightning_task_info" + em := New(db, cfg, log.L()) + err = em.Init(ctx) + require.NoError(t, err) + + fnGetLatestCount := atomic.NewInt32(0) + fnDeleteKeyCount := atomic.NewInt32(0) + pool := utils.NewWorkerPool(16, "resolve duplicate rows by replace") + err = em.ReplaceConflictKeys( + ctx, tbl, "test", pool, + func(ctx context.Context, key []byte) ([]byte, error) { + fnGetLatestCount.Add(1) + switch { + case bytes.Equal(key, rawKey): + return rawValue1, nil + case bytes.Equal(key, rawHandle2): + return rawRow, nil + default: + return nil, fmt.Errorf("key %v is not expected", key) + } + }, + func(ctx context.Context, key []byte) error { + fnDeleteKeyCount.Add(1) + if !bytes.Equal(key, rawHandle2) { + return fmt.Errorf("key %v is not expected", key) + } + return nil + }, + ) + require.NoError(t, err) + require.Equal(t, int32(3), fnGetLatestCount.Load()) + require.Equal(t, int32(1), fnDeleteKeyCount.Load()) + err = mockDB.ExpectationsWereMet() + require.NoError(t, err) +} + func TestErrorMgrHasError(t *testing.T) { cfg := &config.Config{} cfg.App.MaxError = config.MaxError{ diff --git a/br/pkg/lightning/importer/table_import.go b/br/pkg/lightning/importer/table_import.go index 2229f37579963..ae4b5d7f87857 100644 --- a/br/pkg/lightning/importer/table_import.go +++ b/br/pkg/lightning/importer/table_import.go @@ -951,7 +951,7 @@ func (tr *TableImporter) postProcess( rc.alterTableLock.Unlock() saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusAlteredAutoInc) if err = firstErr(err, saveCpErr); err != nil { - return false, err + return false, errors.Trace(err) } cp.Status = checkpoints.CheckpointStatusAlteredAutoInc } @@ -997,7 +997,7 @@ func (tr *TableImporter) postProcess( hasLocalDupe, err := dupeController.CollectLocalDuplicateRows(ctx, tr.encTable, tr.tableName, opts) if err != nil { tr.logger.Error("collect local duplicate keys failed", log.ShortError(err)) - return false, err + return false, errors.Trace(err) } hasDupe = hasLocalDupe } @@ -1010,7 +1010,7 @@ func (tr *TableImporter) postProcess( otherHasDupe, needRemoteDupe, baseTotalChecksum, err := metaMgr.CheckAndUpdateLocalChecksum(ctx, &localChecksum, hasDupe) if err != nil { - return false, err + return false, errors.Trace(err) } needChecksum := !otherHasDupe && needRemoteDupe hasDupe = hasDupe || otherHasDupe @@ -1023,14 +1023,14 @@ func (tr *TableImporter) postProcess( hasRemoteDupe, e := dupeController.CollectRemoteDuplicateRows(ctx, tr.encTable, tr.tableName, opts) if e != nil { tr.logger.Error("collect remote duplicate keys failed", log.ShortError(e)) - return false, e + return false, errors.Trace(e) } hasDupe = hasDupe || hasRemoteDupe if hasDupe { if err = dupeController.ResolveDuplicateRows(ctx, tr.encTable, tr.tableName, rc.cfg.TikvImporter.DuplicateResolution); err != nil { tr.logger.Error("resolve remote duplicate keys failed", log.ShortError(err)) - return false, err + return false, errors.Trace(err) } } } @@ -1057,7 +1057,7 @@ func (tr *TableImporter) postProcess( }) if err != nil { if rc.cfg.PostRestore.Checksum != config.OpLevelOptional { - return false, err + return false, errors.Trace(err) } tr.logger.Warn("do checksum failed, will skip this error and go on", log.ShortError(err)) err = nil diff --git a/br/tests/lightning_duplicate_resolution/config.toml b/br/tests/lightning_duplicate_resolution_remove/config.toml similarity index 100% rename from br/tests/lightning_duplicate_resolution/config.toml rename to br/tests/lightning_duplicate_resolution_remove/config.toml diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve-schema-create.sql b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve-schema-create.sql similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve-schema-create.sql rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve-schema-create.sql diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.a-schema.sql b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a-schema.sql similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.a-schema.sql rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a-schema.sql diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.a.1.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.1.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.a.1.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.1.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.a.2.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.2.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.a.2.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.2.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.a.3.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.3.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.a.3.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.3.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.a.4.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.4.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.a.4.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.4.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.a.5.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.5.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.a.5.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.5.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.a.6.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.6.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.a.6.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.6.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.a.7.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.7.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.a.7.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.7.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.a.8.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.8.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.a.8.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.8.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.a.9.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.9.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.a.9.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.a.9.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.b-schema.sql b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b-schema.sql similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.b-schema.sql rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b-schema.sql diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.b.1.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.1.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.b.1.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.1.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.b.2.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.2.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.b.2.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.2.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.b.3.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.3.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.b.3.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.3.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.b.4.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.4.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.b.4.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.4.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.b.5.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.5.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.b.5.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.5.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.b.6.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.6.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.b.6.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.6.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.b.7.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.7.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.b.7.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.7.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.b.8.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.8.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.b.8.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.8.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.b.9.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.9.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.b.9.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.b.9.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.c-schema.sql b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.c-schema.sql similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.c-schema.sql rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.c-schema.sql diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.c.1.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.c.1.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.c.1.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.c.1.csv diff --git a/br/tests/lightning_duplicate_resolution/data/dup_resolve.c.2.csv b/br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.c.2.csv similarity index 100% rename from br/tests/lightning_duplicate_resolution/data/dup_resolve.c.2.csv rename to br/tests/lightning_duplicate_resolution_remove/data/dup_resolve.c.2.csv diff --git a/br/tests/lightning_duplicate_resolution/run.sh b/br/tests/lightning_duplicate_resolution_remove/run.sh similarity index 100% rename from br/tests/lightning_duplicate_resolution/run.sh rename to br/tests/lightning_duplicate_resolution_remove/run.sh diff --git a/br/tests/lightning_duplicate_resolution_replace/config.toml b/br/tests/lightning_duplicate_resolution_replace/config.toml new file mode 100644 index 0000000000000..d49b2583e944c --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_replace/config.toml @@ -0,0 +1,17 @@ +[lightning] +task-info-schema-name = 'lightning_task_info' + +[tikv-importer] +backend = 'local' +duplicate-resolution = 'replace' +add-index-by-sql = false + +[checkpoint] +enable = false + +[mydumper] +batch-size = 1 +# ensure each file is its own engine to facilitate cross-engine detection. + +[mydumper.csv] +header = true diff --git a/br/tests/lightning_duplicate_resolution_replace/data/dup_resolve-schema-create.sql b/br/tests/lightning_duplicate_resolution_replace/data/dup_resolve-schema-create.sql new file mode 100644 index 0000000000000..f8d42367a3d4c --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_replace/data/dup_resolve-schema-create.sql @@ -0,0 +1 @@ +create schema dup_resolve; diff --git a/br/tests/lightning_duplicate_resolution_replace/data/dup_resolve.a-schema.sql b/br/tests/lightning_duplicate_resolution_replace/data/dup_resolve.a-schema.sql new file mode 100644 index 0000000000000..a55ebf84d8db4 --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_replace/data/dup_resolve.a-schema.sql @@ -0,0 +1,6 @@ +create table a ( + a int primary key clustered, + b int not null, + c text, + unique key uni_b(b) +); diff --git a/br/tests/lightning_duplicate_resolution_replace/data/dup_resolve.a.1.csv b/br/tests/lightning_duplicate_resolution_replace/data/dup_resolve.a.1.csv new file mode 100644 index 0000000000000..88a29a5489f14 --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_replace/data/dup_resolve.a.1.csv @@ -0,0 +1,4 @@ +a,b,c +1,6,1.csv +2,6,2.csv +3,3,3.csv diff --git a/br/tests/lightning_duplicate_resolution_replace/run.sh b/br/tests/lightning_duplicate_resolution_replace/run.sh new file mode 100644 index 0000000000000..71df204b868b8 --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_replace/run.sh @@ -0,0 +1,35 @@ +#!/bin/bash +# +# Copyright 2021 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +check_cluster_version 5 2 0 'duplicate detection' || exit 0 + +run_lightning + +# Ensure all tables are consistent. +run_sql 'admin check table dup_resolve.a' + +run_sql 'select count(*) from dup_resolve.a' +check_contains 'count(*): 2' + +run_sql 'select * from dup_resolve.a' +check_contains 'a: 1' +check_contains 'b: 6' +check_contains 'c: 1.csv' +check_contains 'a: 3' +check_contains 'b: 3' +check_contains 'c: 3.csv' diff --git a/br/tests/run_group.sh b/br/tests/run_group.sh index 58fe387d4be6a..a3a8ec6afa234 100755 --- a/br/tests/run_group.sh +++ b/br/tests/run_group.sh @@ -32,7 +32,7 @@ groups=( ["G09"]='lightning_auto_random_default lightning_bom_file lightning_character_sets lightning_check_partial_imported lightning_checkpoint lightning_checkpoint_chunks lightning_checkpoint_columns lightning_checkpoint_dirty_tableid' ["G10"]='lightning_checkpoint_engines lightning_checkpoint_engines_order lightning_checkpoint_error_destroy lightning_checkpoint_parquet lightning_checkpoint_timestamp lightning_checksum_mismatch lightning_cmdline_override lightning_column_permutation lightning_common_handle' ["G11"]='lightning_compress lightning_concurrent-restore lightning_config_max_error lightning_config_skip_csv_header lightning_csv lightning_default-columns lightning_disable_scheduler_by_key_range lightning_disk_quota lightning_distributed_import' - ["G12"]='lightning_drop_other_tables_halfway lightning_duplicate_detection lightning_duplicate_detection_new lightning_duplicate_resolution lightning_duplicate_resolution_incremental lightning_error_summary lightning_examples lightning_exotic_filenames lightning_extend_routes lightning_fail_fast' + ["G12"]='lightning_drop_other_tables_halfway lightning_duplicate_detection lightning_duplicate_detection_new lightning_duplicate_resolution_incremental lightning_duplicate_resolution_remove lightning_duplicate_resolution_replace lightning_error_summary lightning_examples lightning_exotic_filenames lightning_extend_routes lightning_fail_fast' ["G13"]='lightning_fail_fast_on_nonretry_err lightning_file_routing lightning_foreign_key lightning_gcs lightning_generated_columns lightning_ignore_columns lightning_import_compress lightning_incremental lightning_issue_282' ["G14"]='lightning_issue_40657 lightning_issue_410 lightning_issue_519 lightning_local_backend lightning_max_incr lightning_max_random lightning_multi_valued_index lightning_new_collation lightning_no_schema' ["G15"]='lightning_parquet lightning_partition_incremental lightning_partitioned-table lightning_record_network lightning_reload_cert lightning_restore lightning_routes lightning_routes_panic lightning_row-format-v2 lightning_s3'