Skip to content

Commit

Permalink
filter(ticdc): change case-sensitive parameter default value to false (
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy authored Nov 9, 2023
1 parent 1b62946 commit c8eed98
Show file tree
Hide file tree
Showing 23 changed files with 166 additions and 54 deletions.
6 changes: 6 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: c.Consistent.MetaFlushIntervalInMs,
EncodingWorkerNum: c.Consistent.EncodingWorkerNum,
FlushWorkerNum: c.Consistent.FlushWorkerNum,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
}
Expand Down Expand Up @@ -460,6 +462,8 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
MetaFlushIntervalInMs: cloned.Consistent.MetaFlushIntervalInMs,
EncodingWorkerNum: c.Consistent.EncodingWorkerNum,
FlushWorkerNum: c.Consistent.FlushWorkerNum,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
}
Expand Down Expand Up @@ -633,6 +637,8 @@ type ConsistentConfig struct {
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
MetaFlushIntervalInMs int64 `json:"meta_flush_interval"`
EncodingWorkerNum int `json:"encoding_worker_num"`
FlushWorkerNum int `json:"flush_worker_num"`
Storage string `json:"storage,omitempty"`
UseFileBackend bool `json:"use_file_backend"`
}
Expand Down
4 changes: 3 additions & 1 deletion cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
// note: this is api published default value, not change it
var defaultAPIConfig = &ReplicaConfig{
MemoryQuota: config.DefaultChangefeedMemoryQuota,
CaseSensitive: true,
CaseSensitive: false,
EnableOldValue: true,
CheckGCSafePoint: true,
EnableSyncPoint: false,
Expand Down Expand Up @@ -60,6 +60,8 @@ var defaultAPIConfig = &ReplicaConfig{
MaxLogSize: 64,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
EncodingWorkerNum: redo.DefaultEncodingWorkerNum,
FlushWorkerNum: redo.DefaultFlushWorkerNum,
Storage: "",
UseFileBackend: false,
},
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestVerifyAndComplete(t *testing.T) {
StartTs: 417257993615179777,
Config: &config.ReplicaConfig{
MemoryQuota: 1073741824,
CaseSensitive: true,
CaseSensitive: false,
EnableOldValue: true,
CheckGCSafePoint: true,
SyncPointInterval: time.Minute * 10,
Expand Down
2 changes: 2 additions & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,8 @@ func TestRemoveChangefeed(t *testing.T) {
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
EncodingWorkerNum: redo.DefaultEncodingWorkerNum,
FlushWorkerNum: redo.DefaultFlushWorkerNum,
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Expand Down
5 changes: 4 additions & 1 deletion cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,10 @@ func (m *logManager) bgUpdateLog(ctx context.Context, flushDuration time.Duratio
log.Info("redo manager bgUpdateLog is running",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Duration("flushIntervalInMs", flushDuration))
zap.Duration("flushIntervalInMs", flushDuration),
zap.Int64("maxLogSize", m.cfg.MaxLogSize),
zap.Int("encoderWorkerNum", m.cfg.EncodingWorkerNum),
zap.Int("flushWorkerNum", m.cfg.FlushWorkerNum))

var err error
// logErrCh is used to retrieve errors from log flushing goroutines.
Expand Down
2 changes: 2 additions & 0 deletions cdc/redo/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
EncodingWorkerNum: redo.DefaultEncodingWorkerNum,
FlushWorkerNum: redo.DefaultFlushWorkerNum,
UseFileBackend: useFileBackend,
}
dmlMgr, err := NewDMLManager(ctx, cfg)
Expand Down
6 changes: 6 additions & 0 deletions cdc/redo/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func TestInitAndWriteMeta(t *testing.T) {
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
EncodingWorkerNum: redo.DefaultEncodingWorkerNum,
FlushWorkerNum: redo.DefaultFlushWorkerNum,
}
m, err := NewMetaManagerWithInit(ctx, cfg, startTs)
require.NoError(t, err)
Expand Down Expand Up @@ -142,6 +144,8 @@ func TestPreCleanupAndWriteMeta(t *testing.T) {
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
EncodingWorkerNum: redo.DefaultEncodingWorkerNum,
FlushWorkerNum: redo.DefaultFlushWorkerNum,
}
m, err := NewMetaManagerWithInit(ctx, cfg, startTs)
require.NoError(t, err)
Expand Down Expand Up @@ -274,6 +278,8 @@ func TestGCAndCleanup(t *testing.T) {
Storage: uri.String(),
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
EncodingWorkerNum: redo.DefaultEncodingWorkerNum,
FlushWorkerNum: redo.DefaultFlushWorkerNum,
}
m, err := NewMetaManagerWithInit(ctx, cfg, startTs)
require.NoError(t, err)
Expand Down
7 changes: 4 additions & 3 deletions cdc/redo/writer/memory/encoding_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -106,15 +107,15 @@ type encodingWorkerGroup struct {

func newEncodingWorkerGroup(workerNum int) *encodingWorkerGroup {
if workerNum <= 0 {
workerNum = defaultEncodingWorkerNum
workerNum = redo.DefaultEncodingWorkerNum
}
inputChs := make([]chan *polymorphicRedoEvent, workerNum)
for i := 0; i < workerNum; i++ {
inputChs[i] = make(chan *polymorphicRedoEvent, defaultEncodingInputChanSize)
inputChs[i] = make(chan *polymorphicRedoEvent, redo.DefaultEncodingInputChanSize)
}
return &encodingWorkerGroup{
inputChs: inputChs,
outputCh: make(chan *polymorphicRedoEvent, defaultEncodingOutputChanSize),
outputCh: make(chan *polymorphicRedoEvent, redo.DefaultEncodingOutputChanSize),
workerNum: workerNum,
closed: make(chan struct{}),
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/redo/writer/memory/file_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func newFileWorkerGroup(
opts ...writer.Option,
) *fileWorkerGroup {
if workerNum <= 0 {
workerNum = defaultFlushWorkerNum
workerNum = redo.DefaultFlushWorkerNum
}

op := &writer.LogWriterOptions{}
Expand Down
5 changes: 0 additions & 5 deletions cdc/redo/writer/memory/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,5 @@ import (
)

func TestMain(m *testing.M) {
bak := defaultEncodingInputChanSize
defer func() {
defaultEncodingInputChanSize = bak
}()
defaultEncodingInputChanSize = 0
leakutil.SetUpLeakTest(m)
}
13 changes: 2 additions & 11 deletions cdc/redo/writer/memory/mem_log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@ import (
"golang.org/x/sync/errgroup"
)

var (
defaultEncodingWorkerNum = 16
defaultEncodingInputChanSize = 128
defaultEncodingOutputChanSize = 2048
// Maximum allocated memory is flushWorkerNum*maxLogSize, which is
// `8*64MB = 512MB` by default.
defaultFlushWorkerNum = 8
)

var _ writer.RedoLogWriter = (*memoryLogWriter)(nil)

type memoryLogWriter struct {
Expand Down Expand Up @@ -71,11 +62,11 @@ func NewLogWriter(
cancel: lwCancel,
}

lw.encodeWorkers = newEncodingWorkerGroup(defaultEncodingWorkerNum)
lw.encodeWorkers = newEncodingWorkerGroup(cfg.EncodingWorkerNum)
eg.Go(func() error {
return lw.encodeWorkers.Run(lwCtx)
})
lw.fileWorkers = newFileWorkerGroup(cfg, defaultFlushWorkerNum, extStorage, opts...)
lw.fileWorkers = newFileWorkerGroup(cfg, cfg.FlushWorkerNum, extStorage, opts...)
eg.Go(func() error {
return lw.fileWorkers.Run(lwCtx, lw.encodeWorkers.outputCh)
})
Expand Down
10 changes: 5 additions & 5 deletions cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type dmlSink struct {
alive struct {
sync.RWMutex
// msgCh is a channel to hold eventFragment.
msgCh chan eventFragment
msgCh *chann.DrainableChann[eventFragment]
isDead bool
}

Expand Down Expand Up @@ -140,15 +140,15 @@ func NewCloudStorageSink(
cancel: wgCancel,
dead: make(chan struct{}),
}
s.alive.msgCh = make(chan eventFragment, defaultChannelSize)
s.alive.msgCh = chann.NewDrainableChann[eventFragment]()

encodedCh := make(chan eventFragment, defaultChannelSize)
workerChannels := make([]*chann.DrainableChann[eventFragment], cfg.WorkerCount)

// create a group of encoding workers.
for i := 0; i < defaultEncodingConcurrency; i++ {
encoder := encoderBuilder.Build()
s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.alive.msgCh, encodedCh)
s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.alive.msgCh.Out(), encodedCh)
}
// create defragmenter.
s.defragmenter = newDefragmenter(encodedCh, workerChannels)
Expand All @@ -168,7 +168,7 @@ func NewCloudStorageSink(

s.alive.Lock()
s.alive.isDead = true
close(s.alive.msgCh)
s.alive.msgCh.CloseAndDrain()
s.alive.Unlock()
close(s.dead)

Expand Down Expand Up @@ -234,7 +234,7 @@ func (s *dmlSink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.Single

s.statistics.ObserveRows(txn.Event.Rows...)
// emit a TxnCallbackableEvent encoupled with a sequence number starting from one.
s.alive.msgCh <- eventFragment{
s.alive.msgCh.In() <- eventFragment{
seqNumber: seq,
versionedTable: tbl,
event: txn,
Expand Down
6 changes: 6 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1755,9 +1755,15 @@ var doc = `{
"v2.ConsistentConfig": {
"type": "object",
"properties": {
"encoding_worker_num": {
"type": "integer"
},
"flush_interval": {
"type": "integer"
},
"flush_worker_num": {
"type": "integer"
},
"level": {
"type": "string"
},
Expand Down
6 changes: 6 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1736,9 +1736,15 @@
"v2.ConsistentConfig": {
"type": "object",
"properties": {
"encoding_worker_num": {
"type": "integer"
},
"flush_interval": {
"type": "integer"
},
"flush_worker_num": {
"type": "integer"
},
"level": {
"type": "string"
},
Expand Down
4 changes: 4 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,12 @@ definitions:
type: object
v2.ConsistentConfig:
properties:
encoding_worker_num:
type: integer
flush_interval:
type: integer
flush_worker_num:
type: integer
level:
type: string
max_log_size:
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ const (
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"encoding-worker-num": 32,
"flush-worker-num": 8,
"storage": "",
"use-file-backend": false
}
Expand Down Expand Up @@ -213,6 +215,8 @@ const (
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"encoding-worker-num": 32,
"flush-worker-num": 8,
"storage": "",
"use-file-backend": false
}
Expand Down Expand Up @@ -274,6 +278,8 @@ const (
"max-log-size": 64,
"flush-interval": 2000,
"meta-flush-interval": 200,
"encoding-worker-num": 32,
"flush-worker-num": 8,
"storage": "",
"use-file-backend": false
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/config/consistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type ConsistentConfig struct {
MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"`
FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"`
MetaFlushIntervalInMs int64 `toml:"meta-flush-interval" json:"meta-flush-interval"`
EncodingWorkerNum int `toml:"encoding-worker-num" json:"encoding-worker-num"`
FlushWorkerNum int `toml:"flush-worker-num" json:"flush-worker-num"`
Storage string `toml:"storage" json:"storage"`
UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"`
}
Expand Down Expand Up @@ -60,6 +62,13 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
c.MetaFlushIntervalInMs, redo.MinFlushIntervalInMs))
}

if c.EncodingWorkerNum == 0 {
c.EncodingWorkerNum = redo.DefaultEncodingWorkerNum
}
if c.FlushWorkerNum == 0 {
c.FlushWorkerNum = redo.DefaultFlushWorkerNum
}

uri, err := storage.ParseRawURL(c.Storage)
if err != nil {
return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs(
Expand Down
4 changes: 3 additions & 1 deletion pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (

var defaultReplicaConfig = &ReplicaConfig{
MemoryQuota: DefaultChangefeedMemoryQuota,
CaseSensitive: true,
CaseSensitive: false,
EnableOldValue: true,
CheckGCSafePoint: true,
EnableSyncPoint: false,
Expand Down Expand Up @@ -69,6 +69,8 @@ var defaultReplicaConfig = &ReplicaConfig{
MaxLogSize: redo.DefaultMaxLogSize,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
EncodingWorkerNum: redo.DefaultEncodingWorkerNum,
FlushWorkerNum: redo.DefaultFlushWorkerNum,
Storage: "",
UseFileBackend: false,
},
Expand Down
11 changes: 11 additions & 0 deletions pkg/redo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ const (
// MinFlushIntervalInMs is the minimum flush interval for redo log.
MinFlushIntervalInMs = 50

// DefaultEncodingWorkerNum is the default number of encoding workers.
DefaultEncodingWorkerNum = 32
// DefaultEncodingInputChanSize is the default size of input channel for encoding worker.
DefaultEncodingInputChanSize = 128
// DefaultEncodingOutputChanSize is the default size of output channel for encoding worker.
DefaultEncodingOutputChanSize = 2048
// DefaultFlushWorkerNum is the default number of flush workers.
// Maximum allocated memory is flushWorkerNum*maxLogSize, which is
// `8*64MB = 512MB` by default.
DefaultFlushWorkerNum = 8

// DefaultFileMode is the default mode when operation files
DefaultFileMode = 0o644
// DefaultDirMode is the default mode when operation dir
Expand Down
Loading

0 comments on commit c8eed98

Please sign in to comment.