Skip to content

Commit

Permalink
lightning: improve post-import conflict detection 'error' semantic (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
lyzx2001 authored Mar 13, 2024
1 parent 97bb8d8 commit ccc453b
Show file tree
Hide file tree
Showing 29 changed files with 489 additions and 263 deletions.
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ go_library(
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/sessionctx/variable",
"//pkg/table",
"//pkg/table/tables",
"//pkg/tablecodec",
"//pkg/util",
"//pkg/util/codec",
Expand Down Expand Up @@ -141,6 +144,7 @@ go_test(
"//pkg/parser/mysql",
"//pkg/sessionctx/stmtctx",
"//pkg/store/pdtypes",
"//pkg/table",
"//pkg/table/tables",
"//pkg/tablecodec",
"//pkg/testkit/testsetup",
Expand Down
186 changes: 145 additions & 41 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package local
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
Expand All @@ -42,7 +43,10 @@ import (
"github.com/pingcap/tidb/pkg/distsql"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/hack"
Expand Down Expand Up @@ -459,7 +463,7 @@ func (m *DupeDetector) HasDuplicate() bool {
}

// RecordDataConflictError records data conflicts to errorMgr. The key received from stream must be a row key.
func (m *DupeDetector) RecordDataConflictError(ctx context.Context, stream DupKVStream) error {
func (m *DupeDetector) RecordDataConflictError(ctx context.Context, stream DupKVStream, algorithm config.DuplicateResolutionAlgorithm) error {
//nolint: errcheck
defer stream.Close()
var dataConflictInfos []errormanager.DataConflictInfo
Expand All @@ -481,6 +485,7 @@ func (m *DupeDetector) RecordDataConflictError(ctx context.Context, stream DupKV
if err != nil {
return errors.Trace(err)
}

conflictInfo := errormanager.DataConflictInfo{
RawKey: key,
RawValue: val,
Expand All @@ -494,6 +499,10 @@ func (m *DupeDetector) RecordDataConflictError(ctx context.Context, stream DupKV
}
dataConflictInfos = dataConflictInfos[:0]
}

if algorithm == config.DupeResAlgErr {
return errors.Trace(common.ErrFoundDataConflictRecords.FastGenByArgs(m.tbl.Meta().Name, h.String(), m.decoder.DecodeRawRowDataAsStr(h, val)))
}
}
if len(dataConflictInfos) > 0 {
if err := m.errorMgr.RecordDataConflictError(ctx, m.logger, m.tableName, dataConflictInfos); err != nil {
Expand Down Expand Up @@ -528,7 +537,7 @@ func (m *DupeDetector) saveIndexHandles(ctx context.Context, handles pendingInde
}

// RecordIndexConflictError records index conflicts to errorMgr. The key received from stream must be an index key.
func (m *DupeDetector) RecordIndexConflictError(ctx context.Context, stream DupKVStream, tableID int64, indexInfo *model.IndexInfo) error {
func (m *DupeDetector) RecordIndexConflictError(ctx context.Context, stream DupKVStream, tableID int64, indexInfo *model.IndexInfo, algorithm config.DuplicateResolutionAlgorithm) error {
//nolint: errcheck
defer stream.Close()
indexHandles := makePendingIndexHandlesWithCapacity(0)
Expand All @@ -550,6 +559,7 @@ func (m *DupeDetector) RecordIndexConflictError(ctx context.Context, stream DupK
if err != nil {
return errors.Trace(err)
}

conflictInfo := errormanager.DataConflictInfo{
RawKey: key,
RawValue: val,
Expand All @@ -564,6 +574,10 @@ func (m *DupeDetector) RecordIndexConflictError(ctx context.Context, stream DupK
}
indexHandles.truncate()
}

if algorithm == config.DupeResAlgErr {
return newErrFoundIndexConflictRecords(key, val, m.tbl, indexInfo)
}
}
if indexHandles.Len() > 0 {
if err := m.saveIndexHandles(ctx, indexHandles); err != nil {
Expand All @@ -573,6 +587,99 @@ func (m *DupeDetector) RecordIndexConflictError(ctx context.Context, stream DupK
return nil
}

// RetrieveKeyAndValueFromErrFoundDuplicateKeys retrieves the key and value
// from ErrFoundDuplicateKeys error.
func RetrieveKeyAndValueFromErrFoundDuplicateKeys(err error) ([]byte, []byte, error) {
if !common.ErrFoundDuplicateKeys.Equal(err) {
return nil, nil, err
}
tErr, ok := errors.Cause(err).(*terror.Error)
if !ok {
return nil, nil, err
}
if len(tErr.Args()) != 2 {
return nil, nil, err
}
key, keyIsByte := tErr.Args()[0].([]byte)
value, valIsByte := tErr.Args()[1].([]byte)
if !keyIsByte || !valIsByte {
return nil, nil, err
}
return key, value, nil
}

// newErrFoundConflictRecords generate an error ErrFoundDataConflictRecords / ErrFoundIndexConflictRecords
// according to key and value.
func newErrFoundConflictRecords(key []byte, value []byte, tbl table.Table) error {
sessionOpts := encode.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
}

decoder, err := kv.NewTableKVDecoder(tbl, tbl.Meta().Name.L, &sessionOpts, log.L())
if err != nil {
return errors.Trace(err)
}

if tablecodec.IsRecordKey(key) {
// for data KV
handle, err := tablecodec.DecodeRowKey(key)
if err != nil {
return errors.Trace(err)
}

rowData := decoder.DecodeRawRowDataAsStr(handle, value)

return errors.Trace(common.ErrFoundDataConflictRecords.FastGenByArgs(tbl.Meta().Name, handle.String(), rowData))
}

// for index KV
_, idxID, _, err := tablecodec.DecodeIndexKey(key)
if err != nil {
return errors.Trace(err)
}

idxInfo := model.FindIndexInfoByID(tbl.Meta().Indices, idxID)
return newErrFoundIndexConflictRecords(key, value, tbl, idxInfo)
}

// newErrFoundIndexConflictRecords generate an error ErrFoundIndexConflictRecords
// according to key and value.
func newErrFoundIndexConflictRecords(key []byte, value []byte, tbl table.Table, idxInfo *model.IndexInfo) error {
sessionOpts := encode.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
}

decoder, err := kv.NewTableKVDecoder(tbl, tbl.Meta().Name.L, &sessionOpts, log.L())
if err != nil {
return errors.Trace(err)
}

indexName := fmt.Sprintf("%s.%s", tbl.Meta().Name.String(), idxInfo.Name.String())
valueStr, err := tables.GenIndexValueFromIndex(key, value, tbl.Meta(), idxInfo)
if err != nil {
log.L().Warn("decode index key value / column value failed", zap.String("index", indexName),
zap.String("key", hex.EncodeToString(key)), zap.String("value", hex.EncodeToString(value)), zap.Error(err))
return errors.Trace(common.ErrFoundIndexConflictRecords.FastGenByArgs(tbl.Meta().Name, indexName, key, value))
}

h, err := decoder.DecodeHandleFromIndex(idxInfo, key, value)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(common.ErrFoundIndexConflictRecords.FastGenByArgs(tbl.Meta().Name, indexName, valueStr, h))
}

// ConvertToErrFoundConflictRecords converts ErrFoundDuplicateKeys
// to ErrFoundDataConflictRecords or ErrFoundIndexConflictRecords error.
func ConvertToErrFoundConflictRecords(originalErr error, tbl table.Table) error {
rawKey, rawValue, err := RetrieveKeyAndValueFromErrFoundDuplicateKeys(originalErr)
if err != nil {
return errors.Trace(err)
}

return newErrFoundConflictRecords(rawKey, rawValue, tbl)
}

// BuildDuplicateTaskForTest is only used for test.
var BuildDuplicateTaskForTest = func(m *DupeDetector) ([]dupTask, error) {
return m.buildDupTasks()
Expand Down Expand Up @@ -702,7 +809,7 @@ func (m *DupeDetector) buildLocalDupTasks(dupDB *pebble.DB, keyAdapter common.Ke
}

// CollectDuplicateRowsFromDupDB collects duplicates from the duplicate DB and records all duplicate row info into errorMgr.
func (m *DupeDetector) CollectDuplicateRowsFromDupDB(ctx context.Context, dupDB *pebble.DB, keyAdapter common.KeyAdapter) error {
func (m *DupeDetector) CollectDuplicateRowsFromDupDB(ctx context.Context, dupDB *pebble.DB, keyAdapter common.KeyAdapter, algorithm config.DuplicateResolutionAlgorithm) error {
tasks, err := m.buildLocalDupTasks(dupDB, keyAdapter)
if err != nil {
return errors.Trace(err)
Expand All @@ -719,9 +826,9 @@ func (m *DupeDetector) CollectDuplicateRowsFromDupDB(ctx context.Context, dupDB
stream := NewLocalDupKVStream(dupDB, keyAdapter, task.KeyRange)
var err error
if task.indexInfo == nil {
err = m.RecordDataConflictError(gCtx, stream)
err = m.RecordDataConflictError(gCtx, stream, algorithm)
} else {
err = m.RecordIndexConflictError(gCtx, stream, task.tableID, task.indexInfo)
err = m.RecordIndexConflictError(gCtx, stream, task.tableID, task.indexInfo, algorithm)
}
return errors.Trace(err)
}); err != nil {
Expand Down Expand Up @@ -788,6 +895,7 @@ func (m *DupeDetector) processRemoteDupTaskOnce(
importClientFactory ImportClientFactory,
regionPool *utils.WorkerPool,
remainKeyRanges *pendingKeyRanges,
algorithm config.DuplicateResolutionAlgorithm,
) (madeProgress bool, err error) {
//nolint: prealloc
var regions []*split.RegionInfo
Expand Down Expand Up @@ -828,9 +936,9 @@ func (m *DupeDetector) processRemoteDupTaskOnce(
return errors.Annotatef(err, "failed to create remote duplicate kv stream")
}
if task.indexInfo == nil {
err = m.RecordDataConflictError(ctx, stream)
err = m.RecordDataConflictError(ctx, stream, algorithm)
} else {
err = m.RecordIndexConflictError(ctx, stream, task.tableID, task.indexInfo)
err = m.RecordIndexConflictError(ctx, stream, task.tableID, task.indexInfo, algorithm)
}
if err != nil {
return errors.Annotatef(err, "failed to record conflict errors")
Expand Down Expand Up @@ -864,12 +972,13 @@ func (m *DupeDetector) processRemoteDupTask(
logger log.Logger,
importClientFactory ImportClientFactory,
regionPool *utils.WorkerPool,
algorithm config.DuplicateResolutionAlgorithm,
) error {
regionErrRetryAttempts := split.WaitRegionOnlineAttemptTimes
remainAttempts := maxDupCollectAttemptTimes
remainKeyRanges := newPendingKeyRanges(task.KeyRange)
for {
madeProgress, err := m.processRemoteDupTaskOnce(ctx, task, logger, importClientFactory, regionPool, remainKeyRanges)
madeProgress, err := m.processRemoteDupTaskOnce(ctx, task, logger, importClientFactory, regionPool, remainKeyRanges, algorithm)
if err == nil {
if !remainKeyRanges.empty() {
remainKeyRanges.list()
Expand Down Expand Up @@ -904,7 +1013,7 @@ func (m *DupeDetector) processRemoteDupTask(
}

// CollectDuplicateRowsFromTiKV collects duplicates from the remote TiKV and records all duplicate row info into errorMgr.
func (m *DupeDetector) CollectDuplicateRowsFromTiKV(ctx context.Context, importClientFactory ImportClientFactory) error {
func (m *DupeDetector) CollectDuplicateRowsFromTiKV(ctx context.Context, importClientFactory ImportClientFactory, algorithm config.DuplicateResolutionAlgorithm) error {
tasks, err := m.buildDupTasks()
if err != nil {
return errors.Trace(err)
Expand All @@ -929,7 +1038,7 @@ func (m *DupeDetector) CollectDuplicateRowsFromTiKV(ctx context.Context, importC
zap.Int64("indexID", task.indexInfo.ID),
)
}
err := m.processRemoteDupTask(gCtx, task, taskLogger, importClientFactory, regionPool)
err := m.processRemoteDupTask(gCtx, task, taskLogger, importClientFactory, regionPool, algorithm)
return errors.Trace(err)
})
}
Expand All @@ -954,7 +1063,7 @@ type DupeController struct {

// CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which
// may be repeated with other keys in local data source.
func (local *DupeController) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error) {
func (local *DupeController) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions, algorithm config.DuplicateResolutionAlgorithm) (hasDupe bool, err error) {
logger := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "[detect-dupe] collect local duplicate keys")
defer func() {
logger.End(zap.ErrorLevel, err)
Expand All @@ -965,15 +1074,16 @@ func (local *DupeController) CollectLocalDuplicateRows(ctx context.Context, tbl
if err != nil {
return false, errors.Trace(err)
}
if err := duplicateManager.CollectDuplicateRowsFromDupDB(ctx, local.duplicateDB, local.keyAdapter); err != nil {
if err := duplicateManager.CollectDuplicateRowsFromDupDB(ctx, local.duplicateDB, local.keyAdapter, algorithm); err != nil {
return false, errors.Trace(err)
}
return duplicateManager.HasDuplicate(), nil
}

// CollectRemoteDuplicateRows collect duplicate keys from remote TiKV storage. This keys may be duplicate with
// the data import by other lightning.
func (local *DupeController) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error) {
// TODO: revise the returned arguments to (hasDupe bool, dupInfo *DupInfo, err error) to distinguish the conflict error and the common error
func (local *DupeController) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions, algorithm config.DuplicateResolutionAlgorithm) (hasDupe bool, err error) {
logger := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "[detect-dupe] collect remote duplicate keys")
defer func() {
logger.End(zap.ErrorLevel, err)
Expand All @@ -984,8 +1094,9 @@ func (local *DupeController) CollectRemoteDuplicateRows(ctx context.Context, tbl
if err != nil {
return false, errors.Trace(err)
}
if err := duplicateManager.CollectDuplicateRowsFromTiKV(ctx, local.importClientFactory); err != nil {
return false, errors.Trace(err)
err = duplicateManager.CollectDuplicateRowsFromTiKV(ctx, local.importClientFactory, algorithm)
if err != nil {
return common.ErrFoundDataConflictRecords.Equal(err) || common.ErrFoundIndexConflictRecords.Equal(err), errors.Trace(err)
}
return duplicateManager.HasDuplicate(), nil
}
Expand All @@ -1002,7 +1113,7 @@ func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table
case config.DupeResAlgNone:
logger.Warn("skipping resolution due to selected algorithm. this table will become inconsistent!", zap.String("category", "resolve-dupe"), zap.Stringer("algorithm", algorithm))
return nil
case config.DupeResAlgReplace, config.DupeResAlgErr:
case config.DupeResAlgReplace:
default:
panic(fmt.Sprintf("[resolve-dupe] unknown resolution algorithm %v", algorithm))
}
Expand All @@ -1016,31 +1127,24 @@ func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table
logger.Debug("got tblInfo from tbl",
zap.ByteString("tblInfo", tblInfo))

switch algorithm {
case config.DupeResAlgReplace:
err = local.errorMgr.ReplaceConflictKeys(
ctx, tbl, tableName, pool,
func(ctx context.Context, key []byte) ([]byte, error) {
value, err := local.getLatestValue(ctx, logger, key)
if err != nil {
return nil, errors.Trace(err)
}
return value, nil
},
func(ctx context.Context, key []byte) error {
err := local.deleteDuplicateRow(ctx, logger, key)
if err != nil {
logger.Warn("delete duplicate rows encounter error", log.ShortError(err))
return common.ErrResolveDuplicateRows.Wrap(errors.Trace(err)).GenWithStackByArgs(tableName)
}
return nil
},
)
case config.DupeResAlgErr:
err = local.errorMgr.ResolveConflictKeysError(
ctx, tableName,
)
}
err = local.errorMgr.ReplaceConflictKeys(
ctx, tbl, tableName, pool,
func(ctx context.Context, key []byte) ([]byte, error) {
value, err := local.getLatestValue(ctx, logger, key)
if err != nil {
return nil, errors.Trace(err)
}
return value, nil
},
func(ctx context.Context, key []byte) error {
err := local.deleteDuplicateRow(ctx, logger, key)
if err != nil {
logger.Warn("delete duplicate rows encounter error", log.ShortError(err))
return common.ErrResolveDuplicateRows.Wrap(errors.Trace(err)).GenWithStackByArgs(tableName)
}
return nil
},
)

return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit ccc453b

Please sign in to comment.