Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink: fix kafka max message size inaccurate issue. (#3002) #3047

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package codec

import (
"encoding/binary"
"strings"
"time"

Expand Down Expand Up @@ -63,9 +64,16 @@ type MQMessage struct {
Protocol Protocol // protocol
}

// maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client.
// reference: https://github.com/Shopify/sarama/blob/66521126c71c522c15a36663ae9cddc2b024c799/async_producer.go#L233
// for TiCDC, minimum supported kafka version is `0.11.0.2`, which will be treated as `version = 2` by sarama producer.
const maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1

// Length returns the expected size of the Kafka message
// We didn't append any `Headers` when send the message, so ignore the calculations related to it.
// If `ProducerMessage` Headers fields used, this method should also adjust.
func (m *MQMessage) Length() int {
return len(m.Key) + len(m.Value)
return len(m.Key) + len(m.Value) + maximumRecordOverhead
}

// PhysicalTime returns physical time part of Ts in time.Time
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)

if message.Length() > d.maxKafkaMessageSize {
// `len(d.messageBuf) == 1` is implied
log.Warn("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxKafkaMessageSize))
}
d.curBatchSize++
Expand Down
21 changes: 18 additions & 3 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,22 +265,37 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-message-bytes": "256"})
c.Check(err, check.IsNil)

testEvent := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}

for i := 0; i < 10000; i++ {
// make producer's `max-message-bytes` must less than event, but we should still send it as possible.
err := encoder.SetParams(map[string]string{"max-message-bytes": "1"})
c.Check(err, check.IsNil)
for i := 0; i < 100; i++ {
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(err, check.IsNil)
}

// one message per batch, and can be build, which means the producer will try to send it.
messages := encoder.Build()
c.Assert(len(messages), check.Equals, 100)

// make sure each batch's `Length` not greater than `max-message-bytes`
err = encoder.SetParams(map[string]string{"max-message-bytes": "256"})
c.Check(err, check.IsNil)

for i := 0; i < 10000; i++ {
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(err, check.IsNil)
}

messages = encoder.Build()
for _, msg := range messages {
c.Assert(msg.Length(), check.LessEqual, 256)
}
Expand Down
92 changes: 3 additions & 89 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package sink
import (
"context"
"net/url"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -399,99 +398,14 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage,
}

func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) {
config := kafka.NewKafkaConfig()

scheme := strings.ToLower(sinkURI.Scheme)
if scheme != "kafka" && scheme != "kafka+ssl" {
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("can't create MQ sink with unsupported scheme: %s", scheme)
}
s := sinkURI.Query().Get("partition-num")
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
config.PartitionNum = int32(c)
}

s = sinkURI.Query().Get("replication-factor")
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
config.ReplicationFactor = int16(c)
}

s = sinkURI.Query().Get("kafka-version")
if s != "" {
config.Version = s
}

s = sinkURI.Query().Get("max-message-bytes")
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
config.MaxMessageBytes = c
opts["max-message-bytes"] = s
}

s = sinkURI.Query().Get("max-batch-size")
if s != "" {
opts["max-batch-size"] = s
}

s = sinkURI.Query().Get("compression")
if s != "" {
config.Compression = s
}

config.ClientID = sinkURI.Query().Get("kafka-client-id")

s = sinkURI.Query().Get("protocol")
if s != "" {
replicaConfig.Sink.Protocol = s
}

s = sinkURI.Query().Get("ca")
if s != "" {
config.Credential.CAPath = s
}

s = sinkURI.Query().Get("cert")
if s != "" {
config.Credential.CertPath = s
}

s = sinkURI.Query().Get("key")
if s != "" {
config.Credential.KeyPath = s
}

s = sinkURI.Query().Get("sasl-user")
if s != "" {
config.SaslScram.SaslUser = s
}

s = sinkURI.Query().Get("sasl-password")
if s != "" {
config.SaslScram.SaslPassword = s
}

s = sinkURI.Query().Get("sasl-mechanism")
if s != "" {
config.SaslScram.SaslMechanism = s
}

