Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec(cdc): fix encoder max-message-bytes (#4074) #4077

Merged
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
0187fdd
This is an automated cherry-pick of #3192
3AceShowHand Nov 12, 2021
e1a50d6
fix conflicts.
3AceShowHand Nov 23, 2021
806ceae
This is an automated cherry-pick of #4036
3AceShowHand Dec 24, 2021
f51e1a9
remove craft.
3AceShowHand Dec 24, 2021
1996bd6
remove useless files.
3AceShowHand Dec 25, 2021
9dd01b8
fix conflicts in json.
3AceShowHand Dec 25, 2021
2913f7b
test(ticdc): separate statistics for unit tests and integration tests…
ti-chi-bot Nov 23, 2021
9f3f8cf
ticdc/metrics: fix processor checkpoint-ts lag expr (#3543) (#3559)
ti-chi-bot Nov 23, 2021
6fad381
test: add new ci collation without old value test (#3114) (#3368)
ti-chi-bot Nov 30, 2021
3136a20
processor,sink(cdc): let sink report resolved ts and do not skip buff…
ti-chi-bot Nov 30, 2021
d928a2f
integration_tests(ticdc): move integration tests into integration_tes…
ti-chi-bot Dec 2, 2021
50545de
sink(ticdc): Set config.Metadata.Timeout correctly (#3665) (#3671)
ti-chi-bot Dec 7, 2021
47b2b2d
*: fix changefeed checkpoint lag negative value error (#3013) (#3534)
ti-chi-bot Dec 8, 2021
666ac9e
ticdc.rules: fix alertmanager rules (#3422) (#3595)
ti-chi-bot Dec 8, 2021
643fb1b
Fix handling of JSON columns (#3643) (#3652)
ti-chi-bot Dec 8, 2021
974aaa1
workerpool: limit the rate to output deadlock warning (#3775) (#3798)
ti-chi-bot Dec 10, 2021
e5155ed
ticdc/alert: add no owner alert rule (#3809) (#3835)
ti-chi-bot Dec 14, 2021
daf1adb
pkg,cdc: do not use log package (#3902) (#3943)
ti-chi-bot Dec 17, 2021
ab1661e
*: rename repo from pingcap/ticdc to pingcap/tiflow (#3956)
amyangfei Dec 18, 2021
f7a8919
tz (ticdc): fix timezone error (#3887) (#3909)
ti-chi-bot Dec 20, 2021
92b5021
kvclient(ticdc): fix kvclient takes too long time to recover (#3612) …
ti-chi-bot Dec 20, 2021
fea07be
etcd_worker: batch etcd patch (#3277) (#3392)
ti-chi-bot Dec 20, 2021
b5178e6
Clean old owner and old processor in release 5.2 branch (#4019) (#4023)
ti-chi-bot Dec 23, 2021
4ef3deb
tests(ticdc): set up the sync diff output directory correctly (#3725)…
ti-chi-bot Dec 24, 2021
af3c44f
owner,scheduler(cdc): fix nil pointer panic in owner scheduler (#2980…
ti-chi-bot Dec 24, 2021
f51db06
config(ticdc): Fix old value configuration check for maxwell protocol…
ti-chi-bot Dec 24, 2021
8b5360c
This is an automated cherry-pick of #4036
3AceShowHand Dec 24, 2021
f35ae50
remove craft.
3AceShowHand Dec 24, 2021
9da3fb7
remove useless files.
3AceShowHand Dec 25, 2021
380feac
fix conflicts in json.
3AceShowHand Dec 25, 2021
e3b5105
fix conflict.
3AceShowHand Dec 25, 2021
803c407
remove config.
3AceShowHand Dec 25, 2021
eb37041
fix mq.
3AceShowHand Dec 25, 2021
78dfd21
fix mq.
3AceShowHand Dec 25, 2021
3901bdd
fix mq.
3AceShowHand Dec 25, 2021
afb5392
This is an automated cherry-pick of #4074
3AceShowHand Dec 26, 2021
4f5c54a
remove craft.
3AceShowHand Dec 27, 2021
dbebb49
first round fix.
3AceShowHand Dec 27, 2021
3fd2d96
fix conflict.
3AceShowHand Dec 27, 2021
580ddf4
Fix kafka.
3AceShowHand Dec 27, 2021
965e17d
fix kafka conflict.
3AceShowHand Dec 27, 2021
98c7eb8
fix json.
3AceShowHand Dec 27, 2021
a2dd3a9
fix json test.
3AceShowHand Dec 27, 2021
6e1a3b2
Update cdc/sink/codec/json.go
3AceShowHand Dec 27, 2021
4bf83dc
Update cdc/sink/codec/json.go
3AceShowHand Dec 27, 2021
bd8a6e1
fix failpoint path.
3AceShowHand Dec 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (
const (
// BatchVersion1 represents the version of batch format
BatchVersion1 uint64 = 1
// DefaultMaxMessageBytes sets the default value for max-message-bytes
DefaultMaxMessageBytes int = 1 * 1024 * 1024 // 1M
// DefaultMaxBatchSize sets the default value for max-batch-size
DefaultMaxBatchSize int = 16
)
Expand Down Expand Up @@ -317,13 +315,13 @@ type JSONEventBatchEncoder struct {
messageBuf []*MQMessage
curBatchSize int
// configs
maxKafkaMessageSize int
maxBatchSize int
maxMessageBytes int
maxBatchSize int
}

// GetMaxKafkaMessageSize is only for unit testing.
func (d *JSONEventBatchEncoder) GetMaxKafkaMessageSize() int {
return d.maxKafkaMessageSize
// GetMaxMessageBytes is only for unit testing.
func (d *JSONEventBatchEncoder) GetMaxMessageBytes() int {
return d.maxMessageBytes
}

// GetMaxBatchSize is only for unit testing.
Expand Down Expand Up @@ -402,15 +400,15 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
// for single message that longer than max-message-size, do not send it.
// 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead`
length := len(key) + len(value) + maximumRecordOverhead + 16 + 8
if length > d.maxKafkaMessageSize {
if length > d.maxMessageBytes {
log.Warn("Single message too large",
zap.Int("max-message-size", d.maxKafkaMessageSize), zap.Int("length", length), zap.Any("table", e.Table))
zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table))
return EncoderNoOperation, cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs()
}

if len(d.messageBuf) == 0 ||
d.curBatchSize >= d.maxBatchSize ||
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxKafkaMessageSize {
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes {

versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)
Expand All @@ -428,10 +426,10 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
message.Schema = &e.Table.Schema
message.Table = &e.Table.Table

if message.Length() > d.maxKafkaMessageSize {
if message.Length() > d.maxMessageBytes {
// `len(d.messageBuf) == 1` is implied
log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxKafkaMessageSize))
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxMessageBytes))
}
d.curBatchSize++
}
Expand Down Expand Up @@ -549,17 +547,19 @@ func (d *JSONEventBatchEncoder) Reset() {
// SetParams reads relevant parameters for Open Protocol
func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error {
var err error
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
d.maxKafkaMessageSize, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrKafkaInvalidConfig.Wrap(err)
}
} else {
d.maxKafkaMessageSize = DefaultMaxMessageBytes

maxMessageBytes, ok := params["max-message-bytes"]
if !ok {
return cerror.ErrKafkaInvalidConfig.GenWithStack("max-message-bytes not found")
}

d.maxMessageBytes, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrKafkaInvalidConfig.Wrap(err)
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
}

if d.maxKafkaMessageSize <= 0 {
return cerror.ErrKafkaInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxKafkaMessageSize))
if d.maxMessageBytes <= 0 {
return cerror.ErrKafkaInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxMessageBytes))
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
}

if maxBatchSize, ok := params["max-batch-size"]; ok {
Expand Down
53 changes: 41 additions & 12 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"testing"

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
)

Expand Down Expand Up @@ -224,10 +226,10 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder().(*JSONEventBatchEncoder)
err := encoder.SetParams(map[string]string{})
err := encoder.SetParams(map[string]string{"max-message-bytes": "10485760"})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-message-bytes": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")
Expand All @@ -238,28 +240,50 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxInt32)
c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxInt32)

err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxUint32)
c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxUint32)

err = encoder.SetParams(map[string]string{"max-batch-size": "0"})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")

err = encoder.SetParams(map[string]string{"max-batch-size": "-1"})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": "-1"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxInt32)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxUint32)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes)
}

func (s *batchSuite) TestSetParams(c *check.C) {
defer testleak.AfterTest(c)

opts := make(map[string]string)
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(opts)
c.Assert(
errors.Cause(err),
check.ErrorMatches,
".*max-message-bytes not found.*",
)

opts["max-message-bytes"] = "1"
encoder = NewJSONEventBatchEncoder()
err = encoder.SetParams(opts)
c.Assert(err, check.IsNil)
c.Assert(encoder, check.NotNil)
jsonEncoder, ok := encoder.(*JSONEventBatchEncoder)
c.Assert(ok, check.IsTrue)
c.Assert(jsonEncoder.GetMaxMessageBytes(), check.Equals, 1)
}

func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
Expand Down Expand Up @@ -307,8 +331,13 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
func (s *batchSuite) TestMaxBatchSize(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-batch-size": "64"})
c.Check(err, check.IsNil)
err := encoder.SetParams(map[string]string{"max-message-bytes": "1048576", "max-batch-size": "64"})
c.Assert(encoder, check.NotNil)
c.Assert(err, check.IsNil)

jsonEncoder, ok := encoder.(*JSONEventBatchEncoder)
c.Assert(ok, check.IsTrue)
c.Assert(jsonEncoder.GetMaxMessageBytes(), check.Equals, 1048576)

testEvent := &model.RowChangedEvent{
CommitTs: 1,
Expand Down
10 changes: 7 additions & 3 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,11 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool {
return r == '/'
})
producer, err := kafka.NewKafkaSaramaProducer(ctx, sinkURI.Host, topic, config, errCh)
if topic == "" {
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri")
}

producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, config, opts, errCh)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -443,8 +447,8 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter,
if s != "" {
opts["max-batch-size"] = s
}
// For now, it's a place holder. Avro format have to make connection to Schema Registery,
// and it may needs credential.
// For now, it's a placeholder. Avro format have to make connection to Schema Registry,
// and it may need credential.
credential := &security.Credential{}
sink, err := newMqSink(ctx, credential, producer, filter, replicaConfig, opts, errCh)
if err != nil {
Expand Down
16 changes: 14 additions & 2 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,19 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

c.Assert(failpoint.Enable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

encoder := sink.newEncoder()
c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{})
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxKafkaMessageSize(), check.Equals, 4194304)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 4194304)

// mock kafka broker processes 1 row changed event
leader.Returns(prodSuccess)
Expand Down Expand Up @@ -159,6 +165,12 @@ func (s mqSinkSuite) TestKafkaSinkFilter(c *check.C) {
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

c.Assert(failpoint.Enable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

Expand Down Expand Up @@ -216,5 +228,5 @@ func (s mqSinkSuite) TestPulsarSinkEncoderConfig(c *check.C) {
encoder := sink.newEncoder()
c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{})
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxKafkaMessageSize(), check.Equals, 4194304)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 4194304)
}
Loading