Skip to content

Commit

Permalink
config: cherry-pick capture-session-ttl config into release-4.0 (#2169)
Browse files Browse the repository at this point in the history
* config: cherry-pick capture-session-ttl config into release-4.0

This is part of #1540

* fix unit test

* fix server config marshal test.

Co-authored-by: Ling Jin <JinLingChristopher@users.noreply.github.com>
Co-authored-by: JinlingChristopher <jinl1037@hotmail.com>
  • Loading branch information
3 people authored Jun 30, 2021
1 parent 5b5bc79 commit 7713b97
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 21 deletions.
2 changes: 1 addition & 1 deletion cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewCapture(
return nil, errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client")
}
sess, err := concurrency.NewSession(etcdCli,
concurrency.WithTTL(defaultCaptureSessionTTL))
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return nil, errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session")
}
Expand Down
6 changes: 1 addition & 5 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ import (
"golang.org/x/time/rate"
)

const (
defaultCaptureSessionTTL = 10
)

// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
type Capture struct {
captureMu sync.Mutex
Expand Down Expand Up @@ -95,7 +91,7 @@ func (c *Capture) reset() error {
c.session.Close() //nolint:errcheck
}
sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(),
concurrency.WithTTL(defaultCaptureSessionTTL))
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session")
}
Expand Down
10 changes: 5 additions & 5 deletions cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID s

func (s *ownerSuite) TestOwnerFlushChangeFeedInfos(c *check.C) {
defer testleak.AfterTest(c)()
session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(defaultCaptureSessionTTL))
session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL))
c.Assert(err, check.IsNil)
mockPDCli := &mockPDClient{}
mockOwner := Owner{
Expand Down Expand Up @@ -164,7 +164,7 @@ func (s *ownerSuite) TestOwnerFlushChangeFeedInfosFailed(c *check.C) {
},
}

session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(defaultCaptureSessionTTL))
session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL))
c.Assert(err, check.IsNil)
mockOwner := Owner{
session: session,
Expand Down Expand Up @@ -217,7 +217,7 @@ func (s *ownerSuite) TestTiKVGCLifeTimeLargeThanGCTTL(c *check.C) {
},
}

session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(defaultCaptureSessionTTL))
session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL))
c.Assert(err, check.IsNil)

mockOwner := Owner{
Expand Down Expand Up @@ -299,7 +299,7 @@ func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) {
}

session, err := concurrency.NewSession(s.client.Client.Unwrap(),
concurrency.WithTTL(defaultCaptureSessionTTL))
concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL))
c.Assert(err, check.IsNil)

mockOwner := Owner{
Expand Down Expand Up @@ -393,7 +393,7 @@ func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) {
}

session, err := concurrency.NewSession(s.client.Client.Unwrap(),
concurrency.WithTTL(defaultCaptureSessionTTL))
concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL))
c.Assert(err, check.IsNil)

mockOwner := Owner{
Expand Down
2 changes: 0 additions & 2 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ const (

// DefaultCDCGCSafePointTTL is the default value of cdc gc safe-point ttl, specified in seconds.
DefaultCDCGCSafePointTTL = 24 * 60 * 60

defaultCaptureSessionTTL = 10
)

// Server is the capture server
Expand Down
4 changes: 4 additions & 0 deletions cmd/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) {
LogLevel: "debug",
GcTTL: 10,
TZ: "UTC",
CaptureSessionTTL: 10,
OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond),
ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond),
Sorter: &config.SorterConfig{
Expand Down Expand Up @@ -131,6 +132,7 @@ log-level = "warn"
gc-ttl = 500
tz = "US"
capture-session-ttl = 10
owner-flush-interval = "600ms"
processor-flush-interval = "600ms"
Expand All @@ -157,6 +159,7 @@ sort-dir = "/tmp/just_a_test"
LogLevel: "warn",
GcTTL: 500,
TZ: "US",
CaptureSessionTTL: 10,
OwnerFlushInterval: config.TomlDuration(600 * time.Millisecond),
ProcessorFlushInterval: config.TomlDuration(600 * time.Millisecond),
Sorter: &config.SorterConfig{
Expand Down Expand Up @@ -206,6 +209,7 @@ cert-allowed-cn = ["dd","ee"]
LogLevel: "debug",
GcTTL: 10,
TZ: "UTC",
CaptureSessionTTL: 10,
OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond),
ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond),
Sorter: &config.SorterConfig{
Expand Down
25 changes: 19 additions & 6 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,17 @@ func GetDefaultReplicaConfig() *ReplicaConfig {
type SecurityConfig = security.Credential

var defaultServerConfig = &ServerConfig{
Addr: "127.0.0.1:8300",
AdvertiseAddr: "",
LogFile: "",
LogLevel: "info",
GcTTL: 24 * 60 * 60, // 24H
TZ: "System",
Addr: "127.0.0.1:8300",
AdvertiseAddr: "",
LogFile: "",
LogLevel: "info",
GcTTL: 24 * 60 * 60, // 24H
TZ: "System",
// The default election-timeout in PD is 3s and minimum session TTL is 5s,
// which is calculated by `math.Ceil(3 * election-timeout / 2)`, we choose
// default capture session ttl to 10s to increase robust to PD jitter,
// however it will decrease RTO when single TiCDC node error happens.
CaptureSessionTTL: 10,
OwnerFlushInterval: TomlDuration(200 * time.Millisecond),
ProcessorFlushInterval: TomlDuration(100 * time.Millisecond),
Sorter: &SorterConfig{
Expand All @@ -174,6 +179,8 @@ type ServerConfig struct {
GcTTL int64 `toml:"gc-ttl" json:"gc-ttl"`
TZ string `toml:"tz" json:"tz"`

CaptureSessionTTL int `toml:"capture-session-ttl" json:"capture-session-ttl"`

OwnerFlushInterval TomlDuration `toml:"owner-flush-interval" json:"owner-flush-interval"`
ProcessorFlushInterval TomlDuration `toml:"processor-flush-interval" json:"processor-flush-interval"`

Expand Down Expand Up @@ -245,6 +252,12 @@ func (c *ServerConfig) ValidateAndAdjust() error {
return cerror.ErrInvalidServerOption.GenWithStack("empty GC TTL is not allowed")
}

// 5s is minimum lease ttl in etcd(PD)
if c.CaptureSessionTTL < 5 {
log.Warn("capture session ttl too small, set to default value 10s")
c.CaptureSessionTTL = 10
}

if c.Security != nil && c.Security.IsTLSEnabled() {
var err error
_, err = c.Security.ToTLSConfig()
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ func (s *serverConfigSuite) TestMarshal(c *check.C) {
conf.Sorter.ChunkSizeLimit = 999
b, err := conf.Marshal()
c.Assert(err, check.IsNil)
c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`)
c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`)
conf2 := new(ServerConfig)
err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`))
err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`))
c.Assert(err, check.IsNil)
c.Assert(conf2, check.DeepEquals, conf)
}
Expand Down

0 comments on commit 7713b97

Please sign in to comment.