Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink: fix kafka max message size inaccurate issue. #3002

Merged
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6da0e30
borrow producerMessageOverhead from sarama.
3AceShowHand Oct 9, 2021
a7e633b
add some test.
3AceShowHand Oct 11, 2021
ad5c65e
Merge branch 'master' into fix-sink-kafka-max-message-size
3AceShowHand Oct 11, 2021
db9edc8
fix test.
3AceShowHand Oct 11, 2021
a84342e
remove max-message-byte option in producer, 512m by default.
3AceShowHand Oct 11, 2021
9c16b68
tiny fix.
3AceShowHand Oct 11, 2021
a509664
extract kafka config initialize.
3AceShowHand Oct 11, 2021
90a4cc0
use config pointer as parameter.
3AceShowHand Oct 11, 2021
a47c9cf
tiny fix.
3AceShowHand Oct 11, 2021
629a2b2
add a simple unit test for kafka config initialize.
3AceShowHand Oct 11, 2021
41cea04
add a unit test.
3AceShowHand Oct 11, 2021
9f50c1a
no need to have a mock broker.
3AceShowHand Oct 11, 2021
fc37d0a
Update cdc/sink/codec/interface.go
3AceShowHand Oct 11, 2021
540cfe9
tiny fix.
3AceShowHand Oct 11, 2021
deed55d
Merge branch 'fix-sink-kafka-max-message-size' of github.com:3AceShow…
3AceShowHand Oct 11, 2021
32e8bc4
fix case.
3AceShowHand Oct 11, 2021
721e3b0
fix test
3AceShowHand Oct 11, 2021
95c2c75
fix wrong producer message overhead.
3AceShowHand Oct 11, 2021
cde2e76
fix testcase.
3AceShowHand Oct 11, 2021
ff41bfc
Merge branch 'master' into fix-sink-kafka-max-message-size
3AceShowHand Oct 11, 2021
f1da7b7
add more logs about MQMessage Length.
3AceShowHand Oct 11, 2021
15e0786
Merge branch 'master' into fix-sink-kafka-max-message-size
3AceShowHand Oct 12, 2021
ff4a725
Merge branch 'master' into fix-sink-kafka-max-message-size
3AceShowHand Oct 13, 2021
64895a5
fix error in kafka test
3AceShowHand Oct 13, 2021
464a353
adjust log level for AppendRowChangedEvent
3AceShowHand Oct 13, 2021
7ff7894
Merge branch 'master' into fix-sink-kafka-max-message-size
3AceShowHand Oct 13, 2021
eb2e750
Merge branch 'master' into fix-sink-kafka-max-message-size
3AceShowHand Oct 13, 2021
c019bd1
Merge branch 'fix-sink-kafka-max-message-size' of github.com:3AceShow…
3AceShowHand Oct 13, 2021
a5ed564
rename to
3AceShowHand Oct 13, 2021
c27a5eb
Merge branch 'master' into fix-sink-kafka-max-message-size
3AceShowHand Oct 13, 2021
6647b69
Merge branch 'master' into fix-sink-kafka-max-message-size
ti-chi-bot Oct 14, 2021
88d1890
Merge branch 'master' into fix-sink-kafka-max-message-size
ti-chi-bot Oct 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package codec

import (
"encoding/binary"
"strings"
"time"

Expand Down Expand Up @@ -63,9 +64,16 @@ type MQMessage struct {
Protocol Protocol // protocol
}

// maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client.
// reference: https://github.com/Shopify/sarama/blob/66521126c71c522c15a36663ae9cddc2b024c799/async_producer.go#L233
// for TiCDC, minimum supported kafka version is `0.11.0.2`, which will be treated as `version = 2` by sarama producer.
const maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1

3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
// Length returns the expected size of the Kafka message
// We didn't append any `Headers` when send the message, so ignore the calculations related to it.
// If `ProducerMessage` Headers fields used, this method should also adjust.
func (m *MQMessage) Length() int {
return len(m.Key) + len(m.Value)
return len(m.Key) + len(m.Value) + maximumRecordOverhead
}

// PhysicalTime returns physical time part of Ts in time.Time
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)

