Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): set max-message-bytes default to 10m (#4036) #4060

Merged
Merged
34 changes: 16 additions & 18 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/log"
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"

Expand All @@ -35,8 +36,6 @@ import (
const (
// BatchVersion1 represents the version of batch format
BatchVersion1 uint64 = 1
// DefaultMaxMessageBytes sets the default value for max-message-bytes
DefaultMaxMessageBytes int = 1 * 1024 * 1024 // 1M
// DefaultMaxBatchSize sets the default value for max-batch-size
DefaultMaxBatchSize int = 16
)
Expand Down Expand Up @@ -317,13 +316,13 @@ type JSONEventBatchEncoder struct {
messageBuf []*MQMessage
curBatchSize int
// configs
maxKafkaMessageSize int
maxBatchSize int
maxMessageBytes int
maxBatchSize int
}

// GetMaxKafkaMessageSize is only for unit testing.
func (d *JSONEventBatchEncoder) GetMaxKafkaMessageSize() int {
return d.maxKafkaMessageSize
// GetMaxMessageSize is only for unit testing.
func (d *JSONEventBatchEncoder) GetMaxMessageSize() int {
return d.maxMessageBytes
}

// GetMaxBatchSize is only for unit testing.
Expand Down Expand Up @@ -402,15 +401,15 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
// for single message that longer than max-message-size, do not send it.
// 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead`
length := len(key) + len(value) + maximumRecordOverhead + 16 + 8
if length > d.maxKafkaMessageSize {
if length > d.maxMessageBytes {
log.Warn("Single message too large",
zap.Int("max-message-size", d.maxKafkaMessageSize), zap.Int("length", length), zap.Any("table", e.Table))
zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table))
return EncoderNoOperation, cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs()
}

if len(d.messageBuf) == 0 ||
d.curBatchSize >= d.maxBatchSize ||
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxKafkaMessageSize {
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes {

versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)
Expand All @@ -429,10 +428,10 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
message.Table = &e.Table.Table
message.IncRowsCount()

if message.Length() > d.maxKafkaMessageSize {
if message.Length() > d.maxMessageBytes {
// `len(d.messageBuf) == 1` is implied
log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxKafkaMessageSize))
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxMessageBytes))
}
d.curBatchSize++
}
Expand Down Expand Up @@ -550,17 +549,16 @@ func (d *JSONEventBatchEncoder) Reset() {
// SetParams reads relevant parameters for Open Protocol
func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error {
var err error

d.maxMessageBytes = config.DefaultMaxMessageBytes
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
d.maxKafkaMessageSize, err = strconv.Atoi(maxMessageBytes)
d.maxMessageBytes, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrKafkaInvalidConfig.Wrap(err)
}
} else {
d.maxKafkaMessageSize = DefaultMaxMessageBytes
}

if d.maxKafkaMessageSize <= 0 {
return cerror.ErrKafkaInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxKafkaMessageSize))
if d.maxMessageBytes <= 0 {
return cerror.ErrKafkaInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxMessageBytes))
}

if maxBatchSize, ok := params["max-batch-size"]; ok {
Expand Down
11 changes: 6 additions & 5 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
)

Expand Down Expand Up @@ -227,7 +228,7 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
err := encoder.SetParams(map[string]string{})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-message-bytes": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")
Expand All @@ -238,12 +239,12 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxInt32)
c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxInt32)

err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxUint32)
c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxUint32)

err = encoder.SetParams(map[string]string{"max-batch-size": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")
Expand All @@ -254,12 +255,12 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxInt32)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxUint32)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)
}

func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
Expand Down
4 changes: 1 addition & 3 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,7 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri")
}

var protocol codec.Protocol
protocol.FromString(replicaConfig.Sink.Protocol)
producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, protocol, config, errCh)
producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, config, errCh)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {
encoder := sink.newEncoder()
c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{})
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxKafkaMessageSize(), check.Equals, 4194304)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageSize(), check.Equals, 4194304)

// mock kafka broker processes 1 row changed event
leader.Returns(prodSuccess)
Expand Down Expand Up @@ -228,5 +228,5 @@ func (s mqSinkSuite) TestPulsarSinkEncoderConfig(c *check.C) {
encoder := sink.newEncoder()
c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{})
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxKafkaMessageSize(), check.Equals, 4194304)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageSize(), check.Equals, 4194304)
}
62 changes: 30 additions & 32 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
}
}

func topicPreProcess(topic string, protocol codec.Protocol, config *Config, saramaConfig *sarama.Config) error {
func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) error {
// FIXME: find a way to remove this failpoint for workload the unit test
failpoint.Inject("SkipTopicAutoCreate", func() {
failpoint.Return(nil)
Expand All @@ -410,20 +410,18 @@ func topicPreProcess(topic string, protocol codec.Protocol, config *Config, sara
info, created := topics[topic]
// once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid.
if created {
// make sure that topic's `max.message.bytes` is not less than given `max-message-bytes`
// else the producer will send message that too large to make topic reject, then changefeed would error.
// only the default `open protocol` and `craft protocol` use `max-message-bytes`, so check this for them.
if protocol == codec.ProtocolDefault {
topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
if topicMaxMessageBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"topic already exist, and topic's max.message.bytes(%d) less than max-message-bytes(%d)."+
"Please make sure `max-message-bytes` not greater than topic `max.message.bytes`",
topicMaxMessageBytes, config.MaxMessageBytes)
}
// make sure that producer's `MaxMessageBytes` smaller than topic's `max.message.bytes`
topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

if topicMaxMessageBytes < config.MaxMessageBytes {
log.Warn("topic's `max.message.bytes` less than the user set `max-message-bytes`,"+
"use topic's `max.message.bytes` to initialize the Kafka producer",
zap.Int("max.message.bytes", topicMaxMessageBytes),
zap.Int("max-message-bytes", config.MaxMessageBytes))
saramaConfig.Producer.MaxMessageBytes = topicMaxMessageBytes
}

// no need to create the topic, but we would have to log user if they found enter wrong topic name later
Expand All @@ -443,22 +441,22 @@ func topicPreProcess(topic string, protocol codec.Protocol, config *Config, sara
return cerror.ErrKafkaInvalidConfig.GenWithStack("`auto-create-topic` is false, and topic not found")
}

// when try to create the topic, we don't know how to set the `max.message.bytes` for the topic.
// Kafka would create the topic with broker's `message.max.bytes`,
// we have to make sure it's not greater than `max-message-bytes` for the default open protocol.
if protocol == codec.ProtocolDefault {
brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin)
if err != nil {
log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration")
return errors.Trace(err)
}
brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin)
if err != nil {
log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration")
return errors.Trace(err)
}

if brokerMessageMaxBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"broker's message.max.bytes(%d) less than max-message-bytes(%d)"+
"Please make sure `max-message-bytes` not greater than broker's `message.max.bytes`",
brokerMessageMaxBytes, config.MaxMessageBytes)
}
// when create the topic, `max.message.bytes` is decided by the broker,
// it would use broker's `message.max.bytes` to set topic's `max.message.bytes`.
// TiCDC need to make sure that the producer's `MaxMessageBytes` won't larger than
// broker's `message.max.bytes`.
if brokerMessageMaxBytes < config.MaxMessageBytes {
log.Warn("broker's `message.max.bytes` less than the user set `max-message-bytes`,"+
"use broker's `message.max.bytes` to initialize the Kafka producer",
zap.Int("message.max.bytes", brokerMessageMaxBytes),
zap.Int("max-message-bytes", config.MaxMessageBytes))
saramaConfig.Producer.MaxMessageBytes = brokerMessageMaxBytes
}

// topic not created yet, and user does not specify the `partition-num` in the sink uri.
Expand Down Expand Up @@ -487,14 +485,14 @@ func topicPreProcess(topic string, protocol codec.Protocol, config *Config, sara
var newSaramaConfigImpl = newSaramaConfig

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, topic string, protocol codec.Protocol, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
if err != nil {
return nil, err
}

if err := topicPreProcess(topic, protocol, config, cfg); err != nil {
if err := topicPreProcess(topic, config, cfg); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

Expand Down
11 changes: 6 additions & 5 deletions cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

producer, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh)
producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh)
c.Assert(err, check.IsNil)
c.Assert(producer.GetPartitionNum(), check.Equals, int32(2))
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -275,11 +275,12 @@ func (s *kafkaSuite) TestTopicPreProcess(c *check.C) {

cfg, err := newSaramaConfigImpl(ctx, config)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes)

config.BrokerEndpoints = []string{""}
cfg.Metadata.Retry.Max = 1

err = topicPreProcess(topic, codec.ProtocolDefault, config, cfg)
err = topicPreProcess(topic, config, cfg)
c.Assert(errors.Cause(err), check.Equals, sarama.ErrOutOfBrokers)
}

Expand Down Expand Up @@ -348,7 +349,7 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) {
config.BrokerEndpoints = []string{"127.0.0.1:1111"}
topic := "topic"
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
_, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh)
_, err := NewKafkaSaramaProducer(ctx, topic, config, errCh)
c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*")

_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
Expand Down Expand Up @@ -394,7 +395,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) {
}()

errCh := make(chan error, 1)
producer, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh)
producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
err := producer.Close()
Expand Down Expand Up @@ -459,7 +460,7 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) {
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)

errCh := make(chan error, 1)
producer, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh)
producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh)
defer func() {
err := producer.Close()
c.Assert(err, check.IsNil)
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,11 @@ error = '''
locate region by id
'''

["CDC:ErrMQSinkUnknownProtocol"]
error = '''
unknown '%s' protocol for Message Queue sink
'''

["CDC:ErrMarshalFailed"]
error = '''
marshal failed
Expand Down
Loading