Skip to content

Commit

Permalink
redo(ticdc): add meta flush interval configuration (#9959)
Browse files Browse the repository at this point in the history
close #9960
  • Loading branch information
CharlesCheung96 committed Oct 31, 2023
1 parent 4febedd commit 963a471
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 51 deletions.
33 changes: 18 additions & 15 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
}
if c.Consistent != nil {
res.Consistent = &config.ConsistentConfig{
Level: c.Consistent.Level,
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
Level: c.Consistent.Level,
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: c.Consistent.MetaFlushIntervalInMs,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
}
}
if c.Sink != nil {
Expand Down Expand Up @@ -455,11 +456,12 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
Level: cloned.Consistent.Level,
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
Level: cloned.Consistent.Level,
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: cloned.Consistent.MetaFlushIntervalInMs,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
}
}
if cloned.Mounter != nil {
Expand Down Expand Up @@ -627,11 +629,12 @@ type ColumnSelector struct {
// ConsistentConfig represents replication consistency config for a changefeed
// This is a duplicate of config.ConsistentConfig
type ConsistentConfig struct {
Level string `json:"level"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
Storage string `json:"storage"`
UseFileBackend bool `json:"use_file_backend"`
Level string `json:"level,omitempty"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
MetaFlushIntervalInMs int64 `json:"meta_flush_interval"`
Storage string `json:"storage,omitempty"`
UseFileBackend bool `json:"use_file_backend"`
}

// ChangefeedSchedulerConfig is per changefeed scheduler settings.
Expand Down
11 changes: 6 additions & 5 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ var defaultAPIConfig = &ReplicaConfig{
AdvanceTimeoutInSec: config.DefaultAdvanceTimeoutInSec,
},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
},
}

Expand Down
7 changes: 4 additions & 3 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,10 @@ func TestRemoveChangefeed(t *testing.T) {
info := ctx.ChangefeedVars().Info
dir := t.TempDir()
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Expand Down
8 changes: 7 additions & 1 deletion cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@ func NewMetaManager(ctx context.Context, cfg *config.ConsistentConfig) (*metaMan
changeFeedID: contextutil.ChangefeedIDFromCtx(ctx),
uuidGenerator: uuid.NewGenerator(),
enabled: true,
flushIntervalInMs: cfg.FlushIntervalInMs,
flushIntervalInMs: cfg.MetaFlushIntervalInMs,
}

if m.flushIntervalInMs < redo.MinFlushIntervalInMs {
log.Warn("redo flush interval is too small, use default value",
zap.Int64("interval", m.flushIntervalInMs))
m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs
}

uri, err := storage.ParseRawURL(cfg.Storage)
Expand Down
27 changes: 15 additions & 12 deletions cdc/redo/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ func TestInitAndWriteMeta(t *testing.T) {

startTs := uint64(10)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
m, err := NewMetaManagerWithInit(ctx, cfg, startTs)
require.NoError(t, err)
Expand Down Expand Up @@ -136,10 +137,11 @@ func TestPreCleanupAndWriteMeta(t *testing.T) {

startTs := uint64(10)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
m, err := NewMetaManagerWithInit(ctx, cfg, startTs)
require.NoError(t, err)
Expand Down Expand Up @@ -267,10 +269,11 @@ func TestGCAndCleanup(t *testing.T) {

startTs := uint64(3)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
m, err := NewMetaManagerWithInit(ctx, cfg, startTs)
require.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
"level": "none",
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"storage": "",
"use-file-backend": false
}
Expand Down Expand Up @@ -211,6 +212,7 @@ const (
"level": "none",
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"storage": "",
"use-file-backend": false
}
Expand Down Expand Up @@ -271,6 +273,7 @@ const (
"level": "none",
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"storage": "",
"use-file-backend": false
}
Expand Down
20 changes: 15 additions & 5 deletions pkg/config/consistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (

// ConsistentConfig represents replication consistency config for a changefeed.
type ConsistentConfig struct {
Level string `toml:"level" json:"level"`
MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"`
FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"`
Storage string `toml:"storage" json:"storage"`
UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"`
Level string `toml:"level" json:"level"`
MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"`
FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"`
MetaFlushIntervalInMs int64 `toml:"meta-flush-interval" json:"meta-flush-interval"`
Storage string `toml:"storage" json:"storage"`
UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"`
}

// ValidateAndAdjust validates the consistency config and adjusts it if necessary.
Expand All @@ -50,6 +51,15 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
c.FlushIntervalInMs, redo.MinFlushIntervalInMs))
}

if c.MetaFlushIntervalInMs == 0 {
c.MetaFlushIntervalInMs = redo.DefaultMetaFlushIntervalInMs
}
if c.MetaFlushIntervalInMs < redo.MinFlushIntervalInMs {
return cerror.ErrInvalidReplicaConfig.FastGenByArgs(
fmt.Sprintf("The consistent.meta-flush-interval:%d must be equal or greater than %d",
c.MetaFlushIntervalInMs, redo.MinFlushIntervalInMs))
}

uri, err := storage.ParseRawURL(c.Storage)
if err != nil {
return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs(
Expand Down
11 changes: 6 additions & 5 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,12 @@ var defaultReplicaConfig = &ReplicaConfig{
AdvanceTimeoutInSec: DefaultAdvanceTimeoutInSec,
},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: redo.DefaultMaxLogSize,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
Level: "none",
MaxLogSize: redo.DefaultMaxLogSize,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
},
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/redo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
FlushWarnDuration = time.Second * 20
// DefaultFlushIntervalInMs is the default flush interval for redo log.
DefaultFlushIntervalInMs = 2000
// DefaultMetaFlushIntervalInMs is the default flush interval for redo meta.
DefaultMetaFlushIntervalInMs = 200
// MinFlushIntervalInMs is the minimum flush interval for redo log.
MinFlushIntervalInMs = 50

Expand Down
11 changes: 6 additions & 5 deletions tests/integration_tests/api_v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,12 @@ type ColumnSelector struct {
// ConsistentConfig represents replication consistency config for a changefeed
// This is a duplicate of config.ConsistentConfig
type ConsistentConfig struct {
Level string `json:"level"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
Storage string `json:"storage"`
UseFileBackend bool `json:"use_file_backend"`
Level string `json:"level"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
MetaFlushIntervalInMs int64 `json:"meta_flush_interval"`
Storage string `json:"storage"`
UseFileBackend bool `json:"use_file_backend"`
}

// ChangefeedSchedulerConfig is per changefeed scheduler settings.
Expand Down

0 comments on commit 963a471

Please sign in to comment.