if message.Length() > d.maxKafkaMessageSize {
// `len(d.messageBuf) == 1` is implied
log.Warn("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxKafkaMessageSize))
}
d.curBatchSize++
Expand Down
21 changes: 18 additions & 3 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,22 +241,37 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-message-bytes": "256"})
c.Check(err, check.IsNil)

testEvent := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}

for i := 0; i < 10000; i++ {
// make producer's `max-message-bytes` must less than event, but we should still send it as possible.
err := encoder.SetParams(map[string]string{"max-message-bytes": "1"})
c.Check(err, check.IsNil)
for i := 0; i < 100; i++ {
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(err, check.IsNil)
}

// one message per batch, and can be build, which means the producer will try to send it.
messages := encoder.Build()
c.Assert(len(messages), check.Equals, 100)

// make sure each batch's `Length` not greater than `max-message-bytes`
err = encoder.SetParams(map[string]string{"max-message-bytes": "256"})
c.Check(err, check.IsNil)

for i := 0; i < 10000; i++ {
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(err, check.IsNil)
}

messages = encoder.Build()
for _, msg := range messages {
c.Assert(msg.Length(), check.LessEqual, 256)
}
Expand Down
92 changes: 3 additions & 89 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package sink
import (
"context"
"net/url"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -399,99 +398,14 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage,
}

func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) {
config := kafka.NewKafkaConfig()

scheme := strings.ToLower(sinkURI.Scheme)
if scheme != "kafka" && scheme != "kafka+ssl" {
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("can't create MQ sink with unsupported scheme: %s", scheme)
}
s := sinkURI.Query().Get("partition-num")
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
config.PartitionNum = int32(c)
}

s = sinkURI.Query().Get("replication-factor")
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
config.ReplicationFactor = int16(c)
}

s = sinkURI.Query().Get("kafka-version")
if s != "" {
config.Version = s
}

s = sinkURI.Query().Get("max-message-bytes")
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
config.MaxMessageBytes = c
opts["max-message-bytes"] = s
}

s = sinkURI.Query().Get("max-batch-size")
if s != "" {
opts["max-batch-size"] = s
}

s = sinkURI.Query().Get("compression")
if s != "" {
config.Compression = s
}

config.ClientID = sinkURI.Query().Get("kafka-client-id")

s = sinkURI.Query().Get("protocol")
if s != "" {
replicaConfig.Sink.Protocol = s
}

s = sinkURI.Query().Get("ca")
if s != "" {
config.Credential.CAPath = s
}

s = sinkURI.Query().Get("cert")
if s != "" {
config.Credential.CertPath = s
}

s = sinkURI.Query().Get("key")
if s != "" {
config.Credential.KeyPath = s
}

s = sinkURI.Query().Get("sasl-user")
if s != "" {
config.SaslScram.SaslUser = s
}

s = sinkURI.Query().Get("sasl-password")
if s != "" {
config.SaslScram.SaslPassword = s
}

s = sinkURI.Query().Get("sasl-mechanism")
if s != "" {
config.SaslScram.SaslMechanism = s
}

