Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#10014
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Nov 22, 2023
1 parent 04685c5 commit 880af81
Show file tree
Hide file tree
Showing 37 changed files with 2,420 additions and 154 deletions.
76 changes: 76 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
}

res.Sink = &config.SinkConfig{
<<<<<<< HEAD
DispatchRules: dispatchRules,
Protocol: c.Sink.Protocol,
CSVConfig: csvConfig,
Expand All @@ -401,6 +402,31 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
MySQLConfig: mysqlConfig,
CloudStorageConfig: cloudStorageConfig,
SafeMode: c.Sink.SafeMode,
=======
DispatchRules: dispatchRules,
Protocol: c.Sink.Protocol,
CSVConfig: csvConfig,
ColumnSelectors: columnSelectors,
SchemaRegistry: c.Sink.SchemaRegistry,
EncoderConcurrency: c.Sink.EncoderConcurrency,
Terminator: c.Sink.Terminator,
DateSeparator: c.Sink.DateSeparator,
EnablePartitionSeparator: c.Sink.EnablePartitionSeparator,
FileIndexWidth: c.Sink.FileIndexWidth,
EnableKafkaSinkV2: c.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: c.Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: c.Sink.DeleteOnlyOutputHandleKeyColumns,
ContentCompatible: c.Sink.ContentCompatible,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
PulsarConfig: pulsarConfig,
CloudStorageConfig: cloudStorageConfig,
SafeMode: c.Sink.SafeMode,
}

