Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: cherry-pick capture-session-ttl config into release-4.0 #2169

Merged
merged 4 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewCapture(
return nil, errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client")
}
sess, err := concurrency.NewSession(etcdCli,
concurrency.WithTTL(defaultCaptureSessionTTL))
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return nil, errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session")
}
Expand Down
6 changes: 1 addition & 5 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ import (
"golang.org/x/time/rate"
)

const (
defaultCaptureSessionTTL = 10
)

// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
type Capture struct {
captureMu sync.Mutex
Expand Down Expand Up @@ -95,7 +91,7 @@ func (c *Capture) reset() error {
c.session.Close() //nolint:errcheck
}
sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(),
concurrency.WithTTL(defaultCaptureSessionTTL))
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session")
}
Expand Down
10 changes: 5 additions & 5 deletions cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID s

func (s *ownerSuite) TestOwnerFlushChangeFeedInfos(c *check.C) {
defer testleak.AfterTest(c)()
session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(defaultCaptureSessionTTL))
session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL))
c.Assert(err, check.IsNil)
mockPDCli := &mockPDClient{}
mockOwner := Owner{
Expand Down Expand Up @@ -164,7 +164,7 @@ func (s *ownerSuite) TestOwnerFlushChangeFeedInfosFailed(c *check.C) {
},
}

session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(defaultCaptureSessionTTL))
session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL))
c.Assert(err, check.IsNil)
mockOwner := Owner{
session: session,
Expand Down Expand Up @@ -217,7 +217,7 @@ func (s *ownerSuite) TestTiKVGCLifeTimeLargeThanGCTTL(c *check.C) {
},
}

session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(defaultCaptureSessionTTL))
session, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL))
c.Assert(err, check.IsNil)

mockOwner := Owner{
Expand Down Expand Up @@ -299,7 +299,7 @@ func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) {
}

session, err := concurrency.NewSession(s.client.Client.Unwrap(),
concurrency.WithTTL(defaultCaptureSessionTTL))
concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL))
c.Assert(err, check.IsNil)

mockOwner := Owner{
Expand Down Expand Up @@ -393,7 +393,7 @@ func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) {
}

session, err := concurrency.NewSession(s.client.Client.Unwrap(),
concurrency.WithTTL(defaultCaptureSessionTTL))
concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL))
c.Assert(err, check.IsNil)

mockOwner := Owner{
Expand Down
2 changes: 0 additions & 2 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ const (

// DefaultCDCGCSafePointTTL is the default value of cdc gc safe-point ttl, specified in seconds.
DefaultCDCGCSafePointTTL = 24 * 60 * 60

defaultCaptureSessionTTL = 10
)

// Server is the capture server
Expand Down
4 changes: 4 additions & 0 deletions cmd/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) {
LogLevel: "debug",
GcTTL: 10,
TZ: "UTC",
CaptureSessionTTL: 10,
OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond),
ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond),
Sorter: &config.SorterConfig{
Expand Down Expand Up @@ -131,6 +132,7 @@ log-level = "warn"

gc-ttl = 500
tz = "US"
capture-session-ttl = 10

owner-flush-interval = "600ms"
processor-flush-interval = "600ms"
Expand All @@ -157,6 +159,7 @@ sort-dir = "/tmp/just_a_test"
LogLevel: "warn",
GcTTL: 500,
TZ: "US",
CaptureSessionTTL: 10,
OwnerFlushInterval: config.TomlDuration(600 * time.Millisecond),
ProcessorFlushInterval: config.TomlDuration(600 * time.Millisecond),
Sorter: &config.SorterConfig{
Expand Down Expand Up @@ -206,6 +209,7 @@ cert-allowed-cn = ["dd","ee"]
LogLevel: "debug",
GcTTL: 10,
TZ: "UTC",
CaptureSessionTTL: 10,
OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond),
ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond),
Sorter: &config.SorterConfig{
Expand Down
25 changes: 19 additions & 6 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,17 @@ func GetDefaultReplicaConfig() *ReplicaConfig {
type SecurityConfig = security.Credential

var defaultServerConfig = &ServerConfig{
Addr: "127.0.0.1:8300",
AdvertiseAddr: "",
LogFile: "",
LogLevel: "info",
GcTTL: 24 * 60 * 60, // 24H
TZ: "System",
Addr: "127.0.0.1:8300",
AdvertiseAddr: "",
LogFile: "",
LogLevel: "info",
GcTTL: 24 * 60 * 60, // 24H
TZ: "System",
// The default election-timeout in PD is 3s and minimum session TTL is 5s,
// which is calculated by `math.Ceil(3 * election-timeout / 2)`, we choose
// default capture session ttl to 10s to increase robust to PD jitter,
// however it will decrease RTO when single TiCDC node error happens.
CaptureSessionTTL: 10,
OwnerFlushInterval: TomlDuration(200 * time.Millisecond),
ProcessorFlushInterval: TomlDuration(100 * time.Millisecond),
Sorter: &SorterConfig{
Expand All @@ -174,6 +179,8 @@ type ServerConfig struct {
GcTTL int64 `toml:"gc-ttl" json:"gc-ttl"`
TZ string `toml:"tz" json:"tz"`

CaptureSessionTTL int `toml:"capture-session-ttl" json:"capture-session-ttl"`

OwnerFlushInterval TomlDuration `toml:"owner-flush-interval" json:"owner-flush-interval"`
ProcessorFlushInterval TomlDuration `toml:"processor-flush-interval" json:"processor-flush-interval"`

Expand Down Expand Up @@ -245,6 +252,12 @@ func (c *ServerConfig) ValidateAndAdjust() error {
return cerror.ErrInvalidServerOption.GenWithStack("empty GC TTL is not allowed")
}

// 5s is minimum lease ttl in etcd(PD)
if c.CaptureSessionTTL < 5 {
log.Warn("capture session ttl too small, set to default value 10s")
c.CaptureSessionTTL = 10
}

if c.Security != nil && c.Security.IsTLSEnabled() {
var err error
_, err = c.Security.ToTLSConfig()
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ func (s *serverConfigSuite) TestMarshal(c *check.C) {
conf.Sorter.ChunkSizeLimit = 999
b, err := conf.Marshal()
c.Assert(err, check.IsNil)
c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`)
c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`)
conf2 := new(ServerConfig)
err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`))
err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`))
c.Assert(err, check.IsNil)
c.Assert(conf2, check.DeepEquals, conf)
}
Expand Down