From c9d276fd0db314a24a644f899d45ac30b76632dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=89=8B=E6=8E=89=E5=8C=85=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E5=B8=88?= Date: Sun, 7 May 2023 15:44:56 +0800 Subject: [PATCH] pkg/sink(ticdc): iterate all Kafka configs to support KOP (#8893) close pingcap/tiflow#8892 --- pkg/sink/kafka/admin.go | 21 +++++++++++++-------- pkg/sink/kafka/v2/admin.go | 20 ++++++++++++-------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/pkg/sink/kafka/admin.go b/pkg/sink/kafka/admin.go index 2cfe7e33291..c5d7533ca35 100644 --- a/pkg/sink/kafka/admin.go +++ b/pkg/sink/kafka/admin.go @@ -179,16 +179,21 @@ func (a *saramaAdminClient) GetBrokerConfig( return "", err } - if len(configEntries) == 0 || configEntries[0].Name != configName { - log.Warn("Kafka config item not found", - zap.String("namespace", a.changefeed.Namespace), - zap.String("changefeed", a.changefeed.ID), - zap.String("configName", configName)) - return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack( - "cannot find the `%s` from the broker's configuration", configName) + // For compatibility with KOP, we checked all return values. + // 1. Kafka only returns requested configs. + // 2. Kop returns all configs. + for _, entry := range configEntries { + if entry.Name == configName { + return entry.Value, nil + } } - return configEntries[0].Value, nil + log.Warn("Kafka config item not found", + zap.String("namespace", a.changefeed.Namespace), + zap.String("changefeed", a.changefeed.ID), + zap.String("configName", configName)) + return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack( + "cannot find the `%s` from the broker's configuration", configName) } func (a *saramaAdminClient) GetTopicsPartitions(_ context.Context) (map[string]int32, error) { diff --git a/pkg/sink/kafka/v2/admin.go b/pkg/sink/kafka/v2/admin.go index c1c3a66d3e6..247adeb4f4f 100644 --- a/pkg/sink/kafka/v2/admin.go +++ b/pkg/sink/kafka/v2/admin.go @@ -96,21 +96,25 @@ func (a *admin) GetBrokerConfig(ctx context.Context, configName string) (string, } if len(resp.Resources) == 0 || len(resp.Resources[0].ConfigEntries) == 0 { - log.Warn("kafka config item not found", + log.Warn("Kafka config item not found", zap.String("configName", configName)) return "", errors.ErrKafkaBrokerConfigNotFound.GenWithStack( "cannot find the `%s` from the broker's configuration", configName) } - entry := resp.Resources[0].ConfigEntries[0] - if entry.ConfigName != configName { - log.Warn("kafka config item not found", - zap.String("configName", configName)) - return "", errors.ErrKafkaBrokerConfigNotFound.GenWithStack( - "cannot find the `%s` from the broker's configuration", configName) + // For compatibility with KOP, we checked all return values. + // 1. Kafka only returns requested configs. + // 2. Kop returns all configs. + for _, entry := range resp.Resources[0].ConfigEntries { + if entry.ConfigName == configName { + return entry.ConfigValue, nil + } } - return entry.ConfigValue, nil + log.Warn("Kafka config item not found", + zap.String("configName", configName)) + return "", errors.ErrKafkaBrokerConfigNotFound.GenWithStack( + "cannot find the `%s` from the broker's configuration", configName) } func (a *admin) GetTopicsPartitions(ctx context.Context) (map[string]int32, error) {