From 78b49e80fdc1f5eba8e01bdb6053803d395d1401 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 21 May 2018 16:06:55 +0800 Subject: [PATCH] sessionctx/binloginfo: add a timeout for writing binlog (#6588) --- config/config.go | 4 ++++ config/config.toml.example | 3 +++ sessionctx/binloginfo/binloginfo.go | 15 ++++++++++++++- tidb-server/main.go | 1 + 4 files changed, 22 insertions(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index efe2d18fc755a..5b1d7c46712ec 100644 --- a/config/config.go +++ b/config/config.go @@ -223,6 +223,7 @@ type TiKVClient struct { // Binlog is the config for binlog. type Binlog struct { BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"` + WriteTimeout string `toml:"write-timeout" json:"write-timeout"` // If IgnoreError is true, when writting binlog meets error, TiDB would // ignore the error. IgnoreError bool `toml:"ignore-error" json:"ignore-error"` @@ -296,6 +297,9 @@ var defaultConf = Config{ GrpcConnectionCount: 16, CommitTimeout: "41s", }, + Binlog: Binlog{ + WriteTimeout: "15s", + }, } var globalConf = defaultConf diff --git a/config/config.toml.example b/config/config.toml.example index 2f6f1dc5102be..e1db9f7686b21 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -220,6 +220,9 @@ commit-timeout = "41s" # Socket file to write binlog. binlog-socket = "" +# WriteTimeout specifies how long it will wait for writing binlog to pump. +write-timeout = "15s" + # If IgnoreError is true, when writting binlog meets error, TiDB would stop writting binlog, # but still provide service. ignore-error = false \ No newline at end of file diff --git a/sessionctx/binloginfo/binloginfo.go b/sessionctx/binloginfo/binloginfo.go index cfbc38b7e7a64..5f70cc99f749d 100644 --- a/sessionctx/binloginfo/binloginfo.go +++ b/sessionctx/binloginfo/binloginfo.go @@ -35,6 +35,8 @@ func init() { grpc.EnableTracing = false } +var binlogWriteTimeout = 15 * time.Second + // pumpClient is the gRPC client to write binlog, it is opened on server start and never close, // shared by all sessions. var pumpClient binlog.PumpClient @@ -54,6 +56,15 @@ func GetPumpClient() binlog.PumpClient { return client } +// SetGRPCTimeout sets grpc timeout for writing binlog. +func SetGRPCTimeout(timeout time.Duration) { + if timeout < 300*time.Millisecond { + log.Warnf("set binlog grpc timeout %s ignored, use default value %s", timeout, binlogWriteTimeout) + return // Avoid invalid value + } + binlogWriteTimeout = timeout +} + // SetPumpClient sets the pump client instance. func SetPumpClient(client binlog.PumpClient) { pumpClientLock.Lock() @@ -109,7 +120,9 @@ func (info *BinlogInfo) WriteBinlog(clusterID uint64) error { // Retry many times because we may raise CRITICAL error here. for i := 0; i < 20; i++ { var resp *binlog.WriteBinlogResp - resp, err = info.Client.WriteBinlog(context.Background(), req) + ctx, cancel := context.WithTimeout(context.Background(), binlogWriteTimeout) + resp, err = info.Client.WriteBinlog(ctx, req) + cancel() if err == nil && resp.Errmsg != "" { err = errors.New(resp.Errmsg) } diff --git a/tidb-server/main.go b/tidb-server/main.go index f777b2dfef146..770ac7257124d 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -176,6 +176,7 @@ func setupBinlogClient() { if cfg.Binlog.IgnoreError { binloginfo.SetIgnoreError(true) } + binloginfo.SetGRPCTimeout(parseDuration(cfg.Binlog.WriteTimeout)) binloginfo.SetPumpClient(binlog.NewPumpClient(clientConn)) log.Infof("created binlog client at %s, ignore error %v", cfg.Binlog.BinlogSocket, cfg.Binlog.IgnoreError) }