diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 8f5f14753eac3..f3b9ec49cb990 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -77,6 +77,12 @@ const ( "You may check the metadata and continue by wait other task finish or manually delete the lock file " + truncateLockPath + " at the external storage." ) +const ( + waitInfoSchemaReloadCheckInterval = 1 * time.Second + // a million tables should take a few minutes to load all DDL change, making 15 to make sure we don't exit early + waitInfoSchemaReloadTimeout = 15 * time.Minute +) + var ( StreamStart = "log start" StreamStop = "log stop" @@ -1479,6 +1485,21 @@ func restoreStream( return errors.Annotate(err, "failed to restore kv files") } + // failpoint to stop for a while after restoring kvs + // this is to mimic the scenario that restore takes long time and the lease in schemaInfo has expired and needs refresh + failpoint.Inject("post-restore-kv-pending", func(val failpoint.Value) { + if val.(bool) { + // not ideal to use sleep but not sure what's the better way right now + log.Info("sleep after restoring kv") + time.Sleep(2 * time.Second) + } + }) + + // make sure schema reload finishes before proceeding + if err = waitUntilSchemaReload(ctx, client); err != nil { + return errors.Trace(err) + } + if err = client.CleanUpKVFiles(ctx); err != nil { return errors.Annotate(err, "failed to clean up") } @@ -1870,3 +1891,16 @@ func checkPiTRTaskInfo( return checkInfo, nil } + +func waitUntilSchemaReload(ctx context.Context, client *logclient.LogClient) error { + log.Info("waiting for schema info finishes reloading") + reloadStart := time.Now() + conditionFunc := func() bool { + return !client.GetDomain().IsLeaseExpired() + } + if err := utils.WaitUntil(ctx, conditionFunc, waitInfoSchemaReloadCheckInterval, waitInfoSchemaReloadTimeout); err != nil { + return errors.Annotate(err, "failed to wait until schema reload") + } + log.Info("reloading schema finished", zap.Duration("timeTaken", time.Since(reloadStart))) + return nil +} diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index fa18a8317b234..39e08f06d1c5e 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "safe_point.go", "schema.go", "store_manager.go", + "wait.go", "worker.go", ], importpath = "github.com/pingcap/tidb/br/pkg/utils", diff --git a/br/pkg/utils/encryption.go b/br/pkg/utils/encryption.go index 9a81f01b2e787..9262c93a4e69d 100644 --- a/br/pkg/utils/encryption.go +++ b/br/pkg/utils/encryption.go @@ -1,3 +1,17 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package utils import ( diff --git a/br/pkg/utils/wait.go b/br/pkg/utils/wait.go new file mode 100644 index 0000000000000..5e5616eafb9eb --- /dev/null +++ b/br/pkg/utils/wait.go @@ -0,0 +1,49 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "context" + "time" + + "github.com/pingcap/errors" +) + +func WaitUntil(ctx context.Context, condition func() bool, checkInterval, maxTimeout time.Duration) error { + // do a quick check before starting the ticker + if condition() { + return nil + } + + timeoutCtx, cancel := context.WithTimeout(ctx, maxTimeout) + defer cancel() + + ticker := time.NewTicker(checkInterval) + defer ticker.Stop() + + for { + select { + case <-timeoutCtx.Done(): + if ctx.Err() != nil { + return ctx.Err() + } + return errors.Errorf("waitUntil timed out after waiting for %v", maxTimeout) + case <-ticker.C: + if condition() { + return nil + } + } + } +} diff --git a/br/tests/br_pitr_long_running_schema_loading/run.sh b/br/tests/br_pitr_long_running_schema_loading/run.sh new file mode 100644 index 0000000000000..e6f32c08bcdce --- /dev/null +++ b/br/tests/br_pitr_long_running_schema_loading/run.sh @@ -0,0 +1,51 @@ +#!/bin/bash +# +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +. run_services +CUR=$(cd `dirname $0`; pwd) + +TASK_NAME="pitr_long_running_schema_loading" +res_file="$TEST_DIR/sql_res.$TEST_NAME.txt" +DB="$TEST_NAME" + +restart_services + +run_sql "CREATE SCHEMA $DB;" + +# start the log backup +run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log" + +run_sql "USE $DB; CREATE TABLE t1 (id INT PRIMARY KEY, value VARCHAR(255));" +run_sql "USE $DB; INSERT INTO t1 VALUES (1, 'before-backup-1'), (2, 'before-backup-2');" + + +# do a full backup +run_br --pd "$PD_ADDR" backup full -s "local://$TEST_DIR/$TASK_NAME/full" + +run_sql "USE $DB; INSERT INTO t1 VALUES (3, 'after-backup-1'), (4, 'after-backup-2');" +run_sql "USE $DB; DROP TABLE t1;" +run_sql "USE $DB; CREATE TABLE t2 (id INT PRIMARY KEY, data TEXT);" +run_sql "USE $DB; INSERT INTO t2 VALUES (1, 'new-table-data');" + +echo "wait checkpoint advance" +. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance $TASK_NAME + +restart_services + +export GO_FAILPOINTS="github.com/pingcap/tidb/pkg/domain/mock-load-schema-long-time=return(true);github.com/pingcap/tidb/br/pkg/task/post-restore-kv-pending=return(true)" +run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" +export GO_FAILPOINTS="" diff --git a/br/tests/run_group_br_tests.sh b/br/tests/run_group_br_tests.sh index 5064127142204..fdb9dd6d6ea6b 100755 --- a/br/tests/run_group_br_tests.sh +++ b/br/tests/run_group_br_tests.sh @@ -22,7 +22,7 @@ declare -A groups groups=( ["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable br_history br_gcs br_rawkv br_tidb_placement_policy" ["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full br_table_partition br_full_ddl br_tiflash" - ["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint br_pitr_gc_safepoint br_other" + ["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint br_pitr_gc_safepoint br_other br_pitr_long_running_schema_loading" ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index' ["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' ["G05"]='br_skip_checksum br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter br_partition_add_index' diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 3bc01ac16928b..7afa66f0c0b1b 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -356,6 +356,16 @@ func (do *Domain) loadInfoSchema(startTS uint64, isSnapshot bool) (infoschema.In // We can fall back to full load, don't need to return the error. logutil.BgLogger().Error("failed to load schema diff", zap.Error(err)) } + + // add failpoint to simulate long-running schema loading scenario + failpoint.Inject("mock-load-schema-long-time", func(val failpoint.Value) { + if val.(bool) { + // not ideal to use sleep, but not sure if there is a better way + logutil.BgLogger().Error("sleep before doing a full load") + time.Sleep(15 * time.Second) + } + }) + // full load. schemas, err := do.fetchAllSchemasWithTables(m) if err != nil { @@ -1513,6 +1523,11 @@ func (do *Domain) GetSchemaLease() time.Duration { return do.schemaLease } +// IsLeaseExpired returns whether lease has expired +func (do *Domain) IsLeaseExpired() bool { + return do.SchemaValidator.IsLeaseExpired() +} + // InitInfo4Test init infosync for distributed execution test. func (do *Domain) InitInfo4Test() { infosync.MockGlobalServerInfoManagerEntry.Add(do.ddl.GetID(), do.ServerID) diff --git a/pkg/domain/schema_validator.go b/pkg/domain/schema_validator.go index 6cf2480bc0ab2..3853478115953 100644 --- a/pkg/domain/schema_validator.go +++ b/pkg/domain/schema_validator.go @@ -57,6 +57,8 @@ type SchemaValidator interface { Reset() // IsStarted indicates whether SchemaValidator is started. IsStarted() bool + // IsLeaseExpired checks whether the current lease has expired + IsLeaseExpired() bool } type deltaSchemaInfo struct { @@ -169,6 +171,10 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha } } +func (s *schemaValidator) IsLeaseExpired() bool { + return time.Now().After(s.latestSchemaExpire) +} + // isRelatedTablesChanged returns the result whether relatedTableIDs is changed // from usedVer to the latest schema version. // NOTE, this function should be called under lock!