Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: add complex integration tests for lightning post-import conflict detection "replace" mode #47460

Merged
merged 45 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
61218e7
lightning: support data KV checking of 'replace' mode for lightning p…
lyzx2001 Sep 7, 2023
4ac742a
update unit tests
lyzx2001 Sep 7, 2023
e653151
update unit tests
lyzx2001 Sep 8, 2023
f881845
update unit tests
lyzx2001 Sep 9, 2023
3075acc
update according to comments
lyzx2001 Sep 13, 2023
bb1ddd4
add more INDEX
lyzx2001 Sep 13, 2023
7d55ced
update algorithms of checking data KV
lyzx2001 Sep 15, 2023
30cfa1e
Merge branch 'master' into issue45774-step3
lyzx2001 Sep 19, 2023
19ec0e4
update createConflictErrorTable
lyzx2001 Sep 19, 2023
614f92a
update error checking
lyzx2001 Sep 19, 2023
d0b86df
add some comments
lyzx2001 Sep 22, 2023
4462832
Merge branch 'master' into issue45774-step3
lyzx2001 Sep 25, 2023
a2f1414
add unit tests
lyzx2001 Sep 27, 2023
5429f4e
lightning: add unit tests and integration tests for lightning post-im…
lyzx2001 Sep 27, 2023
e2d0c8b
merge master
lyzx2001 Oct 8, 2023
b59dc5f
modify unit tests
lyzx2001 Oct 8, 2023
4269ff3
add multiple unit tests and integration tests
lyzx2001 Oct 9, 2023
2a1cef4
modify run_group.sh
lyzx2001 Oct 9, 2023
861c95e
update tests
lyzx2001 Oct 9, 2023
eacaf98
update tests
lyzx2001 Oct 10, 2023
3279804
update tests
lyzx2001 Oct 10, 2023
a4ad0f8
update tests
lyzx2001 Oct 12, 2023
4a08d15
choose the needed integration tests
lyzx2001 Oct 12, 2023
6bbf2d4
modify ReplaceConflictKeys algorithms
lyzx2001 Oct 16, 2023
009bd9f
fix some bugs
lyzx2001 Oct 16, 2023
87a6c2b
update IT
lyzx2001 Oct 16, 2023
5020400
fix unit tests
lyzx2001 Oct 16, 2023
967e677
Merge branch 'master' into issue45774-step4
lyzx2001 Oct 17, 2023
68c8313
modify run_group.sh
lyzx2001 Oct 17, 2023
3353949
fix IT bug
lyzx2001 Oct 18, 2023
5dcc797
fix UT
lyzx2001 Oct 18, 2023
11218f2
fix IT
lyzx2001 Oct 18, 2023
09b277f
fix IT
lyzx2001 Oct 18, 2023
3327d85
add nonclustered pk cases
lyzx2001 Oct 18, 2023
9593a85
update bazel
lyzx2001 Oct 18, 2023
77a002b
update IT
lyzx2001 Oct 19, 2023
a616dfa
update IT
lyzx2001 Oct 19, 2023
5b42f9f
fix nonclustered pk by adding column is_data_kv
lyzx2001 Oct 25, 2023
040e081
fix UT
lyzx2001 Oct 26, 2023
73ac949
update DupKVStream
lyzx2001 Oct 26, 2023
98e8239
update ConflictErrorTableName
lyzx2001 Oct 26, 2023
c5eaaf3
update ConflictErrorTableName
lyzx2001 Oct 26, 2023
e0d2f49
update ReplaceConflictKeys
lyzx2001 Oct 30, 2023
5ea8c8d
update UT
lyzx2001 Oct 30, 2023
26f5578
update IT
lyzx2001 Nov 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,16 +588,16 @@ const (
// DupeResAlgNone doesn't detect duplicate.
DupeResAlgNone DuplicateResolutionAlgorithm = iota

// DupeResAlgRecord only records duplicate records to `lightning_task_info.conflict_error_v1` table on the target TiDB.
// DupeResAlgRecord only records duplicate records to `lightning_task_info.conflict_error_v2` table on the target TiDB.
DupeResAlgRecord

// DupeResAlgRemove records all duplicate records like the 'record' algorithm and remove all information related to the
// duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1 table to add back the correct rows.
// duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v2 table to add back the correct rows.
DupeResAlgRemove

// DupeResAlgReplace records all duplicate records like the 'record' algorithm, and remove some rows with conflict
// and reserve other rows that can be kept and not cause conflict anymore. Users need to analyze the
// lightning_task_info.conflict_error_v1 table to check whether the reserved data cater to their need and check whether
// lightning_task_info.conflict_error_v2 table to check whether the reserved data cater to their need and check whether
// they need to add back the correct rows.
DupeResAlgReplace

Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/errormanager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/table",
"//pkg/table/tables",
"//pkg/tablecodec",
"//pkg/types",
"@com_github_jedib0t_go_pretty_v6//table",
"@com_github_jedib0t_go_pretty_v6//text",
"@com_github_pingcap_errors//:errors",
Expand Down
96 changes: 72 additions & 24 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
tidbtbl "github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
tikverr "github.com/tikv/client-go/v2/error"
"go.uber.org/atomic"
"go.uber.org/multierr"
Expand All @@ -54,7 +55,7 @@ const (
syntaxErrorTableName = "syntax_error_v1"
typeErrorTableName = "type_error_v1"
// ConflictErrorTableName is the table name for duplicate detection.
ConflictErrorTableName = "conflict_error_v1"
ConflictErrorTableName = "conflict_error_v2"
// DupRecordTable is the table name to record duplicate data that displayed to user.
DupRecordTable = "conflict_records"

Expand Down Expand Up @@ -94,6 +95,7 @@ const (
raw_value mediumblob NOT NULL COMMENT 'the value of the conflicted key',
raw_handle mediumblob NOT NULL COMMENT 'the data handle derived from the conflicted key or value',
raw_row mediumblob NOT NULL COMMENT 'the data retrieved from the handle',
is_data_kv tinyint(1) NOT NULL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe kv_group, values can be data/index, or if we want finer grain, we can use index id as the name.

current is lgtm too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe current is_data_kv is simpler in implemention, it seems like it is not necessary to determine whether the value is data or index based on the result tablecodec.IsRecordKey(conflictInfo.RawKey)

INDEX (task_id, table_name),
INDEX (index_name),
INDEX (table_name, index_name)
Expand Down Expand Up @@ -122,19 +124,19 @@ const (

insertIntoConflictErrorData = `
INSERT INTO %s.` + ConflictErrorTableName + `
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row)
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row, is_data_kv)
VALUES
`

sqlValuesConflictErrorData = "(?,?,'PRIMARY',?,?,?,?,raw_key,raw_value)"
sqlValuesConflictErrorData = "(?,?,'PRIMARY',?,?,?,?,raw_key,raw_value,?)"

insertIntoConflictErrorIndex = `
INSERT INTO %s.` + ConflictErrorTableName + `
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row)
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row, is_data_kv)
VALUES
`

sqlValuesConflictErrorIndex = "(?,?,?,?,?,?,?,?,?)"
sqlValuesConflictErrorIndex = "(?,?,?,?,?,?,?,?,?,?)"

selectConflictKeysRemove = `
SELECT _tidb_rowid, raw_handle, raw_row
Expand All @@ -146,14 +148,14 @@ const (
selectIndexConflictKeysReplace = `
SELECT raw_key, index_name, raw_value, raw_handle
FROM %s.` + ConflictErrorTableName + `
WHERE table_name = ? AND index_name <> 'PRIMARY'
WHERE table_name = ? AND is_data_kv = 0
ORDER BY raw_key;
`

selectDataConflictKeysReplace = `
SELECT raw_key, raw_value, raw_handle
SELECT raw_key, raw_value
FROM %s.` + ConflictErrorTableName + `
WHERE table_name = ? AND index_name = 'PRIMARY'
WHERE table_name = ? AND is_data_kv = 1
ORDER BY raw_key;
`

Expand Down Expand Up @@ -252,7 +254,7 @@ func (em *ErrorManager) Init(ctx context.Context) error {
sqls = append(sqls, [2]string{"create type error table", createTypeErrorTable})
}
if em.conflictV1Enabled {
sqls = append(sqls, [2]string{"create conflict error v1 table", createConflictErrorTable})
sqls = append(sqls, [2]string{"create conflict error v2 table", createConflictErrorTable})
}
if em.conflictV2Enabled {
sqls = append(sqls, [2]string{"create duplicate records table", createDupRecordTable})
Expand Down Expand Up @@ -378,6 +380,7 @@ func (em *ErrorManager) RecordDataConflictError(
conflictInfo.Row,
conflictInfo.RawKey,
conflictInfo.RawValue,
tablecodec.IsRecordKey(conflictInfo.RawKey),
)
}
_, err := txn.ExecContext(c, sb.String(), sqlArgs...)
Expand Down Expand Up @@ -439,6 +442,7 @@ func (em *ErrorManager) RecordIndexConflictError(
conflictInfo.RawValue,
rawHandles[i],
rawRows[i],
tablecodec.IsRecordKey(conflictInfo.RawKey),
)
}
_, err := txn.ExecContext(c, sb.String(), sqlArgs...)
Expand Down Expand Up @@ -561,7 +565,7 @@ func (em *ErrorManager) ReplaceConflictKeys(
pool.ApplyOnErrorGroup(g, func() error {
// TODO: provide a detailed document to explain the algorithm and link it here
// demo for "replace" algorithm: https://github.com/lyzx2001/tidb-conflict-replace
// check index KV first
// check index KV
indexKvRows, err := em.db.QueryContext(
gCtx, fmt.Sprintf(selectIndexConflictKeysReplace, em.schemaEscaped),
tableName)
Expand All @@ -583,7 +587,10 @@ func (em *ErrorManager) ReplaceConflictKeys(

// get the latest value of rawKey from downstream TiDB
latestValue, err := fnGetLatest(gCtx, rawKey)
if err != nil && !tikverr.IsErrNotFound(err) {
if tikverr.IsErrNotFound(err) {
continue
}
if err != nil {
return errors.Trace(err)
}

Expand All @@ -597,7 +604,7 @@ func (em *ErrorManager) ReplaceConflictKeys(
// get the latest value of the row key of the data KV that needs to be deleted
overwritten, err := fnGetLatest(gCtx, rawHandle)
// if the latest value cannot be found, that means the data KV has been deleted
if tikverr.IsErrNotFound(err) || overwritten == nil {
if tikverr.IsErrNotFound(err) {
continue
}
if err != nil {
Expand All @@ -613,12 +620,24 @@ func (em *ErrorManager) ReplaceConflictKeys(
if err != nil {
return errors.Trace(err)
}
if !tbl.Meta().HasClusteredIndex() {
// for nonclustered PK, need to append handle to decodedData for AddRecord
decodedData = append(decodedData, types.NewIntDatum(overwrittenHandle.IntValue()))
}
_, err = encoder.Table.AddRecord(encoder.SessionCtx, decodedData)
if err != nil {
return errors.Trace(err)
}

// find out all the KV pairs that are contained in the data KV
kvPairs := encoder.SessionCtx.TakeKvPairs()

exec := common.SQLWithRetry{
DB: em.db,
Logger: em.logger,
HideQueryLog: redact.NeedRedact(),
}

for _, kvPair := range kvPairs.Pairs {
em.logger.Debug("got encoded KV",
logutil.Key("key", kvPair.Key),
Expand All @@ -644,6 +663,26 @@ func (em *ErrorManager) ReplaceConflictKeys(
// Only if there is a->1 we dare to delete data KV with key "1".

if bytes.Equal(kvPair.Key, rawKey) && bytes.Equal(kvPair.Val, rawValue) {
if err := exec.Transact(ctx, "insert data conflict error record for conflict detection 'replace' mode",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move the creating of exec closer to its usage.

func(c context.Context, txn *sql.Tx) error {
sb := &strings.Builder{}
fmt.Fprintf(sb, insertIntoConflictErrorData, em.schemaEscaped)
var sqlArgs []interface{}
sb.WriteString(sqlValuesConflictErrorData)
sqlArgs = append(sqlArgs,
em.taskID,
tableName,
nil,
nil,
rawHandle,
overwritten,
1,
)
_, err := txn.ExecContext(c, sb.String(), sqlArgs...)
return err
}); err != nil {
return err
}
if err := fnDeleteKey(gCtx, rawHandle); err != nil {
return errors.Trace(err)
}
Expand All @@ -668,14 +707,13 @@ func (em *ErrorManager) ReplaceConflictKeys(
var mustKeepKvPairs *kv.Pairs

for dataKvRows.Next() {
var rawKey, rawValue, rawHandle []byte
if err := dataKvRows.Scan(&rawKey, &rawValue, &rawHandle); err != nil {
var rawKey, rawValue []byte
if err := dataKvRows.Scan(&rawKey, &rawValue); err != nil {
return errors.Trace(err)
}
em.logger.Debug("got group raw_key, raw_value, raw_handle from table",
em.logger.Debug("got group raw_key, raw_value from table",
logutil.Key("raw_key", rawKey),
zap.Binary("raw_value", rawValue),
zap.Binary("raw_handle", rawHandle))
zap.Binary("raw_value", rawValue))

if !bytes.Equal(rawKey, previousRawKey) {
previousRawKey = rawKey
Expand All @@ -694,6 +732,10 @@ func (em *ErrorManager) ReplaceConflictKeys(
if err != nil {
return errors.Trace(err)
}
if !tbl.Meta().HasClusteredIndex() {
// for nonclustered PK, need to append handle to decodedData for AddRecord
decodedData = append(decodedData, types.NewIntDatum(handle.IntValue()))
}
_, err = encoder.Table.AddRecord(encoder.SessionCtx, decodedData)
if err != nil {
return errors.Trace(err)
Expand All @@ -710,7 +752,7 @@ func (em *ErrorManager) ReplaceConflictKeys(
continue
}

handle, err := tablecodec.DecodeRowKey(rawHandle)
handle, err := tablecodec.DecodeRowKey(rawKey)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -719,6 +761,10 @@ func (em *ErrorManager) ReplaceConflictKeys(
if err != nil {
return errors.Trace(err)
}
if !tbl.Meta().HasClusteredIndex() {
// for nonclustered PK, need to append handle to decodedData for AddRecord
decodedData = append(decodedData, types.NewIntDatum(handle.IntValue()))
}
_, err = encoder.Table.AddRecord(encoder.SessionCtx, decodedData)
if err != nil {
return errors.Trace(err)
Expand All @@ -731,7 +777,7 @@ func (em *ErrorManager) ReplaceConflictKeys(
logutil.Key("key", kvPair.Key),
zap.Binary("value", kvPair.Val))
kvLatestValue, err := fnGetLatest(gCtx, kvPair.Key)
if tikverr.IsErrNotFound(err) || kvLatestValue == nil {
if tikverr.IsErrNotFound(err) {
continue
}
if err != nil {
Expand All @@ -746,11 +792,13 @@ func (em *ErrorManager) ReplaceConflictKeys(

// if the KV pair is contained in mustKeepKvPairs, we cannot delete it
// if not, delete the KV pair
isContained := slices.ContainsFunc(mustKeepKvPairs.Pairs, func(mustKeepKvPair common.KvPair) bool {
return bytes.Equal(mustKeepKvPair.Key, kvPair.Key) && bytes.Equal(mustKeepKvPair.Val, kvPair.Val)
})
if isContained {
continue
if mustKeepKvPairs != nil {
isContained := slices.ContainsFunc(mustKeepKvPairs.Pairs, func(mustKeepKvPair common.KvPair) bool {
return bytes.Equal(mustKeepKvPair.Key, kvPair.Key) && bytes.Equal(mustKeepKvPair.Val, kvPair.Val)
})
if isContained {
continue
}
}

if err := fnDeleteKey(gCtx, kvPair.Key); err != nil {
Expand Down
Loading
Loading