Skip to content

Commit

Permalink
br/operator: fix adapt env for snapshot backup stuck when encountered…
Browse files Browse the repository at this point in the history
… error (pingcap#52607)

close pingcap#52846
  • Loading branch information
YuJuncen committed May 6, 2024
1 parent 3a8b526 commit a382435
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 12 deletions.
1 change: 1 addition & 0 deletions br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"@com_github_docker_go_units//:go-units",
"@com_github_google_btree//:btree",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/metapb",
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/backup/prepare_snap/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
brpb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -452,6 +453,9 @@ func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) {
// PrepareConnections prepares the connections for each store.
// This will pause the admin commands for each store.
func (p *Preparer) PrepareConnections(ctx context.Context) error {
failpoint.Inject("PrepareConnectionsErr", func() {
failpoint.Return(errors.New("mock PrepareConnectionsErr"))
})
log.Info("Preparing connections to stores.")
stores, err := p.env.GetAllLiveStores(ctx)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/operator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//br/pkg/task",
"//br/pkg/utils",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_spf13_pflag//:pflag",
"@com_github_tikv_client_go_v2//tikv",
Expand Down
21 changes: 9 additions & 12 deletions br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ package operator
import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"os"
"runtime/debug"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
preparesnap "github.com/pingcap/tidb/br/pkg/backup/prepare_snap"
berrors "github.com/pingcap/tidb/br/pkg/errors"
Expand Down Expand Up @@ -139,12 +137,19 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
cx.run(func() error { return pauseGCKeeper(cx) })
cx.run(func() error {
log.Info("Pause scheduler waiting all connections established.")
<-initChan
select {
case <-initChan:
case <-cx.Done():
return cx.Err()
}
log.Info("Pause scheduler noticed connections established.")
return pauseSchedulerKeeper(cx)
})
cx.run(func() error { return pauseAdminAndWaitApply(cx, initChan) })
go func() {
failpoint.Inject("SkipReadyHint", func() {
failpoint.Return()
})
cx.rdGrp.Wait()
if cfg.OnAllReady != nil {
cfg.OnAllReady()
Expand Down Expand Up @@ -192,14 +197,6 @@ func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext, afterConnectio
return nil
}

func getCallerName() string {
name, err := os.Hostname()
if err != nil {
name = fmt.Sprintf("UNKNOWN-%d", rand.Int63())
}
return fmt.Sprintf("operator@%sT%d#%d", name, time.Now().Unix(), os.Getpid())
}

func pauseGCKeeper(cx *AdaptEnvForSnapshotBackupContext) (err error) {
// Note: should we remove the service safepoint as soon as this exits?
sp := utils.BRServiceSafePoint{
Expand Down
31 changes: 31 additions & 0 deletions tests/realtikvtest/brietest/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/br/pkg/task"
Expand Down Expand Up @@ -224,3 +225,33 @@ func TestOperator(t *testing.T) {
verifySchedulerNotStopped(req, cfg)
verifyGCNotStopped(req, cfg)
}

func TestFailure(t *testing.T) {
req := require.New(t)
req.NoError(failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/prepare_snap/PrepareConnectionsErr", "return()"))
// Make goleak happy.
req.NoError(failpoint.Enable("github.com/pingcap/tidb/br/pkg/task/operator/SkipReadyHint", "return()"))
defer func() {
req.NoError(failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/prepare_snap/PrepareConnectionsErr"))
req.NoError(failpoint.Disable("github.com/pingcap/tidb/br/pkg/task/operator/SkipReadyHint"))
}()

cfg := operator.PauseGcConfig{
Config: task.Config{
PD: []string{"127.0.0.1:2379"},
},
TTL: 5 * time.Minute,
SafePoint: oracle.GoTimeToTS(time.Now()),
}

verifyGCNotStopped(req, cfg)
verifySchedulerNotStopped(req, cfg)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := operator.AdaptEnvForSnapshotBackup(ctx, &cfg)
require.Error(t, err)

verifyGCNotStopped(req, cfg)
verifySchedulerNotStopped(req, cfg)
}

0 comments on commit a382435

Please sign in to comment.