From 034da6ee94d90ddf9023863156f3b117af32e78f Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 25 Jul 2023 14:12:03 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #45441 Signed-off-by: ti-chi-bot --- Makefile | 1 + br/pkg/task/restore_txn.go | 2 +- br/tests/br_txn/BUILD.bazel | 21 +++ br/tests/br_txn/client.go | 268 ++++++++++++++++++++++++++++++++++++ br/tests/br_txn/run.sh | 136 ++++++++++++++++++ br/tests/run_group.sh | 76 ++++++++++ 6 files changed, 503 insertions(+), 1 deletion(-) create mode 100644 br/tests/br_txn/BUILD.bazel create mode 100644 br/tests/br_txn/client.go create mode 100755 br/tests/br_txn/run.sh create mode 100755 br/tests/run_group.sh diff --git a/Makefile b/Makefile index 06bf3f9d564d8..e00542be65752 100644 --- a/Makefile +++ b/Makefile @@ -327,6 +327,7 @@ build_for_br_integration_test: $(GOBUILD) $(RACE_FLAG) -o bin/gc br/tests/br_z_gc_safepoint/*.go && \ $(GOBUILD) $(RACE_FLAG) -o bin/oauth br/tests/br_gcs/*.go && \ $(GOBUILD) $(RACE_FLAG) -o bin/rawkv br/tests/br_rawkv/*.go && \ + $(GOBUILD) $(RACE_FLAG) -o bin/txnkv br/tests/br_txn/*.go && \ $(GOBUILD) $(RACE_FLAG) -o bin/parquet_gen br/tests/lightning_checkpoint_parquet/*.go \ ) || (make failpoint-disable && exit 1) @make failpoint-disable diff --git a/br/pkg/task/restore_txn.go b/br/pkg/task/restore_txn.go index 40ab734cbcd52..7fe38c2bb7f32 100644 --- a/br/pkg/task/restore_txn.go +++ b/br/pkg/task/restore_txn.go @@ -87,7 +87,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config) !cfg.LogProgress) // RawKV restore does not need to rewrite keys. - err = restore.SplitRanges(ctx, client, ranges, nil, updateCh, true) + err = restore.SplitRanges(ctx, client, ranges, nil, updateCh, false) if err != nil { return errors.Trace(err) } diff --git a/br/tests/br_txn/BUILD.bazel b/br/tests/br_txn/BUILD.bazel new file mode 100644 index 0000000000000..911f746718a6e --- /dev/null +++ b/br/tests/br_txn/BUILD.bazel @@ -0,0 +1,21 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "br_txn_lib", + srcs = ["client.go"], + importpath = "github.com/pingcap/tidb/br/tests/br_txn", + visibility = ["//visibility:private"], + deps = [ + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_log//:log", + "@com_github_tikv_client_go_v2//config", + "@com_github_tikv_client_go_v2//txnkv", + "@org_uber_go_zap//:zap", + ], +) + +go_binary( + name = "br_txn", + embed = [":br_txn_lib"], + visibility = ["//visibility:public"], +) diff --git a/br/tests/br_txn/client.go b/br/tests/br_txn/client.go new file mode 100644 index 0000000000000..6f0bf414e207a --- /dev/null +++ b/br/tests/br_txn/client.go @@ -0,0 +1,268 @@ +package main + +import ( + "bytes" + "context" + "encoding/hex" + "flag" + "fmt" + "hash/crc64" + "math/rand" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/tikv/client-go/v2/config" + "github.com/tikv/client-go/v2/txnkv" + "go.uber.org/zap" +) + +var ( + ca = flag.String("ca", "", "CA certificate path for TLS connection") + cert = flag.String("cert", "", "certificate path for TLS connection") + key = flag.String("key", "", "private key path for TLS connection") + pdAddr = flag.String("pd", "127.0.0.1:2379", "Address of PD") + runMode = flag.String("mode", "", "Mode. One of 'rand-gen', 'checksum', 'scan', 'delete'") + startKeyStr = flag.String("start-key", "", "Start key in hex") + endKeyStr = flag.String("end-key", "", "End key in hex") + keyMaxLen = flag.Int("key-max-len", 32, "Max length of keys for rand-gen mode") + concurrency = flag.Int("concurrency", 32, "Concurrency to run rand-gen") + duration = flag.Int("duration", 10, "duration(second) of rand-gen") +) + +func createClient(addr string) (*txnkv.Client, error) { + if *ca != "" { + conf := config.GetGlobalConfig() + conf.Security.ClusterSSLCA = *ca + conf.Security.ClusterSSLCert = *cert + conf.Security.ClusterSSLKey = *key + config.StoreGlobalConfig(conf) + } + + cli, err := txnkv.NewClient([]string{addr}) + return cli, errors.Trace(err) +} + +func main() { + flag.Parse() + + startKey := []byte(*startKeyStr) + endKey := []byte(*endKeyStr) + if len(endKey) == 0 { + log.Panic("Empty endKey is not supported yet") + } + + if *runMode == "test-rand-key" { + testRandKey(startKey, endKey, *keyMaxLen) + return + } + + client, err := createClient(*pdAddr) + if err != nil { + log.Panic("Failed to create client", zap.String("pd", *pdAddr), zap.Error(err)) + } + + switch *runMode { + case "rand-gen": + err = randGenWithDuration(client, startKey, endKey, *keyMaxLen, *concurrency, *duration) + case "checksum": + err = checksum(client, startKey, endKey) + case "delete": + err = deleteRange(client, startKey, endKey) + } + + if err != nil { + log.Panic("Error", zap.Error(err)) + } +} + +func randGenWithDuration(client *txnkv.Client, startKey, endKey []byte, + maxLen int, concurrency int, duration int) error { + var err error + ok := make(chan struct{}) + go func() { + err = randGen(client, startKey, endKey, maxLen, concurrency) + ok <- struct{}{} + }() + select { + case <-time.After(time.Second * time.Duration(duration)): + case <-ok: + } + return errors.Trace(err) +} + +func randGen(client *txnkv.Client, startKey, endKey []byte, maxLen int, concurrency int) error { + log.Info("Start rand-gen", zap.Int("maxlen", maxLen), + zap.String("startkey", hex.EncodeToString(startKey)), zap.String("endkey", hex.EncodeToString(endKey))) + log.Info("Rand-gen will keep running. Please Ctrl+C to stop manually.") + + // Cannot generate shorter key than commonPrefix + commonPrefixLen := 0 + for ; commonPrefixLen < len(startKey) && commonPrefixLen < len(endKey) && + startKey[commonPrefixLen] == endKey[commonPrefixLen]; commonPrefixLen++ { + continue + } + + if maxLen < commonPrefixLen { + return errors.Errorf("maxLen (%v) < commonPrefixLen (%v)", maxLen, commonPrefixLen) + } + + const batchSize = 32 + + errCh := make(chan error, concurrency) + for i := maxLen; i <= maxLen+concurrency; i++ { + go func(i int) { + for { + txn, err := client.Begin() + if err != nil { + errCh <- errors.Trace(err) + } + for j := 0; j < batchSize; j++ { + key := randKey(startKey, endKey, i) + // append index to avoid write conflict + key = appendIndex(key, i) + value := randValue() + err = txn.Set(key, value) + if err != nil { + errCh <- errors.Trace(err) + } + } + err = txn.Commit(context.TODO()) + if err != nil { + errCh <- errors.Trace(err) + } + } + }(i) + } + + err := <-errCh + if err != nil { + return errors.Trace(err) + } + + return nil +} + +func testRandKey(startKey, endKey []byte, maxLen int) { + for { + k := randKey(startKey, endKey, maxLen) + if bytes.Compare(k, startKey) < 0 || bytes.Compare(k, endKey) >= 0 { + panic(hex.EncodeToString(k)) + } + } +} + +//nolint:gosec +func randKey(startKey, endKey []byte, maxLen int) []byte { +Retry: + for { // Regenerate on fail + result := make([]byte, 0, maxLen) + + upperUnbounded := false + lowerUnbounded := false + + for i := 0; i < maxLen; i++ { + upperBound := 256 + if !upperUnbounded { + if i >= len(endKey) { + // The generated key is the same as endKey which is invalid. Regenerate it. + continue Retry + } + upperBound = int(endKey[i]) + 1 + } + + lowerBound := 0 + if !lowerUnbounded { + if i >= len(startKey) { + lowerUnbounded = true + } else { + lowerBound = int(startKey[i]) + } + } + + if lowerUnbounded { + if rand.Intn(257) == 0 { + return result + } + } + + value := rand.Intn(upperBound - lowerBound) + value += lowerBound + + if value < upperBound-1 { + upperUnbounded = true + } + if value > lowerBound { + lowerUnbounded = true + } + + result = append(result, uint8(value)) + } + + return result + } +} + +//nolint:gosec +func appendIndex(key []byte, i int) []byte { + return append(key, uint8(i)) +} + +//nolint:gosec +func randValue() []byte { + result := make([]byte, 0, 512) + for i := 0; i < 512; i++ { + value := rand.Intn(257) + if value == 256 { + if i > 0 { + return result + } + value-- + } + result = append(result, uint8(value)) + } + return result +} + +func checksum(client *txnkv.Client, startKey, endKey []byte) error { + log.Info("Start checkcum on range", + zap.String("startkey", hex.EncodeToString(startKey)), zap.String("endkey", hex.EncodeToString(endKey))) + + txn, err := client.Begin() + if err != nil { + return errors.Trace(err) + } + iter, err := txn.Iter(startKey, endKey) + if err != nil { + return errors.Trace(err) + } + + digest := crc64.New(crc64.MakeTable(crc64.ECMA)) + + var res uint64 + + for iter.Valid() { + err := iter.Next() + if err != nil { + return errors.Trace(err) + } + if len(iter.Key()) == 0 { + break + } + _, _ = digest.Write(iter.Key()) + _, _ = digest.Write(iter.Value()) + res ^= digest.Sum64() + } + _ = txn.Commit(context.TODO()) + + log.Info("Checksum result", zap.Uint64("checksum", res)) + fmt.Printf("Checksum result: %016x\n", res) + return nil +} + +func deleteRange(client *txnkv.Client, startKey, endKey []byte) error { + log.Info("Start delete data in range", + zap.String("startkey", hex.EncodeToString(startKey)), zap.String("endkey", hex.EncodeToString(endKey))) + _, err := client.DeleteRange(context.TODO(), startKey, endKey, *concurrency) + return err +} diff --git a/br/tests/br_txn/run.sh b/br/tests/br_txn/run.sh new file mode 100755 index 0000000000000..8b15f78764af4 --- /dev/null +++ b/br/tests/br_txn/run.sh @@ -0,0 +1,136 @@ +#!/bin/sh +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +# restart service without tiflash +source $( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )/../_utils/run_services +start_services --no-tiflash + +BACKUP_DIR=$TEST_DIR/"txn_backup" +BACKUP_FULL=$TEST_DIR/"txnkv-full" + +checksum() { + bin/txnkv --pd $PD_ADDR \ + --ca "$TEST_DIR/certs/ca.pem" \ + --cert "$TEST_DIR/certs/br.pem" \ + --key "$TEST_DIR/certs/br.key" \ + --mode checksum --start-key $1 --end-key $2 | grep result | tail -n 1 | awk '{print $3}' +} + +fail_and_exit() { + echo "TEST: [$TEST_NAME] failed!" + exit 1 +} + +clean() { + bin/txnkv --pd $PD_ADDR \ + --ca "$TEST_DIR/certs/ca.pem" \ + --cert "$TEST_DIR/certs/br.pem" \ + --key "$TEST_DIR/certs/br.key" \ + --mode delete --start-key $1 --end-key $2 +} + +test_full_txnkv_encryption() { + check_range_start="hello" + check_range_end="world" + + rm -rf $BACKUP_FULL + + checksum_full=$(checksum $check_range_start $check_range_end) + # backup current state of key-values + run_br --pd $PD_ADDR backup txn -s "local://$BACKUP_FULL" --crypter.method "aes128-ctr" --crypter.key "0123456789abcdef0123456789abcdef" + + clean $check_range_start $check_range_end + # Ensure the data is deleted + checksum_new=$(checksum $check_range_start $check_range_end) + if [ "$checksum_new" == "$checksum_full" ];then + echo "failed to delete data in range in encryption" + fail_and_exit + fi + + run_br --pd $PD_ADDR restore txn -s "local://$BACKUP_FULL" --crypter.method "aes128-ctr" --crypter.key "0123456789abcdef0123456789abcdef" + checksum_new=$(checksum $check_range_start $check_range_end) + if [ "$checksum_new" != "$checksum_full" ];then + echo "failed to restore" + fail_and_exit + fi +} + +run_test() { + if [ -z "$1" ];then + echo "run test" + else + export GO_FAILPOINTS="$1" + echo "run test with failpoints: $GO_FAILPOINTS" + fi + + rm -rf $BACKUP_DIR + clean "hello" "world" + + # generate txn kv randomly in range[start-key, end-key) in 10s + bin/txnkv --pd $PD_ADDR \ + --ca "$TEST_DIR/certs/ca.pem" \ + --cert "$TEST_DIR/certs/br.pem" \ + --key "$TEST_DIR/certs/br.key" \ + --mode rand-gen --start-key "hello" --end-key "world" --duration 10 + + checksum_ori=$(checksum "hello" "world") + + # backup txnkv + echo "backup start..." + run_br --pd $PD_ADDR backup txn -s "local://$BACKUP_DIR" + + # delete data in range[start-key, end-key) + clean "hello" "world" + # Ensure the data is deleted + checksum_new=$(checksum "hello" "world") + + if [ "$checksum_new" != "$checksum_empty" ];then + echo "failed to delete data in range after backup" + fail_and_exit + fi + + # restore rawkv + echo "restore start..." + run_br --pd $PD_ADDR restore txn -s "local://$BACKUP_DIR" + + checksum_new=$(checksum "hello" "world") + + if [ "$checksum_new" != "$checksum_ori" ];then + echo "checksum failed after restore" + fail_and_exit + fi + + test_full_txnkv_encryption + + # delete data in range[start-key, end-key) + clean "hello" "world" + # Ensure the data is deleted + checksum_new=$(checksum "hello" "world") + + if [ "$checksum_new" != "$checksum_empty" ];then + echo "failed to delete data in range" + fail_and_exit + fi + + export GO_FAILPOINTS="" +} + +# delete data in range[start-key, end-key) +clean "hello" "world" +checksum_empty=$(checksum "hello" "world") +run_test "" diff --git a/br/tests/run_group.sh b/br/tests/run_group.sh new file mode 100755 index 0000000000000..21d13c2fc18ec --- /dev/null +++ b/br/tests/run_group.sh @@ -0,0 +1,76 @@ +#!/usr/bin/env bash + +# This script split the integration tests into 16 groups to support parallel group tests execution. +# all the integration tests are located in br/tests directory. only the directories +# containing run.sh will be considered as integration tests. the script will print the total # # # number + +set -eo pipefail + +# Step 1 +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +group=$1 + + +# Define groups +# Note: If new group is added, the group name must also be added to CI +# * https://github.com/PingCAP-QE/ci/blob/main/pipelines/pingcap/tidb/latest/pull_br_integration_test.groovy +# Each group of tests consumes as much time as possible, thus reducing CI waiting time. +# Putting multiple light tests together and heavy tests in a separate group. +declare -A groups +groups=( + ["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable" + ["G01"]="br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full" + ["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history" + ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index' + ["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index' + ["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' + ["G06"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn' + ["G07"]='br_clustered_index br_crypter br_table_partition br_tidb_placement_policy br_tiflash br_tikv_outage' + ["G08"]='br_tikv_outage2 br_ttl br_views_and_sequences br_z_gc_safepoint lightning_add_index lightning_alter_random lightning_auto_columns' + ["G09"]='lightning_auto_random_default lightning_bom_file lightning_character_sets lightning_check_partial_imported lightning_checkpoint lightning_checkpoint_chunks lightning_checkpoint_columns lightning_checkpoint_dirty_tableid' + ["G10"]='lightning_checkpoint_engines lightning_checkpoint_engines_order lightning_checkpoint_error_destroy lightning_checkpoint_parquet lightning_checkpoint_timestamp lightning_checksum_mismatch lightning_cmdline_override lightning_column_permutation lightning_common_handle' + ["G11"]='lightning_compress lightning_concurrent-restore lightning_config_max_error lightning_config_skip_csv_header lightning_csv lightning_default-columns lightning_disable_scheduler_by_key_range lightning_disk_quota lightning_distributed_import' + ["G12"]='lightning_drop_other_tables_halfway lightning_duplicate_detection lightning_duplicate_detection_new lightning_duplicate_resolution lightning_duplicate_resolution_incremental lightning_error_summary lightning_examples lightning_exotic_filenames lightning_extend_routes lightning_fail_fast' + ["G13"]='lightning_fail_fast_on_nonretry_err lightning_file_routing lightning_foreign_key lightning_gcs lightning_generated_columns lightning_ignore_columns lightning_import_compress lightning_incremental lightning_issue_282' + ["G14"]='lightning_issue_40657 lightning_issue_410 lightning_issue_519 lightning_local_backend lightning_max_incr lightning_max_random lightning_multi_valued_index lightning_new_collation lightning_no_schema' + ["G15"]='lightning_parquet lightning_partition_incremental lightning_partitioned-table lightning_record_network lightning_reload_cert lightning_restore lightning_routes lightning_routes_panic lightning_row-format-v2 lightning_s3' + ["G16"]='lightning_shard_rowid lightning_source_linkfile lightning_sqlmode lightning_tidb_duplicate_data lightning_tidb_rowid lightning_tiflash lightning_tikv_multi_rocksdb lightning_too_many_columns lightning_tool_135' + ["G17"]='lightning_tool_1420 lightning_tool_1472 lightning_tool_241 lightning_ttl lightning_unused_config_keys lightning_various_types lightning_view lightning_write_batch lightning_write_limit' +) + +# Get other cases not in groups, to avoid missing any case +others=() +for script in "$CUR"/*/run.sh; do + test_name="$(basename "$(dirname "$script")")" + # shellcheck disable=SC2076 + if [[ ! " ${groups[*]} " =~ " ${test_name} " ]]; then + others=("${others[@]} ${test_name}") + fi +done + +if [[ "$group" == "others" ]]; then + if [[ -z $others ]]; then + echo "All br&lightning integration test cases have been added to groups" + exit 0 + fi + echo "Error: "$others" is not added to any group in br/tests/run_group.sh" + exit 1 +elif [[ " ${!groups[*]} " =~ " ${group} " ]]; then + test_names="${groups[${group}]}" + # Run test cases + if [[ -n $test_names ]]; then + echo "" + echo "Run cases: ${test_names}" + for case_name in $test_names; do + echo "Run cases: ${case_name}" + rm -rf /tmp/backup_restore_test + mkdir -p /tmp/backup_restore_test + rm -rf cover + mkdir cover + TEST_NAME=${case_name} ${CUR}/run.sh + done + fi +else + echo "Error: invalid group name: ${group}" + exit 1 +fi From 1dd420fcca03884ce3d5a0d3b91a300c425bf824 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 10 Oct 2023 16:53:07 +0800 Subject: [PATCH 2/2] Update run_group.sh --- br/tests/run_group.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/tests/run_group.sh b/br/tests/run_group.sh index f806ae7d09296..ca66c8d5013ce 100755 --- a/br/tests/run_group.sh +++ b/br/tests/run_group.sh @@ -24,7 +24,7 @@ groups=( ["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable" ["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full" ["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history" - ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index' + ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index' ["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index' ["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' ["G06"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn'