Skip to content

Commit

Permalink
codec(cdc): fix encoder max-message-bytes (#4074)
Browse files Browse the repository at this point in the history
* fix encoder.

* add test.

* fix test.

* fix encoder SetParams.

* fix test in json.

* update craft test.

* fix craft test.

* Refine error.

* Update cdc/sink/codec/craft.go

Co-authored-by: amyangfei <amyangfei@gmail.com>

* Update cdc/sink/codec/craft.go

Co-authored-by: amyangfei <amyangfei@gmail.com>

* fix test in craft.

Co-authored-by: amyangfei <amyangfei@gmail.com>
  • Loading branch information
3AceShowHand and amyangfei authored Dec 26, 2021
1 parent 43a599d commit f097a12
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 45 deletions.
13 changes: 7 additions & 6 deletions cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,14 @@ func (e *CraftEventBatchEncoder) Reset() {
// SetParams reads relevant parameters for craft protocol
func (e *CraftEventBatchEncoder) SetParams(params map[string]string) error {
var err error
maxMessageBytes, ok := params["max-message-bytes"]
if !ok {
return cerror.ErrSinkInvalidConfig.GenWithStack("max-message-bytes not found")
}

e.maxMessageBytes = config.DefaultMaxMessageBytes
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
e.maxMessageBytes, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrSinkInvalidConfig.Wrap(err)
}
e.maxMessageBytes, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.WrapError(cerror.ErrSinkInvalidConfig, err)
}
if e.maxMessageBytes <= 0 || e.maxMessageBytes > math.MaxInt32 {
return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", e.maxMessageBytes))
Expand Down
14 changes: 7 additions & 7 deletions cdc/sink/codec/craft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *craftBatchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatc
func (s *craftBatchSuite) TestParamsEdgeCases(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewCraftEventBatchEncoder().(*CraftEventBatchEncoder)
err := encoder.SetParams(map[string]string{})
err := encoder.SetParams(map[string]string{"max-message-bytes": "10485760"})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)
Expand All @@ -158,21 +158,21 @@ func (s *craftBatchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.NotNil)

err = encoder.SetParams(map[string]string{"max-batch-size": "0"})
err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32), "max-batch-size": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")

err = encoder.SetParams(map[string]string{"max-batch-size": "-1"})
err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32), "max-batch-size": "-1"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint16)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxUint16)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, int(math.MaxUint16))
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.NotNil)

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.NotNil)
}

