Skip to content

Commit

Permalink
lightning: add complex integration tests for lightning post-import co…
Browse files Browse the repository at this point in the history
…nflict detection "replace" mode (#47460)

ref #45774
  • Loading branch information
lyzx2001 authored Nov 3, 2023
1 parent 3e57546 commit 37965e5
Show file tree
Hide file tree
Showing 66 changed files with 932 additions and 74 deletions.
6 changes: 3 additions & 3 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,16 +587,16 @@ const (
// DupeResAlgNone doesn't detect duplicate.
DupeResAlgNone DuplicateResolutionAlgorithm = iota

// DupeResAlgRecord only records duplicate records to `lightning_task_info.conflict_error_v1` table on the target TiDB.
// DupeResAlgRecord only records duplicate records to `lightning_task_info.conflict_error_v2` table on the target TiDB.
DupeResAlgRecord

// DupeResAlgRemove records all duplicate records like the 'record' algorithm and remove all information related to the
// duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1 table to add back the correct rows.
// duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v2 table to add back the correct rows.
DupeResAlgRemove

// DupeResAlgReplace records all duplicate records like the 'record' algorithm, and remove some rows with conflict
// and reserve other rows that can be kept and not cause conflict anymore. Users need to analyze the
// lightning_task_info.conflict_error_v1 table to check whether the reserved data cater to their need and check whether
// lightning_task_info.conflict_error_v2 table to check whether the reserved data cater to their need and check whether
// they need to add back the correct rows.
DupeResAlgReplace

Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/errormanager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/table",
"//pkg/table/tables",
"//pkg/tablecodec",
"//pkg/types",
"@com_github_jedib0t_go_pretty_v6//table",
"@com_github_jedib0t_go_pretty_v6//text",
"@com_github_pingcap_errors//:errors",
Expand Down
96 changes: 72 additions & 24 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
tidbtbl "github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
tikverr "github.com/tikv/client-go/v2/error"
"go.uber.org/atomic"
"go.uber.org/multierr"
Expand All @@ -54,7 +55,7 @@ const (
syntaxErrorTableName = "syntax_error_v1"
typeErrorTableName = "type_error_v1"
// ConflictErrorTableName is the table name for duplicate detection.
ConflictErrorTableName = "conflict_error_v1"
ConflictErrorTableName = "conflict_error_v2"
// DupRecordTable is the table name to record duplicate data that displayed to user.
DupRecordTable = "conflict_records"

Expand Down Expand Up @@ -94,6 +95,7 @@ const (
raw_value mediumblob NOT NULL COMMENT 'the value of the conflicted key',
raw_handle mediumblob NOT NULL COMMENT 'the data handle derived from the conflicted key or value',
raw_row mediumblob NOT NULL COMMENT 'the data retrieved from the handle',
is_data_kv tinyint(1) NOT NULL,
INDEX (task_id, table_name),
INDEX (index_name),
INDEX (table_name, index_name)
Expand Down Expand Up @@ -122,19 +124,19 @@ const (

insertIntoConflictErrorData = `
INSERT INTO %s.` + ConflictErrorTableName + `
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row)
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row, is_data_kv)
VALUES
`

sqlValuesConflictErrorData = "(?,?,'PRIMARY',?,?,?,?,raw_key,raw_value)"
sqlValuesConflictErrorData = "(?,?,'PRIMARY',?,?,?,?,raw_key,raw_value,?)"

insertIntoConflictErrorIndex = `
INSERT INTO %s.` + ConflictErrorTableName + `
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row)
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row, is_data_kv)
VALUES
`

sqlValuesConflictErrorIndex = "(?,?,?,?,?,?,?,?,?)"
sqlValuesConflictErrorIndex = "(?,?,?,?,?,?,?,?,?,?)"

selectConflictKeysRemove = `
SELECT _tidb_rowid, raw_handle, raw_row
Expand All @@ -146,14 +148,14 @@ const (
selectIndexConflictKeysReplace = `
SELECT raw_key, index_name, raw_value, raw_handle
FROM %s.` + ConflictErrorTableName + `
WHERE table_name = ? AND index_name <> 'PRIMARY'
WHERE table_name = ? AND is_data_kv = 0
ORDER BY raw_key;
`

selectDataConflictKeysReplace = `
SELECT raw_key, raw_value, raw_handle
SELECT raw_key, raw_value
FROM %s.` + ConflictErrorTableName + `
WHERE table_name = ? AND index_name = 'PRIMARY'
WHERE table_name = ? AND is_data_kv = 1
ORDER BY raw_key;
`

Expand Down Expand Up @@ -252,7 +254,7 @@ func (em *ErrorManager) Init(ctx context.Context) error {
sqls = append(sqls, [2]string{"create type error table", createTypeErrorTable})
}
if em.conflictV1Enabled {
sqls = append(sqls, [2]string{"create conflict error v1 table", createConflictErrorTable})
sqls = append(sqls, [2]string{"create conflict error v2 table", createConflictErrorTable})
}
if em.conflictV2Enabled {
sqls = append(sqls, [2]string{"create duplicate records table", createDupRecordTable})
Expand Down Expand Up @@ -378,6 +380,7 @@ func (em *ErrorManager) RecordDataConflictError(
conflictInfo.Row,
conflictInfo.RawKey,
conflictInfo.RawValue,
tablecodec.IsRecordKey(conflictInfo.RawKey),
)
}
_, err := txn.ExecContext(c, sb.String(), sqlArgs...)
Expand Down Expand Up @@ -439,6 +442,7 @@ func (em *ErrorManager) RecordIndexConflictError(
conflictInfo.RawValue,
rawHandles[i],
rawRows[i],
tablecodec.IsRecordKey(conflictInfo.RawKey),
)
}
_, err := txn.ExecContext(c, sb.String(), sqlArgs...)
Expand Down Expand Up @@ -561,7 +565,7 @@ func (em *ErrorManager) ReplaceConflictKeys(
pool.ApplyOnErrorGroup(g, func() error {
// TODO: provide a detailed document to explain the algorithm and link it here
// demo for "replace" algorithm: https://github.com/lyzx2001/tidb-conflict-replace
// check index KV first
// check index KV
indexKvRows, err := em.db.QueryContext(
gCtx, fmt.Sprintf(selectIndexConflictKeysReplace, em.schemaEscaped),
tableName)
Expand All @@ -583,7 +587,10 @@ func (em *ErrorManager) ReplaceConflictKeys(

// get the latest value of rawKey from downstream TiDB
latestValue, err := fnGetLatest(gCtx, rawKey)
if err != nil && !tikverr.IsErrNotFound(err) {
if tikverr.IsErrNotFound(err) {
continue
}
if err != nil {
return errors.Trace(err)
}

Expand All @@ -597,7 +604,7 @@ func (em *ErrorManager) ReplaceConflictKeys(
// get the latest value of the row key of the data KV that needs to be deleted
overwritten, err := fnGetLatest(gCtx, rawHandle)
// if the latest value cannot be found, that means the data KV has been deleted
if tikverr.IsErrNotFound(err) || overwritten == nil {
if tikverr.IsErrNotFound(err) {
continue
}
if err != nil {
Expand All @@ -613,12 +620,24 @@ func (em *ErrorManager) ReplaceConflictKeys(
if err != nil {
return errors.Trace(err)
}
if !tbl.Meta().HasClusteredIndex() {
// for nonclustered PK, need to append handle to decodedData for AddRecord
decodedData = append(decodedData, types.NewIntDatum(overwrittenHandle.IntValue()))
}
_, err = encoder.Table.AddRecord(encoder.SessionCtx, decodedData)
if err != nil {
return errors.Trace(err)
}

// find out all the KV pairs that are contained in the data KV
kvPairs := encoder.SessionCtx.TakeKvPairs()

exec := common.SQLWithRetry{
DB: em.db,
Logger: em.logger,
HideQueryLog: redact.NeedRedact(),
}

for _, kvPair := range kvPairs.Pairs {
em.logger.Debug("got encoded KV",
logutil.Key("key", kvPair.Key),
Expand All @@ -644,6 +663,26 @@ func (em *ErrorManager) ReplaceConflictKeys(
// Only if there is a->1 we dare to delete data KV with key "1".

if bytes.Equal(kvPair.Key, rawKey) && bytes.Equal(kvPair.Val, rawValue) {
if err := exec.Transact(ctx, "insert data conflict error record for conflict detection 'replace' mode",
func(c context.Context, txn *sql.Tx) error {
sb := &strings.Builder{}
fmt.Fprintf(sb, insertIntoConflictErrorData, em.schemaEscaped)
var sqlArgs []interface{}
sb.WriteString(sqlValuesConflictErrorData)
sqlArgs = append(sqlArgs,
em.taskID,
tableName,
nil,
nil,
rawHandle,
overwritten,
1,
)
_, err := txn.ExecContext(c, sb.String(), sqlArgs...)
return err
}); err != nil {
return err
}
if err := fnDeleteKey(gCtx, rawHandle); err != nil {
return errors.Trace(err)
}
Expand All @@ -668,14 +707,13 @@ func (em *ErrorManager) ReplaceConflictKeys(
var mustKeepKvPairs *kv.Pairs

for dataKvRows.Next() {
var rawKey, rawValue, rawHandle []byte
if err := dataKvRows.Scan(&rawKey, &rawValue, &rawHandle); err != nil {
var rawKey, rawValue []byte
if err := dataKvRows.Scan(&rawKey, &rawValue); err != nil {
return errors.Trace(err)
}
em.logger.Debug("got group raw_key, raw_value, raw_handle from table",
em.logger.Debug("got group raw_key, raw_value from table",
logutil.Key("raw_key", rawKey),
zap.Binary("raw_value", rawValue),
zap.Binary("raw_handle", rawHandle))
zap.Binary("raw_value", rawValue))

if !bytes.Equal(rawKey, previousRawKey) {
previousRawKey = rawKey
Expand All @@ -694,6 +732,10 @@ func (em *ErrorManager) ReplaceConflictKeys(
if err != nil {
return errors.Trace(err)
}
if !tbl.Meta().HasClusteredIndex() {
// for nonclustered PK, need to append handle to decodedData for AddRecord
decodedData = append(decodedData, types.NewIntDatum(handle.IntValue()))
}
_, err = encoder.Table.AddRecord(encoder.SessionCtx, decodedData)
if err != nil {
return errors.Trace(err)
Expand All @@ -710,7 +752,7 @@ func (em *ErrorManager) ReplaceConflictKeys(
continue
}

handle, err := tablecodec.DecodeRowKey(rawHandle)
handle, err := tablecodec.DecodeRowKey(rawKey)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -719,6 +761,10 @@ func (em *ErrorManager) ReplaceConflictKeys(
if err != nil {
return errors.Trace(err)
}
if !tbl.Meta().HasClusteredIndex() {
// for nonclustered PK, need to append handle to decodedData for AddRecord
decodedData = append(decodedData, types.NewIntDatum(handle.IntValue()))
}
_, err = encoder.Table.AddRecord(encoder.SessionCtx, decodedData)
if err != nil {
return errors.Trace(err)
Expand All @@ -731,7 +777,7 @@ func (em *ErrorManager) ReplaceConflictKeys(
logutil.Key("key", kvPair.Key),
zap.Binary("value", kvPair.Val))
kvLatestValue, err := fnGetLatest(gCtx, kvPair.Key)
if tikverr.IsErrNotFound(err) || kvLatestValue == nil {
if tikverr.IsErrNotFound(err) {
continue
}
if err != nil {
Expand All @@ -746,11 +792,13 @@ func (em *ErrorManager) ReplaceConflictKeys(

// if the KV pair is contained in mustKeepKvPairs, we cannot delete it
// if not, delete the KV pair
isContained := slices.ContainsFunc(mustKeepKvPairs.Pairs, func(mustKeepKvPair common.KvPair) bool {
return bytes.Equal(mustKeepKvPair.Key, kvPair.Key) && bytes.Equal(mustKeepKvPair.Val, kvPair.Val)
})
if isContained {
continue
if mustKeepKvPairs != nil {
isContained := slices.ContainsFunc(mustKeepKvPairs.Pairs, func(mustKeepKvPair common.KvPair) bool {
return bytes.Equal(mustKeepKvPair.Key, kvPair.Key) && bytes.Equal(mustKeepKvPair.Val, kvPair.Val)
})
if isContained {
continue
}
}

if err := fnDeleteKey(gCtx, kvPair.Key); err != nil {
Expand Down
Loading

0 comments on commit 37965e5

Please sign in to comment.