Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config(ticdc): Fix old value configuration check for maxwell protocol (#3747) #3782

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,26 @@ func (p *Protocol) FromString(protocol string) {
}
}

// String converts the Protocol enum type string to string.
func (p Protocol) String() string {
switch p {
case ProtocolDefault:
return "default"
case ProtocolCanal:
return "canal"
case ProtocolAvro:
return "avro"
case ProtocolMaxwell:
return "maxwell"
case ProtocolCanalJSON:
return "canal-json"
case ProtocolCraft:
return "craft"
default:
panic("unreachable")
}
}

// NewEventBatchEncoder returns a function of creating an EventBatchEncoder by protocol.
func NewEventBatchEncoder(p Protocol) func() EventBatchEncoder {
switch p {
Expand Down
8 changes: 5 additions & 3 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package sink

import (
"context"
"fmt"
"net/url"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -107,9 +108,10 @@ func newMqSink(
avroEncoder.SetTimeZone(util.TimezoneFromCtx(ctx))
return avroEncoder
}
} else if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON) && !config.EnableOldValue {
log.Error("Old value is not enabled when using Canal protocol. Please update changefeed config")
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("Canal requires old value to be enabled"))
} else if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON || protocol == codec.ProtocolMaxwell) && !config.EnableOldValue {
log.Error(fmt.Sprintf("Old value is not enabled when using `%s` protocol. "+
"Please update changefeed config", protocol.String()))
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New(fmt.Sprintf("%s protocol requires old value to be enabled", protocol.String())))
}

// pre-flight verification of encoder parameters
Expand Down
8 changes: 6 additions & 2 deletions pkg/cmd/cli/cli_changefeed_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
// forceEnableOldValueProtocols specifies which protocols need to be forced to enable old value.
var forceEnableOldValueProtocols = []string{
"canal",
"canal-json",
"maxwell",
}

Expand Down Expand Up @@ -205,9 +206,12 @@ func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Co
}

protocol := sinkURIParsed.Query().Get("protocol")
if protocol != "" {
cfg.Sink.Protocol = protocol
}
for _, fp := range forceEnableOldValueProtocols {
if protocol == fp {
log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", protocol))
if cfg.Sink.Protocol == fp {
log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", cfg.Sink.Protocol))
cfg.EnableOldValue = true
break
}
Expand Down