Skip to content

Commit

Permalink
sink: fix kafka max message size inaccurate issue. (#3002) (#3049)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 14, 2021
1 parent 5de69d5 commit 1553069
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 110 deletions.
10 changes: 9 additions & 1 deletion cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package codec

import (
"encoding/binary"
"strings"
"time"

Expand Down Expand Up @@ -63,9 +64,16 @@ type MQMessage struct {
Protocol Protocol // protocol
}

// maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client.
// reference: https://github.com/Shopify/sarama/blob/66521126c71c522c15a36663ae9cddc2b024c799/async_producer.go#L233
// for TiCDC, minimum supported kafka version is `0.11.0.2`, which will be treated as `version = 2` by sarama producer.
const maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1

// Length returns the expected size of the Kafka message
// We didn't append any `Headers` when send the message, so ignore the calculations related to it.
// If `ProducerMessage` Headers fields used, this method should also adjust.
func (m *MQMessage) Length() int {
return len(m.Key) + len(m.Value)
return len(m.Key) + len(m.Value) + maximumRecordOverhead
}

// PhysicalTime returns physical time part of Ts in time.Time
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)

if message.Length() > d.maxKafkaMessageSize {
// `len(d.messageBuf) == 1` is implied
log.Warn("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxKafkaMessageSize))
}
d.curBatchSize++
Expand Down
21 changes: 18 additions & 3 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,22 +241,37 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-message-bytes": "256"})
c.Check(err, check.IsNil)

testEvent := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}

for i := 0; i < 10000; i++ {
// make producer's `max-message-bytes` must less than event, but we should still send it as possible.
err := encoder.SetParams(map[string]string{"max-message-bytes": "1"})
c.Check(err, check.IsNil)
for i := 0; i < 100; i++ {
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(err, check.IsNil)
}

// one message per batch, and can be build, which means the producer will try to send it.
messages := encoder.Build()
c.Assert(len(messages), check.Equals, 100)

// make sure each batch's `Length` not greater than `max-message-bytes`
err = encoder.SetParams(map[string]string{"max-message-bytes": "256"})
c.Check(err, check.IsNil)

for i := 0; i < 10000; i++ {
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(err, check.IsNil)
}

messages = encoder.Build()
for _, msg := range messages {
c.Assert(msg.Length(), check.LessEqual, 256)
}
Expand Down
92 changes: 3 additions & 89 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package sink
import (
"context"
"net/url"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -399,99 +398,14 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage,
}

func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) {
config := kafka.NewKafkaConfig()

scheme := strings.ToLower(sinkURI.Scheme)
if scheme != "kafka" && scheme != "kafka+ssl" {
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("can't create MQ sink with unsupported scheme: %s", scheme)
}
s := sinkURI.Query().Get("partition-num")
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
config.PartitionNum = int32(c)
}

s = sinkURI.Query().Get("replication-factor")
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
config.ReplicationFactor = int16(c)
}

s = sinkURI.Query().Get("kafka-version")
if s != "" {
config.Version = s
}

s = sinkURI.Query().Get("max-message-bytes")
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
config.MaxMessageBytes = c
opts["max-message-bytes"] = s
}

s = sinkURI.Query().Get("max-batch-size")
if s != "" {
opts["max-batch-size"] = s
}

s = sinkURI.Query().Get("compression")
if s != "" {
config.Compression = s
}

config.ClientID = sinkURI.Query().Get("kafka-client-id")

s = sinkURI.Query().Get("protocol")
if s != "" {
replicaConfig.Sink.Protocol = s
}

s = sinkURI.Query().Get("ca")
if s != "" {
config.Credential.CAPath = s
}

s = sinkURI.Query().Get("cert")
if s != "" {
config.Credential.CertPath = s
}

s = sinkURI.Query().Get("key")
if s != "" {
config.Credential.KeyPath = s
}

s = sinkURI.Query().Get("sasl-user")
if s != "" {
config.SaslScram.SaslUser = s
}

s = sinkURI.Query().Get("sasl-password")
if s != "" {
config.SaslScram.SaslPassword = s
}

s = sinkURI.Query().Get("sasl-mechanism")
if s != "" {
config.SaslScram.SaslMechanism = s
}

