From 801bb607615e42be6be7770a302d692bca037834 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Tue, 8 Feb 2022 16:41:35 +0800 Subject: [PATCH] cdc/sink: Kafka support user set configuration (#4512) close pingcap/tiflow#4385 --- cdc/sink/producer/kafka/config.go | 39 ++++++++++++++++++++++++++ cdc/sink/producer/kafka/config_test.go | 37 ++++++++++++++++++++++-- 2 files changed, 74 insertions(+), 2 deletions(-) diff --git a/cdc/sink/producer/kafka/config.go b/cdc/sink/producer/kafka/config.go index 4f9bee3af36..4f2bd6bfce7 100644 --- a/cdc/sink/producer/kafka/config.go +++ b/cdc/sink/producer/kafka/config.go @@ -50,6 +50,11 @@ type Config struct { SaslScram *security.SaslScram // control whether to create topic AutoCreate bool + + // Timeout for sarama `config.Net` configurations, default to `10s` + DialTimeout time.Duration + WriteTimeout time.Duration + ReadTimeout time.Duration } // NewConfig returns a default Kafka configuration @@ -63,6 +68,9 @@ func NewConfig() *Config { Credential: &security.Credential{}, SaslScram: &security.SaslScram{}, AutoCreate: true, + DialTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + ReadTimeout: 10 * time.Second, } } @@ -201,6 +209,33 @@ func CompleteConfigsAndOpts(sinkURI *url.URL, producerConfig *Config, replicaCon opts["enable-tidb-extension"] = s } + s = params.Get("dial-timeout") + if s != "" { + a, err := time.ParseDuration(s) + if err != nil { + return err + } + producerConfig.DialTimeout = a + } + + s = params.Get("write-timeout") + if s != "" { + a, err := time.ParseDuration(s) + if err != nil { + return err + } + producerConfig.WriteTimeout = a + } + + s = params.Get("read-timeout") + if s != "" { + a, err := time.ParseDuration(s) + if err != nil { + return err + } + producerConfig.ReadTimeout = a + } + return nil } @@ -243,6 +278,10 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { // and https://github.com/pingcap/tiflow/issues/3352. config.Metadata.Timeout = 1 * time.Minute + config.Net.DialTimeout = c.DialTimeout + config.Net.WriteTimeout = c.WriteTimeout + config.Net.ReadTimeout = c.ReadTimeout + config.Producer.Partitioner = sarama.NewManualPartitioner config.Producer.MaxMessageBytes = c.MaxMessageBytes config.Producer.Return.Successes = true diff --git a/cdc/sink/producer/kafka/config_test.go b/cdc/sink/producer/kafka/config_test.go index d34ef822879..2a10aaf4ab5 100644 --- a/cdc/sink/producer/kafka/config_test.go +++ b/cdc/sink/producer/kafka/config_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "net/url" + "time" "github.com/Shopify/sarama" "github.com/pingcap/check" @@ -35,7 +36,6 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { config.Version = "invalid" _, err := newSaramaConfigImpl(ctx, config) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") - ctx = util.SetOwnerInCtx(ctx) config.Version = "2.6.0" config.ClientID = "^invalid$" @@ -84,8 +84,41 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { c.Assert(cfg.Net.SASL.Mechanism, check.Equals, sarama.SASLMechanism("SCRAM-SHA-256")) } +func (s *kafkaSuite) TestConfigTimeouts(c *check.C) { + defer testleak.AfterTest(c)() + + cfg := NewConfig() + c.Assert(cfg.DialTimeout, check.Equals, 10*time.Second) + c.Assert(cfg.ReadTimeout, check.Equals, 10*time.Second) + c.Assert(cfg.WriteTimeout, check.Equals, 10*time.Second) + + saramaConfig, err := newSaramaConfig(context.Background(), cfg) + c.Assert(err, check.IsNil) + c.Assert(saramaConfig.Net.DialTimeout, check.Equals, cfg.DialTimeout) + c.Assert(saramaConfig.Net.WriteTimeout, check.Equals, cfg.WriteTimeout) + c.Assert(saramaConfig.Net.ReadTimeout, check.Equals, cfg.ReadTimeout) + + uri := "kafka://127.0.0.1:9092/kafka-test?dial-timeout=5s&read-timeout=1000ms" + + "&write-timeout=2m" + sinkURI, err := url.Parse(uri) + c.Assert(err, check.IsNil) + opts := make(map[string]string) + err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts) + c.Assert(err, check.IsNil) + + c.Assert(cfg.DialTimeout, check.Equals, 5*time.Second) + c.Assert(cfg.ReadTimeout, check.Equals, 1000*time.Millisecond) + c.Assert(cfg.WriteTimeout, check.Equals, 2*time.Minute) + + saramaConfig, err = newSaramaConfig(context.Background(), cfg) + c.Assert(err, check.IsNil) + c.Assert(saramaConfig.Net.DialTimeout, check.Equals, 5*time.Second) + c.Assert(saramaConfig.Net.ReadTimeout, check.Equals, 1000*time.Millisecond) + c.Assert(saramaConfig.Net.WriteTimeout, check.Equals, 2*time.Minute) +} + func (s *kafkaSuite) TestCompleteConfigByOpts(c *check.C) { - defer testleak.AfterTest(c) + defer testleak.AfterTest(c)() cfg := NewConfig() // Normal config.