From e15f5b7d20a05581e4e6970759f76811ad16e4a8 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Mon, 18 May 2020 21:44:29 +0800 Subject: [PATCH] cmd: prevent ongoing requests being canceled by deadline exceeded (#579) Signed-off-by: Neil Shen --- cmd/client.go | 8 ++++---- cmd/client_capture.go | 3 +-- cmd/client_changefeed.go | 43 ++++++++++------------------------------ cmd/client_meta.go | 3 +-- cmd/client_processor.go | 6 ++---- cmd/client_tso.go | 3 +-- cmd/root.go | 19 ++++++++++++++++++ cmd/server.go | 20 +------------------ cmd/util.go | 4 ---- 9 files changed, 39 insertions(+), 70 deletions(-) diff --git a/cmd/client.go b/cmd/client.go index 6bb5a9c62ac..fae9c653042 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -1,6 +1,7 @@ package cmd import ( + "context" "fmt" "io" "os" @@ -51,7 +52,7 @@ var ( captureID string interval uint - defaultContextTimeoutDuration = 30 * time.Second + defaultContext context.Context ) // cf holds changefeed id, which is used for output only @@ -98,7 +99,7 @@ func newCliCommand() *cobra.Command { PersistentPreRunE: func(cmd *cobra.Command, args []string) error { etcdCli, err := clientv3.New(clientv3.Config{ Endpoints: []string{cliPdAddr}, - DialTimeout: defaultContextTimeoutDuration, + DialTimeout: 30 * time.Second, DialOptions: []grpc.DialOption{ grpc.WithBlock(), grpc.WithConnectParams(grpc.ConnectParams{ @@ -133,8 +134,7 @@ func newCliCommand() *cobra.Command { if err != nil { return errors.Annotate(err, "fail to open PD client") } - ctx, cancel := contextTimeout() - defer cancel() + ctx := defaultContext err = util.CheckClusterVersion(ctx, pdCli, cliPdAddr) if err != nil { return err diff --git a/cmd/client_capture.go b/cmd/client_capture.go index 1930e15c4ad..d4d1cf68a36 100644 --- a/cmd/client_capture.go +++ b/cmd/client_capture.go @@ -22,8 +22,7 @@ func newListCaptureCommand() *cobra.Command { Use: "list", Short: "List all captures in TiCDC cluster", RunE: func(cmd *cobra.Command, args []string) error { - ctx, cancel := contextTimeout() - defer cancel() + ctx := defaultContext captures, err := getAllCaptures(ctx) if err != nil { return err diff --git a/cmd/client_changefeed.go b/cmd/client_changefeed.go index aa47c3837e3..9e544bac821 100644 --- a/cmd/client_changefeed.go +++ b/cmd/client_changefeed.go @@ -1,12 +1,8 @@ package cmd import ( - "context" "fmt" - "os" - "os/signal" "strings" - "syscall" "time" "github.com/google/uuid" @@ -41,8 +37,7 @@ func newAdminChangefeedCommand() []*cobra.Command { Use: "pause", Short: "Pause a replicaiton task (changefeed)", RunE: func(cmd *cobra.Command, args []string) error { - ctx, cancel := contextTimeout() - defer cancel() + ctx := defaultContext job := model.AdminJob{ CfID: changefeedID, Type: model.AdminStop, @@ -54,8 +49,7 @@ func newAdminChangefeedCommand() []*cobra.Command { Use: "resume", Short: "Resume a paused replicaiton task (changefeed)", RunE: func(cmd *cobra.Command, args []string) error { - ctx, cancel := contextTimeout() - defer cancel() + ctx := defaultContext job := model.AdminJob{ CfID: changefeedID, Type: model.AdminResume, @@ -67,8 +61,7 @@ func newAdminChangefeedCommand() []*cobra.Command { Use: "remove", Short: "Remove a replicaiton task (changefeed)", RunE: func(cmd *cobra.Command, args []string) error { - ctx, cancel := contextTimeout() - defer cancel() + ctx := defaultContext job := model.AdminJob{ CfID: changefeedID, Type: model.AdminRemove, @@ -90,8 +83,7 @@ func newListChangefeedCommand() *cobra.Command { Use: "list", Short: "List all replication tasks (changefeeds) in TiCDC cluster", RunE: func(cmd *cobra.Command, args []string) error { - ctx, cancel := contextTimeout() - defer cancel() + ctx := defaultContext _, raw, err := cdcEtcdCli.GetChangeFeeds(ctx) if err != nil { return err @@ -111,8 +103,7 @@ func newQueryChangefeedCommand() *cobra.Command { Use: "query", Short: "Query information and status of a replicaiton task (changefeed)", RunE: func(cmd *cobra.Command, args []string) error { - ctx, cancel := contextTimeout() - defer cancel() + ctx := defaultContext info, err := cdcEtcdCli.GetChangeFeedInfo(ctx, changefeedID) if err != nil && errors.Cause(err) != model.ErrChangeFeedNotExists { return err @@ -152,8 +143,7 @@ func newCreateChangefeedCommand() *cobra.Command { Short: "Create a new replication task (changefeed)", Long: ``, RunE: func(cmd *cobra.Command, args []string) error { - ctx, cancel := contextTimeout() - defer cancel() + ctx := defaultContext id := uuid.New().String() if startTs == 0 { ts, logical, err := pdCli.GetTS(ctx) @@ -269,28 +259,15 @@ func newStatisticsChangefeedCommand() *cobra.Command { Use: "statistics", Short: "Periodically check and output the status of a replicaiton task (changefeed)", RunE: func(cmd *cobra.Command, args []string) error { - sc := make(chan os.Signal, 1) - signal.Notify(sc, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT) - - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() + ctx := defaultContext tick := time.NewTicker(time.Duration(interval) * time.Second) lastTime := time.Now() var lastCount uint64 for { select { - case sig := <-sc: - switch sig { - case syscall.SIGTERM: - cancel() - os.Exit(0) - default: - cancel() - os.Exit(1) + case <-ctx.Done(): + if err := ctx.Err(); err != nil { + return err } case <-tick.C: now := time.Now() diff --git a/cmd/client_meta.go b/cmd/client_meta.go index e65ad026908..b8e732c7490 100644 --- a/cmd/client_meta.go +++ b/cmd/client_meta.go @@ -20,8 +20,7 @@ func newDeleteMetaCommand() *cobra.Command { Use: "delete", Short: "Delete all meta data in etcd, confirm that you know what this command will do and use it at your own risk", RunE: func(cmd *cobra.Command, args []string) error { - ctx, cancel := contextTimeout() - defer cancel() + ctx := defaultContext err := cdcEtcdCli.ClearAllCDCInfo(ctx) if err == nil { cmd.Println("already truncate all meta in etcd!") diff --git a/cmd/client_processor.go b/cmd/client_processor.go index da4a8548caf..3c93cf7ceda 100644 --- a/cmd/client_processor.go +++ b/cmd/client_processor.go @@ -24,8 +24,7 @@ func newListProcessorCommand() *cobra.Command { Use: "list", Short: "List all processors in TiCDC cluster", RunE: func(cmd *cobra.Command, args []string) error { - ctx, cancel := contextTimeout() - defer cancel() + ctx := defaultContext info, err := cdcEtcdCli.GetProcessors(ctx) if err != nil { return err @@ -41,8 +40,7 @@ func newQueryProcessorCommand() *cobra.Command { Use: "query", Short: "Query information and status of a sub replication task (processor)", RunE: func(cmd *cobra.Command, args []string) error { - ctx, cancel := contextTimeout() - defer cancel() + ctx := defaultContext _, status, err := cdcEtcdCli.GetTaskStatus(ctx, changefeedID, captureID) if err != nil && errors.Cause(err) != model.ErrTaskStatusNotExists { return err diff --git a/cmd/client_tso.go b/cmd/client_tso.go index a2937d0478e..5f5bd8b1b78 100644 --- a/cmd/client_tso.go +++ b/cmd/client_tso.go @@ -23,8 +23,7 @@ func newQueryTsoCommand() *cobra.Command { Use: "query", Short: "Get tso from PD", RunE: func(cmd *cobra.Command, args []string) error { - ctx, cancel := contextTimeout() - defer cancel() + ctx := defaultContext ts, logic, err := pdCli.GetTS(ctx) if err != nil { return err diff --git a/cmd/root.go b/cmd/root.go index dba3782bfa6..fd3b8f57d3f 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,8 +1,11 @@ package cmd import ( + "context" "fmt" "os" + "os/signal" + "syscall" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -51,6 +54,22 @@ func initLog() error { // Execute runs the root command func Execute() { + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + defaultContext = ctx + go func() { + sig := <-sc + log.Info("got signal to exit", zap.Stringer("signal", sig)) + cancel() + }() + if err := rootCmd.Execute(); err != nil { fmt.Println(err) os.Exit(1) diff --git a/cmd/server.go b/cmd/server.go index 7f14253365e..87a91c1db93 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -2,9 +2,6 @@ package cmd import ( "context" - "os" - "os/signal" - "syscall" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -60,22 +57,7 @@ func runEServer(cmd *cobra.Command, args []string) error { if err != nil { return errors.Annotate(err, "new server") } - - sc := make(chan os.Signal, 1) - signal.Notify(sc, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT) - - ctx, cancel := context.WithCancel(context.Background()) - go func() { - sig := <-sc - log.Info("got signal to exit", zap.Stringer("signal", sig)) - cancel() - }() - - err = server.Run(ctx) + err = server.Run(defaultContext) if err != nil && errors.Cause(err) != context.Canceled { log.Error("run server", zap.String("error", errors.ErrorStack(err))) return errors.Annotate(err, "run server") diff --git a/cmd/util.go b/cmd/util.go index c90bd300602..97048dec546 100644 --- a/cmd/util.go +++ b/cmd/util.go @@ -23,10 +23,6 @@ import ( "go.etcd.io/etcd/clientv3/concurrency" ) -func contextTimeout() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.Background(), defaultContextTimeoutDuration) -} - func getAllCaptures(ctx context.Context) ([]*capture, error) { _, raw, err := cdcEtcdCli.GetCaptures(ctx) if err != nil {