diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index d97c5965119..7ddb9f3c51b 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -177,6 +177,26 @@ func (p *Protocol) FromString(protocol string) { } } +// String converts the Protocol enum type string to string. +func (p Protocol) String() string { + switch p { + case ProtocolDefault: + return "default" + case ProtocolCanal: + return "canal" + case ProtocolAvro: + return "avro" + case ProtocolMaxwell: + return "maxwell" + case ProtocolCanalJSON: + return "canal-json" + case ProtocolCraft: + return "craft" + default: + panic("unreachable") + } +} + type EncoderBuilder interface { Build(ctx context.Context) (EventBatchEncoder, error) } diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index df5128ea42f..db1b78d1b02 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -15,6 +15,7 @@ package sink import ( "context" + "fmt" "net/url" "strings" "sync/atomic" @@ -68,9 +69,10 @@ func newMqSink( ) (*mqSink, error) { var protocol codec.Protocol protocol.FromString(config.Sink.Protocol) - if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON) && !config.EnableOldValue { - log.Error("Old value is not enabled when using Canal protocol. Please update changefeed config") - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("Canal requires old value to be enabled")) + if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON || protocol == codec.ProtocolMaxwell) && !config.EnableOldValue { + log.Error(fmt.Sprintf("Old value is not enabled when using `%s` protocol. "+ + "Please update changefeed config", protocol.String())) + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New(fmt.Sprintf("%s protocol requires old value to be enabled", protocol.String()))) } encoderBuilder, err := codec.NewEventBatchEncoderBuilder(protocol, credential, opts) diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index 47f4bac173a..ddf4a04229b 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -46,6 +46,7 @@ import ( // forceEnableOldValueProtocols specifies which protocols need to be forced to enable old value. var forceEnableOldValueProtocols = []string{ "canal", + "canal-json", "maxwell", } @@ -205,9 +206,12 @@ func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Co } protocol := sinkURIParsed.Query().Get("protocol") + if protocol != "" { + cfg.Sink.Protocol = protocol + } for _, fp := range forceEnableOldValueProtocols { - if protocol == fp { - log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", protocol)) + if cfg.Sink.Protocol == fp { + log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", cfg.Sink.Protocol)) cfg.EnableOldValue = true break }