Skip to content

Commit

Permalink
lightning: support data KV checking of 'replace' mode for lightning p…
Browse files Browse the repository at this point in the history
…ost-import conflict detection (#46763)

close #45774
  • Loading branch information
lyzx2001 authored Oct 7, 2023
1 parent fec9a5b commit 32d7626
Show file tree
Hide file tree
Showing 6 changed files with 373 additions and 36 deletions.
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,9 @@ func (local *DupeController) deleteDuplicateRow(
}
}()

logger.Debug("will delete key", zap.String("category", "resolve-dupe"), logutil.Key("key", key))
logger.Debug("deleteDuplicateRow will delete key",
zap.String("category", "resolve-dupe"),
logutil.Key("key", key))
err = txn.Delete(key)

return errors.Trace(err)
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/errormanager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"@com_github_jedib0t_go_pretty_v6//text",
"@com_github_pingcap_errors//:errors",
"@com_github_tikv_client_go_v2//error",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_multierr//:multierr",
Expand All @@ -35,8 +36,9 @@ go_test(
srcs = ["errormanager_test.go"],
embed = [":errormanager"],
flaky = True,
shard_count = 5,
shard_count = 6,
deps = [
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
Expand Down
136 changes: 125 additions & 11 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -93,8 +94,9 @@ const (
raw_value mediumblob NOT NULL COMMENT 'the value of the conflicted key',
raw_handle mediumblob NOT NULL COMMENT 'the data handle derived from the conflicted key or value',
raw_row mediumblob NOT NULL COMMENT 'the data retrieved from the handle',
KEY (task_id, table_name),
INDEX (index_name)
INDEX (task_id, table_name),
INDEX (index_name),
INDEX (table_name, index_name)
);
`

Expand Down Expand Up @@ -148,6 +150,13 @@ const (
ORDER BY raw_key;
`

selectDataConflictKeysReplace = `
SELECT raw_key, raw_value, raw_handle
FROM %s.` + ConflictErrorTableName + `
WHERE table_name = ? AND index_name = 'PRIMARY'
ORDER BY raw_key;
`

insertIntoDupRecord = `
INSERT INTO %s.` + DupRecordTable + `
(task_id, table_name, path, offset, error, row_id, row_data)
Expand Down Expand Up @@ -553,17 +562,17 @@ func (em *ErrorManager) ReplaceConflictKeys(
// TODO: provide a detailed document to explain the algorithm and link it here
// demo for "replace" algorithm: https://github.com/lyzx2001/tidb-conflict-replace
// check index KV first
rawKeyRows, err := em.db.QueryContext(
indexKvRows, err := em.db.QueryContext(
gCtx, fmt.Sprintf(selectIndexConflictKeysReplace, em.schemaEscaped),
tableName)
if err != nil {
return errors.Trace(err)
}
defer rawKeyRows.Close()
for rawKeyRows.Next() {
defer indexKvRows.Close()
for indexKvRows.Next() {
var rawKey, rawValue, rawHandle []byte
var indexName string
if err := rawKeyRows.Scan(&rawKey, &indexName, &rawValue, &rawHandle); err != nil {
if err := indexKvRows.Scan(&rawKey, &indexName, &rawValue, &rawHandle); err != nil {
return errors.Trace(err)
}
em.logger.Debug("got raw_key, index_name, raw_value, raw_handle from table",
Expand All @@ -573,14 +582,14 @@ func (em *ErrorManager) ReplaceConflictKeys(
zap.Binary("raw_handle", rawHandle))

// get the latest value of rawKey from downstream TiDB
value, err := fnGetLatest(gCtx, rawKey)
if err != nil {
latestValue, err := fnGetLatest(gCtx, rawKey)
if err != nil && !tikverr.IsErrNotFound(err) {
return errors.Trace(err)
}

// if the latest value of rawKey equals to rawValue, that means this index KV is maintained in downstream TiDB
// if not, that means this index KV has been overwritten, and its corresponding data KV needs to be deleted
if bytes.Equal(rawValue, value) {
if bytes.Equal(rawValue, latestValue) {
continue
}

Expand Down Expand Up @@ -642,11 +651,116 @@ func (em *ErrorManager) ReplaceConflictKeys(
}
}
}
if err := rawKeyRows.Err(); err != nil {
if err := indexKvRows.Err(); err != nil {
return errors.Trace(err)
}

// check data KV
dataKvRows, err := em.db.QueryContext(
gCtx, fmt.Sprintf(selectDataConflictKeysReplace, em.schemaEscaped),
tableName)
if err != nil {
return errors.Trace(err)
}
defer dataKvRows.Close()

var previousRawKey, latestValue []byte
var mustKeepKvPairs *kv.Pairs

for dataKvRows.Next() {
var rawKey, rawValue, rawHandle []byte
if err := dataKvRows.Scan(&rawKey, &rawValue, &rawHandle); err != nil {
return errors.Trace(err)
}
em.logger.Debug("got group raw_key, raw_value, raw_handle from table",
logutil.Key("raw_key", rawKey),
zap.Binary("raw_value", rawValue),
zap.Binary("raw_handle", rawHandle))

if !bytes.Equal(rawKey, previousRawKey) {
previousRawKey = rawKey
// get the latest value of rawKey from downstream TiDB
latestValue, err = fnGetLatest(gCtx, rawKey)
if err != nil && !tikverr.IsErrNotFound(err) {
return errors.Trace(err)
}
if latestValue != nil {
handle, err := tablecodec.DecodeRowKey(rawKey)
if err != nil {
return errors.Trace(err)
}
decodedData, _, err := tables.DecodeRawRowData(encoder.SessionCtx,
tbl.Meta(), handle, tbl.Cols(), latestValue)
if err != nil {
return errors.Trace(err)
}
_, err = encoder.Table.AddRecord(encoder.SessionCtx, decodedData)
if err != nil {
return errors.Trace(err)
}
// calculate the new mustKeepKvPairs corresponding to the new rawKey
// find out all the KV pairs that are contained in the data KV
mustKeepKvPairs = encoder.SessionCtx.TakeKvPairs()
}
}

// TODO: check data KV
// if the latest value of rawKey equals to rawValue, that means this data KV is maintained in downstream TiDB
// if not, that means this data KV has been deleted due to overwritten index KV
if bytes.Equal(rawValue, latestValue) {
continue
}

handle, err := tablecodec.DecodeRowKey(rawHandle)
if err != nil {
return errors.Trace(err)
}
decodedData, _, err := tables.DecodeRawRowData(encoder.SessionCtx,
tbl.Meta(), handle, tbl.Cols(), rawValue)
if err != nil {
return errors.Trace(err)
}
_, err = encoder.Table.AddRecord(encoder.SessionCtx, decodedData)
if err != nil {
return errors.Trace(err)
}

// find out all the KV pairs that are contained in the data KV
kvPairs := encoder.SessionCtx.TakeKvPairs()
for _, kvPair := range kvPairs.Pairs {
em.logger.Debug("got encoded KV",
logutil.Key("key", kvPair.Key),
zap.Binary("value", kvPair.Val))
kvLatestValue, err := fnGetLatest(gCtx, kvPair.Key)
if tikverr.IsErrNotFound(err) || kvLatestValue == nil {
continue
}
if err != nil {
return errors.Trace(err)
}

// if the value of the KV pair is not equal to the latest value of the key of the KV pair
// that means the value of the KV pair has been overwritten, so it needs no extra operation
if !bytes.Equal(kvLatestValue, kvPair.Val) {
continue
}

// if the KV pair is contained in mustKeepKvPairs, we cannot delete it
// if not, delete the KV pair
isContained := slices.ContainsFunc(mustKeepKvPairs.Pairs, func(mustKeepKvPair common.KvPair) bool {
return bytes.Equal(mustKeepKvPair.Key, kvPair.Key) && bytes.Equal(mustKeepKvPair.Val, kvPair.Val)
})
if isContained {
continue
}

if err := fnDeleteKey(gCtx, kvPair.Key); err != nil {
return errors.Trace(err)
}
}
}
if err := dataKvRows.Err(); err != nil {
return errors.Trace(err)
}

return nil
})
Expand Down
Loading

0 comments on commit 32d7626

Please sign in to comment.