Skip to content

Commit

Permalink
redo(ticdc): add meta flush interval configuration (#9959) (#9972)
Browse files Browse the repository at this point in the history
close #9960
  • Loading branch information
ti-chi-bot authored Dec 4, 2023
1 parent 1fde95e commit fa92ec1
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 72 deletions.
33 changes: 18 additions & 15 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
}
if c.Consistent != nil {
res.Consistent = &config.ConsistentConfig{
Level: c.Consistent.Level,
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
Level: c.Consistent.Level,
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: c.Consistent.MetaFlushIntervalInMs,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
}
}
if c.Sink != nil {
Expand Down Expand Up @@ -627,11 +628,12 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
Level: cloned.Consistent.Level,
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
Level: cloned.Consistent.Level,
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: cloned.Consistent.MetaFlushIntervalInMs,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
}
}
if cloned.Mounter != nil {
Expand Down Expand Up @@ -807,11 +809,12 @@ type ColumnSelector struct {
// ConsistentConfig represents replication consistency config for a changefeed
// This is a duplicate of config.ConsistentConfig
type ConsistentConfig struct {
Level string `json:"level"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
Storage string `json:"storage"`
UseFileBackend bool `json:"use_file_backend"`
Level string `json:"level,omitempty"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
MetaFlushIntervalInMs int64 `json:"meta_flush_interval"`
Storage string `json:"storage,omitempty"`
UseFileBackend bool `json:"use_file_backend"`
}

// ChangefeedSchedulerConfig is per changefeed scheduler settings.
Expand Down
11 changes: 6 additions & 5 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ var defaultAPIConfig = &ReplicaConfig{
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
},
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: config.GetDefaultReplicaConfig().
Expand Down
7 changes: 4 additions & 3 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,10 @@ func TestRemoveChangefeed(t *testing.T) {
info := ctx.ChangefeedVars().Info
dir := t.TempDir()
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Expand Down
11 changes: 6 additions & 5 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ func newProcessor4Test(
tmpDir := t.TempDir()
redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID)
dmlMgr := redo.NewDMLManager(changefeedID, &config.ConsistentConfig{
Level: string(redoPkg.ConsistentLevelEventual),
MaxLogSize: redoPkg.DefaultMaxLogSize,
FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs,
Storage: "file://" + redoDir,
UseFileBackend: false,
Level: string(redoPkg.ConsistentLevelEventual),
MaxLogSize: redoPkg.DefaultMaxLogSize,
FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redoPkg.DefaultMetaFlushIntervalInMs,
Storage: "file://" + redoDir,
UseFileBackend: false,
})
p.redo.r = dmlMgr
}
Expand Down
22 changes: 15 additions & 7 deletions cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type metaManager struct {
startTs model.Ts

lastFlushTime time.Time
flushIntervalInMs int64
cfg *config.ConsistentConfig
metricFlushLogDuration prometheus.Observer
}
Expand All @@ -94,12 +95,19 @@ func NewMetaManager(
}

m := &metaManager{
captureID: config.GetGlobalServerConfig().AdvertiseAddr,
changeFeedID: changefeedID,
uuidGenerator: uuid.NewGenerator(),
enabled: true,
cfg: cfg,
startTs: checkpoint,
captureID: config.GetGlobalServerConfig().AdvertiseAddr,
changeFeedID: changefeedID,
uuidGenerator: uuid.NewGenerator(),
enabled: true,
cfg: cfg,
startTs: checkpoint,
flushIntervalInMs: cfg.MetaFlushIntervalInMs,
}

if m.flushIntervalInMs < redo.MinFlushIntervalInMs {
log.Warn("redo flush interval is too small, use default value",
zap.Int64("interval", m.flushIntervalInMs))
m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs
}

return m
Expand Down Expand Up @@ -158,7 +166,7 @@ func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error {
}
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
return m.bgFlushMeta(egCtx, m.cfg.FlushIntervalInMs)
return m.bgFlushMeta(egCtx, m.flushIntervalInMs)
})
eg.Go(func() error {
return m.bgGC(egCtx)
Expand Down
27 changes: 15 additions & 12 deletions cdc/redo/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ func TestInitAndWriteMeta(t *testing.T) {

startTs := uint64(10)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
m := NewMetaManager(changefeedID, cfg, startTs)

Expand Down Expand Up @@ -146,10 +147,11 @@ func TestPreCleanupAndWriteMeta(t *testing.T) {

startTs := uint64(10)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
m := NewMetaManager(changefeedID, cfg, startTs)

Expand Down Expand Up @@ -274,10 +276,11 @@ func TestGCAndCleanup(t *testing.T) {

startTs := uint64(3)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
m := NewMetaManager(changefeedID, cfg, startTs)

Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
"level": "none",
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"storage": "",
"use-file-backend": false
},
Expand Down Expand Up @@ -277,6 +278,7 @@ const (
"level": "none",
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"storage": "",
"use-file-backend": false
},
Expand Down Expand Up @@ -410,6 +412,7 @@ const (
"level": "none",
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"storage": "",
"use-file-backend": false
},
Expand Down
20 changes: 15 additions & 5 deletions pkg/config/consistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (

// ConsistentConfig represents replication consistency config for a changefeed.
type ConsistentConfig struct {
Level string `toml:"level" json:"level"`
MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"`
FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"`
Storage string `toml:"storage" json:"storage"`
UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"`
Level string `toml:"level" json:"level"`
MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"`
FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"`
MetaFlushIntervalInMs int64 `toml:"meta-flush-interval" json:"meta-flush-interval"`
Storage string `toml:"storage" json:"storage"`
UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"`
}

// ValidateAndAdjust validates the consistency config and adjusts it if necessary.
Expand All @@ -50,6 +51,15 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
c.FlushIntervalInMs, redo.MinFlushIntervalInMs))
}

if c.MetaFlushIntervalInMs == 0 {
c.MetaFlushIntervalInMs = redo.DefaultMetaFlushIntervalInMs
}
if c.MetaFlushIntervalInMs < redo.MinFlushIntervalInMs {
return cerror.ErrInvalidReplicaConfig.FastGenByArgs(
fmt.Sprintf("The consistent.meta-flush-interval:%d must be equal or greater than %d",
c.MetaFlushIntervalInMs, redo.MinFlushIntervalInMs))
}

uri, err := storage.ParseRawURL(c.Storage)
if err != nil {
return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs(
Expand Down
11 changes: 6 additions & 5 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ var defaultReplicaConfig = &ReplicaConfig{
AdvanceTimeoutInSec: util.AddressOf(DefaultAdvanceTimeoutInSec),
},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: redo.DefaultMaxLogSize,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
Level: "none",
MaxLogSize: redo.DefaultMaxLogSize,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
},
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: false,
Expand Down
2 changes: 2 additions & 0 deletions pkg/redo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
FlushWarnDuration = time.Second * 20
// DefaultFlushIntervalInMs is the default flush interval for redo log.
DefaultFlushIntervalInMs = 2000
// DefaultMetaFlushIntervalInMs is the default flush interval for redo meta.
DefaultMetaFlushIntervalInMs = 200
// MinFlushIntervalInMs is the minimum flush interval for redo log.
MinFlushIntervalInMs = 50

Expand Down
22 changes: 12 additions & 10 deletions tests/integration_tests/api_v2/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,12 @@ var customReplicaConfig = &ReplicaConfig{
EnablePartitionSeparator: true,
},
Consistent: &ConsistentConfig{
Level: "",
MaxLogSize: 65,
FlushIntervalInMs: 500,
Storage: "local://test",
UseFileBackend: true,
Level: "",
MaxLogSize: 65,
MetaFlushIntervalInMs: 201,
FlushIntervalInMs: 500,
Storage: "local://test",
UseFileBackend: true,
},
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: false,
Expand Down Expand Up @@ -133,11 +134,12 @@ var defaultReplicaConfig = &ReplicaConfig{
EnablePartitionSeparator: true,
},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: redo.DefaultMaxLogSize,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
Level: "none",
MaxLogSize: redo.DefaultMaxLogSize,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
},
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: false,
Expand Down
11 changes: 6 additions & 5 deletions tests/integration_tests/api_v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,12 @@ type ColumnSelector struct {
// ConsistentConfig represents replication consistency config for a changefeed
// This is a duplicate of config.ConsistentConfig
type ConsistentConfig struct {
Level string `json:"level"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
Storage string `json:"storage"`
UseFileBackend bool `json:"use_file_backend"`
Level string `json:"level"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
MetaFlushIntervalInMs int64 `json:"meta_flush_interval"`
Storage string `json:"storage"`
UseFileBackend bool `json:"use_file_backend"`
}

// ChangefeedSchedulerConfig is per changefeed scheduler settings.
Expand Down

0 comments on commit fa92ec1

Please sign in to comment.