s = sinkURI.Query().Get("auto-create-topic")
if s != "" {
autoCreate, err := strconv.ParseBool(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
config.TopicPreProcess = autoCreate
config := kafka.NewConfig()
if err := config.Initialize(sinkURI, replicaConfig, opts); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool {
Expand Down
113 changes: 107 additions & 6 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package kafka
import (
"context"
"fmt"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/sink/codec"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/notify"
"github.com/pingcap/ticdc/pkg/security"
Expand All @@ -51,9 +54,9 @@ type Config struct {
TopicPreProcess bool
}

// NewKafkaConfig returns a default Kafka configuration
func NewKafkaConfig() Config {
return Config{
// NewConfig returns a default Kafka configuration
func NewConfig() *Config {
return &Config{
Version: "2.4.0",
MaxMessageBytes: 512 * 1024 * 1024, // 512M
ReplicationFactor: 1,
Expand All @@ -64,6 +67,104 @@ func NewKafkaConfig() Config {
}
}

// Initialize the kafka configuration
func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) error {
s := sinkURI.Query().Get("partition-num")
if s != "" {
a, err := strconv.Atoi(s)
if err != nil {
return err
}
c.PartitionNum = int32(a)
}

s = sinkURI.Query().Get("replication-factor")
if s != "" {
a, err := strconv.Atoi(s)
if err != nil {
return err
}
c.ReplicationFactor = int16(a)
}

s = sinkURI.Query().Get("kafka-version")
if s != "" {
c.Version = s
}

s = sinkURI.Query().Get("max-message-bytes")
if s != "" {
a, err := strconv.Atoi(s)
if err != nil {
return err
}
// `MaxMessageBytes` is set to `512 mb` by default, but it's still possible that a larger value expected.
// TiCDC should send the message at best.
if a > c.MaxMessageBytes {
c.MaxMessageBytes = a
}
opts["max-message-bytes"] = s
}

s = sinkURI.Query().Get("max-batch-size")
if s != "" {
opts["max-batch-size"] = s
}

s = sinkURI.Query().Get("compression")
if s != "" {
c.Compression = s
}

c.ClientID = sinkURI.Query().Get("kafka-client-id")

s = sinkURI.Query().Get("protocol")
if s != "" {
replicaConfig.Sink.Protocol = s
}

s = sinkURI.Query().Get("ca")
if s != "" {
c.Credential.CAPath = s
}

s = sinkURI.Query().Get("cert")
if s != "" {
c.Credential.CertPath = s
}

s = sinkURI.Query().Get("key")
if s != "" {
c.Credential.KeyPath = s
}

s = sinkURI.Query().Get("sasl-user")
if s != "" {
c.SaslScram.SaslUser = s
}

s = sinkURI.Query().Get("sasl-password")
if s != "" {
c.SaslScram.SaslPassword = s
}

s = sinkURI.Query().Get("sasl-mechanism")
if s != "" {
c.SaslScram.SaslMechanism = s
}

s = sinkURI.Query().Get("auto-create-topic")
if s != "" {
autoCreate, err := strconv.ParseBool(s)
if err != nil {
return err
}
c.TopicPreProcess = autoCreate
}

return nil
}

type kafkaSaramaProducer struct {
// clientLock is used to protect concurrent access of asyncClient and syncClient.
// Since we don't close these two clients (which have an input chan) from the
Expand Down Expand Up @@ -283,7 +384,7 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {

// kafkaTopicPreProcess gets partition number from existing topic, if topic doesn't
// exit, creates it automatically.
func kafkaTopicPreProcess(topic, address string, config Config, cfg *sarama.Config) (int32, error) {
func kafkaTopicPreProcess(topic, address string, config *Config, cfg *sarama.Config) (int32, error) {
admin, err := sarama.NewClusterAdmin(strings.Split(address, ","), cfg)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
Expand Down Expand Up @@ -334,7 +435,7 @@ func kafkaTopicPreProcess(topic, address string, config Config, cfg *sarama.Conf
var newSaramaConfigImpl = newSaramaConfig

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config Config, errCh chan error) (*kafkaSaramaProducer, error) {
func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
if err != nil {
Expand Down Expand Up @@ -417,7 +518,7 @@ func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (
}

// NewSaramaConfig return the default config and set the according version and metrics
func newSaramaConfig(ctx context.Context, c Config) (*sarama.Config, error) {
func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config := sarama.NewConfig()

version, err := sarama.ParseKafkaVersion(c.Version)
Expand Down
Loading

0 comments on commit 1553069

Please sign in to comment.