From 4c17d1d161204f3e70677467854929894ce5909a Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Tue, 29 Dec 2020 17:58:04 +0800 Subject: [PATCH 1/2] fix max-batch-size --- cdc/sink/mq.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index b8fb52e1b8b..809419d1af3 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -424,7 +424,7 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi s = sinkURI.Query().Get("max-batch-size") if s != "" { - opts["max-message-bytes"] = s + opts["max-batch-size"] = s } s = sinkURI.Query().Get("compression") From c6c794fa2b79f7506ef28fb551a5bef2243a762f Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Tue, 29 Dec 2020 18:08:15 +0800 Subject: [PATCH 2/2] fix max-batch-size for Pulsar --- cdc/sink/mq.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 809419d1af3..203f994c5d8 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -494,7 +494,7 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, s = sinkURI.Query().Get("max-batch-size") if s != "" { - opts["max-message-bytes"] = s + opts["max-batch-size"] = s } // For now, it's a place holder. Avro format have to make connection to Schema Registery, // and it may needs credential.