Skip to content

Commit

Permalink
cmd: prevent ongoing requests being canceled by deadline exceeded (pi…
Browse files Browse the repository at this point in the history
…ngcap#579)

Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus authored May 18, 2020
1 parent 6098dfb commit e15f5b7
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 70 deletions.
8 changes: 4 additions & 4 deletions cmd/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -51,7 +52,7 @@ var (
captureID string
interval uint

defaultContextTimeoutDuration = 30 * time.Second
defaultContext context.Context
)

// cf holds changefeed id, which is used for output only
Expand Down Expand Up @@ -98,7 +99,7 @@ func newCliCommand() *cobra.Command {
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{cliPdAddr},
DialTimeout: defaultContextTimeoutDuration,
DialTimeout: 30 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Expand Down Expand Up @@ -133,8 +134,7 @@ func newCliCommand() *cobra.Command {
if err != nil {
return errors.Annotate(err, "fail to open PD client")
}
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
err = util.CheckClusterVersion(ctx, pdCli, cliPdAddr)
if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions cmd/client_capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ func newListCaptureCommand() *cobra.Command {
Use: "list",
Short: "List all captures in TiCDC cluster",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
captures, err := getAllCaptures(ctx)
if err != nil {
return err
Expand Down
43 changes: 10 additions & 33 deletions cmd/client_changefeed.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package cmd

import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -41,8 +37,7 @@ func newAdminChangefeedCommand() []*cobra.Command {
Use: "pause",
Short: "Pause a replicaiton task (changefeed)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
job := model.AdminJob{
CfID: changefeedID,
Type: model.AdminStop,
Expand All @@ -54,8 +49,7 @@ func newAdminChangefeedCommand() []*cobra.Command {
Use: "resume",
Short: "Resume a paused replicaiton task (changefeed)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
job := model.AdminJob{
CfID: changefeedID,
Type: model.AdminResume,
Expand All @@ -67,8 +61,7 @@ func newAdminChangefeedCommand() []*cobra.Command {
Use: "remove",
Short: "Remove a replicaiton task (changefeed)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
job := model.AdminJob{
CfID: changefeedID,
Type: model.AdminRemove,
Expand All @@ -90,8 +83,7 @@ func newListChangefeedCommand() *cobra.Command {
Use: "list",
Short: "List all replication tasks (changefeeds) in TiCDC cluster",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
_, raw, err := cdcEtcdCli.GetChangeFeeds(ctx)
if err != nil {
return err
Expand All @@ -111,8 +103,7 @@ func newQueryChangefeedCommand() *cobra.Command {
Use: "query",
Short: "Query information and status of a replicaiton task (changefeed)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
info, err := cdcEtcdCli.GetChangeFeedInfo(ctx, changefeedID)
if err != nil && errors.Cause(err) != model.ErrChangeFeedNotExists {
return err
Expand Down Expand Up @@ -152,8 +143,7 @@ func newCreateChangefeedCommand() *cobra.Command {
Short: "Create a new replication task (changefeed)",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
id := uuid.New().String()
if startTs == 0 {
ts, logical, err := pdCli.GetTS(ctx)
Expand Down Expand Up @@ -269,28 +259,15 @@ func newStatisticsChangefeedCommand() *cobra.Command {
Use: "statistics",
Short: "Periodically check and output the status of a replicaiton task (changefeed)",
RunE: func(cmd *cobra.Command, args []string) error {
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctx := defaultContext
tick := time.NewTicker(time.Duration(interval) * time.Second)
lastTime := time.Now()
var lastCount uint64
for {
select {
case sig := <-sc:
switch sig {
case syscall.SIGTERM:
cancel()
os.Exit(0)
default:
cancel()
os.Exit(1)
case <-ctx.Done():
if err := ctx.Err(); err != nil {
return err
}
case <-tick.C:
now := time.Now()
Expand Down
3 changes: 1 addition & 2 deletions cmd/client_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ func newDeleteMetaCommand() *cobra.Command {
Use: "delete",
Short: "Delete all meta data in etcd, confirm that you know what this command will do and use it at your own risk",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
err := cdcEtcdCli.ClearAllCDCInfo(ctx)
if err == nil {
cmd.Println("already truncate all meta in etcd!")
Expand Down
6 changes: 2 additions & 4 deletions cmd/client_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ func newListProcessorCommand() *cobra.Command {
Use: "list",
Short: "List all processors in TiCDC cluster",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
info, err := cdcEtcdCli.GetProcessors(ctx)
if err != nil {
return err
Expand All @@ -41,8 +40,7 @@ func newQueryProcessorCommand() *cobra.Command {
Use: "query",
Short: "Query information and status of a sub replication task (processor)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
_, status, err := cdcEtcdCli.GetTaskStatus(ctx, changefeedID, captureID)
if err != nil && errors.Cause(err) != model.ErrTaskStatusNotExists {
return err
Expand Down
3 changes: 1 addition & 2 deletions cmd/client_tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ func newQueryTsoCommand() *cobra.Command {
Use: "query",
Short: "Get tso from PD",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
ts, logic, err := pdCli.GetTS(ctx)
if err != nil {
return err
Expand Down
19 changes: 19 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package cmd

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -51,6 +54,22 @@ func initLog() error {

// Execute runs the root command
func Execute() {
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
defaultContext = ctx
go func() {
sig := <-sc
log.Info("got signal to exit", zap.Stringer("signal", sig))
cancel()
}()

if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down
20 changes: 1 addition & 19 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package cmd

import (
"context"
"os"
"os/signal"
"syscall"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -60,22 +57,7 @@ func runEServer(cmd *cobra.Command, args []string) error {
if err != nil {
return errors.Annotate(err, "new server")
}

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

ctx, cancel := context.WithCancel(context.Background())
go func() {
sig := <-sc
log.Info("got signal to exit", zap.Stringer("signal", sig))
cancel()
}()

err = server.Run(ctx)
err = server.Run(defaultContext)
if err != nil && errors.Cause(err) != context.Canceled {
log.Error("run server", zap.String("error", errors.ErrorStack(err)))
return errors.Annotate(err, "run server")
Expand Down
4 changes: 0 additions & 4 deletions cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ import (
"go.etcd.io/etcd/clientv3/concurrency"
)

func contextTimeout() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultContextTimeoutDuration)
}

func getAllCaptures(ctx context.Context) ([]*capture, error) {
_, raw, err := cdcEtcdCli.GetCaptures(ctx)
if err != nil {
Expand Down

0 comments on commit e15f5b7

Please sign in to comment.