Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

encoder (ticdc): simple protocol send bootstrap event periodically #10366

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,13 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
res.Sink.AdvanceTimeoutInSec = util.AddressOf(*c.Sink.AdvanceTimeoutInSec)
}

if c.Sink.SendBootstrapIntervalInSec != nil {
res.Sink.SendBootstrapIntervalInSec = util.AddressOf(*c.Sink.SendBootstrapIntervalInSec)
}

if c.Sink.SendBootstrapInMsgCount != nil {
res.Sink.SendBootstrapInMsgCount = util.AddressOf(*c.Sink.SendBootstrapInMsgCount)
}
}
if c.Mounter != nil {
res.Mounter = &config.MounterConfig{
Expand Down Expand Up @@ -776,6 +783,14 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
if cloned.Sink.AdvanceTimeoutInSec != nil {
res.Sink.AdvanceTimeoutInSec = util.AddressOf(*cloned.Sink.AdvanceTimeoutInSec)
}

if cloned.Sink.SendBootstrapIntervalInSec != nil {
res.Sink.SendBootstrapIntervalInSec = util.AddressOf(*cloned.Sink.SendBootstrapIntervalInSec)
}

if cloned.Sink.SendBootstrapInMsgCount != nil {
res.Sink.SendBootstrapInMsgCount = util.AddressOf(*cloned.Sink.SendBootstrapInMsgCount)
}
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
Expand Down Expand Up @@ -950,6 +965,8 @@ type SinkConfig struct {
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
AdvanceTimeoutInSec *uint `json:"advance_timeout,omitempty"`
SendBootstrapIntervalInSec *int64 `json:"send_bootstrap_interval_in_sec,omitempty"`
SendBootstrapInMsgCount *int32 `json:"send_bootstrap_in_msg_count,omitempty"`
}

// CSVConfig denotes the csv config
Expand Down
2 changes: 2 additions & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ var defaultAPIConfig = &ReplicaConfig{
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
ContentCompatible: util.AddressOf(false),
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewKafkaDMLSink(
metricsCollector := factory.MetricsCollector(tiflowutil.RoleProcessor, adminClient)
dmlProducer := producerCreator(ctx, changefeedID, asyncProducer, metricsCollector, errCh, failpointCh)
concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency)
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID)
encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, concurrency, changefeedID)
s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager,
eventRouter, trans, encoderGroup, protocol, scheme, errCh)
log.Info("DML sink producer created",
Expand Down
6 changes: 4 additions & 2 deletions cdc/sink/dmlsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
txn.Callback()
continue
}

rowCallback := toRowCallback(txn.Callback, uint64(len(txn.Event.Rows)))
for _, row := range txn.Event.Rows {
topic := s.alive.eventRouter.GetTopicForRowChange(row)
Expand Down Expand Up @@ -197,7 +196,10 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
// So it is safe to send the event to a unbounded channel here.
s.alive.worker.msgChan.In() <- mqEvent{
key: codec.TopicPartitionKey{
Topic: topic, Partition: index, PartitionKey: key,
Topic: topic,
Partition: index,
PartitionKey: key,
TotalPartition: partitionNum,
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
},
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: row,
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/mq/pulsar_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewPulsarDMLSink(
}

concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency)
encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID)
encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, concurrency, changefeedID)

s := newDMLSink(ctx, changefeedID, p, nil, topicManager,
eventRouter, trans, encoderGroup, protocol, scheme, errCh)
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/dmlsink/mq/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ func (w *worker) sendMessages(ctx context.Context) error {
if err = future.Ready(ctx); err != nil {
return errors.Trace(err)
}

asddongmen marked this conversation as resolved.
Show resolved Hide resolved
for _, message := range future.Messages {
start := time.Now()
if err = w.statistics.RecordBatchExecution(func() (int, int64, error) {
Expand Down
6 changes: 4 additions & 2 deletions cdc/sink/dmlsink/mq/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ func newBatchEncodeWorker(ctx context.Context, t *testing.T) (*worker, dmlproduc
require.NoError(t, err)
encoderConcurrency := 4
statistics := metrics.NewStatistics(ctx, id, sink.RowSink)
encoderGroup := codec.NewEncoderGroup(encoderBuilder, encoderConcurrency, id)
cfg := config.GetDefaultReplicaConfig()
encoderGroup := codec.NewEncoderGroup(cfg.Sink, encoderBuilder, encoderConcurrency, id)
return newWorker(id, config.ProtocolOpen, p, encoderGroup, statistics), p
}

Expand All @@ -58,7 +59,8 @@ func newNonBatchEncodeWorker(ctx context.Context, t *testing.T) (*worker, dmlpro
require.NoError(t, err)
encoderConcurrency := 4
statistics := metrics.NewStatistics(ctx, id, sink.RowSink)
encoderGroup := codec.NewEncoderGroup(encoderBuilder, encoderConcurrency, id)
cfg := config.GetDefaultReplicaConfig()
encoderGroup := codec.NewEncoderGroup(cfg.Sink, encoderBuilder, encoderConcurrency, id)
return newWorker(id, config.ProtocolOpen, p, encoderGroup, statistics), p
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
ContentCompatible: util.AddressOf(false),
Protocol: util.AddressOf("open-protocol"),
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
}, cfg.Sink)
}

Expand Down Expand Up @@ -249,6 +251,8 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
ContentCompatible: util.AddressOf(false),
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
}, cfg.Sink)
}