s = sinkURI.Query().Get("auto-create-topic")
if s != "" {
autoCreate, err := strconv.ParseBool(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
config.TopicPreProcess = autoCreate
config := kafka.NewConfig()
if err := config.Initialize(sinkURI, replicaConfig, opts); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool {
Expand Down
113 changes: 107 additions & 6 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package kafka
import (
"context"
"fmt"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/sink/codec"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/notify"
"github.com/pingcap/ticdc/pkg/security"
Expand All @@ -51,9 +54,9 @@ type Config struct {
TopicPreProcess bool
}

// NewKafkaConfig returns a default Kafka configuration
func NewKafkaConfig() Config {
return Config{
// NewConfig returns a default Kafka configuration
func NewConfig() *Config {
return &Config{
Version: "2.4.0",
MaxMessageBytes: 512 * 1024 * 1024, // 512M
ReplicationFactor: 1,
Expand All @@ -64,6 +67,104 @@ func NewKafkaConfig() Config {
}
}

// Initialize the kafka configuration
func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) error {
s := sinkURI.Query().Get("partition-num")
if s != "" {
a, err := strconv.Atoi(s)
if err != nil {
return err
}
c.PartitionNum = int32(a)
}

s = sinkURI.Query().Get("replication-factor")
if s != "" {
a, err := strconv.Atoi(s)
if err != nil {
return err
}
c.ReplicationFactor = int16(a)
}

s = sinkURI.Query().Get("kafka-version")
if s != "" {
c.Version = s
}

s = sinkURI.Query().Get("max-message-bytes")
if s != "" {
a, err := strconv.Atoi(s)
if err != nil {
return err
}
// `MaxMessageBytes` is set to `512 mb` by default, but it's still possible that a larger value expected.
// TiCDC should send the message at best.
if a > c.MaxMessageBytes {
c.MaxMessageBytes = a
}
opts["max-message-bytes"] = s
}

s = sinkURI.Query().Get("max-batch-size")
if s != "" {
opts["max-batch-size"] = s
}

s = sinkURI.Query().Get("compression")
if s != "" {
c.Compression = s
}

c.ClientID = sinkURI.Query().Get("kafka-client-id")

s = sinkURI.Query().Get("protocol")
if s != "" {
replicaConfig.Sink.Protocol = s
}

s = sinkURI.Query().Get("ca")
if s != "" {
c.Credential.CAPath = s
}

s = sinkURI.Query().Get("cert")
if s != "" {
c.Credential.CertPath = s
}

s = sinkURI.Query().Get("key")
if s != "" {
c.Credential.KeyPath = s
}

s = sinkURI.Query().Get("sasl-user")
if s != "" {
c.SaslScram.SaslUser = s
}

s = sinkURI.Query().Get("sasl-password")
if s != "" {
c.SaslScram.SaslPassword = s
}

s = sinkURI.Query().Get("sasl-mechanism")
if s != "" {
c.SaslScram.SaslMechanism = s
}

s = sinkURI.Query().Get("auto-create-topic")
if s != "" {
autoCreate, err := strconv.ParseBool(s)
if err != nil {
return err
}
c.TopicPreProcess = autoCreate
}

return nil
}

type kafkaSaramaProducer struct {
// clientLock is used to protect concurrent access of asyncClient and syncClient.
// Since we don't close these two clients (which have an input chan) from the
Expand Down Expand Up @@ -283,7 +384,7 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {

// kafkaTopicPreProcess gets partition number from existing topic, if topic doesn't
// exit, creates it automatically.
func kafkaTopicPreProcess(topic, address string, config Config, cfg *sarama.Config) (int32, error) {
func kafkaTopicPreProcess(topic, address string, config *Config, cfg *sarama.Config) (int32, error) {
admin, err := sarama.NewClusterAdmin(strings.Split(address, ","), cfg)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
Expand Down Expand Up @@ -334,7 +435,7 @@ func kafkaTopicPreProcess(topic, address string, config Config, cfg *sarama.Conf
var newSaramaConfigImpl = newSaramaConfig

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config Config, errCh chan error) (*kafkaSaramaProducer, error) {
func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
if err != nil {
Expand Down Expand Up @@ -417,7 +518,7 @@ func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (
}

// NewSaramaConfig return the default config and set the according version and metrics
func newSaramaConfig(ctx context.Context, c Config) (*sarama.Config, error) {
func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config := sarama.NewConfig()

version, err := sarama.ParseKafkaVersion(c.Version)
Expand Down
Loading