Skip to content

Commit

Permalink
sink/mq(ticdc): Add support for Confluent Cloud Kafka (#5553) (#5594)
Browse files Browse the repository at this point in the history
close #5512
  • Loading branch information
ti-chi-bot authored May 26, 2022
1 parent 379053b commit 39eea6a
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 40 deletions.
11 changes: 6 additions & 5 deletions cdc/sink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type partitionDispatchRule int

const (
partitionDispatchRuleDefault partitionDispatchRule = iota
partitionDispatchRuleRowID
partitionDispatchRuleTS
partitionDispatchRuleTable
partitionDispatchRuleIndexValue
Expand All @@ -51,17 +50,19 @@ func (r *partitionDispatchRule) fromString(rule string) {
switch strings.ToLower(rule) {
case "default":
*r = partitionDispatchRuleDefault
case "rowid":
*r = partitionDispatchRuleRowID
case "ts":
*r = partitionDispatchRuleTS
case "table":
*r = partitionDispatchRuleTable
case "rowid":
*r = partitionDispatchRuleIndexValue
log.Warn("rowid is deprecated, please use index-value instead.")
case "index-value":
*r = partitionDispatchRuleIndexValue
default:
*r = partitionDispatchRuleDefault
log.Warn("can't support dispatch rule, using default rule", zap.String("rule", rule))
log.Warn("the partition dispatch rule is not default/ts/table/index-value," +
" use the default rule instead.")
}
}

Expand Down Expand Up @@ -230,7 +231,7 @@ func getPartitionDispatcher(
)
rule.fromString(ruleConfig.PartitionRule)
switch rule {
case partitionDispatchRuleRowID, partitionDispatchRuleIndexValue:
case partitionDispatchRuleIndexValue:
if enableOldValue {
log.Warn("This index-value distribution mode " +
"does not guarantee row-level orderliness when " +
Expand Down
90 changes: 71 additions & 19 deletions cdc/sink/mq/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kafka

import (
"context"
"crypto/tls"
"net/url"
"strconv"
"strings"
Expand Down Expand Up @@ -42,6 +43,7 @@ type Config struct {
MaxMessageBytes int
Compression string
ClientID string
EnableTLS bool
Credential *security.Credential
SASL *security.SASL
// control whether to create topic
Expand Down Expand Up @@ -145,21 +147,6 @@ func (c *Config) Apply(sinkURI *url.URL) error {

c.ClientID = params.Get("kafka-client-id")

s = params.Get("ca")
if s != "" {
c.Credential.CAPath = s
}

s = params.Get("cert")
if s != "" {
c.Credential.CertPath = s
}

s = params.Get("key")
if s != "" {
c.Credential.KeyPath = s
}

s = params.Get("auto-create-topic")
if s != "" {
autoCreate, err := strconv.ParseBool(s)
Expand Down Expand Up @@ -201,6 +188,60 @@ func (c *Config) Apply(sinkURI *url.URL) error {
return err
}

err = c.applyTLS(params)
if err != nil {
return err
}

return nil
}

func (c *Config) applyTLS(params url.Values) error {
s := params.Get("ca")
if s != "" {
c.Credential.CAPath = s
}

s = params.Get("cert")
if s != "" {
c.Credential.CertPath = s
}

s = params.Get("key")
if s != "" {
c.Credential.KeyPath = s
}

if c.Credential != nil && !c.Credential.IsEmpty() &&
!c.Credential.IsTLSEnabled() {
return cerror.WrapError(cerror.ErrKafkaInvalidConfig,
errors.New("ca, cert and key files should all be supplied"))
}

// if enable-tls is not set, but credential files are set,
// then tls should be enabled, and the self-signed CA certificate is used.
// if enable-tls is set to true, and credential files are not set,
// then tls should be enabled, and the trusted CA certificate on OS is used.
// if enable-tls is set to false, and credential files are set,
// then an error is returned.
s = params.Get("enable-tls")
if s != "" {
enableTLS, err := strconv.ParseBool(s)
if err != nil {
return err
}

if c.Credential != nil && c.Credential.IsTLSEnabled() && !enableTLS {
return cerror.WrapError(cerror.ErrKafkaInvalidConfig,
errors.New("credential files are supplied, but 'enable-tls' is set to false"))
}
c.EnableTLS = enableTLS
} else {
if c.Credential != nil && c.Credential.IsTLSEnabled() {
c.EnableTLS = true
}
}

return nil
}

Expand Down Expand Up @@ -368,11 +409,22 @@ func NewSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config.Producer.Compression = sarama.CompressionNone
}

if c.Credential != nil && len(c.Credential.CAPath) != 0 {
if c.EnableTLS {
// for SSL encryption with a trust CA certificate, we must populate the
// following two params of config.Net.TLS
config.Net.TLS.Enable = true
config.Net.TLS.Config, err = c.Credential.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
config.Net.TLS.Config = &tls.Config{
MinVersion: tls.VersionTLS12,
NextProtos: []string{"h2", "http/1.1"},
}

// for SSL encryption with self-signed CA certificate, we reassign the
// config.Net.TLS.Config using the relevant credential files.
if c.Credential != nil && c.Credential.IsTLSEnabled() {
config.Net.TLS.Config, err = c.Credential.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
}
}
}

Expand Down
71 changes: 70 additions & 1 deletion cdc/sink/mq/producer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ func TestNewSaramaConfig(t *testing.T) {
require.Equal(t, cc.expected, cfg.Producer.Compression)
}

config.EnableTLS = true
config.Credential = &security.Credential{
CAPath: "/invalid/ca/path",
CAPath: "/invalid/ca/path",
CertPath: "/invalid/cert/path",
KeyPath: "/invalid/key/path",
}
_, err = NewSaramaConfig(ctx, config)
require.Regexp(t, ".*no such file or directory", errors.Cause(err))
Expand Down Expand Up @@ -495,6 +498,72 @@ func TestApplySASL(t *testing.T) {
}
}

func TestApplyTLS(t *testing.T) {
t.Parallel()

tests := []struct {
name string
URI string
tlsEnabled bool
exceptErr string
}{
{
name: "tls config with 'enable-tls' set to true",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-user=user&sasl-password=password&sasl-mechanism=plain&enable-tls=true",
tlsEnabled: true,
exceptErr: "",
},
{
name: "tls config with no 'enable-tls', and credential files are supplied",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-user=user&sasl-password=password&sasl-mechanism=plain" +
"&ca=/root/ca.file&cert=/root/cert.file&key=/root/key.file",
tlsEnabled: true,
exceptErr: "",
},
{
name: "tls config with no 'enable-tls', and credential files are not supplied",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-user=user&sasl-password=password&sasl-mechanism=plain",
tlsEnabled: false,
exceptErr: "",
},
{
name: "tls config with 'enable-tls' set to false, and credential files are supplied",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-user=user&sasl-password=password&sasl-mechanism=plain&enable-tls=false" +
"&ca=/root/ca&cert=/root/cert&key=/root/key",
tlsEnabled: false,
exceptErr: "credential files are supplied, but 'enable-tls' is set to false",
},
{
name: "tls config with 'enable-tls' set to true, and some of " +
"the credential files are not supplied ",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-user=user&sasl-password=password&sasl-mechanism=plain&enable-tls=true" +
"&ca=/root/ca&cert=/root/cert&",
tlsEnabled: false,
exceptErr: "ca, cert and key files should all be supplied",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
cfg := NewConfig()
sinkURI, err := url.Parse(test.URI)
require.Nil(t, err)
if test.exceptErr == "" {
require.Nil(t, cfg.applyTLS(sinkURI.Query()))
} else {
require.Regexp(t, test.exceptErr, cfg.applyTLS(sinkURI.Query()).Error())
}
require.Equal(t, test.tlsEnabled, cfg.EnableTLS)
})
}
}

func TestCompleteSaramaSASLConfig(t *testing.T) {
t.Parallel()

Expand Down
9 changes: 7 additions & 2 deletions cdc/sink/mq/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,10 @@ func validateMinInsyncReplicas(

minInsyncReplicasStr, exists, err := minInsyncReplicasConfigGetter()
if err != nil {
// 'min.insync.replica' is invisible to us in Confluent Cloud Kafka.
if cerror.ErrKafkaBrokerConfigNotFound.Equal(err) {
return nil
}
return err
}
minInsyncReplicas, err := strconv.Atoi(minInsyncReplicasStr)
Expand Down Expand Up @@ -551,8 +555,9 @@ func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (s
}

if len(configEntries) == 0 || configEntries[0].Name != brokerConfigName {
return "", errors.New(fmt.Sprintf(
"cannot find the `%s` from the broker's configuration", brokerConfigName))
log.Warn("Kafka config item not found", zap.String("configName", brokerConfigName))
return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", brokerConfigName)
}

return configEntries[0].Value, nil
Expand Down
19 changes: 12 additions & 7 deletions cdc/sink/mq/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,23 +328,28 @@ func TestAdjustConfigMinInsyncReplicas(t *testing.T) {
)

// topic not exist, and `min.insync.replicas` not found in broker's configuration
adminClient.DropBrokerConfig()
adminClient.DropBrokerConfig(kafka.MinInsyncReplicasConfigName)
topicName := "no-topic-no-min-insync-replicas"
err = AdjustConfig(adminClient, config, saramaConfig, "no-topic-no-min-insync-replicas")
require.Regexp(t, ".*cannot find the `min.insync.replicas` from the broker's configuration",
errors.Cause(err))
require.Nil(t, err)
err = adminClient.CreateTopic(topicName, &sarama.TopicDetail{ReplicationFactor: 1}, false)
require.Regexp(t, ".*kafka server: Request parameters do not satisfy the configured policy.",
err.Error())

// Report an error if the replication-factor is less than min.insync.replicas
// when the topic does exist.
saramaConfig, err = NewSaramaConfig(context.Background(), config)
require.Nil(t, err)

// topic exist, but `min.insync.replicas` not found in topic and broker configuration
topicName := "topic-no-config-entry"
err = adminClient.CreateTopic(topicName, &sarama.TopicDetail{}, false)
topicName = "topic-no-config-entry"
err = adminClient.CreateTopic(topicName, &sarama.TopicDetail{
ReplicationFactor: 3,
NumPartitions: 3,
}, false)
require.Nil(t, err)
err = AdjustConfig(adminClient, config, saramaConfig, topicName)
require.Regexp(t, ".*cannot find the `min.insync.replicas` from the broker's configuration",
errors.Cause(err))
require.Nil(t, err)

// topic found, and have `min.insync.replicas`, but set to 2, larger than `replication-factor`.
adminClient.SetMinInsyncReplicas("2")
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,11 @@ error = '''
kafka async send message failed
'''

["CDC:ErrKafkaBrokerConfigNotFound"]
error = '''
kafka broker config item not found
'''

["CDC:ErrKafkaCreateTopic"]
error = '''
kafka create topic failed
Expand Down
4 changes: 4 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ var (
"invalid topic expression",
errors.RFCCodeText("CDC:ErrKafkaTopicExprInvalid"),
)
ErrKafkaBrokerConfigNotFound = errors.Normalize(
"kafka broker config item not found",
errors.RFCCodeText("CDC:ErrKafkaBrokerConfigNotFound"),
)
ErrPulsarNewProducer = errors.Normalize(
"new pulsar producer",
errors.RFCCodeText("CDC:ErrPulsarNewProducer"),
Expand Down
29 changes: 27 additions & 2 deletions pkg/kafka/cluster_admin_client_mock_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
DefaultMockPartitionNum = 3
// defaultMockControllerID specifies the default mock controller ID.
defaultMockControllerID = 1
// topic replication factor must be 3 for Confluent Cloud Kafka.
defaultReplicationFactor = 3
)

const (
Expand Down Expand Up @@ -110,6 +112,20 @@ func (c *ClusterAdminClientMockImpl) DescribeConfig(resource sarama.ConfigResour

// CreateTopic adds topic into map.
func (c *ClusterAdminClientMockImpl) CreateTopic(topic string, detail *sarama.TopicDetail, _ bool) error {
minInsyncReplicaConfigFound := false

for _, config := range c.brokerConfigs {
if config.Name == MinInsyncReplicasConfigName {
minInsyncReplicaConfigFound = true
}
}
// For Confluent Cloud, min.insync.replica is invisible and replication factor must be 3.
// Otherwise, ErrPolicyViolation is expected to be returned.
if !minInsyncReplicaConfigFound &&
detail.ReplicationFactor != defaultReplicationFactor {
return sarama.ErrPolicyViolation
}

c.topics[topic] = *detail
return nil
}
Expand Down Expand Up @@ -148,6 +164,15 @@ func (c *ClusterAdminClientMockImpl) GetTopicMaxMessageBytes() int {
}

// DropBrokerConfig remove all broker level configuration for test purpose.
func (c *ClusterAdminClientMockImpl) DropBrokerConfig() {
c.brokerConfigs = c.brokerConfigs[:0]
func (c *ClusterAdminClientMockImpl) DropBrokerConfig(configName string) {
targetIdx := 0
for i, config := range c.brokerConfigs {
if config.Name == configName {
targetIdx = i
}
}

if targetIdx != 0 {
c.brokerConfigs = append(c.brokerConfigs[:targetIdx], c.brokerConfigs[targetIdx+1:]...)
}
}
Loading

0 comments on commit 39eea6a

Please sign in to comment.