diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index ba37cc20f2c61..9d0e96a140fe1 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -50,8 +50,11 @@ go_library( "//pkg/kv", "//pkg/metrics", "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/parser/terror", "//pkg/sessionctx/variable", "//pkg/table", + "//pkg/table/tables", "//pkg/tablecodec", "//pkg/util", "//pkg/util/codec", @@ -141,6 +144,7 @@ go_test( "//pkg/parser/mysql", "//pkg/sessionctx/stmtctx", "//pkg/store/pdtypes", + "//pkg/table", "//pkg/table/tables", "//pkg/tablecodec", "//pkg/testkit/testsetup", diff --git a/br/pkg/lightning/backend/local/duplicate.go b/br/pkg/lightning/backend/local/duplicate.go index 6cd78cd254934..162513c3896b7 100644 --- a/br/pkg/lightning/backend/local/duplicate.go +++ b/br/pkg/lightning/backend/local/duplicate.go @@ -17,6 +17,7 @@ package local import ( "bytes" "context" + "encoding/hex" "encoding/json" "fmt" "io" @@ -42,7 +43,10 @@ import ( "github.com/pingcap/tidb/pkg/distsql" tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/table" + "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/hack" @@ -459,7 +463,7 @@ func (m *DupeDetector) HasDuplicate() bool { } // RecordDataConflictError records data conflicts to errorMgr. The key received from stream must be a row key. -func (m *DupeDetector) RecordDataConflictError(ctx context.Context, stream DupKVStream) error { +func (m *DupeDetector) RecordDataConflictError(ctx context.Context, stream DupKVStream, algorithm config.DuplicateResolutionAlgorithm) error { //nolint: errcheck defer stream.Close() var dataConflictInfos []errormanager.DataConflictInfo @@ -481,6 +485,7 @@ func (m *DupeDetector) RecordDataConflictError(ctx context.Context, stream DupKV if err != nil { return errors.Trace(err) } + conflictInfo := errormanager.DataConflictInfo{ RawKey: key, RawValue: val, @@ -494,6 +499,10 @@ func (m *DupeDetector) RecordDataConflictError(ctx context.Context, stream DupKV } dataConflictInfos = dataConflictInfos[:0] } + + if algorithm == config.DupeResAlgErr { + return errors.Trace(common.ErrFoundDataConflictRecords.FastGenByArgs(m.tbl.Meta().Name, h.String(), m.decoder.DecodeRawRowDataAsStr(h, val))) + } } if len(dataConflictInfos) > 0 { if err := m.errorMgr.RecordDataConflictError(ctx, m.logger, m.tableName, dataConflictInfos); err != nil { @@ -528,7 +537,7 @@ func (m *DupeDetector) saveIndexHandles(ctx context.Context, handles pendingInde } // RecordIndexConflictError records index conflicts to errorMgr. The key received from stream must be an index key. -func (m *DupeDetector) RecordIndexConflictError(ctx context.Context, stream DupKVStream, tableID int64, indexInfo *model.IndexInfo) error { +func (m *DupeDetector) RecordIndexConflictError(ctx context.Context, stream DupKVStream, tableID int64, indexInfo *model.IndexInfo, algorithm config.DuplicateResolutionAlgorithm) error { //nolint: errcheck defer stream.Close() indexHandles := makePendingIndexHandlesWithCapacity(0) @@ -550,6 +559,7 @@ func (m *DupeDetector) RecordIndexConflictError(ctx context.Context, stream DupK if err != nil { return errors.Trace(err) } + conflictInfo := errormanager.DataConflictInfo{ RawKey: key, RawValue: val, @@ -564,6 +574,10 @@ func (m *DupeDetector) RecordIndexConflictError(ctx context.Context, stream DupK } indexHandles.truncate() } + + if algorithm == config.DupeResAlgErr { + return newErrFoundIndexConflictRecords(key, val, m.tbl, indexInfo) + } } if indexHandles.Len() > 0 { if err := m.saveIndexHandles(ctx, indexHandles); err != nil { @@ -573,6 +587,99 @@ func (m *DupeDetector) RecordIndexConflictError(ctx context.Context, stream DupK return nil } +// RetrieveKeyAndValueFromErrFoundDuplicateKeys retrieves the key and value +// from ErrFoundDuplicateKeys error. +func RetrieveKeyAndValueFromErrFoundDuplicateKeys(err error) ([]byte, []byte, error) { + if !common.ErrFoundDuplicateKeys.Equal(err) { + return nil, nil, err + } + tErr, ok := errors.Cause(err).(*terror.Error) + if !ok { + return nil, nil, err + } + if len(tErr.Args()) != 2 { + return nil, nil, err + } + key, keyIsByte := tErr.Args()[0].([]byte) + value, valIsByte := tErr.Args()[1].([]byte) + if !keyIsByte || !valIsByte { + return nil, nil, err + } + return key, value, nil +} + +// newErrFoundConflictRecords generate an error ErrFoundDataConflictRecords / ErrFoundIndexConflictRecords +// according to key and value. +func newErrFoundConflictRecords(key []byte, value []byte, tbl table.Table) error { + sessionOpts := encode.SessionOptions{ + SQLMode: mysql.ModeStrictAllTables, + } + + decoder, err := kv.NewTableKVDecoder(tbl, tbl.Meta().Name.L, &sessionOpts, log.L()) + if err != nil { + return errors.Trace(err) + } + + if tablecodec.IsRecordKey(key) { + // for data KV + handle, err := tablecodec.DecodeRowKey(key) + if err != nil { + return errors.Trace(err) + } + + rowData := decoder.DecodeRawRowDataAsStr(handle, value) + + return errors.Trace(common.ErrFoundDataConflictRecords.FastGenByArgs(tbl.Meta().Name, handle.String(), rowData)) + } + + // for index KV + _, idxID, _, err := tablecodec.DecodeIndexKey(key) + if err != nil { + return errors.Trace(err) + } + + idxInfo := model.FindIndexInfoByID(tbl.Meta().Indices, idxID) + return newErrFoundIndexConflictRecords(key, value, tbl, idxInfo) +} + +// newErrFoundIndexConflictRecords generate an error ErrFoundIndexConflictRecords +// according to key and value. +func newErrFoundIndexConflictRecords(key []byte, value []byte, tbl table.Table, idxInfo *model.IndexInfo) error { + sessionOpts := encode.SessionOptions{ + SQLMode: mysql.ModeStrictAllTables, + } + + decoder, err := kv.NewTableKVDecoder(tbl, tbl.Meta().Name.L, &sessionOpts, log.L()) + if err != nil { + return errors.Trace(err) + } + + indexName := fmt.Sprintf("%s.%s", tbl.Meta().Name.String(), idxInfo.Name.String()) + valueStr, err := tables.GenIndexValueFromIndex(key, value, tbl.Meta(), idxInfo) + if err != nil { + log.L().Warn("decode index key value / column value failed", zap.String("index", indexName), + zap.String("key", hex.EncodeToString(key)), zap.String("value", hex.EncodeToString(value)), zap.Error(err)) + return errors.Trace(common.ErrFoundIndexConflictRecords.FastGenByArgs(tbl.Meta().Name, indexName, key, value)) + } + + h, err := decoder.DecodeHandleFromIndex(idxInfo, key, value) + if err != nil { + return errors.Trace(err) + } + return errors.Trace(common.ErrFoundIndexConflictRecords.FastGenByArgs(tbl.Meta().Name, indexName, valueStr, h)) +} + +// ConvertToErrFoundConflictRecords converts ErrFoundDuplicateKeys +// to ErrFoundDataConflictRecords or ErrFoundIndexConflictRecords error. +func ConvertToErrFoundConflictRecords(originalErr error, tbl table.Table) error { + rawKey, rawValue, err := RetrieveKeyAndValueFromErrFoundDuplicateKeys(originalErr) + if err != nil { + return errors.Trace(err) + } + + return newErrFoundConflictRecords(rawKey, rawValue, tbl) +} + // BuildDuplicateTaskForTest is only used for test. var BuildDuplicateTaskForTest = func(m *DupeDetector) ([]dupTask, error) { return m.buildDupTasks() @@ -702,7 +809,7 @@ func (m *DupeDetector) buildLocalDupTasks(dupDB *pebble.DB, keyAdapter common.Ke } // CollectDuplicateRowsFromDupDB collects duplicates from the duplicate DB and records all duplicate row info into errorMgr. -func (m *DupeDetector) CollectDuplicateRowsFromDupDB(ctx context.Context, dupDB *pebble.DB, keyAdapter common.KeyAdapter) error { +func (m *DupeDetector) CollectDuplicateRowsFromDupDB(ctx context.Context, dupDB *pebble.DB, keyAdapter common.KeyAdapter, algorithm config.DuplicateResolutionAlgorithm) error { tasks, err := m.buildLocalDupTasks(dupDB, keyAdapter) if err != nil { return errors.Trace(err) @@ -719,9 +826,9 @@ func (m *DupeDetector) CollectDuplicateRowsFromDupDB(ctx context.Context, dupDB stream := NewLocalDupKVStream(dupDB, keyAdapter, task.KeyRange) var err error if task.indexInfo == nil { - err = m.RecordDataConflictError(gCtx, stream) + err = m.RecordDataConflictError(gCtx, stream, algorithm) } else { - err = m.RecordIndexConflictError(gCtx, stream, task.tableID, task.indexInfo) + err = m.RecordIndexConflictError(gCtx, stream, task.tableID, task.indexInfo, algorithm) } return errors.Trace(err) }); err != nil { @@ -788,6 +895,7 @@ func (m *DupeDetector) processRemoteDupTaskOnce( importClientFactory ImportClientFactory, regionPool *utils.WorkerPool, remainKeyRanges *pendingKeyRanges, + algorithm config.DuplicateResolutionAlgorithm, ) (madeProgress bool, err error) { //nolint: prealloc var regions []*split.RegionInfo @@ -828,9 +936,9 @@ func (m *DupeDetector) processRemoteDupTaskOnce( return errors.Annotatef(err, "failed to create remote duplicate kv stream") } if task.indexInfo == nil { - err = m.RecordDataConflictError(ctx, stream) + err = m.RecordDataConflictError(ctx, stream, algorithm) } else { - err = m.RecordIndexConflictError(ctx, stream, task.tableID, task.indexInfo) + err = m.RecordIndexConflictError(ctx, stream, task.tableID, task.indexInfo, algorithm) } if err != nil { return errors.Annotatef(err, "failed to record conflict errors") @@ -864,12 +972,13 @@ func (m *DupeDetector) processRemoteDupTask( logger log.Logger, importClientFactory ImportClientFactory, regionPool *utils.WorkerPool, + algorithm config.DuplicateResolutionAlgorithm, ) error { regionErrRetryAttempts := split.WaitRegionOnlineAttemptTimes remainAttempts := maxDupCollectAttemptTimes remainKeyRanges := newPendingKeyRanges(task.KeyRange) for { - madeProgress, err := m.processRemoteDupTaskOnce(ctx, task, logger, importClientFactory, regionPool, remainKeyRanges) + madeProgress, err := m.processRemoteDupTaskOnce(ctx, task, logger, importClientFactory, regionPool, remainKeyRanges, algorithm) if err == nil { if !remainKeyRanges.empty() { remainKeyRanges.list() @@ -904,7 +1013,7 @@ func (m *DupeDetector) processRemoteDupTask( } // CollectDuplicateRowsFromTiKV collects duplicates from the remote TiKV and records all duplicate row info into errorMgr. -func (m *DupeDetector) CollectDuplicateRowsFromTiKV(ctx context.Context, importClientFactory ImportClientFactory) error { +func (m *DupeDetector) CollectDuplicateRowsFromTiKV(ctx context.Context, importClientFactory ImportClientFactory, algorithm config.DuplicateResolutionAlgorithm) error { tasks, err := m.buildDupTasks() if err != nil { return errors.Trace(err) @@ -929,7 +1038,7 @@ func (m *DupeDetector) CollectDuplicateRowsFromTiKV(ctx context.Context, importC zap.Int64("indexID", task.indexInfo.ID), ) } - err := m.processRemoteDupTask(gCtx, task, taskLogger, importClientFactory, regionPool) + err := m.processRemoteDupTask(gCtx, task, taskLogger, importClientFactory, regionPool, algorithm) return errors.Trace(err) }) } @@ -954,7 +1063,7 @@ type DupeController struct { // CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which // may be repeated with other keys in local data source. -func (local *DupeController) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error) { +func (local *DupeController) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions, algorithm config.DuplicateResolutionAlgorithm) (hasDupe bool, err error) { logger := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "[detect-dupe] collect local duplicate keys") defer func() { logger.End(zap.ErrorLevel, err) @@ -965,7 +1074,7 @@ func (local *DupeController) CollectLocalDuplicateRows(ctx context.Context, tbl if err != nil { return false, errors.Trace(err) } - if err := duplicateManager.CollectDuplicateRowsFromDupDB(ctx, local.duplicateDB, local.keyAdapter); err != nil { + if err := duplicateManager.CollectDuplicateRowsFromDupDB(ctx, local.duplicateDB, local.keyAdapter, algorithm); err != nil { return false, errors.Trace(err) } return duplicateManager.HasDuplicate(), nil @@ -973,7 +1082,8 @@ func (local *DupeController) CollectLocalDuplicateRows(ctx context.Context, tbl // CollectRemoteDuplicateRows collect duplicate keys from remote TiKV storage. This keys may be duplicate with // the data import by other lightning. -func (local *DupeController) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error) { +// TODO: revise the returned arguments to (hasDupe bool, dupInfo *DupInfo, err error) to distinguish the conflict error and the common error +func (local *DupeController) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions, algorithm config.DuplicateResolutionAlgorithm) (hasDupe bool, err error) { logger := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "[detect-dupe] collect remote duplicate keys") defer func() { logger.End(zap.ErrorLevel, err) @@ -984,8 +1094,9 @@ func (local *DupeController) CollectRemoteDuplicateRows(ctx context.Context, tbl if err != nil { return false, errors.Trace(err) } - if err := duplicateManager.CollectDuplicateRowsFromTiKV(ctx, local.importClientFactory); err != nil { - return false, errors.Trace(err) + err = duplicateManager.CollectDuplicateRowsFromTiKV(ctx, local.importClientFactory, algorithm) + if err != nil { + return common.ErrFoundDataConflictRecords.Equal(err) || common.ErrFoundIndexConflictRecords.Equal(err), errors.Trace(err) } return duplicateManager.HasDuplicate(), nil } @@ -1002,7 +1113,7 @@ func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table case config.DupeResAlgNone: logger.Warn("skipping resolution due to selected algorithm. this table will become inconsistent!", zap.String("category", "resolve-dupe"), zap.Stringer("algorithm", algorithm)) return nil - case config.DupeResAlgReplace, config.DupeResAlgErr: + case config.DupeResAlgReplace: default: panic(fmt.Sprintf("[resolve-dupe] unknown resolution algorithm %v", algorithm)) } @@ -1016,31 +1127,24 @@ func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table logger.Debug("got tblInfo from tbl", zap.ByteString("tblInfo", tblInfo)) - switch algorithm { - case config.DupeResAlgReplace: - err = local.errorMgr.ReplaceConflictKeys( - ctx, tbl, tableName, pool, - func(ctx context.Context, key []byte) ([]byte, error) { - value, err := local.getLatestValue(ctx, logger, key) - if err != nil { - return nil, errors.Trace(err) - } - return value, nil - }, - func(ctx context.Context, key []byte) error { - err := local.deleteDuplicateRow(ctx, logger, key) - if err != nil { - logger.Warn("delete duplicate rows encounter error", log.ShortError(err)) - return common.ErrResolveDuplicateRows.Wrap(errors.Trace(err)).GenWithStackByArgs(tableName) - } - return nil - }, - ) - case config.DupeResAlgErr: - err = local.errorMgr.ResolveConflictKeysError( - ctx, tableName, - ) - } + err = local.errorMgr.ReplaceConflictKeys( + ctx, tbl, tableName, pool, + func(ctx context.Context, key []byte) ([]byte, error) { + value, err := local.getLatestValue(ctx, logger, key) + if err != nil { + return nil, errors.Trace(err) + } + return value, nil + }, + func(ctx context.Context, key []byte) error { + err := local.deleteDuplicateRow(ctx, logger, key) + if err != nil { + logger.Warn("delete duplicate rows encounter error", log.ShortError(err)) + return common.ErrResolveDuplicateRows.Wrap(errors.Trace(err)).GenWithStackByArgs(tableName) + } + return nil + }, + ) return errors.Trace(err) } diff --git a/br/pkg/lightning/backend/local/duplicate_test.go b/br/pkg/lightning/backend/local/duplicate_test.go index 268e19b8047c6..fd244c53e3eb3 100644 --- a/br/pkg/lightning/backend/local/duplicate_test.go +++ b/br/pkg/lightning/backend/local/duplicate_test.go @@ -21,14 +21,18 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" lkv "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/backend/local" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/keyspace" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/mock" "github.com/stretchr/testify/require" ) @@ -70,3 +74,115 @@ func TestBuildDupTask(t *testing.T) { require.Equal(t, tc.hasTableRange, hasRecordKey) } } + +func buildTableForTestConvertToErrFoundConflictRecords(t *testing.T, node []ast.StmtNode) (table.Table, *lkv.Pairs) { + mockSctx := mock.NewContext() + info, err := ddl.MockTableInfo(mockSctx, node[0].(*ast.CreateTableStmt), 108) + require.NoError(t, err) + info.State = model.StatePublic + tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), info) + require.NoError(t, err) + + sessionOpts := encode.SessionOptions{ + SQLMode: mysql.ModeStrictAllTables, + Timestamp: 1234567890, + } + + encoder, err := lkv.NewBaseKVEncoder(&encode.EncodingConfig{ + Table: tbl, + SessionOptions: sessionOpts, + Logger: log.L(), + }) + require.NoError(t, err) + encoder.SessionCtx.GetSessionVars().RowEncoder.Enable = true + + data1 := []types.Datum{ + types.NewIntDatum(1), + types.NewIntDatum(6), + types.NewStringDatum("1.csv"), + types.NewIntDatum(101), + } + data2 := []types.Datum{ + types.NewIntDatum(2), + types.NewIntDatum(6), + types.NewStringDatum("2.csv"), + types.NewIntDatum(102), + } + data3 := []types.Datum{ + types.NewIntDatum(3), + types.NewIntDatum(7), + types.NewStringDatum("3.csv"), + types.NewIntDatum(103), + } + tctx := encoder.SessionCtx.GetTableCtx() + _, err = encoder.Table.AddRecord(tctx, data1) + require.NoError(t, err) + _, err = encoder.Table.AddRecord(tctx, data2) + require.NoError(t, err) + _, err = encoder.Table.AddRecord(tctx, data3) + require.NoError(t, err) + return tbl, encoder.SessionCtx.TakeKvPairs() +} + +func TestRetrieveKeyAndValueFromErrFoundDuplicateKeys(t *testing.T) { + p := parser.New() + node, _, err := p.ParseSQL("create table a (a int primary key, b int not null, c text, d int, key key_b(b));") + require.NoError(t, err) + + _, kvPairs := buildTableForTestConvertToErrFoundConflictRecords(t, node) + + data1RowKey := kvPairs.Pairs[0].Key + data1RowValue := kvPairs.Pairs[0].Val + + originalErr := common.ErrFoundDuplicateKeys.FastGenByArgs(data1RowKey, data1RowValue) + rawKey, rawValue, err := local.RetrieveKeyAndValueFromErrFoundDuplicateKeys(originalErr) + require.NoError(t, err) + require.Equal(t, data1RowKey, rawKey) + require.Equal(t, data1RowValue, rawValue) +} + +func TestConvertToErrFoundConflictRecordsSingleColumnsIndex(t *testing.T) { + p := parser.New() + node, _, err := p.ParseSQL("create table a (a int primary key, b int not null, c text, d int, unique key key_b(b));") + require.NoError(t, err) + + tbl, kvPairs := buildTableForTestConvertToErrFoundConflictRecords(t, node) + + data2RowKey := kvPairs.Pairs[2].Key + data2RowValue := kvPairs.Pairs[2].Val + data3IndexKey := kvPairs.Pairs[5].Key + data3IndexValue := kvPairs.Pairs[5].Val + + originalErr := common.ErrFoundDuplicateKeys.FastGenByArgs(data2RowKey, data2RowValue) + + newErr := local.ConvertToErrFoundConflictRecords(originalErr, tbl) + require.EqualError(t, newErr, "[Lightning:Restore:ErrFoundDataConflictRecords]found data conflict records in table a, primary key is '2', row data is '(2, 6, \"2.csv\", 102)'") + + originalErr = common.ErrFoundDuplicateKeys.FastGenByArgs(data3IndexKey, data3IndexValue) + + newErr = local.ConvertToErrFoundConflictRecords(originalErr, tbl) + require.EqualError(t, newErr, "[Lightning:Restore:ErrFoundIndexConflictRecords]found index conflict records in table a, index name is 'a.key_b', unique key is '[7]', primary key is '3'") +} + +func TestConvertToErrFoundConflictRecordsMultipleColumnsIndex(t *testing.T) { + p := parser.New() + node, _, err := p.ParseSQL("create table a (a int primary key, b int not null, c text, d int, unique key key_bd(b,d));") + require.NoError(t, err) + + tbl, kvPairs := buildTableForTestConvertToErrFoundConflictRecords(t, node) + + data2RowKey := kvPairs.Pairs[2].Key + data2RowValue := kvPairs.Pairs[2].Val + data3IndexKey := kvPairs.Pairs[5].Key + data3IndexValue := kvPairs.Pairs[5].Val + + originalErr := common.ErrFoundDuplicateKeys.FastGenByArgs(data2RowKey, data2RowValue) + + newErr := local.ConvertToErrFoundConflictRecords(originalErr, tbl) + require.EqualError(t, newErr, "[Lightning:Restore:ErrFoundDataConflictRecords]found data conflict records in table a, primary key is '2', row data is '(2, 6, \"2.csv\", 102)'") + + originalErr = common.ErrFoundDuplicateKeys.FastGenByArgs(data3IndexKey, data3IndexValue) + + newErr = local.ConvertToErrFoundConflictRecords(originalErr, tbl) + require.EqualError(t, newErr, "[Lightning:Restore:ErrFoundIndexConflictRecords]found index conflict records in table a, index name is 'a.key_bd', unique key is '[7 103]', primary key is '3'") +} diff --git a/br/pkg/lightning/common/errors.go b/br/pkg/lightning/common/errors.go index 4cf79ccac0457..2ba7bc95e01f1 100644 --- a/br/pkg/lightning/common/errors.go +++ b/br/pkg/lightning/common/errors.go @@ -99,9 +99,12 @@ var ( ErrInvalidMetaStatus = errors.Normalize("invalid meta status: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidMetaStatus")) ErrTableIsChecksuming = errors.Normalize("table '%s' is checksuming", errors.RFCCodeText("Lightning:Restore:ErrTableIsChecksuming")) ErrResolveDuplicateRows = errors.Normalize("resolve duplicate rows error on table '%s'", errors.RFCCodeText("Lightning:Restore:ErrResolveDuplicateRows")) - ErrFoundDuplicateKeys = errors.Normalize("found duplicate key '%s', value '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundDuplicateKey")) - ErrAddIndexFailed = errors.Normalize("add index on table %s failed", errors.RFCCodeText("Lightning:Restore:ErrAddIndexFailed")) - ErrDropIndexFailed = errors.Normalize("drop index %s on table %s failed", errors.RFCCodeText("Lightning:Restore:ErrDropIndexFailed")) + // ErrFoundDuplicateKeys shoud be replaced with ErrFoundDataConflictRecords and ErrFoundIndexConflictRecords (TODO) + ErrFoundDuplicateKeys = errors.Normalize("found duplicate key '%s', value '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundDuplicateKey")) + ErrAddIndexFailed = errors.Normalize("add index on table %s failed", errors.RFCCodeText("Lightning:Restore:ErrAddIndexFailed")) + ErrDropIndexFailed = errors.Normalize("drop index %s on table %s failed", errors.RFCCodeText("Lightning:Restore:ErrDropIndexFailed")) + ErrFoundDataConflictRecords = errors.Normalize("found data conflict records in table %s, primary key is '%s', row data is '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundDataConflictRecords")) + ErrFoundIndexConflictRecords = errors.Normalize("found index conflict records in table %s, index name is '%s', unique key is '%s', primary key is '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundIndexConflictRecords")) ) type withStack struct { diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 8aa6b3ecd3dfd..67e12926a134c 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -600,7 +600,7 @@ const ( // DupeResAlgNone doesn't detect duplicate. DupeResAlgNone DuplicateResolutionAlgorithm = iota - // DupeResAlgReplace records all duplicate records like the 'record' algorithm, and remove some rows with conflict + // DupeResAlgReplace records all duplicate records like the 'record' algorithm, remove some rows with conflict // and reserve other rows that can be kept and not cause conflict anymore. Users need to analyze the // lightning_task_info.conflict_error_v2 table to check whether the reserved data cater to their need and check whether // they need to add back the correct rows. diff --git a/br/pkg/lightning/errormanager/BUILD.bazel b/br/pkg/lightning/errormanager/BUILD.bazel index 0c37f9aea9882..0bea375253695 100644 --- a/br/pkg/lightning/errormanager/BUILD.bazel +++ b/br/pkg/lightning/errormanager/BUILD.bazel @@ -40,11 +40,10 @@ go_test( ], embed = [":errormanager"], flaky = True, - shard_count = 10, + shard_count = 9, deps = [ "//br/pkg/lightning/backend/encode", "//br/pkg/lightning/backend/kv", - "//br/pkg/lightning/common", "//br/pkg/lightning/config", "//br/pkg/lightning/log", "//br/pkg/utils", diff --git a/br/pkg/lightning/errormanager/errormanager.go b/br/pkg/lightning/errormanager/errormanager.go index 86a08c0f8cb47..31c5b46daeed6 100644 --- a/br/pkg/lightning/errormanager/errormanager.go +++ b/br/pkg/lightning/errormanager/errormanager.go @@ -155,19 +155,6 @@ const ( WHERE key_data = "" and row_data = ""; ` - selectConflictKeysCountError = ` - SELECT COUNT(*) - FROM %s.` + ConflictErrorTableName + ` - WHERE table_name = ?; - ` - - selectConflictKeysError = ` - SELECT raw_key, raw_row - FROM %s.` + ConflictErrorTableName + ` - WHERE table_name = ? - LIMIT 1; - ` - insertIntoDupRecord = ` INSERT INTO %s.` + DupRecordTable + ` (task_id, table_name, path, offset, error, row_id, row_data) @@ -769,64 +756,6 @@ func (em *ErrorManager) ReplaceConflictKeys( return errors.Trace(g.Wait()) } -// ResolveConflictKeysError query all conflicting rows (handle and their -// values) from the current error report and return error -// if the number of the conflicting rows is larger than 0. -func (em *ErrorManager) ResolveConflictKeysError( - ctx context.Context, - tableName string, -) error { - if em.db == nil { - return nil - } - - _, gCtx := errgroup.WithContext(ctx) - - kvRows, err := em.db.QueryContext( - gCtx, common.SprintfWithIdentifiers(selectConflictKeysCountError, em.schema), - tableName) - if err != nil { - return errors.Trace(err) - } - defer kvRows.Close() - var kvRowsCount int64 - for kvRows.Next() { - if err := kvRows.Scan(&kvRowsCount); err != nil { - return errors.Trace(err) - } - } - if err := kvRows.Err(); err != nil { - return errors.Trace(err) - } - - em.logger.Debug("got kv rows count from table", - zap.Int64("kv rows count", kvRowsCount)) - if kvRowsCount > 0 { - rows, err := em.db.QueryContext( - gCtx, common.SprintfWithIdentifiers(selectConflictKeysError, em.schema), - tableName) - if err != nil { - return errors.Trace(err) - } - defer rows.Close() - - var rawKey, rawRow []byte - for rows.Next() { - if err := rows.Scan(&rawKey, &rawRow); err != nil { - return errors.Trace(err) - } - em.logger.Debug("got raw_key, raw_row from table", - logutil.Key("raw_key", rawKey), - zap.Binary("raw_row", rawRow)) - } - if err := rows.Err(); err != nil { - return errors.Trace(err) - } - return common.ErrFoundDuplicateKeys.FastGenByArgs(rawKey, rawRow) - } - return nil -} - // RecordDuplicateCount reduce the counter of "duplicate entry" errors. // Currently, the count will not be shared for multiple lightning instances. func (em *ErrorManager) RecordDuplicateCount(cnt int64) error { diff --git a/br/pkg/lightning/errormanager/resolveconflict_test.go b/br/pkg/lightning/errormanager/resolveconflict_test.go index f62cc19f389d3..6feb5a02cc5ee 100644 --- a/br/pkg/lightning/errormanager/resolveconflict_test.go +++ b/br/pkg/lightning/errormanager/resolveconflict_test.go @@ -24,7 +24,6 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" tidbkv "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" - "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/errormanager" "github.com/pingcap/tidb/br/pkg/lightning/log" @@ -813,92 +812,3 @@ func TestReplaceConflictOneUniqueKeyNonclusteredVarcharPk(t *testing.T) { err = mockDB.ExpectationsWereMet() require.NoError(t, err) } - -func TestResolveConflictKeysError(t *testing.T) { - p := parser.New() - node, _, err := p.ParseSQL("create table a (a varchar(20) primary key clustered, b int not null, c text, key uni_b(b));") - require.NoError(t, err) - mockSctx := mock.NewContext() - info, err := ddl.MockTableInfo(mockSctx, node[0].(*ast.CreateTableStmt), 108) - require.NoError(t, err) - info.State = model.StatePublic - tbl, err := tables.TableFromMeta(tidbkv.NewPanickingAllocators(0), info) - require.NoError(t, err) - - sessionOpts := encode.SessionOptions{ - SQLMode: mysql.ModeStrictAllTables, - Timestamp: 1234567890, - } - - encoder, err := tidbkv.NewBaseKVEncoder(&encode.EncodingConfig{ - Table: tbl, - SessionOptions: sessionOpts, - Logger: log.L(), - }) - require.NoError(t, err) - encoder.SessionCtx.GetSessionVars().RowEncoder.Enable = true - - data1 := []types.Datum{ - types.NewIntDatum(1), - types.NewIntDatum(6), - types.NewStringDatum("1.csv"), - types.NewIntDatum(1), - } - data2 := []types.Datum{ - types.NewIntDatum(1), - types.NewIntDatum(6), - types.NewStringDatum("2.csv"), - types.NewIntDatum(2), - } - data3 := []types.Datum{ - types.NewIntDatum(3), - types.NewIntDatum(3), - types.NewStringDatum("3.csv"), - types.NewIntDatum(3), - } - tctx := encoder.SessionCtx.GetTableCtx() - _, err = encoder.Table.AddRecord(tctx, data1) - require.NoError(t, err) - _, err = encoder.Table.AddRecord(tctx, data2) - require.NoError(t, err) - _, err = encoder.Table.AddRecord(tctx, data3) - require.NoError(t, err) - kvPairs := encoder.SessionCtx.TakeKvPairs() - - data1RowKey := kvPairs.Pairs[0].Key - data1RowValue := kvPairs.Pairs[0].Val - - db, mockDB, err := sqlmock.New() - require.NoError(t, err) - defer func() { - _ = db.Close() - }() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - mockDB.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_task_info`"). - WillReturnResult(sqlmock.NewResult(1, 1)) - mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v2.*"). - WillReturnResult(sqlmock.NewResult(2, 1)) - mockDB.ExpectQuery("\\QSELECT COUNT(*) FROM `lightning_task_info`.conflict_error_v2 WHERE table_name = ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"COUNT(*)"}). - AddRow(2)) - mockDB.ExpectQuery("\\QSELECT raw_key, raw_row FROM `lightning_task_info`.conflict_error_v2 WHERE table_name = ? LIMIT 1\\E"). - WillReturnRows(sqlmock.NewRows([]string{"raw_key", "raw_value"}). - AddRow(data1RowKey, data1RowValue)) - - cfg := config.NewConfig() - cfg.TikvImporter.DuplicateResolution = config.DupeResAlgErr - cfg.App.TaskInfoSchemaName = "lightning_task_info" - em := errormanager.New(db, cfg, log.L()) - err = em.Init(ctx) - require.NoError(t, err) - - err = em.ResolveConflictKeysError( - ctx, "a", - ) - require.Error(t, err, common.ErrFoundDuplicateKeys) - err = mockDB.ExpectationsWereMet() - require.NoError(t, err) -} diff --git a/br/pkg/lightning/importer/table_import.go b/br/pkg/lightning/importer/table_import.go index 2308e3e47520f..b002d41b81218 100644 --- a/br/pkg/lightning/importer/table_import.go +++ b/br/pkg/lightning/importer/table_import.go @@ -1026,7 +1026,7 @@ func (tr *TableImporter) postProcess( SysVars: rc.sysVars, } var err error - hasLocalDupe, err := dupeController.CollectLocalDuplicateRows(ctx, tr.encTable, tr.tableName, opts) + hasLocalDupe, err := dupeController.CollectLocalDuplicateRows(ctx, tr.encTable, tr.tableName, opts, rc.cfg.TikvImporter.DuplicateResolution) if err != nil { tr.logger.Error("collect local duplicate keys failed", log.ShortError(err)) return false, errors.Trace(err) @@ -1052,7 +1052,7 @@ func (tr *TableImporter) postProcess( SQLMode: mysql.ModeStrictAllTables, SysVars: rc.sysVars, } - hasRemoteDupe, e := dupeController.CollectRemoteDuplicateRows(ctx, tr.encTable, tr.tableName, opts) + hasRemoteDupe, e := dupeController.CollectRemoteDuplicateRows(ctx, tr.encTable, tr.tableName, opts, rc.cfg.TikvImporter.DuplicateResolution) if e != nil { tr.logger.Error("collect remote duplicate keys failed", log.ShortError(e)) return false, errors.Trace(e) @@ -1340,6 +1340,9 @@ func (tr *TableImporter) importKV( } } err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys) + if common.ErrFoundDuplicateKeys.Equal(err) { + err = local.ConvertToErrFoundConflictRecords(err, tr.encTable) + } saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, closedEngine.GetID(), err, checkpoints.CheckpointStatusImported) // Don't clean up when save checkpoint failed, because we will verifyLocalFile and import engine again after restart. if err == nil && saveCpErr == nil { diff --git a/br/tests/lightning_duplicate_resolution_error/run.sh b/br/tests/lightning_duplicate_resolution_error/run.sh index 6584eb48ee70a..d6ae96f6b6b72 100644 --- a/br/tests/lightning_duplicate_resolution_error/run.sh +++ b/br/tests/lightning_duplicate_resolution_error/run.sh @@ -1,6 +1,6 @@ #!/bin/bash # -# Copyright 2021 PingCAP, Inc. +# Copyright 2024 PingCAP, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,6 +26,6 @@ run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2' ! run_lightning --backend local --config "${mydir}/config.toml" [ $? -eq 0 ] -tail -n 10 $TEST_DIR/lightning.log | grep "ERROR" | tail -n 1 | grep -Fq "[Lightning:Restore:ErrFoundDuplicateKey]found duplicate key" +tail -n 10 $TEST_DIR/lightning.log | grep "ERROR" | tail -n 1 | grep -Fq "[Lightning:Restore:ErrFoundDataConflictRecords]found data conflict records in table a, primary key is '3', row data is '(3, 3, \\\"3.csv\\\")'" check_not_contains "the whole procedure completed" $TEST_DIR/lightning.log diff --git a/br/tests/lightning_duplicate_resolution_error_pk_multiple_files/run.sh b/br/tests/lightning_duplicate_resolution_error_pk_multiple_files/run.sh index 6584eb48ee70a..2eab9d2bbe77b 100644 --- a/br/tests/lightning_duplicate_resolution_error_pk_multiple_files/run.sh +++ b/br/tests/lightning_duplicate_resolution_error_pk_multiple_files/run.sh @@ -1,6 +1,6 @@ #!/bin/bash # -# Copyright 2021 PingCAP, Inc. +# Copyright 2024 PingCAP, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,6 +26,6 @@ run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2' ! run_lightning --backend local --config "${mydir}/config.toml" [ $? -eq 0 ] -tail -n 10 $TEST_DIR/lightning.log | grep "ERROR" | tail -n 1 | grep -Fq "[Lightning:Restore:ErrFoundDuplicateKey]found duplicate key" +tail -n 10 $TEST_DIR/lightning.log | grep "ERROR" | tail -n 1 | grep -Fq "[Lightning:Restore:ErrFoundDataConflictRecords]found data conflict records in table a" check_not_contains "the whole procedure completed" $TEST_DIR/lightning.log diff --git a/br/tests/lightning_duplicate_resolution_error_uk_multiple_files/data/dup_resolve.a.1.csv b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files/data/dup_resolve.a.1.csv index c46afaa19de4e..6e7a35d1a685f 100644 --- a/br/tests/lightning_duplicate_resolution_error_uk_multiple_files/data/dup_resolve.a.1.csv +++ b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files/data/dup_resolve.a.1.csv @@ -1,5 +1,5 @@ a,b,c -1,1,1.csv +1,101,1.csv 2,2,2.csv 3,3,3.csv 4,4,4.csv diff --git a/br/tests/lightning_duplicate_resolution_error_uk_multiple_files/data/dup_resolve.a.2.csv b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files/data/dup_resolve.a.2.csv index 16fa5cf896a6a..786ce23c44791 100644 --- a/br/tests/lightning_duplicate_resolution_error_uk_multiple_files/data/dup_resolve.a.2.csv +++ b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files/data/dup_resolve.a.2.csv @@ -1,5 +1,5 @@ a,b,c -6,1,1.csv +6,101,1.csv 7,7,7.csv 8,8,8.csv 9,9,9.csv diff --git a/br/tests/lightning_duplicate_resolution_error_uk_multiple_files/run.sh b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files/run.sh index 6584eb48ee70a..73cb0c301c7c6 100644 --- a/br/tests/lightning_duplicate_resolution_error_uk_multiple_files/run.sh +++ b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files/run.sh @@ -1,6 +1,6 @@ #!/bin/bash # -# Copyright 2021 PingCAP, Inc. +# Copyright 2024 PingCAP, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,6 +26,6 @@ run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2' ! run_lightning --backend local --config "${mydir}/config.toml" [ $? -eq 0 ] -tail -n 10 $TEST_DIR/lightning.log | grep "ERROR" | tail -n 1 | grep -Fq "[Lightning:Restore:ErrFoundDuplicateKey]found duplicate key" +tail -n 10 $TEST_DIR/lightning.log | grep "ERROR" | tail -n 1 | grep -Fq "[Lightning:Restore:ErrFoundIndexConflictRecords]found index conflict records in table a, index name is 'a.key_b', unique key is '[101]', primary key is '1'" check_not_contains "the whole procedure completed" $TEST_DIR/lightning.log diff --git a/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/config.toml b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/config.toml new file mode 100644 index 0000000000000..41ab032d9d6fa --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/config.toml @@ -0,0 +1,17 @@ +[lightning] +task-info-schema-name = 'lightning_task_info' + +[tikv-importer] +backend = 'local' +duplicate-resolution = 'error' +add-index-by-sql = false + +[checkpoint] +enable = false + +[mydumper] +batch-size = 2 +# ensure each file is its own engine to facilitate cross-engine detection. + +[mydumper.csv] +header = true diff --git a/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/data/dup_resolve-schema-create.sql b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/data/dup_resolve-schema-create.sql new file mode 100644 index 0000000000000..f8d42367a3d4c --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/data/dup_resolve-schema-create.sql @@ -0,0 +1 @@ +create schema dup_resolve; diff --git a/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/data/dup_resolve.a-schema.sql b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/data/dup_resolve.a-schema.sql new file mode 100644 index 0000000000000..c3cc966e2d793 --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/data/dup_resolve.a-schema.sql @@ -0,0 +1,7 @@ +create table a ( + a int primary key, + b int not null, + c text, + d int, + unique key key_bd(b,d) +); diff --git a/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/data/dup_resolve.a.1.csv b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/data/dup_resolve.a.1.csv new file mode 100644 index 0000000000000..89fc791fff938 --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/data/dup_resolve.a.1.csv @@ -0,0 +1,6 @@ +a,b,c,d +1,101,1.csv,9 +2,2,2.csv,0 +3,3,3.csv,0 +4,4,4.csv,0 +5,5,5.csv,0 diff --git a/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/data/dup_resolve.a.2.csv b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/data/dup_resolve.a.2.csv new file mode 100644 index 0000000000000..1dea5aa0f5d36 --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/data/dup_resolve.a.2.csv @@ -0,0 +1,5 @@ +a,b,c,d +6,101,6.csv,9 +7,7,7.csv,0 +8,8,8.csv,0 +9,9,9.csv,0 diff --git a/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/run.sh b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/run.sh new file mode 100644 index 0000000000000..ca9fe272272ee --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/run.sh @@ -0,0 +1,31 @@ +#!/bin/bash +# +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +check_cluster_version 5 2 0 'duplicate detection' || exit 0 + +mydir=$(dirname "${BASH_SOURCE[0]}") + +run_sql 'DROP TABLE IF EXISTS dup_resolve.a' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2' + +! run_lightning --backend local --config "${mydir}/config.toml" +[ $? -eq 0 ] + +tail -n 10 $TEST_DIR/lightning.log | grep "ERROR" | tail -n 1 | grep -Fq "[Lightning:Restore:ErrFoundIndexConflictRecords]found index conflict records in table a, index name is 'a.key_bd', unique key is '[101 9]', primary key is '1'" + +check_not_contains "the whole procedure completed" $TEST_DIR/lightning.log diff --git a/br/tests/run_group_lightning_tests.sh b/br/tests/run_group_lightning_tests.sh index abee364fe2d74..eea5b542b6f1a 100755 --- a/br/tests/run_group_lightning_tests.sh +++ b/br/tests/run_group_lightning_tests.sh @@ -22,7 +22,7 @@ declare -A groups groups=( ["G00"]='lightning_auto_random_default lightning_bom_file lightning_character_sets lightning_check_partial_imported lightning_checkpoint lightning_checkpoint_chunks lightning_checkpoint_columns lightning_checkpoint_dirty_tableid' ["G01"]='lightning_checkpoint_engines lightning_checkpoint_engines_order lightning_checkpoint_error_destroy lightning_checkpoint_parquet lightning_checkpoint_timestamp lightning_checksum_mismatch lightning_cmdline_override lightning_column_permutation lightning_common_handle lightning_compress lightning_concurrent-restore' - ["G02"]='lightning_config_max_error lightning_config_skip_csv_header lightning_csv lightning_default-columns lightning_disable_scheduler_by_key_range lightning_disk_quota lightning_distributed_import lightning_drop_other_tables_halfway lightning_duplicate_detection lightning_duplicate_detection_new lightning_duplicate_resolution_error lightning_duplicate_resolution_error_pk_multiple_files lightning_duplicate_resolution_error_uk_multiple_files lightning_duplicate_resolution_incremental' + ["G02"]='lightning_config_max_error lightning_config_skip_csv_header lightning_csv lightning_default-columns lightning_disable_scheduler_by_key_range lightning_disk_quota lightning_distributed_import lightning_drop_other_tables_halfway lightning_duplicate_detection lightning_duplicate_detection_new lightning_duplicate_resolution_error lightning_duplicate_resolution_error_pk_multiple_files lightning_duplicate_resolution_error_uk_multiple_files lightning_duplicate_resolution_error_uk_multiple_files_multicol_index lightning_duplicate_resolution_incremental' ["G03"]='lightning_duplicate_resolution_replace_multiple_keys_clustered_pk lightning_duplicate_resolution_replace_multiple_keys_nonclustered_pk lightning_duplicate_resolution_replace_multiple_unique_keys_clustered_pk lightning_duplicate_resolution_replace_multiple_unique_keys_nonclustered_pk lightning_duplicate_resolution_replace_one_key lightning_duplicate_resolution_replace_one_key_multiple_conflicts_clustered_pk lightning_duplicate_resolution_replace_one_key_multiple_conflicts_nonclustered_pk' ["G04"]='lightning_duplicate_resolution_replace_one_unique_key_clustered_pk lightning_duplicate_resolution_replace_one_unique_key_multiple_conflicts_clustered_pk lightning_duplicate_resolution_replace_one_unique_key_multiple_conflicts_nonclustered_pk lightning_duplicate_resolution_replace_one_unique_key_nonclustered_varchar_pk lightning_error_summary lightning_examples lightning_exotic_filenames lightning_extend_routes' ["G05"]='lightning_fail_fast lightning_fail_fast_on_nonretry_err lightning_file_routing lightning_foreign_key lightning_gcs lightning_generated_columns lightning_ignore_columns lightning_import_compress lightning_incremental lightning_issue_282 lightning_issue_40657 lightning_issue_410 lightning_issue_519 lightning_local_backend lightning_max_incr' diff --git a/errors.toml b/errors.toml index c7ddce74e0275..956a28a5dfe6b 100644 --- a/errors.toml +++ b/errors.toml @@ -566,11 +566,21 @@ error = ''' encode kv error in file %s at offset %d ''' +["Lightning:Restore:ErrFoundDataConflictRecords"] +error = ''' +found data conflict records in table %s, primary key is '%s', row data is '%s' +''' + ["Lightning:Restore:ErrFoundDuplicateKey"] error = ''' found duplicate key '%s', value '%s' ''' +["Lightning:Restore:ErrFoundIndexConflictRecords"] +error = ''' +found index conflict records in table %s, index name is '%s', unique key is '%s', primary key is '%s' +''' + ["Lightning:Restore:ErrInvalidMetaStatus"] error = ''' invalid meta status: '%s' diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index ca7e11b46ce31..5445a14cd2574 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1599,31 +1599,12 @@ func (w *addIndexTxnWorker) checkHandleExists(idxInfo *model.IndexInfo, key kv.K } func genKeyExistsErr(key, value []byte, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) error { - idxColLen := len(idxInfo.Columns) indexName := fmt.Sprintf("%s.%s", tblInfo.Name.String(), idxInfo.Name.String()) - colInfos := tables.BuildRowcodecColInfoForIndexColumns(idxInfo, tblInfo) - values, err := tablecodec.DecodeIndexKV(key, value, idxColLen, tablecodec.HandleNotNeeded, colInfos) + valueStr, err := tables.GenIndexValueFromIndex(key, value, tblInfo, idxInfo) if err != nil { - logutil.BgLogger().Warn("decode index key value failed", zap.String("index", indexName), + logutil.BgLogger().Warn("decode index key value / column value failed", zap.String("index", indexName), zap.String("key", hex.EncodeToString(key)), zap.String("value", hex.EncodeToString(value)), zap.Error(err)) - return kv.ErrKeyExists.FastGenByArgs(key, indexName) - } - valueStr := make([]string, 0, idxColLen) - for i, val := range values[:idxColLen] { - d, err := tablecodec.DecodeColumnValue(val, colInfos[i].Ft, time.Local) - if err != nil { - logutil.BgLogger().Warn("decode column value failed", zap.String("index", indexName), - zap.String("key", hex.EncodeToString(key)), zap.String("value", hex.EncodeToString(value)), zap.Error(err)) - return kv.ErrKeyExists.FastGenByArgs(key, indexName) - } - str, err := d.ToString() - if err != nil { - str = string(val) - } - if types.IsBinaryStr(colInfos[i].Ft) || types.IsTypeBit(colInfos[i].Ft) { - str = util.FmtNonASCIIPrintableCharToHex(str) - } - valueStr = append(valueStr, str) + return errors.Trace(kv.ErrKeyExists.FastGenByArgs(key, indexName)) } return kv.ErrKeyExists.FastGenByArgs(strings.Join(valueStr, "-"), indexName) } diff --git a/pkg/ddl/ingest/BUILD.bazel b/pkg/ddl/ingest/BUILD.bazel index fca91d77303f6..324710f35bee6 100644 --- a/pkg/ddl/ingest/BUILD.bazel +++ b/pkg/ddl/ingest/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "//pkg/kv", "//pkg/meta", "//pkg/parser/mysql", + "//pkg/parser/terror", "//pkg/sessionctx", "//pkg/sessionctx/variable", "//pkg/table", diff --git a/pkg/ddl/ingest/backend.go b/pkg/ddl/ingest/backend.go index 7e79d5d50bfba..b8e6a2bfa0209 100644 --- a/pkg/ddl/ingest/backend.go +++ b/pkg/ddl/ingest/backend.go @@ -24,11 +24,13 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" "github.com/pingcap/tidb/br/pkg/lightning/backend/local" + "github.com/pingcap/tidb/br/pkg/lightning/common" lightning "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/errormanager" "github.com/pingcap/tidb/br/pkg/lightning/log" tikv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/generic" @@ -90,6 +92,33 @@ type litBackendCtx struct { etcdClient *clientv3.Client } +func (bc *litBackendCtx) handleErrorAfterCollectRemoteDuplicateRows(err error, indexID int64, tbl table.Table, hasDupe bool) error { + if err != nil && !common.ErrFoundIndexConflictRecords.Equal(err) { + logutil.Logger(bc.ctx).Error(LitInfoRemoteDupCheck, zap.Error(err), + zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID)) + return errors.Trace(err) + } else if hasDupe { + logutil.Logger(bc.ctx).Error(LitErrRemoteDupExistErr, + zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID)) + + if common.ErrFoundIndexConflictRecords.Equal(err) { + tErr, ok := errors.Cause(err).(*terror.Error) + if !ok { + return errors.Trace(tikv.ErrKeyExists) + } + if len(tErr.Args()) != 4 { + return errors.Trace(tikv.ErrKeyExists) + } + indexName := tErr.Args()[1] + valueStr := tErr.Args()[2] + + return errors.Trace(tikv.ErrKeyExists.FastGenByArgs(valueStr, indexName)) + } + return errors.Trace(tikv.ErrKeyExists) + } + return nil +} + // CollectRemoteDuplicateRows collects duplicate rows from remote TiKV. func (bc *litBackendCtx) CollectRemoteDuplicateRows(indexID int64, tbl table.Table) error { errorMgr := errormanager.New(nil, bc.cfg, log.Logger{Logger: logutil.Logger(bc.ctx)}) @@ -99,17 +128,8 @@ func (bc *litBackendCtx) CollectRemoteDuplicateRows(indexID int64, tbl table.Tab SQLMode: mysql.ModeStrictAllTables, SysVars: bc.sysVars, IndexID: indexID, - }) - if err != nil { - logutil.Logger(bc.ctx).Error(LitInfoRemoteDupCheck, zap.Error(err), - zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID)) - return err - } else if hasDupe { - logutil.Logger(bc.ctx).Error(LitErrRemoteDupExistErr, - zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID)) - return tikv.ErrKeyExists - } - return nil + }, lightning.DupeResAlgErr) + return bc.handleErrorAfterCollectRemoteDuplicateRows(err, indexID, tbl, hasDupe) } // FinishImport imports all the key-values in engine into the storage, collects the duplicate errors if any, and @@ -140,16 +160,8 @@ func (bc *litBackendCtx) FinishImport(indexID int64, unique bool, tbl table.Tabl SQLMode: mysql.ModeStrictAllTables, SysVars: bc.sysVars, IndexID: ei.indexID, - }) - if err != nil { - logutil.Logger(bc.ctx).Error(LitInfoRemoteDupCheck, zap.Error(err), - zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID)) - return err - } else if hasDupe { - logutil.Logger(bc.ctx).Error(LitErrRemoteDupExistErr, - zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID)) - return tikv.ErrKeyExists - } + }, lightning.DupeResAlgErr) + return bc.handleErrorAfterCollectRemoteDuplicateRows(err, indexID, tbl, hasDupe) } return nil } diff --git a/pkg/executor/importer/table_import.go b/pkg/executor/importer/table_import.go index 6afb4c089c2da..5454c480d9304 100644 --- a/pkg/executor/importer/table_import.go +++ b/pkg/executor/importer/table_import.go @@ -517,6 +517,9 @@ func (ti *TableImporter) OpenDataEngine(ctx context.Context, engineID int32) (*b func (ti *TableImporter) ImportAndCleanup(ctx context.Context, closedEngine *backend.ClosedEngine) (int64, error) { var kvCount int64 importErr := closedEngine.Import(ctx, ti.regionSplitSize, ti.regionSplitKeys) + if common.ErrFoundDuplicateKeys.Equal(importErr) { + importErr = local.ConvertToErrFoundConflictRecords(importErr, ti.encTable) + } if closedEngine.GetID() != common.IndexEngineID { // todo: change to a finer-grain progress later. // each row is encoded into 1 data key @@ -605,6 +608,9 @@ func (ti *TableImporter) CheckDiskQuota(ctx context.Context) { int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)*int64(config.MaxSplitRegionSizeRatio), ); err != nil { + if common.ErrFoundDuplicateKeys.Equal(err) { + err = local.ConvertToErrFoundConflictRecords(err, ti.encTable) + } importErr = multierr.Append(importErr, err) } } @@ -699,6 +705,9 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C failpoint.Return(nil, errors.New("mock import from select error")) }) if err = closedDataEngine.Import(ctx, ti.regionSplitSize, ti.regionSplitKeys); err != nil { + if common.ErrFoundDuplicateKeys.Equal(err) { + err = local.ConvertToErrFoundConflictRecords(err, ti.encTable) + } return nil, err } dataKVCount := ti.backend.GetImportedKVCount(closedDataEngine.GetUUID()) @@ -708,6 +717,9 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C return nil, err } if err = closedIndexEngine.Import(ctx, ti.regionSplitSize, ti.regionSplitKeys); err != nil { + if common.ErrFoundDuplicateKeys.Equal(err) { + err = local.ConvertToErrFoundConflictRecords(err, ti.encTable) + } return nil, err } diff --git a/pkg/table/tables/BUILD.bazel b/pkg/table/tables/BUILD.bazel index 3bc808a3f12ba..fff0c106c3ff1 100644 --- a/pkg/table/tables/BUILD.bazel +++ b/pkg/table/tables/BUILD.bazel @@ -74,8 +74,10 @@ go_test( ], embed = [":tables"], flaky = True, - shard_count = 30, + shard_count = 31, deps = [ + "//br/pkg/lightning/backend/encode", + "//br/pkg/lightning/backend/kv", "//pkg/ddl", "//pkg/ddl/util/callback", "//pkg/domain", diff --git a/pkg/table/tables/index.go b/pkg/table/tables/index.go index c90eb900b21af..56a0c511bda1e 100644 --- a/pkg/table/tables/index.go +++ b/pkg/table/tables/index.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" @@ -27,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/rowcodec" "github.com/pingcap/tidb/pkg/util/tracing" @@ -708,3 +710,30 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo * } return colInfo } + +// GenIndexValueFromIndex generate index value from index. +func GenIndexValueFromIndex(key []byte, value []byte, tblInfo *model.TableInfo, idxInfo *model.IndexInfo) ([]string, error) { + idxColLen := len(idxInfo.Columns) + colInfos := BuildRowcodecColInfoForIndexColumns(idxInfo, tblInfo) + values, err := tablecodec.DecodeIndexKV(key, value, idxColLen, tablecodec.HandleNotNeeded, colInfos) + if err != nil { + return nil, errors.Trace(err) + } + valueStr := make([]string, 0, idxColLen) + for i, val := range values[:idxColLen] { + d, err := tablecodec.DecodeColumnValue(val, colInfos[i].Ft, time.Local) + if err != nil { + return nil, errors.Trace(err) + } + str, err := d.ToString() + if err != nil { + str = string(val) + } + if types.IsBinaryStr(colInfos[i].Ft) || types.IsTypeBit(colInfos[i].Ft) { + str = util.FmtNonASCIIPrintableCharToHex(str) + } + valueStr = append(valueStr, str) + } + + return valueStr, nil +} diff --git a/pkg/table/tables/index_test.go b/pkg/table/tables/index_test.go index 889a341a87cc4..d9a212f13e86c 100644 --- a/pkg/table/tables/index_test.go +++ b/pkg/table/tables/index_test.go @@ -18,11 +18,14 @@ import ( "context" "testing" + "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" + lkv "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" @@ -168,3 +171,44 @@ func buildTableInfo(t *testing.T, sql string) *model.TableInfo { require.NoError(t, err) return tblInfo } + +func TestGenIndexValueFromIndex(t *testing.T) { + tblInfo := buildTableInfo(t, "create table a (a int primary key, b int not null, c text, unique key key_b(b));") + tblInfo.State = model.StatePublic + tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo) + require.NoError(t, err) + + sessionOpts := encode.SessionOptions{ + SQLMode: mysql.ModeStrictAllTables, + Timestamp: 1234567890, + } + + encoder, err := lkv.NewBaseKVEncoder(&encode.EncodingConfig{ + Table: tbl, + SessionOptions: sessionOpts, + }) + require.NoError(t, err) + encoder.SessionCtx.GetSessionVars().RowEncoder.Enable = true + + data1 := []types.Datum{ + types.NewIntDatum(1), + types.NewIntDatum(23), + types.NewStringDatum("4.csv"), + } + tctx := encoder.SessionCtx.GetTableCtx() + _, err = encoder.Table.AddRecord(tctx, data1) + require.NoError(t, err) + kvPairs := encoder.SessionCtx.TakeKvPairs() + + indexKey := kvPairs.Pairs[1].Key + indexValue := kvPairs.Pairs[1].Val + + _, idxID, _, err := tablecodec.DecodeIndexKey(indexKey) + require.NoError(t, err) + + idxInfo := model.FindIndexInfoByID(tbl.Meta().Indices, idxID) + + valueStr, err := tables.GenIndexValueFromIndex(indexKey, indexValue, tbl.Meta(), idxInfo) + require.NoError(t, err) + require.Equal(t, []string{"23"}, valueStr) +}