Skip to content

Commit

Permalink
pitr: prevent from restore point to cluster running log backup (#40871)…
Browse files Browse the repository at this point in the history
… (#41215)

ref #40797
  • Loading branch information
ti-chi-bot authored Feb 13, 2023
1 parent b3967f8 commit 1491101
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 6 deletions.
3 changes: 1 addition & 2 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,9 @@ func (push *pushDown) pushBackup(
if len(errMsg) <= 0 {
errMsg = errPb.Msg
}
return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s %s",
return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s",
store.GetId(),
redact.String(store.GetAddress()),
req.StorageBackend.String(),
errMsg,
)
}
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,10 +474,17 @@ func IsStreamRestore(cmdName string) bool {

// RunRestore starts a restore task inside the current goroutine.
func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
if err := checkTaskExists(c, cfg); err != nil {
return errors.Annotate(err, "failed to check task exits")
}

if IsStreamRestore(cmdName) {
return RunStreamRestore(c, g, cmdName, cfg)
}
return runRestore(c, g, cmdName, cfg)
}

func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
cfg.Adjust()
defer summary.Summary(cmdName)
ctx, cancel := context.WithCancel(c)
Expand Down
34 changes: 30 additions & 4 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,8 +862,8 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre
return nil
}

func checkConfigForStatus(cfg *StreamConfig) error {
if len(cfg.PD) == 0 {
func checkConfigForStatus(pd []string) error {
if len(pd) == 0 {
return errors.Annotatef(berrors.ErrInvalidArgument,
"the command needs access to PD, please specify `-u` or `--pd`")
}
Expand Down Expand Up @@ -913,7 +913,7 @@ func RunStreamStatus(
ctx = opentracing.ContextWithSpan(ctx, span1)
}

if err := checkConfigForStatus(cfg); err != nil {
if err := checkConfigForStatus(cfg.PD); err != nil {
return err
}
ctl, err := makeStatusController(ctx, cfg, g)
Expand Down Expand Up @@ -1028,6 +1028,32 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre
return nil
}

// checkTaskExists checks whether there is a log backup task running.
// If so, return an error.
func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error {
if err := checkConfigForStatus(cfg.PD); err != nil {
return err
}
etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config)
if err != nil {
return err
}
cli := streamhelper.NewMetaDataClient(etcdCLI)
defer func() {
if err := cli.Close(); err != nil {
log.Error("failed to close the etcd client", zap.Error(err))
}
}()
tasks, err := cli.GetAllTasks(ctx)
if err != nil {
return err
}
if len(tasks) > 0 {
return errors.Errorf("log backup task is running: %s, please stop the task before restore, and after PITR operation finished, create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name)
}
return nil
}

// RunStreamRestore restores stream log.
func RunStreamRestore(
c context.Context,
Expand Down Expand Up @@ -1089,7 +1115,7 @@ func RunStreamRestore(
logStorage := cfg.Config.Storage
cfg.Config.Storage = cfg.FullBackupStorage
// TiFlash replica is restored to down-stream on 'pitr' currently.
if err = RunRestore(ctx, g, FullRestoreCmd, cfg); err != nil {
if err = runRestore(ctx, g, FullRestoreCmd, cfg); err != nil {
return errors.Trace(err)
}
cfg.Config.Storage = logStorage
Expand Down
56 changes: 56 additions & 0 deletions br/tests/br_restore_log_task_enable/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/bin/sh
#
# Copyright 2022 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux
DB="$TEST_NAME"
TABLE="usertable"

# start log task
run_br log start --task-name 1234 -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR

run_sql "CREATE DATABASE $DB;"
run_sql "CREATE TABLE $DB.$TABLE (id int);"
run_sql "INSERT INTO $DB.$TABLE VALUES (1), (2), (3);"

# backup full
run_br backup full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR

# clean db
run_sql "DROP DATABASE $DB;"

# restore full (should be failed)
run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR && exit 1

# restore point (should be failed)
run_br restore point -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR && exit 1

# pause log task
run_br log pause --task-name 1234 --pd $PD_ADDR

# restore full (should be failed)
run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR && exit 1

# restore point (should be failed)
run_br restore point -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR && exit 1

# stop log task
run_br log stop --task-name 1234 --pd $PD_ADDR

# restore full (should be success)
run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR

# clean db
run_sql "DROP DATABASE $DB"

0 comments on commit 1491101

Please sign in to comment.