diff --git a/cdc/capture.go b/cdc/capture.go index 35cefdc187e..e0be0c7466d 100644 --- a/cdc/capture.go +++ b/cdc/capture.go @@ -108,7 +108,7 @@ func NewCapture( return nil, errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client") } sess, err := concurrency.NewSession(etcdCli, - concurrency.WithTTL(defaultCaptureSessionTTL)) + concurrency.WithTTL(conf.CaptureSessionTTL)) if err != nil { return nil, errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session") } diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 01e1eafc514..41d9e5fec8b 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -41,10 +41,6 @@ import ( "golang.org/x/time/rate" ) -const ( - defaultCaptureSessionTTL = 10 -) - // Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it. type Capture struct { captureMu sync.Mutex @@ -95,7 +91,7 @@ func (c *Capture) reset() error { c.session.Close() //nolint:errcheck } sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(), - concurrency.WithTTL(defaultCaptureSessionTTL)) + concurrency.WithTTL(conf.CaptureSessionTTL)) if err != nil { return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session") } diff --git a/cdc/owner_test.go b/cdc/owner_test.go index 02168fcacab..25f86f4af07 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -123,7 +123,7 @@ func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID s func (s *ownerSuite) TestOwnerFlushChangeFeedInfos(c *check.C) { defer testleak.AfterTest(c)() - session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(defaultCaptureSessionTTL)) + session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL)) c.Assert(err, check.IsNil) mockPDCli := &mockPDClient{} mockOwner := Owner{ @@ -164,7 +164,7 @@ func (s *ownerSuite) TestOwnerFlushChangeFeedInfosFailed(c *check.C) { }, } - session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(defaultCaptureSessionTTL)) + session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL)) c.Assert(err, check.IsNil) mockOwner := Owner{ session: session, @@ -217,7 +217,7 @@ func (s *ownerSuite) TestTiKVGCLifeTimeLargeThanGCTTL(c *check.C) { }, } - session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(defaultCaptureSessionTTL)) + session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL)) c.Assert(err, check.IsNil) mockOwner := Owner{ @@ -299,7 +299,7 @@ func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) { } session, err := concurrency.NewSession(s.client.Client.Unwrap(), - concurrency.WithTTL(defaultCaptureSessionTTL)) + concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL)) c.Assert(err, check.IsNil) mockOwner := Owner{ @@ -393,7 +393,7 @@ func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) { } session, err := concurrency.NewSession(s.client.Client.Unwrap(), - concurrency.WithTTL(defaultCaptureSessionTTL)) + concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL)) c.Assert(err, check.IsNil) mockOwner := Owner{ diff --git a/cdc/server.go b/cdc/server.go index d08356bbde5..9c2a85bf99b 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -50,8 +50,6 @@ const ( // DefaultCDCGCSafePointTTL is the default value of cdc gc safe-point ttl, specified in seconds. DefaultCDCGCSafePointTTL = 24 * 60 * 60 - - defaultCaptureSessionTTL = 10 ) // Server is the capture server diff --git a/cmd/server_test.go b/cmd/server_test.go index ef0bc440cc9..c79565af172 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -101,6 +101,7 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { LogLevel: "debug", GcTTL: 10, TZ: "UTC", + CaptureSessionTTL: 10, OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond), ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond), Sorter: &config.SorterConfig{ @@ -131,6 +132,7 @@ log-level = "warn" gc-ttl = 500 tz = "US" +capture-session-ttl = 10 owner-flush-interval = "600ms" processor-flush-interval = "600ms" @@ -157,6 +159,7 @@ sort-dir = "/tmp/just_a_test" LogLevel: "warn", GcTTL: 500, TZ: "US", + CaptureSessionTTL: 10, OwnerFlushInterval: config.TomlDuration(600 * time.Millisecond), ProcessorFlushInterval: config.TomlDuration(600 * time.Millisecond), Sorter: &config.SorterConfig{ @@ -206,6 +209,7 @@ cert-allowed-cn = ["dd","ee"] LogLevel: "debug", GcTTL: 10, TZ: "UTC", + CaptureSessionTTL: 10, OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond), ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond), Sorter: &config.SorterConfig{ diff --git a/pkg/config/config.go b/pkg/config/config.go index 33836bec4fb..e937519768c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -143,12 +143,17 @@ func GetDefaultReplicaConfig() *ReplicaConfig { type SecurityConfig = security.Credential var defaultServerConfig = &ServerConfig{ - Addr: "127.0.0.1:8300", - AdvertiseAddr: "", - LogFile: "", - LogLevel: "info", - GcTTL: 24 * 60 * 60, // 24H - TZ: "System", + Addr: "127.0.0.1:8300", + AdvertiseAddr: "", + LogFile: "", + LogLevel: "info", + GcTTL: 24 * 60 * 60, // 24H + TZ: "System", + // The default election-timeout in PD is 3s and minimum session TTL is 5s, + // which is calculated by `math.Ceil(3 * election-timeout / 2)`, we choose + // default capture session ttl to 10s to increase robust to PD jitter, + // however it will decrease RTO when single TiCDC node error happens. + CaptureSessionTTL: 10, OwnerFlushInterval: TomlDuration(200 * time.Millisecond), ProcessorFlushInterval: TomlDuration(100 * time.Millisecond), Sorter: &SorterConfig{ @@ -174,6 +179,8 @@ type ServerConfig struct { GcTTL int64 `toml:"gc-ttl" json:"gc-ttl"` TZ string `toml:"tz" json:"tz"` + CaptureSessionTTL int `toml:"capture-session-ttl" json:"capture-session-ttl"` + OwnerFlushInterval TomlDuration `toml:"owner-flush-interval" json:"owner-flush-interval"` ProcessorFlushInterval TomlDuration `toml:"processor-flush-interval" json:"processor-flush-interval"` @@ -245,6 +252,12 @@ func (c *ServerConfig) ValidateAndAdjust() error { return cerror.ErrInvalidServerOption.GenWithStack("empty GC TTL is not allowed") } + // 5s is minimum lease ttl in etcd(PD) + if c.CaptureSessionTTL < 5 { + log.Warn("capture session ttl too small, set to default value 10s") + c.CaptureSessionTTL = 10 + } + if c.Security != nil && c.Security.IsTLSEnabled() { var err error _, err = c.Security.ToTLSConfig() diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 01467a32e57..d429e825a8d 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -86,9 +86,9 @@ func (s *serverConfigSuite) TestMarshal(c *check.C) { conf.Sorter.ChunkSizeLimit = 999 b, err := conf.Marshal() c.Assert(err, check.IsNil) - c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`) + c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`) conf2 := new(ServerConfig) - err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`)) + err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`)) c.Assert(err, check.IsNil) c.Assert(conf2, check.DeepEquals, conf) }