From 925543871c5d555b27ae2aa7e9139734872f3ecb Mon Sep 17 00:00:00 2001 From: hillium Date: Thu, 14 Sep 2023 11:26:30 +0800 Subject: [PATCH 01/13] initial vesrion Signed-off-by: hillium --- br/cmd/br/main.go | 2 +- br/cmd/br/operator.go | 19 +++-- br/pkg/errors/errors.go | 2 + br/pkg/task/operator/cmd.go | 101 ++++++++++++++++++++++---- br/pkg/task/operator/config.go | 2 +- br/pkg/utils/deny_lightning.go | 129 +++++++++++++++++++++++++++++++++ go.mod | 2 + go.sum | 18 +++-- 8 files changed, 243 insertions(+), 32 deletions(-) create mode 100644 br/pkg/utils/deny_lightning.go diff --git a/br/cmd/br/main.go b/br/cmd/br/main.go index d70d9425e0653..5eca340f1e622 100644 --- a/br/cmd/br/main.go +++ b/br/cmd/br/main.go @@ -49,7 +49,7 @@ func main() { NewBackupCommand(), NewRestoreCommand(), NewStreamCommand(), - newOpeartorCommand(), + newOperatorCommand(), ) // Outputs cmd.Print to stdout. rootCmd.SetOut(os.Stdout) diff --git a/br/cmd/br/operator.go b/br/cmd/br/operator.go index 57ed59b224d06..2a6d80aa12ffa 100644 --- a/br/cmd/br/operator.go +++ b/br/cmd/br/operator.go @@ -11,7 +11,7 @@ import ( "github.com/spf13/cobra" ) -func newOpeartorCommand() *cobra.Command { +func newOperatorCommand() *cobra.Command { cmd := &cobra.Command{ Use: "operator ", Short: "utilities for operators like tidb-operator.", @@ -26,14 +26,19 @@ func newOpeartorCommand() *cobra.Command { }, Hidden: true, } - cmd.AddCommand(newPauseGcCommand()) + cmd.AddCommand(newPrepareForSnapshotBackupCommand( + "pause-gc-and-schedulers", + "(Will be replaced with `prepare-for-snapshot-backup`) pause gc, schedulers and importing until the program exits.")) + cmd.AddCommand(newPrepareForSnapshotBackupCommand( + "prepare-for-snapshot-backup", + "pause gc, schedulers and importing until the program exits, for snapshot backup.")) return cmd } -func newPauseGcCommand() *cobra.Command { +func newPrepareForSnapshotBackupCommand(use string, short string) *cobra.Command { cmd := &cobra.Command{ - Use: "pause-gc-and-schedulers", - Short: "pause gc and schedulers to the ts until the program exits.", + Use: use, + Short: short, Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { cfg := operator.PauseGcConfig{} @@ -41,9 +46,9 @@ func newPauseGcCommand() *cobra.Command { return err } ctx := GetDefaultContext() - return operator.PauseGCAndScheduler(ctx, &cfg) + return operator.AdaptEnvForSnapshotBackup(ctx, &cfg) }, } - operator.DefineFlagsForPauseGcConfig(cmd.Flags()) + operator.DefineFlagsForPrepareSnapBackup(cmd.Flags()) return cmd } diff --git a/br/pkg/errors/errors.go b/br/pkg/errors/errors.go index bf9478a2a3e9c..1c18f0b45fe6e 100644 --- a/br/pkg/errors/errors.go +++ b/br/pkg/errors/errors.go @@ -111,4 +111,6 @@ var ( ErrKVDownloadFailed = errors.Normalize("download sst failed", errors.RFCCodeText("BR:KV:ErrKVDownloadFailed")) // ErrKVIngestFailed indicates a generic, retryable ingest error. ErrKVIngestFailed = errors.Normalize("ingest sst failed", errors.RFCCodeText("BR:KV:ErrKVIngestFailed")) + + ErrPossibleInconsistency = errors.Normalize("the cluster state might be inconsistent", errors.RFCCodeText("BR:KV:ErrPossibleInconsistency")) ) diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index 95c922b1c19cf..d6ddd7dd4b90b 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -6,8 +6,10 @@ import ( "context" "crypto/tls" "strings" + "sync" "time" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/pdutil" @@ -15,6 +17,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/keepalive" ) func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) { @@ -35,47 +38,115 @@ func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) } func cleanUpWith(f func(ctx context.Context)) { + cleanUpWithErr(func(ctx context.Context) error { f(ctx); return nil }) +} + +func cleanUpWithErr(f func(ctx context.Context) error) error { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - f(ctx) + return f(ctx) +} + +type AdaptEnvForSnapshotBackupContext struct { + context.Context + + pdMgr *pdutil.PdController + kvMgr *utils.StoreManager + cfg PauseGcConfig + + rdGrp sync.WaitGroup + runGrp *errgroup.Group } -// PauseGCAndScheduler blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config. +func (ctx *AdaptEnvForSnapshotBackupContext) ReadyL(name string, notes ...zap.Field) { + logutil.CL(ctx).Info("Stage ready.", append(notes, zap.String("component", name))...) + ctx.rdGrp.Done() +} + +func hintAllReady() { + // Hacking: some version of operators using the follow two logs to check whether we are ready... + log.Info("Schedulers are paused.") + log.Info("GC is paused.") + log.Info("All ready.") +} + +// AdaptEnvForSnapshotBackup blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config. // This function will block until the context being canceled. -func PauseGCAndScheduler(ctx context.Context, cfg *PauseGcConfig) error { +func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { mgr, err := dialPD(ctx, &cfg.Config) if err != nil { - return err + return errors.Annotate(err, "failed to dial PD") } - + tconf, err := cfg.TLS.ToTLSConfig() + if err != nil { + return errors.Annotate(err, "invalid tls config") + } + kvMgr := utils.NewStoreManager(mgr.GetPDClient(), keepalive.ClientParameters{ + Time: time.Duration(cfg.Config.GRPCKeepaliveTime) * time.Second, + Timeout: time.Duration(cfg.Config.GRPCKeepaliveTimeout) * time.Second, + }, tconf) eg, ectx := errgroup.WithContext(ctx) + cx := &AdaptEnvForSnapshotBackupContext{ + Context: ectx, + pdMgr: mgr, + kvMgr: kvMgr, + cfg: *cfg, + rdGrp: sync.WaitGroup{}, + runGrp: eg, + } + cx.rdGrp.Add(3) - eg.Go(func() error { return pauseGCKeeper(ectx, cfg, mgr) }) - eg.Go(func() error { return pauseSchedulerKeeper(ectx, mgr) }) + eg.Go(func() error { return pauseGCKeeper(cx) }) + eg.Go(func() error { return pauseSchedulerKeeper(cx) }) + goPauseImporting(cx) + cx.rdGrp.Wait() + hintAllReady() return eg.Wait() } -func pauseGCKeeper(ctx context.Context, cfg *PauseGcConfig, ctl *pdutil.PdController) error { +func goPauseImporting(ctx *AdaptEnvForSnapshotBackupContext) error { + denyLightning := utils.NewDenyImporting("prepare_for_snapshot_backup", ctx.kvMgr) + if _, err := denyLightning.DenyAllStore(ctx, ctx.cfg.TTL); err != nil { + return errors.Trace(err) + } + ctx.ReadyL("pause_lightning") + ctx.runGrp.Go(func() error { + err := denyLightning.Keeper(ctx, ctx.cfg.TTL) + if errors.Cause(err) != context.Canceled { + log.Warn("keeper encounters error.", logutil.ShortError(err)) + } + return cleanUpWithErr(func(ctx context.Context) error { + res, err := denyLightning.AllowAllStores(ctx) + if err != nil { + return err + } + return denyLightning.ConsistentWithPrev(res) + }) + }) + return nil +} + +func pauseGCKeeper(ctx *AdaptEnvForSnapshotBackupContext) error { // Note: should we remove the service safepoint as soon as this exits? sp := utils.BRServiceSafePoint{ ID: utils.MakeSafePointID(), - TTL: int64(cfg.TTL.Seconds()), - BackupTS: cfg.SafePoint, + TTL: int64(ctx.cfg.TTL.Seconds()), + BackupTS: ctx.cfg.SafePoint, } if sp.BackupTS == 0 { - rts, err := ctl.GetMinResolvedTS(ctx) + rts, err := ctx.pdMgr.GetMinResolvedTS(ctx) if err != nil { return err } log.Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts)) sp.BackupTS = rts } - err := utils.StartServiceSafePointKeeper(ctx, ctl.GetPDClient(), sp) + err := utils.StartServiceSafePointKeeper(ctx, ctx.pdMgr.GetPDClient(), sp) if err != nil { return err } - log.Info("GC is paused.", zap.Object("safepoint", sp)) + ctx.ReadyL("pause_gc", zap.Object("safepoint", sp)) // Note: in fact we can directly return here. // But the name `keeper` implies once the function exits, // the GC should be resume, so let's block here. @@ -83,8 +154,8 @@ func pauseGCKeeper(ctx context.Context, cfg *PauseGcConfig, ctl *pdutil.PdContro return nil } -func pauseSchedulerKeeper(ctx context.Context, ctl *pdutil.PdController) error { - undo, err := ctl.RemoveAllPDSchedulers(ctx) +func pauseSchedulerKeeper(ctx *AdaptEnvForSnapshotBackupContext) error { + undo, err := ctx.pdMgr.RemoveAllPDSchedulers(ctx) if undo != nil { defer cleanUpWith(func(ctx context.Context) { if err := undo(ctx); err != nil { diff --git a/br/pkg/task/operator/config.go b/br/pkg/task/operator/config.go index eb7e12a49af56..998fdc64d961e 100644 --- a/br/pkg/task/operator/config.go +++ b/br/pkg/task/operator/config.go @@ -16,7 +16,7 @@ type PauseGcConfig struct { TTL time.Duration `json:"ttl" yaml:"ttl"` } -func DefineFlagsForPauseGcConfig(f *pflag.FlagSet) { +func DefineFlagsForPrepareSnapBackup(f *pflag.FlagSet) { _ = f.DurationP("ttl", "i", 5*time.Minute, "The time-to-live of the safepoint.") _ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.") } diff --git a/br/pkg/utils/deny_lightning.go b/br/pkg/utils/deny_lightning.go new file mode 100644 index 0000000000000..1500dd5b90eb5 --- /dev/null +++ b/br/pkg/utils/deny_lightning.go @@ -0,0 +1,129 @@ +package utils + +import ( + "context" + "time" + + "github.com/pingcap/errors" + berrors "github.com/pingcap/tidb/br/pkg/errors" + + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/metapb" + "google.golang.org/grpc" +) + +const ( + DenyLightningUpdateFrequency = 5 +) + +func (mgr *StoreManager) GetAllStores(ctx context.Context) ([]*metapb.Store, error) { + return mgr.PDClient().GetAllStores(ctx) +} + +func (mgr *StoreManager) GetDenyLightningClient(ctx context.Context, storeID uint64) (DenyLightningClient, error) { + var cli import_sstpb.ImportSSTClient + err := mgr.WithConn(ctx, storeID, func(cc *grpc.ClientConn) { + cli = import_sstpb.NewImportSSTClient(cc) + }) + if err != nil { + return nil, err + } + return cli, nil +} + +type DenyImportingEnv interface { + GetAllStores(ctx context.Context) ([]*metapb.Store, error) + GetDenyLightningClient(ctx context.Context, storeID uint64) (DenyLightningClient, error) +} + +type DenyLightningClient interface { + // Temporarily disable ingest / download / write for data listeners don't support catching import data. + DenyImportRPC(ctx context.Context, in *import_sstpb.DenyImportRPCRequest, opts ...grpc.CallOption) (*import_sstpb.DenyImportRPCResponse, error) +} + +type DenyImporting struct { + env DenyImportingEnv + name string +} + +func NewDenyImporting(name string, env DenyImportingEnv) *DenyImporting { + return &DenyImporting{ + env: env, + name: name, + } +} + +// DenyAllStore tries to deny all current stores' lightning execution for the period of time. +// Returns a map mapping store ID to whether they are already denied to import tasks. +func (d *DenyImporting) DenyAllStore(ctx context.Context, dur time.Duration) (map[uint64]bool, error) { + return d.forEachStores(ctx, func() *import_sstpb.DenyImportRPCRequest { + return &import_sstpb.DenyImportRPCRequest{ + ShouldDenyImports: true, + DurationSecs: uint64(dur.Seconds()), + Caller: d.name, + } + }) +} + +func (d *DenyImporting) AllowAllStores(ctx context.Context) (map[uint64]bool, error) { + return d.forEachStores(ctx, func() *import_sstpb.DenyImportRPCRequest { + return &import_sstpb.DenyImportRPCRequest{ + ShouldDenyImports: false, + Caller: d.name, + } + }) +} + +// forEachStores send the request to each stores reachable. +// Returns a map mapping store ID to whether they are already denied to import tasks. +func (d *DenyImporting) forEachStores(ctx context.Context, makeReq func() *import_sstpb.DenyImportRPCRequest) (map[uint64]bool, error) { + stores, err := d.env.GetAllStores(ctx) + if err != nil { + return nil, errors.Annotate(err, "failed to get all stores") + } + + result := map[uint64]bool{} + for _, store := range stores { + cli, err := d.env.GetDenyLightningClient(ctx, store.Id) + if err != nil { + return nil, errors.Annotatef(err, "failed to get client for store %d", store.Id) + } + req := makeReq() + resp, err := cli.DenyImportRPC(ctx, req) + if err != nil { + return nil, errors.Annotatef(err, "failed to deny lightning rpc for store %d", store.Id) + } + result[store.Id] = resp.AlreadyDeniedImports + } + return result, nil +} + +// HasKeptDenying checks whether a result returned by `DenyAllStores` is able to keep the consistency with last request. +// i.e. Whether the store has some holes of pausing the import requests. +func (d *DenyImporting) ConsistentWithPrev(result map[uint64]bool) error { + for storeId, denied := range result { + if !denied { + return errors.Annotatef(berrors.ErrPossibleInconsistency, "failed to keep importing to store %d being denied, the state might be inconsistency", storeId) + } + } + return nil +} + +func (d *DenyImporting) Keeper(ctx context.Context, ttl time.Duration) error { + t := time.NewTicker(ttl / DenyLightningUpdateFrequency) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + res, err := d.DenyAllStore(ctx, ttl) + if err != nil { + return err + } + if err := d.ConsistentWithPrev(res); err != nil { + return err + } + } + } +} diff --git a/go.mod b/go.mod index d5dfd86ba5063..b4c445abb8e5d 100644 --- a/go.mod +++ b/go.mod @@ -309,3 +309,5 @@ replace ( github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117 github.com/pingcap/tidb/parser => ./parser ) + +replace github.com/pingcap/kvproto => github.com/yujuncen/kvproto v0.0.0-20230913085421-7b5c641805b9 diff --git a/go.sum b/go.sum index e39ed4ac4f712..8237c1585dbd0 100644 --- a/go.sum +++ b/go.sum @@ -189,6 +189,10 @@ github.com/cloudfoundry/gosigar v1.3.6 h1:gIc08FbB3QPb+nAQhINIK/qhf5REKkY0FTGgRG github.com/cloudfoundry/gosigar v1.3.6/go.mod h1:lNWstu5g5gw59O09Y+wsMNFzBSnU8a0u+Sfx4dq360E= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= +github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/cockroachdb/cmux v0.0.0-20170110192607-30d10be49292/go.mod h1:qRiX68mZX1lGBkTWyp3CLcenw9I94W2dLeRvMzcn9N4= github.com/cockroachdb/cockroach v0.0.0-20170608034007-84bc9597164f/go.mod h1:xeT/CQ0qZHangbYbWShlCGAx31aV4AjGswDUjhKS6HQ= @@ -278,6 +282,7 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/etcd-io/gofail v0.0.0-20190801230047-ad7f989257ca/go.mod h1:49H/RkXP8pKaZy4h0d+NW16rSLhyVBt4o6VLJbmOqDE= @@ -360,7 +365,6 @@ github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= -github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -391,7 +395,6 @@ github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= -github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -820,9 +823,6 @@ github.com/pingcap/fn v1.0.0 h1:CyA6AxcOZkQh52wIqYlAmaVmF6EvrcqFywP463pjA8g= github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z24= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230904082117-ecdbf1f8c130 h1:qbLm5cOdCWxZ0mt6SaN2aXI+KFekbPqURd6YkNI+XRI= -github.com/pingcap/kvproto v0.0.0-20230904082117-ecdbf1f8c130/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY= @@ -1069,6 +1069,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/yujuncen/kvproto v0.0.0-20230913085421-7b5c641805b9 h1:HV+9VKLvBkMULH3IHklXBiH/a5qzcz78p01JvOQPcd4= +github.com/yujuncen/kvproto v0.0.0-20230913085421-7b5c641805b9/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= @@ -1234,7 +1236,6 @@ golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1364,6 +1365,7 @@ golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1538,7 +1540,6 @@ google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6 google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -1572,7 +1573,6 @@ google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= -google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -1594,6 +1594,7 @@ google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww= google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= @@ -1608,6 +1609,7 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= From 4ee25c86a1cac18a9383896b7b6f807b7b7bd48e Mon Sep 17 00:00:00 2001 From: hillium Date: Thu, 14 Sep 2023 12:24:05 +0800 Subject: [PATCH 02/13] fix tls config Signed-off-by: hillium --- br/pkg/task/operator/cmd.go | 19 ++++++++++++------- br/pkg/utils/store_manager.go | 1 + 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index d6ddd7dd4b90b..2d6536fb545dc 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -77,9 +77,12 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { if err != nil { return errors.Annotate(err, "failed to dial PD") } - tconf, err := cfg.TLS.ToTLSConfig() - if err != nil { - return errors.Annotate(err, "invalid tls config") + var tconf *tls.Config + if cfg.TLS.IsEnabled() { + tconf, err = cfg.TLS.ToTLSConfig() + if err != nil { + return errors.Annotate(err, "invalid tls config") + } } kvMgr := utils.NewStoreManager(mgr.GetPDClient(), keepalive.ClientParameters{ Time: time.Duration(cfg.Config.GRPCKeepaliveTime) * time.Second, @@ -98,9 +101,11 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { eg.Go(func() error { return pauseGCKeeper(cx) }) eg.Go(func() error { return pauseSchedulerKeeper(cx) }) - goPauseImporting(cx) - cx.rdGrp.Wait() - hintAllReady() + eg.Go(func() error { return goPauseImporting(cx) }) + go func() { + cx.rdGrp.Wait() + hintAllReady() + }() return eg.Wait() } @@ -166,7 +171,7 @@ func pauseSchedulerKeeper(ctx *AdaptEnvForSnapshotBackupContext) error { if err != nil { return err } - log.Info("Schedulers are paused.") + ctx.ReadyL("pause_scheduler") // Wait until the context canceled. // So we can properly do the clean up work. <-ctx.Done() diff --git a/br/pkg/utils/store_manager.go b/br/pkg/utils/store_manager.go index 8a89e49022806..430d1394b0037 100644 --- a/br/pkg/utils/store_manager.go +++ b/br/pkg/utils/store_manager.go @@ -148,6 +148,7 @@ func (mgr *StoreManager) getGrpcConnLocked(ctx context.Context, storeID uint64) if addr == "" { addr = store.GetAddress() } + log.Info("StoreManager: dialing to store.", zap.String("address", addr), zap.Uint64("store-id", storeID)) conn, err := grpc.DialContext( ctx, addr, From 8eaffd6b1929883056944235542b22d76c9ccd1b Mon Sep 17 00:00:00 2001 From: hillium Date: Sat, 16 Sep 2023 11:20:45 +0800 Subject: [PATCH 03/13] added some test cases Signed-off-by: hillium --- br/pkg/task/backup_ebs.go | 18 ++ br/pkg/task/operator/cmd.go | 2 +- .../{deny_lightning.go => deny_importing.go} | 17 +- br/pkg/utils/deny_importing_test.go | 209 ++++++++++++++++++ 4 files changed, 241 insertions(+), 5 deletions(-) rename br/pkg/utils/{deny_lightning.go => deny_importing.go} (87%) create mode 100644 br/pkg/utils/deny_importing_test.go diff --git a/br/pkg/task/backup_ebs.go b/br/pkg/task/backup_ebs.go index e84fb66188e49..59247caef5c93 100644 --- a/br/pkg/task/backup_ebs.go +++ b/br/pkg/task/backup_ebs.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/br/pkg/conn" "github.com/pingcap/tidb/br/pkg/conn/util" "github.com/pingcap/tidb/br/pkg/glue" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/storage" @@ -144,6 +145,16 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { if e != nil { return errors.Trace(err) } + denyLightning := utils.NewDenyImporting("backup_ebs_command", mgr.StoreManager) + _, err := denyLightning.DenyAllStores(ctx, utils.DefaultBRGCSafePointTTL) + if err != nil { + return errors.Annotate(err, "lightning from running") + } + go func() { + if err := denyLightning.Keeper(ctx, utils.DefaultBRGCSafePointTTL); err != nil { + log.Warn("cannot keep deny importing, the backup archive may not be useable if there were importing.", logutil.ShortError(err)) + } + }() defer func() { if ctx.Err() != nil { log.Warn("context canceled, doing clean work with background context") @@ -155,6 +166,13 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { if restoreE := restoreFunc(ctx); restoreE != nil { log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } + res, err := denyLightning.AllowAllStores(ctx) + if err != nil { + log.Warn("failed to restore importing, you may need to wait until you are able to start importing", zap.Duration("wait_for", utils.DefaultBRGCSafePointTTL)) + } + if err := denyLightning.ConsistentWithPrev(res); err != nil { + log.Warn("lightning hasn't been denied during restoring, the backup archive may not be usable.", logutil.ShortError(err)) + } }() } diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index 2d6536fb545dc..e40f464bd8995 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -112,7 +112,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { func goPauseImporting(ctx *AdaptEnvForSnapshotBackupContext) error { denyLightning := utils.NewDenyImporting("prepare_for_snapshot_backup", ctx.kvMgr) - if _, err := denyLightning.DenyAllStore(ctx, ctx.cfg.TTL); err != nil { + if _, err := denyLightning.DenyAllStores(ctx, ctx.cfg.TTL); err != nil { return errors.Trace(err) } ctx.ReadyL("pause_lightning") diff --git a/br/pkg/utils/deny_lightning.go b/br/pkg/utils/deny_importing.go similarity index 87% rename from br/pkg/utils/deny_lightning.go rename to br/pkg/utils/deny_importing.go index 1500dd5b90eb5..b5382e14a9d04 100644 --- a/br/pkg/utils/deny_lightning.go +++ b/br/pkg/utils/deny_importing.go @@ -6,6 +6,7 @@ import ( "github.com/pingcap/errors" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" @@ -53,9 +54,9 @@ func NewDenyImporting(name string, env DenyImportingEnv) *DenyImporting { } } -// DenyAllStore tries to deny all current stores' lightning execution for the period of time. +// DenyAllStores tries to deny all current stores' lightning execution for the period of time. // Returns a map mapping store ID to whether they are already denied to import tasks. -func (d *DenyImporting) DenyAllStore(ctx context.Context, dur time.Duration) (map[uint64]bool, error) { +func (d *DenyImporting) DenyAllStores(ctx context.Context, dur time.Duration) (map[uint64]bool, error) { return d.forEachStores(ctx, func() *import_sstpb.DenyImportRPCRequest { return &import_sstpb.DenyImportRPCRequest{ ShouldDenyImports: true, @@ -110,6 +111,7 @@ func (d *DenyImporting) ConsistentWithPrev(result map[uint64]bool) error { } func (d *DenyImporting) Keeper(ctx context.Context, ttl time.Duration) error { + lastSuccess := time.Now() t := time.NewTicker(ttl / DenyLightningUpdateFrequency) defer t.Stop() for { @@ -117,13 +119,20 @@ func (d *DenyImporting) Keeper(ctx context.Context, ttl time.Duration) error { case <-ctx.Done(): return ctx.Err() case <-t.C: - res, err := d.DenyAllStore(ctx, ttl) + res, err := d.DenyAllStores(ctx, ttl) if err != nil { - return err + if time.Since(lastSuccess) < ttl { + logutil.CL(ctx).Warn("Failed to send deny one of the stores.", logutil.ShortError(err)) + continue + } else { + return err + } } if err := d.ConsistentWithPrev(res); err != nil { return err } + + lastSuccess = time.Now() } } } diff --git a/br/pkg/utils/deny_importing_test.go b/br/pkg/utils/deny_importing_test.go new file mode 100644 index 0000000000000..e3a285ad28528 --- /dev/null +++ b/br/pkg/utils/deny_importing_test.go @@ -0,0 +1,209 @@ +package utils_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type ImportTargetStore struct { + mu sync.Mutex + Id uint64 + LastSuccessDenyCall time.Time + DenyImportFor time.Duration + DeniedImport bool + + ErrGen func() error +} + +type ImportTargetStores struct { + mu sync.Mutex + items map[uint64]*ImportTargetStore +} + +func initWithIDs(ids []int) *ImportTargetStores { + ss := &ImportTargetStores{ + items: map[uint64]*ImportTargetStore{}, + } + for _, id := range ids { + store := new(ImportTargetStore) + store.Id = uint64(id) + ss.items[uint64(id)] = store + } + return ss +} + +func (s *ImportTargetStores) GetAllStores(ctx context.Context) ([]*metapb.Store, error) { + s.mu.Lock() + defer s.mu.Unlock() + + stores := make([]*metapb.Store, 0, len(s.items)) + for _, store := range s.items { + stores = append(stores, &metapb.Store{Id: store.Id}) + } + return stores, nil +} + +func (s *ImportTargetStores) GetDenyLightningClient(ctx context.Context, storeID uint64) (utils.DenyLightningClient, error) { + s.mu.Lock() + defer s.mu.Unlock() + + store, ok := s.items[storeID] + if !ok { + return nil, errors.Trace(fmt.Errorf("store %d not found", storeID)) + } + + return store, nil +} + +// Temporarily disable ingest / download / write for data listeners don't support catching import data. +func (s *ImportTargetStore) DenyImportRPC(ctx context.Context, in *import_sstpb.DenyImportRPCRequest, opts ...grpc.CallOption) (*import_sstpb.DenyImportRPCResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.ErrGen != nil { + if err := s.ErrGen(); err != nil { + return nil, s.ErrGen() + } + } + + denied := s.DeniedImport + if in.ShouldDenyImports { + s.DeniedImport = true + s.DenyImportFor = time.Duration(in.DurationSecs) * time.Second + s.LastSuccessDenyCall = time.Now() + } else { + s.DeniedImport = false + } + return &import_sstpb.DenyImportRPCResponse{ + AlreadyDeniedImports: denied, + }, nil +} + +func (s *ImportTargetStores) assertAllStoresDenied(t *testing.T) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, store := range s.items { + func() { + store.mu.Lock() + defer store.mu.Unlock() + + require.True(t, store.DeniedImport, "ID = %d", store.Id) + require.Less(t, time.Since(store.LastSuccessDenyCall), store.DenyImportFor, "ID = %d", store.Id) + }() + } +} + +func TestBasic(t *testing.T) { + req := require.New(t) + + ss := initWithIDs([]int{1, 4, 5}) + deny := utils.NewDenyImporting(t.Name(), ss) + + ctx := context.Background() + res, err := deny.DenyAllStores(ctx, 10*time.Second) + req.NoError(err) + req.Error(deny.ConsistentWithPrev(res)) + for id, inner := range ss.items { + req.True(inner.DeniedImport, "at %d", id) + req.Equal(inner.DenyImportFor, 10*time.Second, "at %d", id) + } + + res, err = deny.DenyAllStores(ctx, 10*time.Second) + req.NoError(err) + req.NoError(deny.ConsistentWithPrev(res)) + + res, err = deny.AllowAllStores(ctx) + req.NoError(err) + req.NoError(deny.ConsistentWithPrev(res)) +} + +func TestKeeperError(t *testing.T) { + req := require.New(t) + + ctx := context.Background() + ss := initWithIDs([]int{1, 4, 5}) + deny := utils.NewDenyImporting(t.Name(), ss) + ttl := time.Second + + now := time.Now() + triggeredErr := uint32(0) + _, err := deny.DenyAllStores(ctx, ttl) + req.NoError(err) + + ss.items[4].ErrGen = func() error { + if time.Since(now) > 600*time.Millisecond { + return nil + } + triggeredErr += 1 + return status.Error(codes.Unavailable, "the store is slacking.") + } + + cx, cancel := context.WithCancel(ctx) + + wg := new(errgroup.Group) + wg.Go(func() error { return deny.Keeper(cx, ttl) }) + time.Sleep(ttl) + cancel() + req.ErrorIs(wg.Wait(), context.Canceled) + req.Positive(triggeredErr) +} + +func TestKeeperErrorExit(t *testing.T) { + req := require.New(t) + + ctx := context.Background() + ss := initWithIDs([]int{1, 4, 5}) + deny := utils.NewDenyImporting(t.Name(), ss) + ttl := time.Second + + triggeredErr := uint32(0) + _, err := deny.DenyAllStores(ctx, ttl) + req.NoError(err) + + ss.items[4].ErrGen = func() error { + triggeredErr += 1 + return status.Error(codes.Unavailable, "the store is slacking.") + } + + wg := new(errgroup.Group) + wg.Go(func() error { return deny.Keeper(ctx, ttl) }) + time.Sleep(ttl) + req.Error(wg.Wait()) + req.Positive(triggeredErr) +} + +func TestKeeperCalled(t *testing.T) { + req := require.New(t) + + ctx := context.Background() + ss := initWithIDs([]int{1, 4, 5}) + deny := utils.NewDenyImporting(t.Name(), ss) + ttl := 1 * time.Second + + _, err := deny.DenyAllStores(ctx, ttl) + req.NoError(err) + + cx, cancel := context.WithCancel(ctx) + wg := new(errgroup.Group) + wg.Go(func() error { return deny.Keeper(cx, ttl) }) + for i := 0; i < 20; i++ { + ss.assertAllStoresDenied(t) + time.Sleep(ttl / 10) + } + cancel() + req.ErrorIs(wg.Wait(), context.Canceled) +} From fc4d563479ae714347b6231735eb123f25fc845e Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 19 Sep 2023 17:55:35 +0800 Subject: [PATCH 04/13] make bazel_prepare Signed-off-by: hillium --- DEPS.bzl | 12 ++++++------ br/pkg/storage/BUILD.bazel | 3 ++- br/pkg/task/operator/BUILD.bazel | 2 ++ br/pkg/utils/BUILD.bazel | 9 ++++++++- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 15e84d288b60d..37e7561dc1d93 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -5754,13 +5754,13 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sha256 = "543243cdee14bbbe601a9201aff9a27c05f13233f8a4f0e4f35b01c32a164520", - strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20230904082117-ecdbf1f8c130", + sha256 = "4af4342540977423986b6627c9efb98477c8a4746cf0c62d63251f6830ce3e3a", + strip_prefix = "github.com/yujuncen/kvproto@v0.0.0-20230913085421-7b5c641805b9", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230904082117-ecdbf1f8c130.zip", - "http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230904082117-ecdbf1f8c130.zip", - "https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230904082117-ecdbf1f8c130.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230904082117-ecdbf1f8c130.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230913085421-7b5c641805b9.zip", + "http://ats.apps.svc/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230913085421-7b5c641805b9.zip", + "https://cache.hawkingrei.com/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230913085421-7b5c641805b9.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230913085421-7b5c641805b9.zip", ], ) go_repository( diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index 616638d9f8fe8..e269956c3d4dc 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -58,6 +58,7 @@ go_library( "@com_google_cloud_go_storage//:storage", "@org_golang_google_api//iterator", "@org_golang_google_api//option", + "@org_golang_google_api//transport/http", "@org_golang_x_oauth2//google", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", @@ -80,7 +81,7 @@ go_test( ], embed = [":storage"], flaky = True, - shard_count = 48, + shard_count = 49, deps = [ "//br/pkg/mock", "@com_github_aws_aws_sdk_go//aws", diff --git a/br/pkg/task/operator/BUILD.bazel b/br/pkg/task/operator/BUILD.bazel index a291d68df5b12..5ce85cbd1313f 100644 --- a/br/pkg/task/operator/BUILD.bazel +++ b/br/pkg/task/operator/BUILD.bazel @@ -13,8 +13,10 @@ go_library( "//br/pkg/pdutil", "//br/pkg/task", "//br/pkg/utils", + "@com_github_pingcap_errors//:errors", "@com_github_pingcap_log//:log", "@com_github_spf13_pflag//:pflag", + "@org_golang_google_grpc//keepalive", "@org_golang_x_sync//errgroup", "@org_uber_go_zap//:zap", ], diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index 0299cebfca64f..2226abcef08a8 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "backoff.go", "cdc.go", "db.go", + "deny_importing.go", "dyn_pprof_other.go", "dyn_pprof_unix.go", "env.go", @@ -46,6 +47,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/import_sstpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//oracle", @@ -75,6 +77,7 @@ go_test( "backoff_test.go", "cdc_test.go", "db_test.go", + "deny_importing_test.go", "env_test.go", "json_test.go", "key_test.go", @@ -90,7 +93,7 @@ go_test( ], embed = [":utils"], flaky = True, - shard_count = 30, + shard_count = 34, deps = [ "//br/pkg/errors", "//br/pkg/metautil", @@ -111,13 +114,17 @@ go_test( "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/encryptionpb", + "@com_github_pingcap_kvproto//pkg/import_sstpb", + "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_pd_client//:client", "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_tests_v3//integration", + "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", + "@org_golang_x_sync//errgroup", "@org_uber_go_goleak//:goleak", "@org_uber_go_multierr//:multierr", ], From 133a75f0e49cb6506008350f6cd981e7b3efe7af Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 20 Sep 2023 18:06:37 +0800 Subject: [PATCH 05/13] rename RPC Signed-off-by: hillium --- DEPS.bzl | 12 ++++---- br/pkg/storage/BUILD.bazel | 3 +- br/pkg/task/backup_ebs.go | 2 +- br/pkg/task/operator/cmd.go | 2 +- br/pkg/utils/deny_importing.go | 44 ++++++++++++++--------------- br/pkg/utils/deny_importing_test.go | 22 +++++++-------- go.mod | 2 +- go.sum | 4 +-- 8 files changed, 45 insertions(+), 46 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 37e7561dc1d93..21f0bf1c2b9e4 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -5754,13 +5754,13 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sha256 = "4af4342540977423986b6627c9efb98477c8a4746cf0c62d63251f6830ce3e3a", - strip_prefix = "github.com/yujuncen/kvproto@v0.0.0-20230913085421-7b5c641805b9", + sha256 = "5f527b38c9d4e9799dabae9e6edd068d258a3eda94c6dad7e2134bcdf5918886", + strip_prefix = "github.com/yujuncen/kvproto@v0.0.0-20230920093617-240e56b2e11a", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230913085421-7b5c641805b9.zip", - "http://ats.apps.svc/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230913085421-7b5c641805b9.zip", - "https://cache.hawkingrei.com/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230913085421-7b5c641805b9.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230913085421-7b5c641805b9.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230920093617-240e56b2e11a.zip", + "http://ats.apps.svc/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230920093617-240e56b2e11a.zip", + "https://cache.hawkingrei.com/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230920093617-240e56b2e11a.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230920093617-240e56b2e11a.zip", ], ) go_repository( diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index e269956c3d4dc..616638d9f8fe8 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -58,7 +58,6 @@ go_library( "@com_google_cloud_go_storage//:storage", "@org_golang_google_api//iterator", "@org_golang_google_api//option", - "@org_golang_google_api//transport/http", "@org_golang_x_oauth2//google", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", @@ -81,7 +80,7 @@ go_test( ], embed = [":storage"], flaky = True, - shard_count = 49, + shard_count = 48, deps = [ "//br/pkg/mock", "@com_github_aws_aws_sdk_go//aws", diff --git a/br/pkg/task/backup_ebs.go b/br/pkg/task/backup_ebs.go index 59247caef5c93..6b467fc6f817d 100644 --- a/br/pkg/task/backup_ebs.go +++ b/br/pkg/task/backup_ebs.go @@ -145,7 +145,7 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { if e != nil { return errors.Trace(err) } - denyLightning := utils.NewDenyImporting("backup_ebs_command", mgr.StoreManager) + denyLightning := utils.NewSuspendImporting("backup_ebs_command", mgr.StoreManager) _, err := denyLightning.DenyAllStores(ctx, utils.DefaultBRGCSafePointTTL) if err != nil { return errors.Annotate(err, "lightning from running") diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index e40f464bd8995..872dfdc9f195d 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -111,7 +111,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { } func goPauseImporting(ctx *AdaptEnvForSnapshotBackupContext) error { - denyLightning := utils.NewDenyImporting("prepare_for_snapshot_backup", ctx.kvMgr) + denyLightning := utils.NewSuspendImporting("prepare_for_snapshot_backup", ctx.kvMgr) if _, err := denyLightning.DenyAllStores(ctx, ctx.cfg.TTL); err != nil { return errors.Trace(err) } diff --git a/br/pkg/utils/deny_importing.go b/br/pkg/utils/deny_importing.go index b5382e14a9d04..fc31017a1c406 100644 --- a/br/pkg/utils/deny_importing.go +++ b/br/pkg/utils/deny_importing.go @@ -32,23 +32,23 @@ func (mgr *StoreManager) GetDenyLightningClient(ctx context.Context, storeID uin return cli, nil } -type DenyImportingEnv interface { +type SuspendImportingEnv interface { GetAllStores(ctx context.Context) ([]*metapb.Store, error) GetDenyLightningClient(ctx context.Context, storeID uint64) (DenyLightningClient, error) } type DenyLightningClient interface { // Temporarily disable ingest / download / write for data listeners don't support catching import data. - DenyImportRPC(ctx context.Context, in *import_sstpb.DenyImportRPCRequest, opts ...grpc.CallOption) (*import_sstpb.DenyImportRPCResponse, error) + SuspendImportRPC(ctx context.Context, in *import_sstpb.SuspendImportRPCRequest, opts ...grpc.CallOption) (*import_sstpb.SuspendImportRPCResponse, error) } -type DenyImporting struct { - env DenyImportingEnv +type SuspendImporting struct { + env SuspendImportingEnv name string } -func NewDenyImporting(name string, env DenyImportingEnv) *DenyImporting { - return &DenyImporting{ +func NewSuspendImporting(name string, env SuspendImportingEnv) *SuspendImporting { + return &SuspendImporting{ env: env, name: name, } @@ -56,28 +56,28 @@ func NewDenyImporting(name string, env DenyImportingEnv) *DenyImporting { // DenyAllStores tries to deny all current stores' lightning execution for the period of time. // Returns a map mapping store ID to whether they are already denied to import tasks. -func (d *DenyImporting) DenyAllStores(ctx context.Context, dur time.Duration) (map[uint64]bool, error) { - return d.forEachStores(ctx, func() *import_sstpb.DenyImportRPCRequest { - return &import_sstpb.DenyImportRPCRequest{ - ShouldDenyImports: true, - DurationSecs: uint64(dur.Seconds()), - Caller: d.name, +func (d *SuspendImporting) DenyAllStores(ctx context.Context, dur time.Duration) (map[uint64]bool, error) { + return d.forEachStores(ctx, func() *import_sstpb.SuspendImportRPCRequest { + return &import_sstpb.SuspendImportRPCRequest{ + ShouldSuspendImports: true, + DurationInSecs: uint64(dur.Seconds()), + Caller: d.name, } }) } -func (d *DenyImporting) AllowAllStores(ctx context.Context) (map[uint64]bool, error) { - return d.forEachStores(ctx, func() *import_sstpb.DenyImportRPCRequest { - return &import_sstpb.DenyImportRPCRequest{ - ShouldDenyImports: false, - Caller: d.name, +func (d *SuspendImporting) AllowAllStores(ctx context.Context) (map[uint64]bool, error) { + return d.forEachStores(ctx, func() *import_sstpb.SuspendImportRPCRequest { + return &import_sstpb.SuspendImportRPCRequest{ + ShouldSuspendImports: false, + Caller: d.name, } }) } // forEachStores send the request to each stores reachable. // Returns a map mapping store ID to whether they are already denied to import tasks. -func (d *DenyImporting) forEachStores(ctx context.Context, makeReq func() *import_sstpb.DenyImportRPCRequest) (map[uint64]bool, error) { +func (d *SuspendImporting) forEachStores(ctx context.Context, makeReq func() *import_sstpb.SuspendImportRPCRequest) (map[uint64]bool, error) { stores, err := d.env.GetAllStores(ctx) if err != nil { return nil, errors.Annotate(err, "failed to get all stores") @@ -90,18 +90,18 @@ func (d *DenyImporting) forEachStores(ctx context.Context, makeReq func() *impor return nil, errors.Annotatef(err, "failed to get client for store %d", store.Id) } req := makeReq() - resp, err := cli.DenyImportRPC(ctx, req) + resp, err := cli.SuspendImportRPC(ctx, req) if err != nil { return nil, errors.Annotatef(err, "failed to deny lightning rpc for store %d", store.Id) } - result[store.Id] = resp.AlreadyDeniedImports + result[store.Id] = resp.AlreadySuspended } return result, nil } // HasKeptDenying checks whether a result returned by `DenyAllStores` is able to keep the consistency with last request. // i.e. Whether the store has some holes of pausing the import requests. -func (d *DenyImporting) ConsistentWithPrev(result map[uint64]bool) error { +func (d *SuspendImporting) ConsistentWithPrev(result map[uint64]bool) error { for storeId, denied := range result { if !denied { return errors.Annotatef(berrors.ErrPossibleInconsistency, "failed to keep importing to store %d being denied, the state might be inconsistency", storeId) @@ -110,7 +110,7 @@ func (d *DenyImporting) ConsistentWithPrev(result map[uint64]bool) error { return nil } -func (d *DenyImporting) Keeper(ctx context.Context, ttl time.Duration) error { +func (d *SuspendImporting) Keeper(ctx context.Context, ttl time.Duration) error { lastSuccess := time.Now() t := time.NewTicker(ttl / DenyLightningUpdateFrequency) defer t.Stop() diff --git a/br/pkg/utils/deny_importing_test.go b/br/pkg/utils/deny_importing_test.go index e3a285ad28528..cdb5c365e6bb0 100644 --- a/br/pkg/utils/deny_importing_test.go +++ b/br/pkg/utils/deny_importing_test.go @@ -22,7 +22,7 @@ type ImportTargetStore struct { mu sync.Mutex Id uint64 LastSuccessDenyCall time.Time - DenyImportFor time.Duration + SuspendImportFor time.Duration DeniedImport bool ErrGen func() error @@ -69,7 +69,7 @@ func (s *ImportTargetStores) GetDenyLightningClient(ctx context.Context, storeID } // Temporarily disable ingest / download / write for data listeners don't support catching import data. -func (s *ImportTargetStore) DenyImportRPC(ctx context.Context, in *import_sstpb.DenyImportRPCRequest, opts ...grpc.CallOption) (*import_sstpb.DenyImportRPCResponse, error) { +func (s *ImportTargetStore) SuspendImportRPC(ctx context.Context, in *import_sstpb.SuspendImportRPCRequest, opts ...grpc.CallOption) (*import_sstpb.SuspendImportRPCResponse, error) { s.mu.Lock() defer s.mu.Unlock() @@ -80,14 +80,14 @@ func (s *ImportTargetStore) DenyImportRPC(ctx context.Context, in *import_sstpb. } denied := s.DeniedImport - if in.ShouldDenyImports { + if in.ShouldSuspendImports { s.DeniedImport = true - s.DenyImportFor = time.Duration(in.DurationSecs) * time.Second + s.SuspendImportFor = time.Duration(in.DurationSecs) * time.Second s.LastSuccessDenyCall = time.Now() } else { s.DeniedImport = false } - return &import_sstpb.DenyImportRPCResponse{ + return &import_sstpb.SuspendImportRPCResponse{ AlreadyDeniedImports: denied, }, nil } @@ -102,7 +102,7 @@ func (s *ImportTargetStores) assertAllStoresDenied(t *testing.T) { defer store.mu.Unlock() require.True(t, store.DeniedImport, "ID = %d", store.Id) - require.Less(t, time.Since(store.LastSuccessDenyCall), store.DenyImportFor, "ID = %d", store.Id) + require.Less(t, time.Since(store.LastSuccessDenyCall), store.SuspendImportFor, "ID = %d", store.Id) }() } } @@ -111,7 +111,7 @@ func TestBasic(t *testing.T) { req := require.New(t) ss := initWithIDs([]int{1, 4, 5}) - deny := utils.NewDenyImporting(t.Name(), ss) + deny := utils.NewSuspendImporting(t.Name(), ss) ctx := context.Background() res, err := deny.DenyAllStores(ctx, 10*time.Second) @@ -119,7 +119,7 @@ func TestBasic(t *testing.T) { req.Error(deny.ConsistentWithPrev(res)) for id, inner := range ss.items { req.True(inner.DeniedImport, "at %d", id) - req.Equal(inner.DenyImportFor, 10*time.Second, "at %d", id) + req.Equal(inner.SuspendImportFor, 10*time.Second, "at %d", id) } res, err = deny.DenyAllStores(ctx, 10*time.Second) @@ -136,7 +136,7 @@ func TestKeeperError(t *testing.T) { ctx := context.Background() ss := initWithIDs([]int{1, 4, 5}) - deny := utils.NewDenyImporting(t.Name(), ss) + deny := utils.NewSuspendImporting(t.Name(), ss) ttl := time.Second now := time.Now() @@ -167,7 +167,7 @@ func TestKeeperErrorExit(t *testing.T) { ctx := context.Background() ss := initWithIDs([]int{1, 4, 5}) - deny := utils.NewDenyImporting(t.Name(), ss) + deny := utils.NewSuspendImporting(t.Name(), ss) ttl := time.Second triggeredErr := uint32(0) @@ -191,7 +191,7 @@ func TestKeeperCalled(t *testing.T) { ctx := context.Background() ss := initWithIDs([]int{1, 4, 5}) - deny := utils.NewDenyImporting(t.Name(), ss) + deny := utils.NewSuspendImporting(t.Name(), ss) ttl := 1 * time.Second _, err := deny.DenyAllStores(ctx, ttl) diff --git a/go.mod b/go.mod index b4c445abb8e5d..ba7d4d831a00d 100644 --- a/go.mod +++ b/go.mod @@ -310,4 +310,4 @@ replace ( github.com/pingcap/tidb/parser => ./parser ) -replace github.com/pingcap/kvproto => github.com/yujuncen/kvproto v0.0.0-20230913085421-7b5c641805b9 +replace github.com/pingcap/kvproto => github.com/yujuncen/kvproto v0.0.0-20230920093617-240e56b2e11a diff --git a/go.sum b/go.sum index 8237c1585dbd0..62fbf1bae6f5a 100644 --- a/go.sum +++ b/go.sum @@ -1069,8 +1069,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -github.com/yujuncen/kvproto v0.0.0-20230913085421-7b5c641805b9 h1:HV+9VKLvBkMULH3IHklXBiH/a5qzcz78p01JvOQPcd4= -github.com/yujuncen/kvproto v0.0.0-20230913085421-7b5c641805b9/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/yujuncen/kvproto v0.0.0-20230920093617-240e56b2e11a h1:hX0Ohl1aJZsCmHuLJYGUSVvGaCq5CNMUMmxIyrrzytI= +github.com/yujuncen/kvproto v0.0.0-20230920093617-240e56b2e11a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= From 4f8cd5274e446ee78da3f6aa8247c939899b5a56 Mon Sep 17 00:00:00 2001 From: hillium Date: Thu, 21 Sep 2023 15:13:14 +0800 Subject: [PATCH 06/13] fix build and test Signed-off-by: hillium --- br/pkg/task/operator/cmd.go | 40 +++++++++++-------- ...deny_importing.go => suspend_importing.go} | 6 +-- ...ting_test.go => suspend_importing_test.go} | 18 ++++----- 3 files changed, 36 insertions(+), 28 deletions(-) rename br/pkg/utils/{deny_importing.go => suspend_importing.go} (95%) rename br/pkg/utils/{deny_importing_test.go => suspend_importing_test.go} (92%) diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index 872dfdc9f195d..bde46eef717ac 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -37,12 +37,12 @@ func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) return mgr, nil } -func cleanUpWith(f func(ctx context.Context)) { - cleanUpWithErr(func(ctx context.Context) error { f(ctx); return nil }) +func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWith(f func(ctx context.Context)) { + cx.cleanUpWithErr(func(ctx context.Context) error { f(ctx); return nil }) } -func cleanUpWithErr(f func(ctx context.Context) error) error { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) +func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWithErr(f func(ctx context.Context) error) error { + ctx, cancel := context.WithTimeout(context.Background(), cx.cfg.TTL) defer cancel() return f(ctx) } @@ -110,23 +110,31 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { return eg.Wait() } -func goPauseImporting(ctx *AdaptEnvForSnapshotBackupContext) error { - denyLightning := utils.NewSuspendImporting("prepare_for_snapshot_backup", ctx.kvMgr) - if _, err := denyLightning.DenyAllStores(ctx, ctx.cfg.TTL); err != nil { +func goPauseImporting(cx *AdaptEnvForSnapshotBackupContext) error { + denyLightning := utils.NewSuspendImporting("prepare_for_snapshot_backup", cx.kvMgr) + if _, err := denyLightning.DenyAllStores(cx, cx.cfg.TTL); err != nil { return errors.Trace(err) } - ctx.ReadyL("pause_lightning") - ctx.runGrp.Go(func() error { - err := denyLightning.Keeper(ctx, ctx.cfg.TTL) + cx.ReadyL("pause_lightning") + cx.runGrp.Go(func() error { + err := denyLightning.Keeper(cx, cx.cfg.TTL) if errors.Cause(err) != context.Canceled { log.Warn("keeper encounters error.", logutil.ShortError(err)) } - return cleanUpWithErr(func(ctx context.Context) error { - res, err := denyLightning.AllowAllStores(ctx) - if err != nil { - return err + return cx.cleanUpWithErr(func(ctx context.Context) error { + for { + if ctx.Err() != nil { + return errors.Annotate(ctx.Err(), "cleaning up timed out") + } + res, err := denyLightning.AllowAllStores(ctx) + if err != nil { + log.Warn("Failed to restore lightning, will retry.", logutil.ShortError(err)) + // Retry for 10 times. + time.Sleep(cx.cfg.TTL / 10) + continue + } + return denyLightning.ConsistentWithPrev(res) } - return denyLightning.ConsistentWithPrev(res) }) }) return nil @@ -162,7 +170,7 @@ func pauseGCKeeper(ctx *AdaptEnvForSnapshotBackupContext) error { func pauseSchedulerKeeper(ctx *AdaptEnvForSnapshotBackupContext) error { undo, err := ctx.pdMgr.RemoveAllPDSchedulers(ctx) if undo != nil { - defer cleanUpWith(func(ctx context.Context) { + defer ctx.cleanUpWith(func(ctx context.Context) { if err := undo(ctx); err != nil { log.Warn("failed to restore pd scheduler.", logutil.ShortError(err)) } diff --git a/br/pkg/utils/deny_importing.go b/br/pkg/utils/suspend_importing.go similarity index 95% rename from br/pkg/utils/deny_importing.go rename to br/pkg/utils/suspend_importing.go index fc31017a1c406..8e4adb0fa0dd7 100644 --- a/br/pkg/utils/deny_importing.go +++ b/br/pkg/utils/suspend_importing.go @@ -21,7 +21,7 @@ func (mgr *StoreManager) GetAllStores(ctx context.Context) ([]*metapb.Store, err return mgr.PDClient().GetAllStores(ctx) } -func (mgr *StoreManager) GetDenyLightningClient(ctx context.Context, storeID uint64) (DenyLightningClient, error) { +func (mgr *StoreManager) GetDenyLightningClient(ctx context.Context, storeID uint64) (SuspendLightningClient, error) { var cli import_sstpb.ImportSSTClient err := mgr.WithConn(ctx, storeID, func(cc *grpc.ClientConn) { cli = import_sstpb.NewImportSSTClient(cc) @@ -34,10 +34,10 @@ func (mgr *StoreManager) GetDenyLightningClient(ctx context.Context, storeID uin type SuspendImportingEnv interface { GetAllStores(ctx context.Context) ([]*metapb.Store, error) - GetDenyLightningClient(ctx context.Context, storeID uint64) (DenyLightningClient, error) + GetDenyLightningClient(ctx context.Context, storeID uint64) (SuspendLightningClient, error) } -type DenyLightningClient interface { +type SuspendLightningClient interface { // Temporarily disable ingest / download / write for data listeners don't support catching import data. SuspendImportRPC(ctx context.Context, in *import_sstpb.SuspendImportRPCRequest, opts ...grpc.CallOption) (*import_sstpb.SuspendImportRPCResponse, error) } diff --git a/br/pkg/utils/deny_importing_test.go b/br/pkg/utils/suspend_importing_test.go similarity index 92% rename from br/pkg/utils/deny_importing_test.go rename to br/pkg/utils/suspend_importing_test.go index cdb5c365e6bb0..e0e72f907cc44 100644 --- a/br/pkg/utils/deny_importing_test.go +++ b/br/pkg/utils/suspend_importing_test.go @@ -23,7 +23,7 @@ type ImportTargetStore struct { Id uint64 LastSuccessDenyCall time.Time SuspendImportFor time.Duration - DeniedImport bool + SuspendedImport bool ErrGen func() error } @@ -56,7 +56,7 @@ func (s *ImportTargetStores) GetAllStores(ctx context.Context) ([]*metapb.Store, return stores, nil } -func (s *ImportTargetStores) GetDenyLightningClient(ctx context.Context, storeID uint64) (utils.DenyLightningClient, error) { +func (s *ImportTargetStores) GetDenyLightningClient(ctx context.Context, storeID uint64) (utils.SuspendLightningClient, error) { s.mu.Lock() defer s.mu.Unlock() @@ -79,16 +79,16 @@ func (s *ImportTargetStore) SuspendImportRPC(ctx context.Context, in *import_sst } } - denied := s.DeniedImport + suspended := s.SuspendedImport if in.ShouldSuspendImports { - s.DeniedImport = true - s.SuspendImportFor = time.Duration(in.DurationSecs) * time.Second + s.SuspendedImport = true + s.SuspendImportFor = time.Duration(in.DurationInSecs) * time.Second s.LastSuccessDenyCall = time.Now() } else { - s.DeniedImport = false + s.SuspendedImport = false } return &import_sstpb.SuspendImportRPCResponse{ - AlreadyDeniedImports: denied, + AlreadySuspended: suspended, }, nil } @@ -101,7 +101,7 @@ func (s *ImportTargetStores) assertAllStoresDenied(t *testing.T) { store.mu.Lock() defer store.mu.Unlock() - require.True(t, store.DeniedImport, "ID = %d", store.Id) + require.True(t, store.SuspendedImport, "ID = %d", store.Id) require.Less(t, time.Since(store.LastSuccessDenyCall), store.SuspendImportFor, "ID = %d", store.Id) }() } @@ -118,7 +118,7 @@ func TestBasic(t *testing.T) { req.NoError(err) req.Error(deny.ConsistentWithPrev(res)) for id, inner := range ss.items { - req.True(inner.DeniedImport, "at %d", id) + req.True(inner.SuspendedImport, "at %d", id) req.Equal(inner.SuspendImportFor, 10*time.Second, "at %d", id) } From 836f83e54ee820fa8d20aac52dd8a06ca73ea6a0 Mon Sep 17 00:00:00 2001 From: hillium Date: Thu, 21 Sep 2023 15:24:24 +0800 Subject: [PATCH 07/13] make bazel_prepare Signed-off-by: hillium --- br/pkg/utils/BUILD.bazel | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index 2226abcef08a8..62c2ee8ffcb8e 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -6,7 +6,6 @@ go_library( "backoff.go", "cdc.go", "db.go", - "deny_importing.go", "dyn_pprof_other.go", "dyn_pprof_unix.go", "env.go", @@ -23,6 +22,7 @@ go_library( "schema.go", "sensitive.go", "store_manager.go", + "suspend_importing.go", "worker.go", ], importpath = "github.com/pingcap/tidb/br/pkg/utils", @@ -77,7 +77,6 @@ go_test( "backoff_test.go", "cdc_test.go", "db_test.go", - "deny_importing_test.go", "env_test.go", "json_test.go", "key_test.go", @@ -90,6 +89,7 @@ go_test( "safe_point_test.go", "schema_test.go", "sensitive_test.go", + "suspend_importing_test.go", ], embed = [":utils"], flaky = True, From f3f351cd5a42ed0899d281300b27e2fe50049107 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 22 Sep 2023 14:33:09 +0800 Subject: [PATCH 08/13] address comments Signed-off-by: hillium --- br/pkg/task/backup_ebs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/task/backup_ebs.go b/br/pkg/task/backup_ebs.go index 6b467fc6f817d..e459cfcb60bff 100644 --- a/br/pkg/task/backup_ebs.go +++ b/br/pkg/task/backup_ebs.go @@ -171,7 +171,7 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { log.Warn("failed to restore importing, you may need to wait until you are able to start importing", zap.Duration("wait_for", utils.DefaultBRGCSafePointTTL)) } if err := denyLightning.ConsistentWithPrev(res); err != nil { - log.Warn("lightning hasn't been denied during restoring, the backup archive may not be usable.", logutil.ShortError(err)) + log.Warn("lightning hasn't been denied, the backup archive may not be usable.", logutil.ShortError(err)) } }() } From b253365cf0946e2fdd2b5e44c9846aa58208f3b9 Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 26 Sep 2023 11:23:41 +0800 Subject: [PATCH 09/13] back to master Signed-off-by: hillium --- DEPS.bzl | 12 ++++++------ go.mod | 4 +--- go.sum | 18 ++++++++---------- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 20d6ca97d9536..fd34d7fea2521 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -5832,13 +5832,13 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sha256 = "5f527b38c9d4e9799dabae9e6edd068d258a3eda94c6dad7e2134bcdf5918886", - strip_prefix = "github.com/yujuncen/kvproto@v0.0.0-20230920093617-240e56b2e11a", + sha256 = "f4b1d302bbbb5075222ff4ed7d5f7f8807d920aad1a5a7d4c31fdf2233d8568c", + strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20230925123611-87bebcc0d071", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230920093617-240e56b2e11a.zip", - "http://ats.apps.svc/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230920093617-240e56b2e11a.zip", - "https://cache.hawkingrei.com/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230920093617-240e56b2e11a.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/yujuncen/kvproto/com_github_yujuncen_kvproto-v0.0.0-20230920093617-240e56b2e11a.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230925123611-87bebcc0d071.zip", + "http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230925123611-87bebcc0d071.zip", + "https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230925123611-87bebcc0d071.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20230925123611-87bebcc0d071.zip", ], ) go_repository( diff --git a/go.mod b/go.mod index 5db36705b27d2..17b5ed5cedd85 100644 --- a/go.mod +++ b/go.mod @@ -80,7 +80,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c github.com/pingcap/fn v1.0.0 - github.com/pingcap/kvproto v0.0.0-20230904082117-ecdbf1f8c130 + github.com/pingcap/kvproto v0.0.0-20230925123611-87bebcc0d071 github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e @@ -313,5 +313,3 @@ replace ( github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117 github.com/pingcap/tidb/parser => ./parser ) - -replace github.com/pingcap/kvproto => github.com/yujuncen/kvproto v0.0.0-20230920093617-240e56b2e11a diff --git a/go.sum b/go.sum index 6ba91db6495e2..56c319c0be276 100644 --- a/go.sum +++ b/go.sum @@ -189,10 +189,6 @@ github.com/cloudfoundry/gosigar v1.3.6 h1:gIc08FbB3QPb+nAQhINIK/qhf5REKkY0FTGgRG github.com/cloudfoundry/gosigar v1.3.6/go.mod h1:lNWstu5g5gw59O09Y+wsMNFzBSnU8a0u+Sfx4dq360E= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= -github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= -github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/cockroachdb/cmux v0.0.0-20170110192607-30d10be49292/go.mod h1:qRiX68mZX1lGBkTWyp3CLcenw9I94W2dLeRvMzcn9N4= github.com/cockroachdb/cockroach v0.0.0-20170608034007-84bc9597164f/go.mod h1:xeT/CQ0qZHangbYbWShlCGAx31aV4AjGswDUjhKS6HQ= @@ -286,7 +282,6 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= -github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/etcd-io/gofail v0.0.0-20190801230047-ad7f989257ca/go.mod h1:49H/RkXP8pKaZy4h0d+NW16rSLhyVBt4o6VLJbmOqDE= @@ -369,6 +364,7 @@ github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -399,6 +395,7 @@ github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= +github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -835,6 +832,9 @@ github.com/pingcap/fn v1.0.0 h1:CyA6AxcOZkQh52wIqYlAmaVmF6EvrcqFywP463pjA8g= github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z24= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= +github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20230925123611-87bebcc0d071 h1:giqmIJSWHs+jhHfd+rth8CXWR18KAtqJu4imY1YdA6o= +github.com/pingcap/kvproto v0.0.0-20230925123611-87bebcc0d071/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY= @@ -1081,8 +1081,6 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -github.com/yujuncen/kvproto v0.0.0-20230920093617-240e56b2e11a h1:hX0Ohl1aJZsCmHuLJYGUSVvGaCq5CNMUMmxIyrrzytI= -github.com/yujuncen/kvproto v0.0.0-20230920093617-240e56b2e11a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= @@ -1248,6 +1246,7 @@ golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1377,7 +1376,6 @@ golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1552,6 +1550,7 @@ google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6 google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -1585,6 +1584,7 @@ google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= +google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -1606,7 +1606,6 @@ google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= -google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww= google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= @@ -1621,7 +1620,6 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= From b53c6642387f9e082a5a73b748f56e393cca5830 Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 26 Sep 2023 17:15:00 +0800 Subject: [PATCH 10/13] fix build Signed-off-by: hillium --- br/pkg/task/operator/cmd.go | 12 ++++++------ br/pkg/utils/suspend_importing.go | 8 +++----- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index bde46eef717ac..57d2fb5e81dd3 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -38,7 +38,7 @@ func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) } func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWith(f func(ctx context.Context)) { - cx.cleanUpWithErr(func(ctx context.Context) error { f(ctx); return nil }) + _ = cx.cleanUpWithErr(func(ctx context.Context) error { f(ctx); return nil }) } func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWithErr(f func(ctx context.Context) error) error { @@ -58,9 +58,9 @@ type AdaptEnvForSnapshotBackupContext struct { runGrp *errgroup.Group } -func (ctx *AdaptEnvForSnapshotBackupContext) ReadyL(name string, notes ...zap.Field) { - logutil.CL(ctx).Info("Stage ready.", append(notes, zap.String("component", name))...) - ctx.rdGrp.Done() +func (cx *AdaptEnvForSnapshotBackupContext) ReadyL(name string, notes ...zap.Field) { + logutil.CL(cx).Info("Stage ready.", append(notes, zap.String("component", name))...) + cx.rdGrp.Done() } func hintAllReady() { @@ -85,8 +85,8 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { } } kvMgr := utils.NewStoreManager(mgr.GetPDClient(), keepalive.ClientParameters{ - Time: time.Duration(cfg.Config.GRPCKeepaliveTime) * time.Second, - Timeout: time.Duration(cfg.Config.GRPCKeepaliveTimeout) * time.Second, + Time: cfg.Config.GRPCKeepaliveTime, + Timeout: cfg.Config.GRPCKeepaliveTimeout, }, tconf) eg, ectx := errgroup.WithContext(ctx) cx := &AdaptEnvForSnapshotBackupContext{ diff --git a/br/pkg/utils/suspend_importing.go b/br/pkg/utils/suspend_importing.go index 8e4adb0fa0dd7..342e3be73298a 100644 --- a/br/pkg/utils/suspend_importing.go +++ b/br/pkg/utils/suspend_importing.go @@ -5,11 +5,10 @@ import ( "time" "github.com/pingcap/errors" - berrors "github.com/pingcap/tidb/br/pkg/errors" - "github.com/pingcap/tidb/br/pkg/logutil" - "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" + berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" "google.golang.org/grpc" ) @@ -124,9 +123,8 @@ func (d *SuspendImporting) Keeper(ctx context.Context, ttl time.Duration) error if time.Since(lastSuccess) < ttl { logutil.CL(ctx).Warn("Failed to send deny one of the stores.", logutil.ShortError(err)) continue - } else { - return err } + return err } if err := d.ConsistentWithPrev(res); err != nil { return err From 0c3b3739196fab242db3e5af845fe9e7079869c4 Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 26 Sep 2023 17:48:32 +0800 Subject: [PATCH 11/13] generate error.toml Signed-off-by: hillium --- errors.toml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/errors.toml b/errors.toml index 8d37b77363cbc..ce5692b8e9e61 100644 --- a/errors.toml +++ b/errors.toml @@ -146,6 +146,11 @@ error = ''' storage is not tikv ''' +["BR:KV:ErrPossibleInconsistency"] +error = ''' +the cluster state might be inconsistent +''' + ["BR:PD:ErrPDBatchScanRegion"] error = ''' batch scan region From 2118fd0db9c216f542289b65aaa97058dafcf6a4 Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 27 Sep 2023 19:23:30 +0800 Subject: [PATCH 12/13] address comments Signed-off-by: hillium --- br/pkg/task/operator/cmd.go | 12 ++++++------ br/pkg/utils/suspend_importing.go | 16 ++++++++++++---- br/pkg/utils/suspend_importing_test.go | 2 +- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index 57d2fb5e81dd3..909d18911c8d0 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -90,7 +90,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { }, tconf) eg, ectx := errgroup.WithContext(ctx) cx := &AdaptEnvForSnapshotBackupContext{ - Context: ectx, + Context: logutil.ContextWithField(ectx, zap.String("tag", "br_operator")), pdMgr: mgr, kvMgr: kvMgr, cfg: *cfg, @@ -101,7 +101,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { eg.Go(func() error { return pauseGCKeeper(cx) }) eg.Go(func() error { return pauseSchedulerKeeper(cx) }) - eg.Go(func() error { return goPauseImporting(cx) }) + eg.Go(func() error { return pauseImporting(cx) }) go func() { cx.rdGrp.Wait() hintAllReady() @@ -110,7 +110,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { return eg.Wait() } -func goPauseImporting(cx *AdaptEnvForSnapshotBackupContext) error { +func pauseImporting(cx *AdaptEnvForSnapshotBackupContext) error { denyLightning := utils.NewSuspendImporting("prepare_for_snapshot_backup", cx.kvMgr) if _, err := denyLightning.DenyAllStores(cx, cx.cfg.TTL); err != nil { return errors.Trace(err) @@ -119,7 +119,7 @@ func goPauseImporting(cx *AdaptEnvForSnapshotBackupContext) error { cx.runGrp.Go(func() error { err := denyLightning.Keeper(cx, cx.cfg.TTL) if errors.Cause(err) != context.Canceled { - log.Warn("keeper encounters error.", logutil.ShortError(err)) + logutil.CL(cx).Warn("keeper encounters error.", logutil.ShortError(err)) } return cx.cleanUpWithErr(func(ctx context.Context) error { for { @@ -128,7 +128,7 @@ func goPauseImporting(cx *AdaptEnvForSnapshotBackupContext) error { } res, err := denyLightning.AllowAllStores(ctx) if err != nil { - log.Warn("Failed to restore lightning, will retry.", logutil.ShortError(err)) + logutil.CL(ctx).Warn("Failed to restore lightning, will retry.", logutil.ShortError(err)) // Retry for 10 times. time.Sleep(cx.cfg.TTL / 10) continue @@ -152,7 +152,7 @@ func pauseGCKeeper(ctx *AdaptEnvForSnapshotBackupContext) error { if err != nil { return err } - log.Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts)) + logutil.CL(ctx).Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts)) sp.BackupTS = rts } err := utils.StartServiceSafePointKeeper(ctx, ctx.pdMgr.GetPDClient(), sp) diff --git a/br/pkg/utils/suspend_importing.go b/br/pkg/utils/suspend_importing.go index 342e3be73298a..c2df70229c525 100644 --- a/br/pkg/utils/suspend_importing.go +++ b/br/pkg/utils/suspend_importing.go @@ -9,6 +9,9 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/util/engine" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -17,10 +20,10 @@ const ( ) func (mgr *StoreManager) GetAllStores(ctx context.Context) ([]*metapb.Store, error) { - return mgr.PDClient().GetAllStores(ctx) + return mgr.PDClient().GetAllStores(ctx, pd.WithExcludeTombstone()) } -func (mgr *StoreManager) GetDenyLightningClient(ctx context.Context, storeID uint64) (SuspendLightningClient, error) { +func (mgr *StoreManager) GetDenyLightningClient(ctx context.Context, storeID uint64) (SuspendImportingClient, error) { var cli import_sstpb.ImportSSTClient err := mgr.WithConn(ctx, storeID, func(cc *grpc.ClientConn) { cli = import_sstpb.NewImportSSTClient(cc) @@ -33,10 +36,10 @@ func (mgr *StoreManager) GetDenyLightningClient(ctx context.Context, storeID uin type SuspendImportingEnv interface { GetAllStores(ctx context.Context) ([]*metapb.Store, error) - GetDenyLightningClient(ctx context.Context, storeID uint64) (SuspendLightningClient, error) + GetDenyLightningClient(ctx context.Context, storeID uint64) (SuspendImportingClient, error) } -type SuspendLightningClient interface { +type SuspendImportingClient interface { // Temporarily disable ingest / download / write for data listeners don't support catching import data. SuspendImportRPC(ctx context.Context, in *import_sstpb.SuspendImportRPCRequest, opts ...grpc.CallOption) (*import_sstpb.SuspendImportRPCResponse, error) } @@ -84,6 +87,11 @@ func (d *SuspendImporting) forEachStores(ctx context.Context, makeReq func() *im result := map[uint64]bool{} for _, store := range stores { + logutil.CL(ctx).Info("Handling store.", zap.Stringer("store", store)) + if engine.IsTiFlash(store) { + logutil.CL(ctx).Info("Store is tiflash, skipping.", zap.Stringer("store", store)) + continue + } cli, err := d.env.GetDenyLightningClient(ctx, store.Id) if err != nil { return nil, errors.Annotatef(err, "failed to get client for store %d", store.Id) diff --git a/br/pkg/utils/suspend_importing_test.go b/br/pkg/utils/suspend_importing_test.go index e0e72f907cc44..8ee04af072048 100644 --- a/br/pkg/utils/suspend_importing_test.go +++ b/br/pkg/utils/suspend_importing_test.go @@ -56,7 +56,7 @@ func (s *ImportTargetStores) GetAllStores(ctx context.Context) ([]*metapb.Store, return stores, nil } -func (s *ImportTargetStores) GetDenyLightningClient(ctx context.Context, storeID uint64) (utils.SuspendLightningClient, error) { +func (s *ImportTargetStores) GetDenyLightningClient(ctx context.Context, storeID uint64) (utils.SuspendImportingClient, error) { s.mu.Lock() defer s.mu.Unlock() From 74b375416b04d8835a3ceca512ba606eb3b58358 Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 27 Sep 2023 19:36:42 +0800 Subject: [PATCH 13/13] make bazel_prepare Signed-off-by: hillium --- br/pkg/utils/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index 62c2ee8ffcb8e..00158fa746da6 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -39,6 +39,7 @@ go_library( "//parser/types", "//sessionctx", "//util", + "//util/engine", "//util/sqlexec", "@com_github_cheggaaa_pb_v3//:pb", "@com_github_cznic_mathutil//:mathutil",