Skip to content

Commit

Permalink
redo(ticdc): add meta flush interval configuration (#9959) (#9973)
Browse files Browse the repository at this point in the history
close #9960, close #9998
  • Loading branch information
ti-chi-bot authored Nov 7, 2023
1 parent 87785f1 commit fc2d31d
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 79 deletions.
33 changes: 18 additions & 15 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
}
if c.Consistent != nil {
res.Consistent = &config.ConsistentConfig{
Level: c.Consistent.Level,
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
Level: c.Consistent.Level,
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: c.Consistent.MetaFlushIntervalInMs,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
}
}
if c.Sink != nil {
Expand Down Expand Up @@ -741,11 +742,12 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
Level: cloned.Consistent.Level,
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
Level: cloned.Consistent.Level,
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: cloned.Consistent.MetaFlushIntervalInMs,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
}
}
if cloned.Mounter != nil {
Expand Down Expand Up @@ -933,11 +935,12 @@ type ColumnSelector struct {
// ConsistentConfig represents replication consistency config for a changefeed
// This is a duplicate of config.ConsistentConfig
type ConsistentConfig struct {
Level string `json:"level,omitempty"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
Storage string `json:"storage,omitempty"`
UseFileBackend bool `json:"use_file_backend"`
Level string `json:"level,omitempty"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
MetaFlushIntervalInMs int64 `json:"meta_flush_interval"`
Storage string `json:"storage,omitempty"`
UseFileBackend bool `json:"use_file_backend"`
}

// ChangefeedSchedulerConfig is per changefeed scheduler settings.
Expand Down
11 changes: 6 additions & 5 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ var defaultAPIConfig = &ReplicaConfig{
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
},
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: config.GetDefaultReplicaConfig().
Expand Down
7 changes: 4 additions & 3 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,10 @@ func TestRemoveChangefeed(t *testing.T) {
dir := t.TempDir()
info.SinkURI = "mysql://"
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Expand Down
11 changes: 6 additions & 5 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,12 @@ func newProcessor4Test(
tmpDir := t.TempDir()
redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID)
dmlMgr := redo.NewDMLManager(changefeedID, &config.ConsistentConfig{
Level: string(redoPkg.ConsistentLevelEventual),
MaxLogSize: redoPkg.DefaultMaxLogSize,
FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs,
Storage: "file://" + redoDir,
UseFileBackend: false,
Level: string(redoPkg.ConsistentLevelEventual),
MaxLogSize: redoPkg.DefaultMaxLogSize,
FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redoPkg.DefaultMetaFlushIntervalInMs,
Storage: "file://" + redoDir,
UseFileBackend: false,
})
p.redo.r = dmlMgr
}
Expand Down
17 changes: 12 additions & 5 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,15 @@ func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error {
return err
}
m.writer = w
return m.bgUpdateLog(ctx)
return m.bgUpdateLog(ctx, m.getFlushDuration())
}

func (m *logManager) getFlushDuration() time.Duration {
flushIntervalInMs := m.cfg.FlushIntervalInMs
if m.cfg.LogType == redo.RedoDDLLogFileType {
flushIntervalInMs = m.cfg.MetaFlushIntervalInMs
}
return time.Duration(flushIntervalInMs) * time.Millisecond
}

// WaitForReady implements pkg/util.Runnable.
Expand Down Expand Up @@ -483,15 +491,14 @@ func (m *logManager) onResolvedTsMsg(span tablepb.Span, resolvedTs model.Ts) {
}
}

