diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index 2d9875ad56960..7f1546d552aa7 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -87,7 +87,7 @@ type tableMetaMgr interface { UpdateTableStatus(ctx context.Context, status metaStatus) error UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) ( - needChecksum bool, needRemoteDupe bool, baseTotalChecksum *verify.KVChecksum, err error) + otherHasDupe bool, needRemoteDupe bool, baseTotalChecksum *verify.KVChecksum, err error) FinishTable(ctx context.Context) error } @@ -370,7 +370,7 @@ func (m *dbTableMetaMgr) UpdateTableStatus(ctx context.Context, status metaStatu } func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) ( - needChecksum bool, needRemoteDupe bool, baseTotalChecksum *verify.KVChecksum, err error, + otherHasDupe bool, needRemoteDupe bool, baseTotalChecksum *verify.KVChecksum, err error, ) { conn, err := m.session.Conn(ctx) if err != nil { @@ -393,7 +393,7 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks taskHasDuplicates bool ) newStatus := metaStatusChecksuming - needChecksum = true + otherHasDupe = false needRemoteDupe = true err = exec.Transact(ctx, "checksum pre-check", func(ctx context.Context, tx *sql.Tx) error { rows, err := tx.QueryContext( @@ -423,9 +423,7 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks return err } - if taskHasDuplicates { - needChecksum = false - } + otherHasDupe = otherHasDupe || taskHasDuplicates // skip finished meta if status >= metaStatusFinished { @@ -436,7 +434,6 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks if status >= metaStatusChecksuming { newStatus = status needRemoteDupe = status == metaStatusChecksuming - needChecksum = needChecksum && needRemoteDupe return nil } @@ -445,7 +442,6 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks if status < metaStatusChecksuming { newStatus = metaStatusChecksumSkipped - needChecksum = false needRemoteDupe = false break } else if status == metaStatusChecksuming { @@ -475,12 +471,13 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks return false, false, nil, err } - if needChecksum { + if !otherHasDupe && needRemoteDupe { ck := verify.MakeKVChecksum(totalBytes, totalKvs, totalChecksum) baseTotalChecksum = &ck } log.FromContext(ctx).Info("check table checksum", zap.String("table", m.tr.tableName), - zap.Bool("checksum", needChecksum), zap.String("new_status", newStatus.String())) + zap.Bool("otherHasDupe", otherHasDupe), zap.Bool("needRemoteDupe", needRemoteDupe), + zap.String("new_status", newStatus.String())) return } @@ -1073,7 +1070,7 @@ func (m noopTableMetaMgr) UpdateTableBaseChecksum(ctx context.Context, checksum } func (m noopTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (bool, bool, *verify.KVChecksum, error) { - return true, true, &verify.KVChecksum{}, nil + return false, true, &verify.KVChecksum{}, nil } func (m noopTableMetaMgr) FinishTable(ctx context.Context) error { diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 86d3ed2622ddc..fc669a271b2a6 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -794,11 +794,19 @@ func (tr *TableRestore) postProcess( } hasDupe = hasLocalDupe } + failpoint.Inject("SlowDownCheckDupe", func(v failpoint.Value) { + sec := v.(int) + tr.logger.Warn("start to sleep several seconds before checking other dupe", + zap.Int("seconds", sec)) + time.Sleep(time.Duration(sec) * time.Second) + }) - needChecksum, needRemoteDupe, baseTotalChecksum, err := metaMgr.CheckAndUpdateLocalChecksum(ctx, &localChecksum, hasDupe) + otherHasDupe, needRemoteDupe, baseTotalChecksum, err := metaMgr.CheckAndUpdateLocalChecksum(ctx, &localChecksum, hasDupe) if err != nil { return false, err } + needChecksum := !otherHasDupe && needRemoteDupe + hasDupe = hasDupe || otherHasDupe if needRemoteDupe && rc.cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone { opts := &kv.SessionOptions{ diff --git a/br/tests/lightning_duplicate_resolution_incremental/config1.toml b/br/tests/lightning_duplicate_resolution_incremental/config1.toml new file mode 100644 index 0000000000000..a72bc7a3718cb --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_incremental/config1.toml @@ -0,0 +1,41 @@ +[lightning] +task-info-schema-name = 'lightning_task_info_dupe_resolve_incremental' +index-concurrency = 10 +table-concurrency = 10 + +[tikv-importer] +backend = "local" +on-duplicate = "replace" +duplicate-resolution = "remove" +incremental-import = true + +[checkpoint] +enable = true +schema = "tidb_lightning_checkpoint_dupe_resolve_incremental1" +driver = "mysql" + +[[mydumper.files]] +pattern = '(?i).*(-schema-trigger|-schema-post)\.sql$' +type = 'ignore' + +[[mydumper.files]] +pattern = '(?i)^(?:[^/]*/)*([^/.]+)-schema-create\.sql$' +schema = '$1' +type = 'schema-schema' + +[[mydumper.files]] +pattern = '(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema\.sql$' +schema = '$1' +table = '$2' +type = 'table-schema' + +[[mydumper.files]] +pattern = '(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)\.0\.sql$' +schema = '$1' +table = '$2' +key = '0' +type = 'sql' + +[post-restore] +analyze = false +checksum = "optional" diff --git a/br/tests/lightning_duplicate_resolution_incremental/config2.toml b/br/tests/lightning_duplicate_resolution_incremental/config2.toml new file mode 100644 index 0000000000000..bb29511a9b432 --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_incremental/config2.toml @@ -0,0 +1,41 @@ +[lightning] +task-info-schema-name = 'lightning_task_info_dupe_resolve_incremental' +index-concurrency = 10 +table-concurrency = 10 + +[tikv-importer] +backend = "local" +on-duplicate = "replace" +duplicate-resolution = "remove" +incremental-import = true + +[checkpoint] +enable = true +schema = "tidb_lightning_checkpoint_dupe_resolve_incremental2" +driver = "mysql" + +[[mydumper.files]] +pattern = '(?i).*(-schema-trigger|-schema-post)\.sql$' +type = 'ignore' + +[[mydumper.files]] +pattern = '(?i)^(?:[^/]*/)*([^/.]+)-schema-create\.sql$' +schema = '$1' +type = 'schema-schema' + +[[mydumper.files]] +pattern = '(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema\.sql$' +schema = '$1' +table = '$2' +type = 'table-schema' + +[[mydumper.files]] +pattern = '(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)\.1\.sql$' +schema = '$1' +table = '$2' +key = '1' +type = 'sql' + +[post-restore] +analyze = false +checksum = "optional" diff --git a/br/tests/lightning_duplicate_resolution_incremental/data/dup_resolve_detect-schema-create.sql b/br/tests/lightning_duplicate_resolution_incremental/data/dup_resolve_detect-schema-create.sql new file mode 100644 index 0000000000000..202de81067861 --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_incremental/data/dup_resolve_detect-schema-create.sql @@ -0,0 +1 @@ +create schema dup_resolve_detect; diff --git a/br/tests/lightning_duplicate_resolution_incremental/data/dup_resolve_detect.ta-schema.sql b/br/tests/lightning_duplicate_resolution_incremental/data/dup_resolve_detect.ta-schema.sql new file mode 100644 index 0000000000000..fb6cf2d5a7651 --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_incremental/data/dup_resolve_detect.ta-schema.sql @@ -0,0 +1,6 @@ +create table ta ( + id varchar(11) not null primary key nonclustered, -- use varchar here to make sure _tidb_rowid will be generated + name varchar(20) not null, + size bigint not null, + unique key uni_name(name) +); diff --git a/br/tests/lightning_duplicate_resolution_incremental/data/dup_resolve_detect.ta.0.sql b/br/tests/lightning_duplicate_resolution_incremental/data/dup_resolve_detect.ta.0.sql new file mode 100644 index 0000000000000..ee29f689e8792 --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_incremental/data/dup_resolve_detect.ta.0.sql @@ -0,0 +1,20 @@ +insert into ta values (3, '3c49f3bd', 6194643990092531757); +insert into ta values (13, '1da87b44', 3724743701402246028); +insert into ta values (6, '8b080186', 4840750639653607661); +insert into ta values (1, 'c83c0e6a', 5057094372111243649); +insert into ta values (12, 'dd73baf5', 2295098755414696158); +insert into ta values (4, '1cf99fa1', 2520784525406914042); +insert into ta values (11, 'b238a0e6', 3314555604794199537); +insert into ta values (10, 'a489c47a', 7706822128523578708); +insert into ta values (10, '9a54941e', 4969369552499069659); +insert into ta values (2, 'e7c90179', 1305347797378229715); +insert into ta values (9, '75e0344a', 500154046394880294); +insert into ta values (9, 'c3e8fc36', 5880042654284780409); +insert into ta values (6, 'd6835599', 2703142091339420770); +insert into ta values (5, 'c4a9c3a3', 6725275961959702206); +insert into ta values (14, 'eb1ab0dd', 5442878220607642694); +insert into ta values (7, '78e166f4', 7062852002089313920); +insert into ta values (8, '20986b65', 5485014514564267319); +insert into ta values (8, '9bd4d7a9', 9085469020413045798); +insert into ta values (15, 'd4aa9a8a', 546189610059969690); +insert into ta values (7, 'a7870c06', 3615729521258364152); diff --git a/br/tests/lightning_duplicate_resolution_incremental/data/dup_resolve_detect.ta.1.sql b/br/tests/lightning_duplicate_resolution_incremental/data/dup_resolve_detect.ta.1.sql new file mode 100644 index 0000000000000..88b67b051fe6e --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_incremental/data/dup_resolve_detect.ta.1.sql @@ -0,0 +1,16 @@ +insert into ta values (111, 'bcf4e75f', 3304674741328415661); +insert into ta values (112, 'c08078e9', 7464585077725645791); +insert into ta values (113, 'ca05b4b2', 1280363363179468054); +insert into ta values (114, '8a094c96', 107578474892900608); +insert into ta values (115, 'f38efac2', 5273601814057696410); +insert into ta values (116, '5bf0cb56', 7276272767003446282); +insert into ta values (117, 'c8836b45', 653431702983792793); +insert into ta values (118, '7470ba67', 5617407618564683998); +insert into ta values (119, '466e1e95', 6827370124386922419); +insert into ta values (120, '41df97f3', 2296443172527920942); +insert into ta values (121, 'bd644f43', 6038622426427289955); +insert into ta values (122, '96aeb918', 1496857236328804363); +insert into ta values (123, '232448f7', 1199921720244646472); +insert into ta values (124, 'd296d6e4', 5705035255191089143); +insert into ta values (125, '194ec1d8', 6895413645725179445); +insert into ta values (126, 'a53238ec', 1527836891202149330); diff --git a/br/tests/lightning_duplicate_resolution_incremental/run.sh b/br/tests/lightning_duplicate_resolution_incremental/run.sh new file mode 100644 index 0000000000000..b1bf1e3869d27 --- /dev/null +++ b/br/tests/lightning_duplicate_resolution_incremental/run.sh @@ -0,0 +1,62 @@ +#!/bin/bash +# +# Copyright 2022 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +check_cluster_version 5 2 0 'duplicate detection' || exit 0 + +LOG_FILE1="$TEST_DIR/lightning-duplicate-resolution1.log" +LOG_FILE2="$TEST_DIR/lightning-duplicate-resolution2.log" + +# let lightning run a bit slow to avoid some table in the first lightning finish too fast. +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownCheckDupe=return(10)" +run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_duplicate_resolution_incremental.sorted1" \ + --enable-checkpoint=1 --log-file "$LOG_FILE1" --config "tests/$TEST_NAME/config1.toml" & + +counter=0 +while [ $counter -lt 10 ]; do + if grep -Fq "start to sleep several seconds before checking other dupe" "$LOG_FILE1"; then + echo "lightning 1 already starts waiting for dupe" + break + fi + ((counter += 1)) + echo "waiting for lightning 1 starts" + sleep 1 +done + +if [ $counter -ge 10 ]; then + echo "fail to wait for lightning 1 starts" + exit 1 +fi + +run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_duplicate_resolution_incremental.sorted2" \ + --enable-checkpoint=1 --log-file "$LOG_FILE2" --config "tests/$TEST_NAME/config2.toml" & + +wait + +export GO_FAILPOINTS="" + +# Ensure table is consistent. +run_sql 'admin check table dup_resolve_detect.ta' + +# Check data correctness +run_sql 'select count(*), sum(id) from dup_resolve_detect.ta where id < 100' +check_contains 'count(*): 10' +check_contains 'sum(id): 80' + +run_sql 'select count(*), sum(id) from dup_resolve_detect.ta where id > 100' +check_contains 'count(*): 16' +check_contains 'sum(id): 1896'