From 822a79f6e2e2619d00df4d6b70572cf5064b4ba3 Mon Sep 17 00:00:00 2001 From: Jianyuan Jiang Date: Tue, 22 Nov 2022 22:43:58 +0800 Subject: [PATCH] cli(ticdc): cli support mounter worker-num parameter (#7681) close pingcap/tiflow#7680 --- cdc/api/v2/model.go | 16 ++++++++++++++++ cdc/api/v2/model_test.go | 1 + 2 files changed, 17 insertions(+) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index b81887d94c8..ec32508805e 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -98,6 +98,7 @@ type ReplicaConfig struct { SyncPointInterval time.Duration `json:"sync_point_interval"` SyncPointRetention time.Duration `json:"sync_point_retention"` Filter *FilterConfig `json:"filter"` + Mounter *MounterConfig `json:"mounter"` Sink *SinkConfig `json:"sink"` Consistent *ConsistentConfig `json:"consistent"` } @@ -200,6 +201,11 @@ func (c *ReplicaConfig) ToInternalReplicaConfig() *config.ReplicaConfig { EnablePartitionSeparator: c.Sink.EnablePartitionSeparator, } } + if c.Mounter != nil { + res.Mounter = &config.MounterConfig{ + WorkerNum: c.Mounter.WorkerNum, + } + } return res } @@ -305,6 +311,11 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { Storage: cloned.Consistent.Storage, } } + if cloned.Mounter != nil { + res.Mounter = &MounterConfig{ + WorkerNum: cloned.Mounter.WorkerNum, + } + } return res } @@ -339,6 +350,11 @@ type FilterConfig struct { EventFilters []EventFilterRule `json:"event_filters"` } +// MounterConfig represents mounter config for a changefeed +type MounterConfig struct { + WorkerNum int `json:"worker_num"` +} + // EventFilterRule is used by sql event filter and expression filter type EventFilterRule struct { Matcher []string `json:"matcher"` diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index 2396b9ca280..d354303eaf4 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -80,6 +80,7 @@ func TestToAPIReplicaConfig(t *testing.T) { IgnoreDeleteValueExpr: "age > 20", }}, } + cfg.Mounter = &config.MounterConfig{WorkerNum: 11} cfg2 := ToAPIReplicaConfig(cfg).ToInternalReplicaConfig() require.Equal(t, "", cfg2.Sink.DispatchRules[0].DispatcherRule) cfg.Sink.DispatchRules[0].DispatcherRule = ""