if c.Sink.TxnAtomicity != nil {
res.Sink.TxnAtomicity = util.AddressOf(config.AtomicityLevel(*c.Sink.TxnAtomicity))
>>>>>>> 4a3762cdc5 (codec(ticdc): canal-json support compatible content by output detailed mysql type information (#10014))
}
if c.Sink.AdvanceTimeoutInSec != nil {
res.Sink.AdvanceTimeoutInSec = util.AddressOf(*c.Sink.AdvanceTimeoutInSec)
Expand Down Expand Up @@ -603,6 +629,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
}

res.Sink = &SinkConfig{
<<<<<<< HEAD
Protocol: cloned.Sink.Protocol,
SchemaRegistry: cloned.Sink.SchemaRegistry,
DispatchRules: dispatchRules,
Expand All @@ -620,6 +647,31 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
MySQLConfig: mysqlConfig,
CloudStorageConfig: cloudStorageConfig,
SafeMode: cloned.Sink.SafeMode,
=======
Protocol: cloned.Sink.Protocol,
SchemaRegistry: cloned.Sink.SchemaRegistry,
DispatchRules: dispatchRules,
CSVConfig: csvConfig,
ColumnSelectors: columnSelectors,
EncoderConcurrency: cloned.Sink.EncoderConcurrency,
Terminator: cloned.Sink.Terminator,
DateSeparator: cloned.Sink.DateSeparator,
EnablePartitionSeparator: cloned.Sink.EnablePartitionSeparator,
FileIndexWidth: cloned.Sink.FileIndexWidth,
EnableKafkaSinkV2: cloned.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: cloned.Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: cloned.Sink.DeleteOnlyOutputHandleKeyColumns,
ContentCompatible: cloned.Sink.ContentCompatible,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
PulsarConfig: pulsarConfig,
CloudStorageConfig: cloudStorageConfig,
SafeMode: cloned.Sink.SafeMode,
}

if cloned.Sink.TxnAtomicity != nil {
res.Sink.TxnAtomicity = util.AddressOf(string(*cloned.Sink.TxnAtomicity))
>>>>>>> 4a3762cdc5 (codec(ticdc): canal-json support compatible content by output detailed mysql type information (#10014))
}
if cloned.Sink.AdvanceTimeoutInSec != nil {
res.Sink.AdvanceTimeoutInSec = util.AddressOf(*cloned.Sink.AdvanceTimeoutInSec)
Expand Down Expand Up @@ -759,6 +811,7 @@ type Table struct {
// SinkConfig represents sink config for a changefeed
// This is a duplicate of config.SinkConfig
type SinkConfig struct {
<<<<<<< HEAD
Protocol string `json:"protocol"`
SchemaRegistry string `json:"schema_registry"`
CSVConfig *CSVConfig `json:"csv"`
Expand All @@ -777,6 +830,29 @@ type SinkConfig struct {
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
AdvanceTimeoutInSec *uint `json:"advance_timeout,omitempty"`
=======
Protocol *string `json:"protocol,omitempty"`
SchemaRegistry *string `json:"schema_registry,omitempty"`
CSVConfig *CSVConfig `json:"csv,omitempty"`
DispatchRules []*DispatchRule `json:"dispatchers,omitempty"`
ColumnSelectors []*ColumnSelector `json:"column_selectors,omitempty"`
TxnAtomicity *string `json:"transaction_atomicity,omitempty"`
EncoderConcurrency *int `json:"encoder_concurrency,omitempty"`
Terminator *string `json:"terminator,omitempty"`
DateSeparator *string `json:"date_separator,omitempty"`
EnablePartitionSeparator *bool `json:"enable_partition_separator,omitempty"`
FileIndexWidth *int `json:"file_index_width,omitempty"`
EnableKafkaSinkV2 *bool `json:"enable_kafka_sink_v2,omitempty"`
OnlyOutputUpdatedColumns *bool `json:"only_output_updated_columns,omitempty"`
DeleteOnlyOutputHandleKeyColumns *bool `json:"delete_only_output_handle_key_columns"`
ContentCompatible *bool `json:"content_compatible"`
SafeMode *bool `json:"safe_mode,omitempty"`
KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"`
PulsarConfig *PulsarConfig `json:"pulsar_config,omitempty"`
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
AdvanceTimeoutInSec *uint `json:"advance_timeout,omitempty"`
>>>>>>> 4a3762cdc5 (codec(ticdc): canal-json support compatible content by output detailed mysql type information (#10014))
}

// CSVConfig denotes the csv config
Expand Down
12 changes: 12 additions & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,24 @@ var defaultAPIConfig = &ReplicaConfig{
NullString: config.NULL,
BinaryEncodingMethod: config.BinaryEncodingBase64,
},
<<<<<<< HEAD
EncoderConcurrency: 16,
Terminator: config.CRLF,
DateSeparator: config.DateSeparatorDay.String(),
EnablePartitionSeparator: true,
EnableKafkaSinkV2: false,
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
=======
EncoderConcurrency: util.AddressOf(config.DefaultEncoderGroupConcurrency),
Terminator: util.AddressOf(config.CRLF),
DateSeparator: util.AddressOf(config.DateSeparatorDay.String()),
EnablePartitionSeparator: util.AddressOf(true),
EnableKafkaSinkV2: util.AddressOf(false),
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
ContentCompatible: util.AddressOf(false),
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
>>>>>>> 4a3762cdc5 (codec(ticdc): canal-json support compatible content by output detailed mysql type information (#10014))
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
75 changes: 75 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,81 @@ func (info *ChangeFeedInfo) VerifyAndComplete() {
if info.Config.ChangefeedErrorStuckDuration == nil {
info.Config.ChangefeedErrorStuckDuration = defaultConfig.ChangefeedErrorStuckDuration
}
<<<<<<< HEAD
=======
if info.Config.SQLMode == "" {
info.Config.SQLMode = defaultConfig.SQLMode
}
info.RmUnusedFields()
}

// RmUnusedFields removes unnecessary fields based on the downstream type and
// the protocol. Since we utilize a common changefeed configuration template,
// certain fields may not be utilized for certain protocols.
func (info *ChangeFeedInfo) RmUnusedFields() {
uri, err := url.Parse(info.SinkURI)
if err != nil {
log.Warn(
"failed to parse the sink uri",
zap.Error(err),
zap.Any("sinkUri", info.SinkURI),
)
return
}
if !sink.IsMQScheme(uri.Scheme) {
info.rmMQOnlyFields()
} else {
// remove schema registry for MQ downstream with
// protocol other than avro
if util.GetOrZero(info.Config.Sink.Protocol) != config.ProtocolAvro.String() {
info.Config.Sink.SchemaRegistry = nil
}
}

if !sink.IsStorageScheme(uri.Scheme) {
info.rmStorageOnlyFields()
}

if !sink.IsMySQLCompatibleScheme(uri.Scheme) {
info.rmDBOnlyFields()
} else {
// remove fields only being used by MQ and Storage downstream
info.Config.Sink.Protocol = nil
info.Config.Sink.Terminator = nil
}
}

func (info *ChangeFeedInfo) rmMQOnlyFields() {
log.Info("since the downstream is not a MQ, remove MQ only fields",
zap.String("namespace", info.Namespace),
zap.String("changefeed", info.ID))
info.Config.Sink.DispatchRules = nil
info.Config.Sink.SchemaRegistry = nil
info.Config.Sink.EncoderConcurrency = nil
info.Config.Sink.EnableKafkaSinkV2 = nil
info.Config.Sink.OnlyOutputUpdatedColumns = nil
info.Config.Sink.DeleteOnlyOutputHandleKeyColumns = nil
info.Config.Sink.ContentCompatible = nil
info.Config.Sink.KafkaConfig = nil
}

func (info *ChangeFeedInfo) rmStorageOnlyFields() {
info.Config.Sink.CSVConfig = nil
info.Config.Sink.DateSeparator = nil
info.Config.Sink.EnablePartitionSeparator = nil
info.Config.Sink.FileIndexWidth = nil
info.Config.Sink.CloudStorageConfig = nil
}

func (info *ChangeFeedInfo) rmDBOnlyFields() {
info.Config.EnableSyncPoint = nil
info.Config.BDRMode = nil
info.Config.SyncPointInterval = nil
info.Config.SyncPointRetention = nil
info.Config.Consistent = nil
info.Config.Sink.SafeMode = nil
info.Config.Sink.MySQLConfig = nil
>>>>>>> 4a3762cdc5 (codec(ticdc): canal-json support compatible content by output detailed mysql type information (#10014))
}

// FixIncompatible fixes incompatible changefeed meta info.
Expand Down
3 changes: 3 additions & 0 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ func (ti *TableInfo) initColumnsFlag() {
if mysql.HasUnsignedFlag(colInfo.GetFlag()) {
flag.SetIsUnsigned()
}
if mysql.HasZerofillFlag(colInfo.GetFlag()) {
flag.SetZeroFill()
}
ti.ColumnsFlag[colInfo.ID] = flag
}

Expand Down
12 changes: 12 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,20 @@ const (
NullableFlag
// UnsignedFlag means the column stores an unsigned integer
UnsignedFlag
// ZerofillFlag means the column is zerofill
ZerofillFlag
)

// SetZeroFill sets ZerofillFlag
func (b *ColumnFlagType) SetZeroFill() {
(*util.Flag)(b).Add(util.Flag(ZerofillFlag))
}

// IsZerofill shows whether ZerofillFlag is set
func (b *ColumnFlagType) IsZerofill() bool {
return (*util.Flag)(b).HasAll(util.Flag(ZerofillFlag))
}

// SetIsBinary sets BinaryFlag
func (b *ColumnFlagType) SetIsBinary() {
(*util.Flag)(b).Add(util.Flag(BinaryFlag))
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/dmlsink/mq/mq_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"testing"
"time"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
Expand Down Expand Up @@ -83,7 +86,8 @@ func TestWriteEvents(t *testing.T) {
row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
}

events := make([]*dmlsink.CallbackableEvent[*model.SingleTableTxn], 0, 3000)
Expand Down
33 changes: 23 additions & 10 deletions cdc/sink/dmlsink/mq/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"testing"
"time"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
Expand Down Expand Up @@ -72,7 +75,8 @@ func TestNonBatchEncode_SendMessages(t *testing.T) {
row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
}
tableStatus := state.TableSinkSinking

Expand Down Expand Up @@ -343,7 +347,8 @@ func TestBatchEncode_SendMessages(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -355,7 +360,8 @@ func TestBatchEncode_SendMessages(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -367,7 +373,8 @@ func TestBatchEncode_SendMessages(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "cc"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -379,7 +386,8 @@ func TestBatchEncode_SendMessages(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "aa", Table: "bb"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -391,7 +399,8 @@ func TestBatchEncode_SendMessages(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "aaa", Table: "bbb"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -403,7 +412,8 @@ func TestBatchEncode_SendMessages(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "aaa", Table: "bbb"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &tableStatus,
Expand Down Expand Up @@ -483,7 +493,8 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand All @@ -495,7 +506,8 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand All @@ -507,7 +519,8 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "cc"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &stoppedStatus,
Expand Down
Loading

0 comments on commit 880af81

Please sign in to comment.