Expand Down
12 changes: 9 additions & 3 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ const (
"large-message-handle-compression": "",
"claim-check-storage-uri": ""
},
"advance-timeout-in-sec": 150
"advance-timeout-in-sec": 150,
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000
},
"consistent": {
"level": "none",
Expand Down Expand Up @@ -318,7 +320,9 @@ const (
"file-size": 1024,
"output-column-id":false
},
"advance-timeout-in-sec": 150
"advance-timeout-in-sec": 150,
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000
},
"consistent": {
"level": "none",
Expand Down Expand Up @@ -479,7 +483,9 @@ const (
"file-size": 1024,
"output-column-id":false
},
"advance-timeout-in-sec": 150
"advance-timeout-in-sec": 150,
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000
},
"consistent": {
"level": "none",
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ var defaultReplicaConfig = &ReplicaConfig{
ContentCompatible: util.AddressOf(false),
TiDBSourceID: 1,
AdvanceTimeoutInSec: util.AddressOf(DefaultAdvanceTimeoutInSec),
SendBootstrapIntervalInSec: util.AddressOf(DefaultSendBootstrapIntervalInSec),
SendBootstrapInMsgCount: util.AddressOf(DefaultSendBootstrapInMsgCount),
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
11 changes: 11 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ const (

// DefaultEncoderGroupConcurrency is the default concurrency of encoder group.
DefaultEncoderGroupConcurrency = 32

// DefaultSendBootstrapIntervalInSec is the default interval to send bootstrap message.
DefaultSendBootstrapIntervalInSec = int64(120)
// DefaultSendBootstrapInMsgCount is the default number of messages to send bootstrap message.
DefaultSendBootstrapInMsgCount = int32(10000)
)

// AtomicityLevel represents the atomicity level of a changefeed.
Expand Down Expand Up @@ -168,6 +173,12 @@ type SinkConfig struct {

// Debezium only. Whether schema should be excluded in the output.
DebeziumDisableSchema *bool `toml:"debezium-disable-schema" json:"debezium-disable-schema,omitempty"`

// Simple Protocol only config.
// SendBootstrapIntervalInSec is the interval in seconds to send bootstrap message.
SendBootstrapIntervalInSec *int64 `toml:"send-bootstrap-interval-in-sec" json:"send-bootstrap-interval-in-sec,omitempty"`
// SendBootstrapInMsgCount is the number of messages to send bootstrap message.
SendBootstrapInMsgCount *int32 `toml:"send-bootstrap-in-msg-count" json:"send-bootstrap-in-msg-count,omitempty"`
}

// MaskSensitiveData masks sensitive data in SinkConfig
Expand Down
6 changes: 6 additions & 0 deletions pkg/orchestrator/reactor_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ func TestChangefeedStateUpdate(t *testing.T) {
OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns,
ContentCompatible: config.GetDefaultReplicaConfig().Sink.ContentCompatible,
SendBootstrapIntervalInSec: config.GetDefaultReplicaConfig().Sink.SendBootstrapIntervalInSec,
SendBootstrapInMsgCount: config.GetDefaultReplicaConfig().Sink.SendBootstrapInMsgCount,
},
Consistent: config.GetDefaultReplicaConfig().Consistent,
Integrity: config.GetDefaultReplicaConfig().Integrity,
Expand Down Expand Up @@ -191,6 +193,8 @@ func TestChangefeedStateUpdate(t *testing.T) {
OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns,
ContentCompatible: config.GetDefaultReplicaConfig().Sink.ContentCompatible,
SendBootstrapIntervalInSec: config.GetDefaultReplicaConfig().Sink.SendBootstrapIntervalInSec,
SendBootstrapInMsgCount: config.GetDefaultReplicaConfig().Sink.SendBootstrapInMsgCount,
},
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
Expand Down Expand Up @@ -261,6 +265,8 @@ func TestChangefeedStateUpdate(t *testing.T) {
OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns,
ContentCompatible: config.GetDefaultReplicaConfig().Sink.ContentCompatible,
SendBootstrapIntervalInSec: config.GetDefaultReplicaConfig().Sink.SendBootstrapIntervalInSec,
SendBootstrapInMsgCount: config.GetDefaultReplicaConfig().Sink.SendBootstrapInMsgCount,
},
Consistent: config.GetDefaultReplicaConfig().Consistent,
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Expand Down
Loading
Loading