Expand Down Expand Up @@ -203,7 +203,7 @@ func (s *craftBatchSuite) TestMaxMessageBytes(c *check.C) {
func (s *craftBatchSuite) TestMaxBatchSize(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewCraftEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-batch-size": "64"})
err := encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": "64"})
c.Check(err, check.IsNil)

testEvent := &model.RowChangedEvent{
Expand Down
18 changes: 10 additions & 8 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,8 @@ type JSONEventBatchEncoder struct {
maxBatchSize int
}

// GetMaxMessageSize is only for unit testing.
func (d *JSONEventBatchEncoder) GetMaxMessageSize() int {
// GetMaxMessageBytes is only for unit testing.
func (d *JSONEventBatchEncoder) GetMaxMessageBytes() int {
return d.maxMessageBytes
}

Expand Down Expand Up @@ -615,12 +615,14 @@ func (d *JSONEventBatchEncoder) Reset() {
func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error {
var err error

d.maxMessageBytes = config.DefaultMaxMessageBytes
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
d.maxMessageBytes, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrSinkInvalidConfig.Wrap(err)
}
maxMessageBytes, ok := params["max-message-bytes"]
if !ok {
return cerror.ErrSinkInvalidConfig.Wrap(errors.New("max-message-bytes not found"))
}

d.maxMessageBytes, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrSinkInvalidConfig.Wrap(err)
}
if d.maxMessageBytes <= 0 {
return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxMessageBytes))
Expand Down
42 changes: 36 additions & 6 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package codec

import (
"context"
"math"
"sort"
"strconv"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
Expand Down Expand Up @@ -201,7 +203,7 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder().(*JSONEventBatchEncoder)
err := encoder.SetParams(map[string]string{})
err := encoder.SetParams(map[string]string{"max-message-bytes": "10485760"})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)
Expand All @@ -228,17 +230,43 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-batch-size": "-1"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxInt32)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxUint32)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)
}

func (s *batchSuite) TestSetParams(c *check.C) {
defer testleak.AfterTest(c)

opts := make(map[string]string)
encoderBuilder := newJSONEventBatchEncoderBuilder(opts)
c.Assert(encoderBuilder, check.NotNil)
encoder, err := encoderBuilder.Build(context.Background())
c.Assert(encoder, check.IsNil)
c.Assert(
errors.Cause(err),
check.ErrorMatches,
".*max-message-bytes not found.*",
)

opts["max-message-bytes"] = "1"
encoderBuilder = newJSONEventBatchEncoderBuilder(opts)
c.Assert(encoderBuilder, check.NotNil)
encoder, err = encoderBuilder.Build(context.Background())
c.Assert(err, check.IsNil)
c.Assert(encoder, check.NotNil)

jsonEncoder, ok := encoder.(*JSONEventBatchEncoder)
c.Assert(ok, check.IsTrue)
c.Assert(jsonEncoder.GetMaxMessageBytes(), check.Equals, 1)
}

func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder()
Expand Down Expand Up @@ -283,9 +311,11 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) {

func (s *batchSuite) TestMaxBatchSize(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-batch-size": "64"})
c.Check(err, check.IsNil)
encoderBuilder := newJSONEventBatchEncoderBuilder(map[string]string{"max-message-bytes": "1048576", "max-batch-size": "64"})
c.Assert(encoderBuilder, check.NotNil)
encoder, err := encoderBuilder.Build(context.Background())
c.Assert(err, check.IsNil)
c.Assert(encoder, check.NotNil)

testEvent := &model.RowChangedEvent{
CommitTs: 1,
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri")
}

sProducer, err := kafka.NewKafkaSaramaProducer(ctx, topic, producerConfig, errCh)
sProducer, err := kafka.NewKafkaSaramaProducer(ctx, topic, producerConfig, opts, errCh)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {

c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{})
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageSize(), check.Equals, 1048576)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 1048576)

// mock kafka broker processes 1 row changed event
leader.Returns(prodSuccess)
Expand Down Expand Up @@ -234,7 +234,7 @@ func (s mqSinkSuite) TestPulsarSinkEncoderConfig(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{})
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageSize(), check.Equals, 4194304)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 4194304)
}

func (s mqSinkSuite) TestFlushRowChangedEvents(c *check.C) {
Expand Down
8 changes: 5 additions & 3 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ var (
)

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) {
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
if err != nil {
Expand All @@ -280,7 +280,7 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, e
}
}()

if err := validateMaxMessageBytesAndCreateTopic(admin, topic, config, cfg); err != nil {
if err := validateMaxMessageBytesAndCreateTopic(admin, topic, config, cfg, opts); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

Expand Down Expand Up @@ -345,7 +345,7 @@ func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (
return
}

func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic string, config *Config, saramaConfig *sarama.Config) error {
func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic string, config *Config, saramaConfig *sarama.Config, opts map[string]string) error {
topics, err := admin.ListTopics()
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
Expand All @@ -367,6 +367,7 @@ func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic
zap.Int("max-message-bytes", config.MaxMessageBytes))
saramaConfig.Producer.MaxMessageBytes = topicMaxMessageBytes
}
opts["max-message-bytes"] = strconv.Itoa(saramaConfig.Producer.MaxMessageBytes)

// no need to create the topic, but we would have to log user if they found enter wrong topic name later
if config.AutoCreate {
Expand Down Expand Up @@ -402,6 +403,7 @@ func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic
zap.Int("max-message-bytes", config.MaxMessageBytes))
saramaConfig.Producer.MaxMessageBytes = brokerMessageMaxBytes
}
opts["max-message-bytes"] = strconv.Itoa(saramaConfig.Producer.MaxMessageBytes)

// topic not exists yet, and user does not specify the `partition-num` in the sink uri.
if config.PartitionNum == 0 {
Expand Down
Loading

0 comments on commit f097a12

Please sign in to comment.