Skip to content

Commit

Permalink
cdc/sink: Kafka support user set configuration (#4512)
Browse files Browse the repository at this point in the history
close #4385
  • Loading branch information
3AceShowHand authored Feb 8, 2022
1 parent da0dda8 commit 801bb60
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 2 deletions.
39 changes: 39 additions & 0 deletions cdc/sink/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ type Config struct {
SaslScram *security.SaslScram
// control whether to create topic
AutoCreate bool

// Timeout for sarama `config.Net` configurations, default to `10s`
DialTimeout time.Duration
WriteTimeout time.Duration
ReadTimeout time.Duration
}

// NewConfig returns a default Kafka configuration
Expand All @@ -63,6 +68,9 @@ func NewConfig() *Config {
Credential: &security.Credential{},
SaslScram: &security.SaslScram{},
AutoCreate: true,
DialTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
ReadTimeout: 10 * time.Second,
}
}

Expand Down Expand Up @@ -201,6 +209,33 @@ func CompleteConfigsAndOpts(sinkURI *url.URL, producerConfig *Config, replicaCon
opts["enable-tidb-extension"] = s
}

s = params.Get("dial-timeout")
if s != "" {
a, err := time.ParseDuration(s)
if err != nil {
return err
}
producerConfig.DialTimeout = a
}

s = params.Get("write-timeout")
if s != "" {
a, err := time.ParseDuration(s)
if err != nil {
return err
}
producerConfig.WriteTimeout = a
}

s = params.Get("read-timeout")
if s != "" {
a, err := time.ParseDuration(s)
if err != nil {
return err
}
producerConfig.ReadTimeout = a
}

return nil
}

Expand Down Expand Up @@ -243,6 +278,10 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
// and https://github.com/pingcap/tiflow/issues/3352.
config.Metadata.Timeout = 1 * time.Minute

config.Net.DialTimeout = c.DialTimeout
config.Net.WriteTimeout = c.WriteTimeout
config.Net.ReadTimeout = c.ReadTimeout

config.Producer.Partitioner = sarama.NewManualPartitioner
config.Producer.MaxMessageBytes = c.MaxMessageBytes
config.Producer.Return.Successes = true
Expand Down
37 changes: 35 additions & 2 deletions cdc/sink/producer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"net/url"
"time"

"github.com/Shopify/sarama"
"github.com/pingcap/check"
Expand All @@ -35,7 +36,6 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
config.Version = "invalid"
_, err := newSaramaConfigImpl(ctx, config)
c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*")

ctx = util.SetOwnerInCtx(ctx)
config.Version = "2.6.0"
config.ClientID = "^invalid$"
Expand Down Expand Up @@ -84,8 +84,41 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
c.Assert(cfg.Net.SASL.Mechanism, check.Equals, sarama.SASLMechanism("SCRAM-SHA-256"))
}

func (s *kafkaSuite) TestConfigTimeouts(c *check.C) {
defer testleak.AfterTest(c)()

cfg := NewConfig()
c.Assert(cfg.DialTimeout, check.Equals, 10*time.Second)
c.Assert(cfg.ReadTimeout, check.Equals, 10*time.Second)
c.Assert(cfg.WriteTimeout, check.Equals, 10*time.Second)

saramaConfig, err := newSaramaConfig(context.Background(), cfg)
c.Assert(err, check.IsNil)
c.Assert(saramaConfig.Net.DialTimeout, check.Equals, cfg.DialTimeout)
c.Assert(saramaConfig.Net.WriteTimeout, check.Equals, cfg.WriteTimeout)
c.Assert(saramaConfig.Net.ReadTimeout, check.Equals, cfg.ReadTimeout)

uri := "kafka://127.0.0.1:9092/kafka-test?dial-timeout=5s&read-timeout=1000ms" +
"&write-timeout=2m"
sinkURI, err := url.Parse(uri)
c.Assert(err, check.IsNil)
opts := make(map[string]string)
err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts)
c.Assert(err, check.IsNil)

c.Assert(cfg.DialTimeout, check.Equals, 5*time.Second)
c.Assert(cfg.ReadTimeout, check.Equals, 1000*time.Millisecond)
c.Assert(cfg.WriteTimeout, check.Equals, 2*time.Minute)

saramaConfig, err = newSaramaConfig(context.Background(), cfg)
c.Assert(err, check.IsNil)
c.Assert(saramaConfig.Net.DialTimeout, check.Equals, 5*time.Second)
c.Assert(saramaConfig.Net.ReadTimeout, check.Equals, 1000*time.Millisecond)
c.Assert(saramaConfig.Net.WriteTimeout, check.Equals, 2*time.Minute)
}

func (s *kafkaSuite) TestCompleteConfigByOpts(c *check.C) {
defer testleak.AfterTest(c)
defer testleak.AfterTest(c)()
cfg := NewConfig()

// Normal config.
Expand Down

0 comments on commit 801bb60

Please sign in to comment.