Skip to content

Commit

Permalink
fix test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Nov 8, 2021
1 parent 1796153 commit 7a24e98
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
12 changes: 9 additions & 3 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,11 @@ func topicPreProcess(config *Config, saramaConfig *sarama.Config) error {
return nil
}

var newSaramaConfigImpl = newSaramaConfig
var (
newSaramaConfigImpl = newSaramaConfig
// when use kafka broker version less than "1.0.0", we would have to disable topicPreProcess
enableTopicPreProcess = true
)

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
Expand All @@ -495,8 +499,10 @@ func NewKafkaSaramaProducer(ctx context.Context, config *Config, errCh chan erro
return nil, err
}

if err := topicPreProcess(config, cfg); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
if enableTopicPreProcess {
if err := topicPreProcess(config, cfg); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
}

asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg)
Expand Down
12 changes: 4 additions & 8 deletions cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/Shopify/sarama"
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/ticdc/cdc/sink/codec"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
Expand Down Expand Up @@ -146,7 +145,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) {
config.BrokerEndpoints = strings.Split(leader.Addr(), ",")
config.TopicName = topic

c.Assert(failpoint.Enable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/workaround4NewClusterAdmin", ""), check.IsNil)
enableTopicPreProcess = false

newSaramaConfigImplBak := newSaramaConfigImpl
newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) {
Expand All @@ -157,7 +156,6 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) {
}
defer func() {
newSaramaConfigImpl = newSaramaConfigImplBak
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/workaround4NewClusterAdmin")
}()

producer, err := NewKafkaSaramaProducer(ctx, config, errCh)
Expand Down Expand Up @@ -437,6 +435,8 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) {
config.BrokerEndpoints = strings.Split(leader.Addr(), ",")
config.TopicName = topic

enableTopicPreProcess = false

newSaramaConfigImplBak := newSaramaConfigImpl
newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) {
cfg, err := newSaramaConfigImplBak(ctx, config)
Expand All @@ -450,12 +450,9 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) {
newSaramaConfigImpl = newSaramaConfigImplBak
}()

c.Assert(failpoint.Enable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/workaround4NewClusterAdmin", ""), check.IsNil)

errCh := make(chan error, 1)
producer, err := NewKafkaSaramaProducer(ctx, config, errCh)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/workaround4NewClusterAdmin")
err := producer.Close()
c.Assert(err, check.IsNil)
}()
Expand Down Expand Up @@ -506,7 +503,6 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) {
leader.Returns(metadataResponse)
leader.Returns(metadataResponse)

c.Assert(failpoint.Enable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/workaround4NewClusterAdmin", ""), check.IsNil)
config := NewConfig()
// Because the sarama mock broker is not compatible with version larger than 1.0.0
// We use a smaller version in the following producer tests.
Expand All @@ -517,10 +513,10 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) {
config.BrokerEndpoints = strings.Split(leader.Addr(), ",")
config.TopicName = topic

enableTopicPreProcess = false
errCh := make(chan error, 1)
producer, err := NewKafkaSaramaProducer(ctx, config, errCh)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/workaround4NewClusterAdmin")
err := producer.Close()
c.Assert(err, check.IsNil)
}()
Expand Down

0 comments on commit 7a24e98

Please sign in to comment.