Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#52607
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
YuJuncen authored and ti-chi-bot committed Apr 29, 2024
1 parent bf84e23 commit 1cfc58e
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 11 deletions.
1 change: 1 addition & 0 deletions br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"@com_github_docker_go_units//:go-units",
"@com_github_google_btree//:btree",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/metapb",
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/backup/prepare_snap/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
brpb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -414,7 +415,16 @@ func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) {
p.inflightReqs[region.GetMeta().Id] = *region.GetMeta()
}

<<<<<<< HEAD
func (p *Preparer) prepareConnections(ctx context.Context) error {
=======
// PrepareConnections prepares the connections for each store.
// This will pause the admin commands for each store.
func (p *Preparer) PrepareConnections(ctx context.Context) error {
failpoint.Inject("PrepareConnectionsErr", func() {
failpoint.Return(errors.New("mock PrepareConnectionsErr"))
})
>>>>>>> 2969b9e5767 (br/operator: fix adapt env for snapshot backup stuck when encountered error (#52607))
log.Info("Preparing connections to stores.")
stores, err := p.env.GetAllLiveStores(ctx)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/operator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//br/pkg/task",
"//br/pkg/utils",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_spf13_pflag//:pflag",
"@com_github_tikv_client_go_v2//tikv",
Expand Down
29 changes: 18 additions & 11 deletions br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ package operator
import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"os"
"runtime/debug"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
preparesnap "github.com/pingcap/tidb/br/pkg/backup/prepare_snap"
berrors "github.com/pingcap/tidb/br/pkg/errors"
Expand Down Expand Up @@ -136,9 +134,26 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
defer cx.Close()

cx.run(func() error { return pauseGCKeeper(cx) })
<<<<<<< HEAD
cx.run(func() error { return pauseSchedulerKeeper(cx) })
cx.run(func() error { return pauseAdminAndWaitApply(cx) })
=======
cx.run(func() error {
log.Info("Pause scheduler waiting all connections established.")
select {
case <-initChan:
case <-cx.Done():
return cx.Err()
}
log.Info("Pause scheduler noticed connections established.")
return pauseSchedulerKeeper(cx)
})
cx.run(func() error { return pauseAdminAndWaitApply(cx, initChan) })
>>>>>>> 2969b9e5767 (br/operator: fix adapt env for snapshot backup stuck when encountered error (#52607))
go func() {
failpoint.Inject("SkipReadyHint", func() {
failpoint.Return()
})
cx.rdGrp.Wait()
if cfg.OnAllReady != nil {
cfg.OnAllReady()
Expand Down Expand Up @@ -182,14 +197,6 @@ func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error {
return nil
}

func getCallerName() string {
name, err := os.Hostname()
if err != nil {
name = fmt.Sprintf("UNKNOWN-%d", rand.Int63())
}
return fmt.Sprintf("operator@%sT%d#%d", name, time.Now().Unix(), os.Getpid())
}

func pauseGCKeeper(cx *AdaptEnvForSnapshotBackupContext) (err error) {
// Note: should we remove the service safepoint as soon as this exits?
sp := utils.BRServiceSafePoint{
Expand Down
31 changes: 31 additions & 0 deletions tests/realtikvtest/brietest/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/br/pkg/task"
Expand Down Expand Up @@ -224,3 +225,33 @@ func TestOperator(t *testing.T) {
verifySchedulerNotStopped(req, cfg)
verifyGCNotStopped(req, cfg)
}

func TestFailure(t *testing.T) {
req := require.New(t)
req.NoError(failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/prepare_snap/PrepareConnectionsErr", "return()"))
// Make goleak happy.
req.NoError(failpoint.Enable("github.com/pingcap/tidb/br/pkg/task/operator/SkipReadyHint", "return()"))
defer func() {
req.NoError(failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/prepare_snap/PrepareConnectionsErr"))
req.NoError(failpoint.Disable("github.com/pingcap/tidb/br/pkg/task/operator/SkipReadyHint"))
}()

cfg := operator.PauseGcConfig{
Config: task.Config{
PD: []string{"127.0.0.1:2379"},
},
TTL: 5 * time.Minute,
SafePoint: oracle.GoTimeToTS(time.Now()),
}

verifyGCNotStopped(req, cfg)
verifySchedulerNotStopped(req, cfg)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := operator.AdaptEnvForSnapshotBackup(ctx, &cfg)
require.Error(t, err)

verifyGCNotStopped(req, cfg)
verifySchedulerNotStopped(req, cfg)
}

0 comments on commit 1cfc58e

Please sign in to comment.