s = sinkURI.Query().Get("auto-create-topic")
if s != "" {
autoCreate, err := strconv.ParseBool(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
config.TopicPreProcess = autoCreate
config := kafka.NewKafkaConfig()
if err := config.Initialize(sinkURI, replicaConfig, opts); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool {
Expand Down
110 changes: 105 additions & 5 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package kafka
import (
"context"
"fmt"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/sink/codec"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/notify"
"github.com/pingcap/ticdc/pkg/security"
Expand All @@ -52,8 +55,8 @@ type Config struct {
}

// NewKafkaConfig returns a default Kafka configuration
func NewKafkaConfig() Config {
return Config{
func NewKafkaConfig() *Config {
return &Config{
Version: "2.4.0",
MaxMessageBytes: 512 * 1024 * 1024, // 512M
ReplicationFactor: 1,
Expand All @@ -64,6 +67,103 @@ func NewKafkaConfig() Config {
}
}

func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) error {
s := sinkURI.Query().Get("partition-num")
if s != "" {
a, err := strconv.Atoi(s)
if err != nil {
return err
}
c.PartitionNum = int32(a)
}

s = sinkURI.Query().Get("replication-factor")
if s != "" {
a, err := strconv.Atoi(s)
if err != nil {
return err
}
c.ReplicationFactor = int16(a)
}

s = sinkURI.Query().Get("kafka-version")
if s != "" {
c.Version = s
}

s = sinkURI.Query().Get("max-message-bytes")
if s != "" {
a, err := strconv.Atoi(s)
if err != nil {
return err
}
// `MaxMessageBytes` is set to `512 mb` by default, but it's still possible that a larger value expected.
// TiCDC should send the message at best.
if a > c.MaxMessageBytes {
c.MaxMessageBytes = a
}
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
opts["max-message-bytes"] = s
}

s = sinkURI.Query().Get("max-batch-size")
if s != "" {
opts["max-batch-size"] = s
}

s = sinkURI.Query().Get("compression")
if s != "" {
c.Compression = s
}

c.ClientID = sinkURI.Query().Get("kafka-client-id")

s = sinkURI.Query().Get("protocol")
if s != "" {
replicaConfig.Sink.Protocol = s
}

s = sinkURI.Query().Get("ca")
if s != "" {
c.Credential.CAPath = s
}

s = sinkURI.Query().Get("cert")
if s != "" {
c.Credential.CertPath = s
}

s = sinkURI.Query().Get("key")
if s != "" {
c.Credential.KeyPath = s
}

s = sinkURI.Query().Get("sasl-user")
if s != "" {
c.SaslScram.SaslUser = s
}

s = sinkURI.Query().Get("sasl-password")
if s != "" {
c.SaslScram.SaslPassword = s
}

s = sinkURI.Query().Get("sasl-mechanism")
if s != "" {
c.SaslScram.SaslMechanism = s
}

s = sinkURI.Query().Get("auto-create-topic")
if s != "" {
autoCreate, err := strconv.ParseBool(s)
if err != nil {
return err
}
c.TopicPreProcess = autoCreate
}

return nil
}

type kafkaSaramaProducer struct {
// clientLock is used to protect concurrent access of asyncClient and syncClient.
// Since we don't close these two clients (which have an input chan) from the
Expand Down Expand Up @@ -283,7 +383,7 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {

// kafkaTopicPreProcess gets partition number from existing topic, if topic doesn't
// exit, creates it automatically.
func kafkaTopicPreProcess(topic, address string, config Config, cfg *sarama.Config) (int32, error) {
func kafkaTopicPreProcess(topic, address string, config *Config, cfg *sarama.Config) (int32, error) {
admin, err := sarama.NewClusterAdmin(strings.Split(address, ","), cfg)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
Expand Down Expand Up @@ -334,7 +434,7 @@ func kafkaTopicPreProcess(topic, address string, config Config, cfg *sarama.Conf
var newSaramaConfigImpl = newSaramaConfig

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config Config, errCh chan error) (*kafkaSaramaProducer, error) {
func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
if err != nil {
Expand Down Expand Up @@ -417,7 +517,7 @@ func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (
}

// NewSaramaConfig return the default config and set the according version and metrics
func newSaramaConfig(ctx context.Context, c Config) (*sarama.Config, error) {
func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config := sarama.NewConfig()

version, err := sarama.ParseKafkaVersion(c.Version)
Expand Down
Loading