func (m *logManager) bgUpdateLog(ctx context.Context) error {
func (m *logManager) bgUpdateLog(ctx context.Context, flushDuration time.Duration) error {
m.releaseMemoryCbs = make([]func(), 0, 1024)
flushIntervalInMs := m.cfg.FlushIntervalInMs
ticker := time.NewTicker(time.Duration(flushIntervalInMs) * time.Millisecond)
ticker := time.NewTicker(flushDuration)
defer ticker.Stop()
log.Info("redo manager bgUpdateLog is running",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Int64("flushIntervalInMs", flushIntervalInMs))
zap.Duration("flushIntervalInMs", flushDuration))

var err error
// logErrCh is used to retrieve errors from log flushing goroutines.
Expand Down
42 changes: 23 additions & 19 deletions cdc/redo/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,12 @@ func TestLogManagerInProcessor(t *testing.T) {
testWriteDMLs := func(storage string, useFileBackend bool) {
ctx, cancel := context.WithCancel(ctx)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
Expand Down Expand Up @@ -221,11 +222,12 @@ func TestLogManagerInOwner(t *testing.T) {
testWriteDDLs := func(storage string, useFileBackend bool) {
ctx, cancel := context.WithCancel(ctx)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
startTs := model.Ts(10)
ddlMgr := NewDDLManager(model.DefaultChangeFeedID("test"), cfg, startTs)
Expand Down Expand Up @@ -266,10 +268,11 @@ func TestLogManagerError(t *testing.T) {
defer cancel()

cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: "blackhole-invalid://",
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: "blackhole-invalid://",
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
logMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
Expand Down Expand Up @@ -317,11 +320,12 @@ func BenchmarkFileWriter(b *testing.B) {
func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
ctx, cancel := context.WithCancel(context.Background())
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
Expand Down
27 changes: 15 additions & 12 deletions cdc/redo/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ func TestInitAndWriteMeta(t *testing.T) {

startTs := uint64(10)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
m := NewMetaManager(changefeedID, cfg, startTs)

Expand Down Expand Up @@ -150,10 +151,11 @@ func TestPreCleanupAndWriteMeta(t *testing.T) {

startTs := uint64(10)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
m := NewMetaManager(changefeedID, cfg, startTs)

Expand Down Expand Up @@ -280,10 +282,11 @@ func TestGCAndCleanup(t *testing.T) {

startTs := uint64(3)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
m := NewMetaManager(changefeedID, cfg, startTs)

Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
"level": "none",
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"storage": "",
"use-file-backend": false
},
Expand Down Expand Up @@ -294,6 +295,7 @@ const (
"level": "none",
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"storage": "",
"use-file-backend": false
},
Expand Down Expand Up @@ -441,6 +443,7 @@ const (
"level": "none",
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"storage": "",
"use-file-backend": false
},
Expand Down
20 changes: 15 additions & 5 deletions pkg/config/consistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (

// ConsistentConfig represents replication consistency config for a changefeed.
type ConsistentConfig struct {
Level string `toml:"level" json:"level"`
MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"`
FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"`
Storage string `toml:"storage" json:"storage"`
UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"`
Level string `toml:"level" json:"level"`
MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"`
FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"`
MetaFlushIntervalInMs int64 `toml:"meta-flush-interval" json:"meta-flush-interval"`
Storage string `toml:"storage" json:"storage"`
UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"`
}

// ValidateAndAdjust validates the consistency config and adjusts it if necessary.
Expand All @@ -50,6 +51,15 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
c.FlushIntervalInMs, redo.MinFlushIntervalInMs))
}

if c.MetaFlushIntervalInMs == 0 {
c.MetaFlushIntervalInMs = redo.DefaultMetaFlushIntervalInMs
}
if c.MetaFlushIntervalInMs < redo.MinFlushIntervalInMs {
return cerror.ErrInvalidReplicaConfig.FastGenByArgs(
fmt.Sprintf("The consistent.meta-flush-interval:%d must be equal or greater than %d",
c.MetaFlushIntervalInMs, redo.MinFlushIntervalInMs))
}

uri, err := storage.ParseRawURL(c.Storage)
if err != nil {
return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs(
Expand Down
11 changes: 6 additions & 5 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,12 @@ var defaultReplicaConfig = &ReplicaConfig{
AdvanceTimeoutInSec: util.AddressOf(DefaultAdvanceTimeoutInSec),
},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: redo.DefaultMaxLogSize,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
Level: "none",
MaxLogSize: redo.DefaultMaxLogSize,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
},
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: false,
Expand Down
2 changes: 2 additions & 0 deletions pkg/redo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
FlushWarnDuration = time.Second * 20
// DefaultFlushIntervalInMs is the default flush interval for redo log.
DefaultFlushIntervalInMs = 2000
// DefaultMetaFlushIntervalInMs is the default flush interval for redo meta.
DefaultMetaFlushIntervalInMs = 200
// MinFlushIntervalInMs is the minimum flush interval for redo log.
MinFlushIntervalInMs = 50

Expand Down
11 changes: 6 additions & 5 deletions tests/integration_tests/api_v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,12 @@ type ColumnSelector struct {
// ConsistentConfig represents replication consistency config for a changefeed
// This is a duplicate of config.ConsistentConfig
type ConsistentConfig struct {
Level string `json:"level"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
Storage string `json:"storage"`
UseFileBackend bool `json:"use_file_backend"`
Level string `json:"level"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
MetaFlushIntervalInMs int64 `json:"meta_flush_interval"`
Storage string `json:"storage"`
UseFileBackend bool `json:"use_file_backend"`
}

// ChangefeedSchedulerConfig is per changefeed scheduler settings.
Expand Down

0 comments on commit fc2d31d

Please sign in to comment.