From c93676b8337c800f5ae755e0d9859cfada3d0793 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Thu, 14 Oct 2021 11:13:26 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #3002 Signed-off-by: ti-chi-bot --- cdc/sink/codec/interface.go | 10 +- cdc/sink/codec/json.go | 2 +- cdc/sink/codec/json_test.go | 21 ++- cdc/sink/mq.go | 92 +------------ cdc/sink/producer/kafka/kafka.go | 112 +++++++++++++++- cdc/sink/producer/kafka/kafka_test.go | 180 +++++++++++++++++++++++++- 6 files changed, 310 insertions(+), 107 deletions(-) diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index 7e2ccd31745..b8394cda29c 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -14,6 +14,7 @@ package codec import ( + "encoding/binary" "strings" "time" @@ -63,9 +64,16 @@ type MQMessage struct { Protocol Protocol // protocol } +// maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client. +// reference: https://github.com/Shopify/sarama/blob/66521126c71c522c15a36663ae9cddc2b024c799/async_producer.go#L233 +// for TiCDC, minimum supported kafka version is `0.11.0.2`, which will be treated as `version = 2` by sarama producer. +const maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1 + // Length returns the expected size of the Kafka message +// We didn't append any `Headers` when send the message, so ignore the calculations related to it. +// If `ProducerMessage` Headers fields used, this method should also adjust. func (m *MQMessage) Length() int { - return len(m.Key) + len(m.Value) + return len(m.Key) + len(m.Value) + maximumRecordOverhead } // PhysicalTime returns physical time part of Ts in time.Time diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index edacbd7878f..16a36da13a8 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -427,7 +427,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) if message.Length() > d.maxKafkaMessageSize { // `len(d.messageBuf) == 1` is implied - log.Warn("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.", + log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.", zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxKafkaMessageSize)) } d.curBatchSize++ diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index 98d56739a8e..95b6bbb561b 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -265,8 +265,6 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) { func (s *batchSuite) TestMaxMessageBytes(c *check.C) { defer testleak.AfterTest(c)() encoder := NewJSONEventBatchEncoder() - err := encoder.SetParams(map[string]string{"max-message-bytes": "256"}) - c.Check(err, check.IsNil) testEvent := &model.RowChangedEvent{ CommitTs: 1, @@ -274,13 +272,30 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) { Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, } - for i := 0; i < 10000; i++ { + // make producer's `max-message-bytes` must less than event, but we should still send it as possible. + err := encoder.SetParams(map[string]string{"max-message-bytes": "1"}) + c.Check(err, check.IsNil) + for i := 0; i < 100; i++ { r, err := encoder.AppendRowChangedEvent(testEvent) c.Check(r, check.Equals, EncoderNoOperation) c.Check(err, check.IsNil) } + // one message per batch, and can be build, which means the producer will try to send it. messages := encoder.Build() + c.Assert(len(messages), check.Equals, 100) + + // make sure each batch's `Length` not greater than `max-message-bytes` + err = encoder.SetParams(map[string]string{"max-message-bytes": "256"}) + c.Check(err, check.IsNil) + + for i := 0; i < 10000; i++ { + r, err := encoder.AppendRowChangedEvent(testEvent) + c.Check(r, check.Equals, EncoderNoOperation) + c.Check(err, check.IsNil) + } + + messages = encoder.Build() for _, msg := range messages { c.Assert(msg.Length(), check.LessEqual, 256) } diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index ef3239365cd..a2a47f7fa66 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -16,7 +16,6 @@ package sink import ( "context" "net/url" - "strconv" "strings" "sync/atomic" "time" @@ -399,99 +398,14 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage, } func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) { - config := kafka.NewKafkaConfig() - scheme := strings.ToLower(sinkURI.Scheme) if scheme != "kafka" && scheme != "kafka+ssl" { return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("can't create MQ sink with unsupported scheme: %s", scheme) } - s := sinkURI.Query().Get("partition-num") - if s != "" { - c, err := strconv.Atoi(s) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) - } - config.PartitionNum = int32(c) - } - - s = sinkURI.Query().Get("replication-factor") - if s != "" { - c, err := strconv.Atoi(s) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) - } - config.ReplicationFactor = int16(c) - } - - s = sinkURI.Query().Get("kafka-version") - if s != "" { - config.Version = s - } - - s = sinkURI.Query().Get("max-message-bytes") - if s != "" { - c, err := strconv.Atoi(s) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) - } - config.MaxMessageBytes = c - opts["max-message-bytes"] = s - } - - s = sinkURI.Query().Get("max-batch-size") - if s != "" { - opts["max-batch-size"] = s - } - - s = sinkURI.Query().Get("compression") - if s != "" { - config.Compression = s - } - config.ClientID = sinkURI.Query().Get("kafka-client-id") - - s = sinkURI.Query().Get("protocol") - if s != "" { - replicaConfig.Sink.Protocol = s - } - - s = sinkURI.Query().Get("ca") - if s != "" { - config.Credential.CAPath = s - } - - s = sinkURI.Query().Get("cert") - if s != "" { - config.Credential.CertPath = s - } - - s = sinkURI.Query().Get("key") - if s != "" { - config.Credential.KeyPath = s - } - - s = sinkURI.Query().Get("sasl-user") - if s != "" { - config.SaslScram.SaslUser = s - } - - s = sinkURI.Query().Get("sasl-password") - if s != "" { - config.SaslScram.SaslPassword = s - } - - s = sinkURI.Query().Get("sasl-mechanism") - if s != "" { - config.SaslScram.SaslMechanism = s - } - - s = sinkURI.Query().Get("auto-create-topic") - if s != "" { - autoCreate, err := strconv.ParseBool(s) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) - } - config.TopicPreProcess = autoCreate + config := kafka.NewConfig() + if err := config.Initialize(sinkURI, replicaConfig, opts); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool { diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index d810e770a8d..8520c631abb 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -16,7 +16,9 @@ package kafka import ( "context" "fmt" + "net/url" "regexp" + "strconv" "strings" "sync" "sync/atomic" @@ -27,6 +29,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/sink/codec" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/notify" "github.com/pingcap/ticdc/pkg/security" @@ -51,9 +54,9 @@ type Config struct { TopicPreProcess bool } -// NewKafkaConfig returns a default Kafka configuration -func NewKafkaConfig() Config { - return Config{ +// NewConfig returns a default Kafka configuration +func NewConfig() *Config { + return &Config{ Version: "2.4.0", MaxMessageBytes: 512 * 1024 * 1024, // 512M ReplicationFactor: 1, @@ -64,6 +67,103 @@ func NewKafkaConfig() Config { } } +func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) error { + s := sinkURI.Query().Get("partition-num") + if s != "" { + a, err := strconv.Atoi(s) + if err != nil { + return err + } + c.PartitionNum = int32(a) + } + + s = sinkURI.Query().Get("replication-factor") + if s != "" { + a, err := strconv.Atoi(s) + if err != nil { + return err + } + c.ReplicationFactor = int16(a) + } + + s = sinkURI.Query().Get("kafka-version") + if s != "" { + c.Version = s + } + + s = sinkURI.Query().Get("max-message-bytes") + if s != "" { + a, err := strconv.Atoi(s) + if err != nil { + return err + } + // `MaxMessageBytes` is set to `512 mb` by default, but it's still possible that a larger value expected. + // TiCDC should send the message at best. + if a > c.MaxMessageBytes { + c.MaxMessageBytes = a + } + opts["max-message-bytes"] = s + } + + s = sinkURI.Query().Get("max-batch-size") + if s != "" { + opts["max-batch-size"] = s + } + + s = sinkURI.Query().Get("compression") + if s != "" { + c.Compression = s + } + + c.ClientID = sinkURI.Query().Get("kafka-client-id") + + s = sinkURI.Query().Get("protocol") + if s != "" { + replicaConfig.Sink.Protocol = s + } + + s = sinkURI.Query().Get("ca") + if s != "" { + c.Credential.CAPath = s + } + + s = sinkURI.Query().Get("cert") + if s != "" { + c.Credential.CertPath = s + } + + s = sinkURI.Query().Get("key") + if s != "" { + c.Credential.KeyPath = s + } + + s = sinkURI.Query().Get("sasl-user") + if s != "" { + c.SaslScram.SaslUser = s + } + + s = sinkURI.Query().Get("sasl-password") + if s != "" { + c.SaslScram.SaslPassword = s + } + + s = sinkURI.Query().Get("sasl-mechanism") + if s != "" { + c.SaslScram.SaslMechanism = s + } + + s = sinkURI.Query().Get("auto-create-topic") + if s != "" { + autoCreate, err := strconv.ParseBool(s) + if err != nil { + return err + } + c.TopicPreProcess = autoCreate + } + + return nil +} + type kafkaSaramaProducer struct { // clientLock is used to protect concurrent access of asyncClient and syncClient. // Since we don't close these two clients (which have an input chan) from the @@ -267,7 +367,7 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { // kafkaTopicPreProcess gets partition number from existing topic, if topic doesn't // exit, creates it automatically. -func kafkaTopicPreProcess(topic, address string, config Config, cfg *sarama.Config) (int32, error) { +func kafkaTopicPreProcess(topic, address string, config *Config, cfg *sarama.Config) (int32, error) { admin, err := sarama.NewClusterAdmin(strings.Split(address, ","), cfg) if err != nil { return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) @@ -318,7 +418,7 @@ func kafkaTopicPreProcess(topic, address string, config Config, cfg *sarama.Conf var newSaramaConfigImpl = newSaramaConfig // NewKafkaSaramaProducer creates a kafka sarama producer -func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config Config, errCh chan error) (*kafkaSaramaProducer, error) { +func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) { log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config)) cfg, err := newSaramaConfigImpl(ctx, config) if err != nil { @@ -400,7 +500,7 @@ func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) ( } // NewSaramaConfig return the default config and set the according version and metrics -func newSaramaConfig(ctx context.Context, c Config) (*sarama.Config, error) { +func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { config := sarama.NewConfig() version, err := sarama.ParseKafkaVersion(c.Version) diff --git a/cdc/sink/producer/kafka/kafka_test.go b/cdc/sink/producer/kafka/kafka_test.go index d54d31c5676..0feeff429c0 100644 --- a/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/sink/producer/kafka/kafka_test.go @@ -15,6 +15,9 @@ package kafka import ( "context" + "fmt" + "net/url" + "strconv" "sync" "testing" "time" @@ -23,6 +26,7 @@ import ( "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/ticdc/cdc/sink/codec" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/security" "github.com/pingcap/ticdc/pkg/util" @@ -62,6 +66,51 @@ func (s *kafkaSuite) TestClientID(c *check.C) { } } +func (s *kafkaSuite) TestInitializeConfig(c *check.C) { + defer testleak.AfterTest(c) + cfg := NewConfig() + + uriTemplate := "kafka://127.0.0.1:9092/kafka-test?kafka-version=2.6.0&max-batch-size=5" + + "&max-message-bytes=%s&partition-num=1&replication-factor=3" + + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" + maxMessageSize := "4194304" + uri := fmt.Sprintf(uriTemplate, maxMessageSize) + + sinkURI, err := url.Parse(uri) + c.Assert(err, check.IsNil) + + replicaConfig := config.GetDefaultReplicaConfig() + + opts := make(map[string]string) + err = cfg.Initialize(sinkURI, replicaConfig, opts) + c.Assert(err, check.IsNil) + + c.Assert(cfg.PartitionNum, check.Equals, int32(1)) + c.Assert(cfg.ReplicationFactor, check.Equals, int16(3)) + c.Assert(cfg.Version, check.Equals, "2.6.0") + c.Assert(cfg.MaxMessageBytes, check.Equals, 512*1024*1024) + + expectedOpts := map[string]string{ + "max-message-bytes": maxMessageSize, + "max-batch-size": "5", + } + for k, v := range opts { + c.Assert(v, check.Equals, expectedOpts[k]) + } + + a := 512*1024*1024 + 1 + maxMessageSize = strconv.Itoa(a) + uri = fmt.Sprintf(uriTemplate, maxMessageSize) + + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + + err = cfg.Initialize(sinkURI, replicaConfig, opts) + c.Assert(err, check.IsNil) + + c.Assert(cfg.MaxMessageBytes, check.Equals, a) +} + func (s *kafkaSuite) TestSaramaProducer(c *check.C) { defer testleak.AfterTest(c)() ctx, cancel := context.WithCancel(context.Background()) @@ -86,7 +135,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { } errCh := make(chan error, 1) - config := NewKafkaConfig() + config := NewConfig() // Because the sarama mock broker is not compatible with version larger than 1.0.0 // We use a smaller version in the following producer tests. // Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447 @@ -95,7 +144,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { config.TopicPreProcess = false newSaramaConfigImplBak := newSaramaConfigImpl - newSaramaConfigImpl = func(ctx context.Context, config Config) (*sarama.Config, error) { + newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { cfg, err := newSaramaConfigImplBak(ctx, config) c.Assert(err, check.IsNil) cfg.Producer.Flush.MaxMessages = 1 @@ -205,7 +254,7 @@ func (s *kafkaSuite) TestTopicPreProcess(c *check.C) { "DescribeConfigsRequest": sarama.NewMockDescribeConfigsResponse(c), }) - config := NewKafkaConfig() + config := NewConfig() config.PartitionNum = int32(0) cfg, err := newSaramaConfigImpl(ctx, config) c.Assert(err, check.IsNil) @@ -238,7 +287,7 @@ func (s *kafkaSuite) TestTopicPreProcessCreate(c *check.C) { }) defer broker.Close() - config := NewKafkaConfig() + config := NewConfig() config.PartitionNum = int32(0) cfg, err := newSaramaConfigImpl(ctx, config) c.Assert(err, check.IsNil) @@ -250,7 +299,7 @@ func (s *kafkaSuite) TestTopicPreProcessCreate(c *check.C) { func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { defer testleak.AfterTest(c)() ctx := context.Background() - config := NewKafkaConfig() + config := NewConfig() config.Version = "invalid" _, err := newSaramaConfigImpl(ctx, config) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") @@ -286,7 +335,7 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { _, err = newSaramaConfigImpl(ctx, config) c.Assert(errors.Cause(err), check.ErrorMatches, ".*no such file or directory") - saslConfig := NewKafkaConfig() + saslConfig := NewConfig() saslConfig.Version = "2.6.0" saslConfig.ClientID = "test-sasl-scram" saslConfig.SaslScram = &security.SaslScram{ @@ -307,7 +356,7 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { defer testleak.AfterTest(c)() ctx := context.Background() errCh := make(chan error, 1) - config := NewKafkaConfig() + config := NewConfig() config.Version = "invalid" _, err := NewKafkaSaramaProducer(ctx, "127.0.0.1:1111", "topic", config, errCh) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") @@ -317,3 +366,120 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { _, err = NewKafkaSaramaProducer(ctx, "127.0.0.1:1111", "topic", config, errCh) c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue) } +<<<<<<< HEAD +======= + +func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { + defer testleak.AfterTest(c)() + topic := "unit_test_4" + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + leader := sarama.NewMockBroker(c, 2) + defer leader.Close() + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + leader.Returns(metadataResponse) + leader.Returns(metadataResponse) + + config := NewConfig() + // Because the sarama mock broker is not compatible with version larger than 1.0.0 + // We use a smaller version in the following producer tests. + // Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447 + config.Version = "0.9.0.0" + config.PartitionNum = int32(2) + config.TopicPreProcess = false + + newSaramaConfigImplBak := newSaramaConfigImpl + newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { + cfg, err := newSaramaConfigImplBak(ctx, config) + c.Assert(err, check.IsNil) + cfg.Producer.Flush.MaxMessages = 1 + cfg.Producer.Retry.Max = 2 + cfg.Producer.MaxMessageBytes = 8 + return cfg, err + } + defer func() { + newSaramaConfigImpl = newSaramaConfigImplBak + }() + + errCh := make(chan error, 1) + producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh) + defer func() { + err := producer.Close() + c.Assert(err, check.IsNil) + }() + + c.Assert(err, check.IsNil) + c.Assert(producer, check.NotNil) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 20; i++ { + err = producer.SendMessage(ctx, &codec.MQMessage{ + Key: []byte("test-key-1"), + Value: []byte("test-value"), + }, int32(0)) + c.Assert(err, check.IsNil) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-ctx.Done(): + c.Fatal("TestProducerSendMessageFailed timed out") + case err := <-errCh: + c.Assert(err, check.ErrorMatches, ".*too large.*") + } + }() + + wg.Wait() +} + +func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { + defer testleak.AfterTest(c)() + topic := "unit_test_4" + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + leader := sarama.NewMockBroker(c, 2) + defer leader.Close() + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + leader.Returns(metadataResponse) + leader.Returns(metadataResponse) + + config := NewConfig() + // Because the sarama mock broker is not compatible with version larger than 1.0.0 + // We use a smaller version in the following producer tests. + // Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447 + config.Version = "0.9.0.0" + config.PartitionNum = int32(2) + config.TopicPreProcess = false + + errCh := make(chan error, 1) + producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh) + defer func() { + err := producer.Close() + c.Assert(err, check.IsNil) + }() + + c.Assert(err, check.IsNil) + c.Assert(producer, check.NotNil) + + err = producer.Close() + c.Assert(err, check.IsNil) + + err = producer.Close() + c.Assert(err, check.IsNil) +} +>>>>>>> b7c783f2f (sink: fix kafka max message size inaccurate issue. (#3002)) From 1d1bf8e3410a9764c68285f9c9f8098efa4001f9 Mon Sep 17 00:00:00 2001 From: Ling Jin Date: Thu, 14 Oct 2021 17:59:57 +0800 Subject: [PATCH 2/2] add comment about kafka config initializa. --- cdc/sink/producer/kafka/kafka.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index 8520c631abb..892cbe90ace 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -67,6 +67,7 @@ func NewConfig() *Config { } } +// Initialize the kafka configuration func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) error { s := sinkURI.Query().Get("partition-num") if s != "" {