Skip to content

Commit

Permalink
redo(ticdc): add meta flush interval configuration (#9959) (#9971)
Browse files Browse the repository at this point in the history
close #9960
  • Loading branch information
ti-chi-bot authored Nov 2, 2023
1 parent 4febedd commit 258d469
Show file tree
Hide file tree
Showing 17 changed files with 135 additions and 81 deletions.
33 changes: 18 additions & 15 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
}
if c.Consistent != nil {
res.Consistent = &config.ConsistentConfig{
Level: c.Consistent.Level,
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
Level: c.Consistent.Level,
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: c.Consistent.MetaFlushIntervalInMs,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
}
}
if c.Sink != nil {
Expand Down Expand Up @@ -455,11 +456,12 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
Level: cloned.Consistent.Level,
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
Level: cloned.Consistent.Level,
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: cloned.Consistent.MetaFlushIntervalInMs,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
}
}
if cloned.Mounter != nil {
Expand Down Expand Up @@ -627,11 +629,12 @@ type ColumnSelector struct {
// ConsistentConfig represents replication consistency config for a changefeed
// This is a duplicate of config.ConsistentConfig
type ConsistentConfig struct {
Level string `json:"level"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
Storage string `json:"storage"`
UseFileBackend bool `json:"use_file_backend"`
Level string `json:"level,omitempty"`
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
MetaFlushIntervalInMs int64 `json:"meta_flush_interval"`
Storage string `json:"storage,omitempty"`
UseFileBackend bool `json:"use_file_backend"`
}

// ChangefeedSchedulerConfig is per changefeed scheduler settings.
Expand Down
11 changes: 6 additions & 5 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ var defaultAPIConfig = &ReplicaConfig{
AdvanceTimeoutInSec: config.DefaultAdvanceTimeoutInSec,
},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
Storage: "",
UseFileBackend: false,
},
}

Expand Down
7 changes: 4 additions & 3 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,10 @@ func TestRemoveChangefeed(t *testing.T) {
info := ctx.ChangefeedVars().Info
dir := t.TempDir()
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Expand Down
8 changes: 7 additions & 1 deletion cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@ func NewMetaManager(ctx context.Context, cfg *config.ConsistentConfig) (*metaMan
changeFeedID: contextutil.ChangefeedIDFromCtx(ctx),
uuidGenerator: uuid.NewGenerator(),
enabled: true,
flushIntervalInMs: cfg.FlushIntervalInMs,
flushIntervalInMs: cfg.MetaFlushIntervalInMs,
}

if m.flushIntervalInMs < redo.MinFlushIntervalInMs {
log.Warn("redo flush interval is too small, use default value",
zap.Int64("interval", m.flushIntervalInMs))
m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs
}

uri, err := storage.ParseRawURL(cfg.Storage)
Expand Down
27 changes: 15 additions & 12 deletions cdc/redo/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ func TestInitAndWriteMeta(t *testing.T) {

startTs := uint64(10)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
m, err := NewMetaManagerWithInit(ctx, cfg, startTs)
require.NoError(t, err)
Expand Down Expand Up @@ -136,10 +137,11 @@ func TestPreCleanupAndWriteMeta(t *testing.T) {

startTs := uint64(10)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
m, err := NewMetaManagerWithInit(ctx, cfg, startTs)
require.NoError(t, err)
Expand Down Expand Up @@ -267,10 +269,11 @@ func TestGCAndCleanup(t *testing.T) {

startTs := uint64(3)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
m, err := NewMetaManagerWithInit(ctx, cfg, startTs)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions cdc/scheduler/internal/v3/agent/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (t *table) getTableStatus(collectStat bool) tablepb.TableStatus {

func newAddTableResponseMessage(status tablepb.TableStatus) *schedulepb.Message {
if status.Checkpoint.ResolvedTs < status.Checkpoint.CheckpointTs {
log.Panic("schedulerv3: resolved ts should not less than checkpoint ts",
log.Warn("schedulerv3: resolved ts should not less than checkpoint ts",
zap.Any("tableStatus", status),
zap.Any("checkpoint", status.Checkpoint.CheckpointTs),
zap.Any("resolved", status.Checkpoint.ResolvedTs))
Expand All @@ -99,7 +99,7 @@ func newRemoveTableResponseMessage(status tablepb.TableStatus) *schedulepb.Messa
// Advance resolved ts to checkpoint ts if table is removed.
status.Checkpoint.ResolvedTs = status.Checkpoint.CheckpointTs
} else {
log.Panic("schedulerv3: resolved ts should not less than checkpoint ts",
log.Warn("schedulerv3: resolved ts should not less than checkpoint ts",
zap.Any("tableStatus", status),
zap.Any("checkpoint", status.Checkpoint.CheckpointTs),
zap.Any("resolved", status.Checkpoint.ResolvedTs))
Expand Down
34 changes: 11 additions & 23 deletions cdc/scheduler/internal/v3/replication/replication_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,7 @@ func NewReplicationSet(
return nil, r.inconsistentError(table, captureID,
"schedulerv3: table id inconsistent")
}
if err := r.updateCheckpointAndStats(table.Checkpoint, table.Stats); err != nil {
return nil, errors.Trace(err)
}
r.updateCheckpointAndStats(table.Checkpoint, table.Stats)

switch table.State {
case tablepb.TableStateReplicating:
Expand Down Expand Up @@ -485,8 +483,8 @@ func (r *ReplicationSet) pollOnPrepare(
}
case tablepb.TableStateReplicating:
if r.Primary == captureID {
err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, errors.Trace(err)
r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, nil
}
case tablepb.TableStateStopping, tablepb.TableStateStopped:
if r.Primary == captureID {
Expand Down Expand Up @@ -587,9 +585,7 @@ func (r *ReplicationSet) pollOnCommit(

case tablepb.TableStateStopped, tablepb.TableStateAbsent:
if r.Primary == captureID {
if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil {
return nil, false, errors.Trace(err)
}
r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
original := r.Primary
r.clearPrimary()
if !r.hasRole(RoleSecondary) {
Expand Down Expand Up @@ -653,9 +649,7 @@ func (r *ReplicationSet) pollOnCommit(

case tablepb.TableStateReplicating:
if r.Primary == captureID {
if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil {
return nil, false, errors.Trace(err)
}
r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
if r.hasRole(RoleSecondary) {
// Original primary is not stopped, ask for stopping.
return &schedulepb.Message{
Expand Down Expand Up @@ -688,8 +682,8 @@ func (r *ReplicationSet) pollOnCommit(

case tablepb.TableStateStopping:
if r.Primary == captureID && r.hasRole(RoleSecondary) {
err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, errors.Trace(err)
r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, nil
} else if r.isInRole(captureID, RoleUndetermined) {
log.Info("schedulerv3: capture is stopping during Commit",
zap.Stringer("tableState", input),
Expand All @@ -714,8 +708,8 @@ func (r *ReplicationSet) pollOnReplicating(
switch input.State {
case tablepb.TableStateReplicating:
if r.Primary == captureID {
err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, errors.Trace(err)
r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, nil
}
return nil, false, r.multiplePrimaryError(
input, captureID, "schedulerv3: multiple primary")
Expand All @@ -726,10 +720,7 @@ func (r *ReplicationSet) pollOnReplicating(
case tablepb.TableStateStopping:
case tablepb.TableStateStopped:
if r.Primary == captureID {
if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil {
return nil, false, errors.Trace(err)
}

r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
// Primary is stopped, but we still has secondary.
// Clear primary and promote secondary when it's prepared.
log.Info("schedulerv3: primary is stopped during Replicating",
Expand Down Expand Up @@ -919,7 +910,7 @@ func (r *ReplicationSet) handleCaptureShutdown(

func (r *ReplicationSet) updateCheckpointAndStats(
checkpoint tablepb.Checkpoint, stats tablepb.Stats,
) error {
) {
if checkpoint.ResolvedTs < checkpoint.CheckpointTs {
log.Warn("schedulerv3: resolved ts should not less than checkpoint ts",
zap.Any("replicationSet", r),
Expand All @@ -941,11 +932,8 @@ func (r *ReplicationSet) updateCheckpointAndStats(
zap.Any("replicationSet", r),
zap.Any("checkpointTs", r.Checkpoint.CheckpointTs),
zap.Any("resolvedTs", r.Checkpoint.ResolvedTs))
return errors.ErrInvalidCheckpointTs.GenWithStackByArgs(r.Checkpoint.CheckpointTs,
r.Checkpoint.ResolvedTs)
}
r.Stats = stats
return nil
}

// SetHeap is a max-heap, it implements heap.Interface.
Expand Down
26 changes: 26 additions & 0 deletions cdc/scheduler/internal/v3/replication/replication_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,3 +1426,29 @@ func TestReplicationSetHeap_MinK(t *testing.T) {
require.Equal(t, expectedTables, tables)
require.Equal(t, 0, h.Len())
}

func TestUpdateCheckpointAndStats(t *testing.T) {
cases := []struct {
checkpoint tablepb.Checkpoint
stats tablepb.Stats
}{
{
checkpoint: tablepb.Checkpoint{
CheckpointTs: 1,
ResolvedTs: 2,
},
stats: tablepb.Stats{},
},
{
checkpoint: tablepb.Checkpoint{
CheckpointTs: 2,
ResolvedTs: 1,
},
stats: tablepb.Stats{},
},
}
r := &ReplicationSet{}
for _, c := range cases {
r.updateCheckpointAndStats(c.checkpoint, c.stats)
}
}
3 changes: 3 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1764,6 +1764,9 @@ var doc = `{
"max_log_size": {
"type": "integer"
},
"meta_flush_interval": {
"type": "integer"
},
"storage": {
"type": "string"
},
Expand Down
3 changes: 3 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1745,6 +1745,9 @@
"max_log_size": {
"type": "integer"
},
"meta_flush_interval": {
"type": "integer"
},
"storage": {
"type": "string"
},
Expand Down
2 changes: 2 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,8 @@ definitions:
type: string
max_log_size:
type: integer
meta_flush_interval:
type: integer
storage:
type: string
use_file_backend:
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
"level": "none",
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"storage": "",
"use-file-backend": false
}
Expand Down Expand Up @@ -211,6 +212,7 @@ const (
"level": "none",
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"storage": "",
"use-file-backend": false
}
Expand Down Expand Up @@ -271,6 +273,7 @@ const (
"level": "none",
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"storage": "",
"use-file-backend": false
}
Expand Down
20 changes: 15 additions & 5 deletions pkg/config/consistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (

// ConsistentConfig represents replication consistency config for a changefeed.
type ConsistentConfig struct {
Level string `toml:"level" json:"level"`
MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"`
FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"`
Storage string `toml:"storage" json:"storage"`
UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"`
Level string `toml:"level" json:"level"`
MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"`
FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"`
MetaFlushIntervalInMs int64 `toml:"meta-flush-interval" json:"meta-flush-interval"`
Storage string `toml:"storage" json:"storage"`
UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"`
}

// ValidateAndAdjust validates the consistency config and adjusts it if necessary.
Expand All @@ -50,6 +51,15 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
c.FlushIntervalInMs, redo.MinFlushIntervalInMs))
}

if c.MetaFlushIntervalInMs == 0 {
c.MetaFlushIntervalInMs = redo.DefaultMetaFlushIntervalInMs
}
if c.MetaFlushIntervalInMs < redo.MinFlushIntervalInMs {
return cerror.ErrInvalidReplicaConfig.FastGenByArgs(
fmt.Sprintf("The consistent.meta-flush-interval:%d must be equal or greater than %d",
c.MetaFlushIntervalInMs, redo.MinFlushIntervalInMs))
}

uri, err := storage.ParseRawURL(c.Storage)
if err != nil {
return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs(
Expand Down
Loading

0 comments on commit 258d469

Please sign in to comment.