Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: fix insert gc failed due to slow schema reload (#57742) #58334

Open
wants to merge 1 commit into
base: release-6.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ const (
"You may check the metadata and continue by wait other task finish or manually delete the lock file " + truncateLockPath + " at the external storage."
)

const (
waitInfoSchemaReloadCheckInterval = 1 * time.Second
// a million tables should take a few minutes to load all DDL change, making 15 to make sure we don't exit early
waitInfoSchemaReloadTimeout = 15 * time.Minute
)

var (
StreamStart = "log start"
StreamStop = "log stop"
Expand Down Expand Up @@ -1303,6 +1309,21 @@ func restoreStream(
return errors.Annotate(err, "failed to restore kv files")
}

// failpoint to stop for a while after restoring kvs
// this is to mimic the scenario that restore takes long time and the lease in schemaInfo has expired and needs refresh
failpoint.Inject("post-restore-kv-pending", func(val failpoint.Value) {
if val.(bool) {
// not ideal to use sleep but not sure what's the better way right now
log.Info("sleep after restoring kv")
time.Sleep(2 * time.Second)
}
})

// make sure schema reload finishes before proceeding
if err = waitUntilSchemaReload(ctx, client); err != nil {
return errors.Trace(err)
}

if err = client.CleanUpKVFiles(ctx); err != nil {
return errors.Annotate(err, "failed to clean up")
}
Expand Down Expand Up @@ -1674,3 +1695,16 @@ func checkPiTRRequirements(ctx context.Context, g glue.Glue, cfg *RestoreConfig)

return nil
}

func waitUntilSchemaReload(ctx context.Context, client *logclient.LogClient) error {
log.Info("waiting for schema info finishes reloading")
reloadStart := time.Now()
conditionFunc := func() bool {
return !client.GetDomain().IsLeaseExpired()
}
if err := utils.WaitUntil(ctx, conditionFunc, waitInfoSchemaReloadCheckInterval, waitInfoSchemaReloadTimeout); err != nil {
return errors.Annotate(err, "failed to wait until schema reload")
}
log.Info("reloading schema finished", zap.Duration("timeTaken", time.Since(reloadStart)))
return nil
}
4 changes: 4 additions & 0 deletions br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ go_library(
"safe_point.go",
"schema.go",
"store_manager.go",
<<<<<<< HEAD
"suspend_importing.go",
=======
"wait.go",
>>>>>>> 8fe061897e8 (br: fix insert gc failed due to slow schema reload (#57742))
"worker.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/utils",
Expand Down
44 changes: 44 additions & 0 deletions br/pkg/utils/encryption.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/pkg/util/encrypt"

Check failure on line 22 in br/pkg/utils/encryption.go

View workflow job for this annotation

GitHub Actions / integration-test (5.7.35)

no required module provides package github.com/pingcap/tidb/pkg/util/encrypt; to add it:

Check failure on line 22 in br/pkg/utils/encryption.go

View workflow job for this annotation

GitHub Actions / integration-test (8.0.22)

no required module provides package github.com/pingcap/tidb/pkg/util/encrypt; to add it:

Check failure on line 22 in br/pkg/utils/encryption.go

View workflow job for this annotation

GitHub Actions / integration-test (8.0.26)

no required module provides package github.com/pingcap/tidb/pkg/util/encrypt; to add it:
)

func Decrypt(content []byte, cipher *backuppb.CipherInfo, iv []byte) ([]byte, error) {
if len(content) == 0 || cipher == nil {
return content, nil
}

switch cipher.CipherType {
case encryptionpb.EncryptionMethod_PLAINTEXT:
return content, nil
case encryptionpb.EncryptionMethod_AES128_CTR,
encryptionpb.EncryptionMethod_AES192_CTR,
encryptionpb.EncryptionMethod_AES256_CTR:
return encrypt.AESDecryptWithCTR(content, cipher.CipherKey, iv)
default:
return content, errors.Annotatef(berrors.ErrInvalidArgument, "cipher type invalid %s", cipher.CipherType)
}
}

func IsEffectiveEncryptionMethod(method encryptionpb.EncryptionMethod) bool {
return method != encryptionpb.EncryptionMethod_UNKNOWN && method != encryptionpb.EncryptionMethod_PLAINTEXT
}
49 changes: 49 additions & 0 deletions br/pkg/utils/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"context"
"time"

"github.com/pingcap/errors"
)

func WaitUntil(ctx context.Context, condition func() bool, checkInterval, maxTimeout time.Duration) error {
// do a quick check before starting the ticker
if condition() {
return nil
}

timeoutCtx, cancel := context.WithTimeout(ctx, maxTimeout)
defer cancel()

ticker := time.NewTicker(checkInterval)
defer ticker.Stop()

for {
select {
case <-timeoutCtx.Done():
if ctx.Err() != nil {
return ctx.Err()
}
return errors.Errorf("waitUntil timed out after waiting for %v", maxTimeout)
case <-ticker.C:
if condition() {
return nil
}
}
}
}
51 changes: 51 additions & 0 deletions br/tests/br_pitr_long_running_schema_loading/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/bin/bash
#
# Copyright 2024 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
. run_services
CUR=$(cd `dirname $0`; pwd)

TASK_NAME="pitr_long_running_schema_loading"
res_file="$TEST_DIR/sql_res.$TEST_NAME.txt"
DB="$TEST_NAME"

restart_services

run_sql "CREATE SCHEMA $DB;"

# start the log backup
run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log"

run_sql "USE $DB; CREATE TABLE t1 (id INT PRIMARY KEY, value VARCHAR(255));"
run_sql "USE $DB; INSERT INTO t1 VALUES (1, 'before-backup-1'), (2, 'before-backup-2');"


# do a full backup
run_br --pd "$PD_ADDR" backup full -s "local://$TEST_DIR/$TASK_NAME/full"

run_sql "USE $DB; INSERT INTO t1 VALUES (3, 'after-backup-1'), (4, 'after-backup-2');"
run_sql "USE $DB; DROP TABLE t1;"
run_sql "USE $DB; CREATE TABLE t2 (id INT PRIMARY KEY, data TEXT);"
run_sql "USE $DB; INSERT INTO t2 VALUES (1, 'new-table-data');"

echo "wait checkpoint advance"
. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance $TASK_NAME

restart_services

export GO_FAILPOINTS="github.com/pingcap/tidb/pkg/domain/mock-load-schema-long-time=return(true);github.com/pingcap/tidb/br/pkg/task/post-restore-kv-pending=return(true)"
run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full"
export GO_FAILPOINTS=""
74 changes: 74 additions & 0 deletions br/tests/run_group_br_tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/usr/bin/env bash

# This script split the integration tests into 9 groups to support parallel group tests execution.
# all the integration tests are located in br/tests directory. only the directories
# containing run.sh will be considered as valid br integration tests. the script will print the total case number

set -eo pipefail

# Step 1
CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
group=$1
export COV_DIR="/tmp/group_cover"
rm -rf $COV_DIR
mkdir -p $COV_DIR

# Define groups
# Note: If new group is added, the group name must also be added to CI
# * https://github.com/PingCAP-QE/ci/blob/main/pipelines/pingcap/tidb/latest/pull_br_integration_test.groovy
# Each group of tests consumes as much time as possible, thus reducing CI waiting time.
# Putting multiple light tests together and heavy tests in a separate group.
declare -A groups
groups=(
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable br_history br_gcs br_rawkv br_tidb_placement_policy"
["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full br_table_partition br_full_ddl br_tiflash"
["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint br_pitr_gc_safepoint br_other br_pitr_long_running_schema_loading"
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index'
["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table'
["G05"]='br_skip_checksum br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter br_partition_add_index'
["G06"]='br_tikv_outage br_tikv_outage3 br_restore_checkpoint br_encryption'
["G07"]='br_pitr'
["G08"]='br_tikv_outage2 br_ttl br_views_and_sequences br_z_gc_safepoint br_autorandom br_file_corruption br_tiflash_conflict'
)

# Get other cases not in groups, to avoid missing any case
others=()
for script in "$CUR"/*/run.sh; do
test_name="$(basename "$(dirname "$script")")"
if [[ $test_name != br* ]]; then
continue
fi
# shellcheck disable=SC2076
if [[ ! " ${groups[*]} " =~ " ${test_name} " ]]; then
others=("${others[@]} ${test_name}")
fi
done

# enable local encryption for all tests
ENABLE_ENCRYPTION=true
export ENABLE_ENCRYPTION

if [[ "$group" == "others" ]]; then
if [[ -z $others ]]; then
echo "All br integration test cases have been added to groups"
exit 0
fi
echo "Error: "$others" is not added to any group in br/tests/run_group_br_tests.sh"
exit 1
elif [[ " ${!groups[*]} " =~ " ${group} " ]]; then
test_names="${groups[${group}]}"
# Run test cases
if [[ -n $test_names ]]; then
echo ""
echo "Run cases: ${test_names}"
for case_name in $test_names; do
echo "Run cases: ${case_name}"
rm -rf /tmp/backup_restore_test
mkdir -p /tmp/backup_restore_test
TEST_NAME=${case_name} ${CUR}/run.sh
done
fi
else
echo "Error: invalid group name: ${group}"
exit 1
fi
31 changes: 31 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,19 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
logutil.BgLogger().Error("failed to load schema diff", zap.Error(err))
}

<<<<<<< HEAD:domain/domain.go
=======
// add failpoint to simulate long-running schema loading scenario
failpoint.Inject("mock-load-schema-long-time", func(val failpoint.Value) {
if val.(bool) {
// not ideal to use sleep, but not sure if there is a better way
logutil.BgLogger().Error("sleep before doing a full load")
time.Sleep(15 * time.Second)
}
})

// full load.
>>>>>>> 8fe061897e8 (br: fix insert gc failed due to slow schema reload (#57742)):pkg/domain/domain.go
schemas, err := do.fetchAllSchemasWithTables(m)
if err != nil {
return nil, false, currentSchemaVersion, nil, err
Expand Down Expand Up @@ -1111,6 +1124,24 @@ func (do *Domain) Init(
return nil
}

<<<<<<< HEAD:domain/domain.go
=======
// GetSchemaLease return the schema lease.
func (do *Domain) GetSchemaLease() time.Duration {
return do.schemaLease
}

// IsLeaseExpired returns whether lease has expired
func (do *Domain) IsLeaseExpired() bool {
return do.SchemaValidator.IsLeaseExpired()
}

// InitInfo4Test init infosync for distributed execution test.
func (do *Domain) InitInfo4Test() {
infosync.MockGlobalServerInfoManagerEntry.Add(do.ddl.GetID(), do.ServerID)
}

>>>>>>> 8fe061897e8 (br: fix insert gc failed due to slow schema reload (#57742)):pkg/domain/domain.go
// SetOnClose used to set do.onClose func.
func (do *Domain) SetOnClose(onClose func()) {
do.onClose = onClose
Expand Down
6 changes: 6 additions & 0 deletions domain/schema_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type SchemaValidator interface {
Reset()
// IsStarted indicates whether SchemaValidator is started.
IsStarted() bool
// IsLeaseExpired checks whether the current lease has expired
IsLeaseExpired() bool
}

type deltaSchemaInfo struct {
Expand Down Expand Up @@ -170,6 +172,10 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha
}
}

func (s *schemaValidator) IsLeaseExpired() bool {
return time.Now().After(s.latestSchemaExpire)
}

// isRelatedTablesChanged returns the result whether relatedTableIDs is changed
// from usedVer to the latest schema version.
// NOTE, this function should be called under lock!
Expand Down
Loading