Skip to content

Commit

Permalink
lightning: support index KV checking of 'replace' mode for lightning …
Browse files Browse the repository at this point in the history
…post-import conflict detection (#45926)

ref #45774
  • Loading branch information
lyzx2001 authored Sep 1, 2023
1 parent 80da849 commit c9219e4
Show file tree
Hide file tree
Showing 39 changed files with 502 additions and 43 deletions.
126 changes: 101 additions & 25 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package local
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -1007,6 +1008,7 @@ func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table
logger.Warn("skipping resolution due to selected algorithm. this table will become inconsistent!", zap.String("category", "resolve-dupe"), zap.Stringer("algorithm", algorithm))
return nil
case config.DupeResAlgRemove:
case config.DupeResAlgReplace:
default:
panic(fmt.Sprintf("[resolve-dupe] unknown resolution algorithm %v", algorithm))
}
Expand All @@ -1016,7 +1018,7 @@ func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table
SQLMode: mysql.ModeStrictAllTables,
}, log.FromContext(ctx))
if err != nil {
return err
return errors.Trace(err)
}

tableIDs := physicalTableIDs(tbl.Meta())
Expand All @@ -1026,30 +1028,104 @@ func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table

errLimiter := rate.NewLimiter(1, 1)
pool := utils.NewWorkerPool(uint(local.dupeConcurrency), "resolve duplicate rows")
err = local.errorMgr.ResolveAllConflictKeys(
ctx, tableName, pool,
func(ctx context.Context, handleRows [][2][]byte) error {
for {
err := local.deleteDuplicateRows(ctx, logger, handleRows, decoder, keyInTable)
if err == nil {
return nil
}
if types.ErrBadNumber.Equal(err) {
logger.Warn("delete duplicate rows encounter error", log.ShortError(err))
return common.ErrResolveDuplicateRows.Wrap(err).GenWithStackByArgs(tableName)
}
if log.IsContextCanceledError(err) {
return err

tblInfo, err := json.Marshal(tbl.Meta())
if err != nil {
return errors.Trace(err)
}
logger.Debug("got tblInfo from tbl",
zap.ByteString("tblInfo", tblInfo))

switch algorithm {
case config.DupeResAlgRemove:
err = local.errorMgr.RemoveAllConflictKeys(
ctx, tableName, pool,
func(ctx context.Context, handleRows [][2][]byte) error {
for {
err := local.deleteDuplicateRows(ctx, logger, handleRows, decoder, keyInTable)
if err == nil {
return nil
}
if types.ErrBadNumber.Equal(err) {
logger.Warn("delete duplicate rows encounter error", log.ShortError(err))
return common.ErrResolveDuplicateRows.Wrap(errors.Trace(err)).GenWithStackByArgs(tableName)
}
if log.IsContextCanceledError(err) {
return errors.Trace(err)
}
if !tikverror.IsErrWriteConflict(errors.Cause(err)) {
logger.Warn("delete duplicate rows encounter error", log.ShortError(errors.Trace(err)))
}
if err = errLimiter.Wait(ctx); err != nil {
return errors.Trace(err)
}
}
if !tikverror.IsErrWriteConflict(errors.Cause(err)) {
logger.Warn("delete duplicate rows encounter error", log.ShortError(err))
},
)
case config.DupeResAlgReplace:
err = local.errorMgr.ReplaceConflictKeys(
ctx, tbl, tableName, pool,
func(ctx context.Context, key []byte) ([]byte, error) {
value, err := local.getLatestValue(ctx, logger, key)
if err != nil {
return nil, errors.Trace(err)
}
if err = errLimiter.Wait(ctx); err != nil {
return err
return value, nil
},
func(ctx context.Context, key []byte) error {
err := local.deleteDuplicateRow(ctx, logger, key)
if err != nil {
logger.Debug("delete duplicate rows encounter error", zap.Error(err))
return errors.Trace(err)
}
return nil
},
)
}

return errors.Trace(err)
}

func (local *DupeController) getLatestValue(
ctx context.Context,
logger *log.Task,
key []byte,
) ([]byte, error) {
snapshot := local.tikvCli.GetSnapshot(math.MaxUint64)
value, err := snapshot.Get(ctx, key)
logger.Debug("getLatestValue",
logutil.Key("key", key),
zap.Binary("value", value),
zap.Error(err))
if err != nil {
return nil, errors.Trace(err)
}
return value, nil
}

func (local *DupeController) deleteDuplicateRow(
ctx context.Context,
logger *log.Task,
key []byte,
) (err error) {
// Starts a Delete transaction.
txn, err := local.tikvCli.Begin()
if err != nil {
return errors.Trace(err)
}
defer func() {
if err == nil {
err = txn.Commit(ctx)
} else {
if rollbackErr := txn.Rollback(); rollbackErr != nil {
logger.Warn("failed to rollback transaction", zap.Error(rollbackErr))
}
},
)
}
}()

logger.Debug("will delete key", zap.String("category", "resolve-dupe"), logutil.Key("key", key))
err = txn.Delete(key)

return errors.Trace(err)
}

Expand All @@ -1063,7 +1139,7 @@ func (local *DupeController) deleteDuplicateRows(
// Starts a Delete transaction.
txn, err := local.tikvCli.Begin()
if err != nil {
return err
return errors.Trace(err)
}
defer func() {
if err == nil {
Expand Down Expand Up @@ -1095,17 +1171,17 @@ func (local *DupeController) deleteDuplicateRows(
logutil.Key("row", handleRow[1]))

if err := deleteKey(handleRow[0]); err != nil {
return err
return errors.Trace(err)
}

handle, err := decoder.DecodeHandleFromRowKey(handleRow[0])
if err != nil {
return err
return errors.Trace(err)
}

err = decoder.IterRawIndexKeys(handle, handleRow[1], deleteKey)
if err != nil {
return err
return errors.Trace(err)
}
}

Expand Down
10 changes: 10 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,12 @@ const (
// duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1 table to add back the correct rows.
DupeResAlgRemove

// DupeResAlgReplace records all duplicate records like the 'record' algorithm, and remove some rows with conflict
// and reserve other rows that can be kept and not cause conflict anymore. Users need to analyze the
// lightning_task_info.conflict_error_v1 table to check whether the reserved data cater to their need and check whether
// they need to add back the correct rows.
DupeResAlgReplace

// DupeResAlgErr reports an error and stops the import process.
// Note: this value is only used for internal.
DupeResAlgErr
Expand Down Expand Up @@ -622,6 +628,8 @@ func (dra *DuplicateResolutionAlgorithm) FromStringValue(s string) error {
*dra = DupeResAlgNone
case "remove":
*dra = DupeResAlgRemove
case "replace":
*dra = DupeResAlgReplace
default:
return errors.Errorf("invalid duplicate-resolution '%s', please choose valid option between ['record', 'none', 'remove']", s)
}
Expand All @@ -647,6 +655,8 @@ func (dra DuplicateResolutionAlgorithm) String() string {
return "none"
case DupeResAlgRemove:
return "remove"
case DupeResAlgReplace:
return "replace"
default:
panic(fmt.Sprintf("invalid duplicate-resolution type '%d'", dra))
}
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,10 +743,13 @@ func TestDuplicateResolutionAlgorithm(t *testing.T) {
require.Equal(t, DupeResAlgNone, dra)
require.NoError(t, dra.FromStringValue("remove"))
require.Equal(t, DupeResAlgRemove, dra)
require.NoError(t, dra.FromStringValue("replace"))
require.Equal(t, DupeResAlgReplace, dra)

require.Equal(t, "record", DupeResAlgRecord.String())
require.Equal(t, "none", DupeResAlgNone.String())
require.Equal(t, "remove", DupeResAlgRemove.String())
require.Equal(t, "replace", DupeResAlgReplace.String())
}

func TestLoadConfig(t *testing.T) {
Expand Down
15 changes: 14 additions & 1 deletion br/pkg/lightning/errormanager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,22 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/lightning/errormanager",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/logutil",
"//br/pkg/redact",
"//br/pkg/utils",
"//parser/mysql",
"//table",
"//table/tables",
"//tablecodec",
"@com_github_jedib0t_go_pretty_v6//table",
"@com_github_jedib0t_go_pretty_v6//text",
"@com_github_pingcap_errors//:errors",
"@com_github_tikv_client_go_v2//error",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_multierr//:multierr",
Expand All @@ -27,11 +35,16 @@ go_test(
srcs = ["errormanager_test.go"],
embed = [":errormanager"],
flaky = True,
shard_count = 4,
shard_count = 5,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/utils",
"//parser/model",
"//parser/mysql",
"//table/tables",
"//types",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_stretchr_testify//require",
"@org_uber_go_atomic//:atomic",
Expand Down
Loading

0 comments on commit c9219e4

Please sign in to comment.