Skip to content

Commit

Permalink
Merge branch 'master' into add-log-rate-limit-for-table-actro
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Sep 22, 2022
2 parents 97db214 + 5bf54f2 commit d8fff7a
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 4 deletions.
4 changes: 2 additions & 2 deletions cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func NewKafkaDDLSink(
// otherwise the adminClient will never be closed and lead to a goroutine leak.
defer func() {
if err != nil {
if err = adminClient.Close(); err != nil {
if closeErr := adminClient.Close(); closeErr != nil {
log.Error("Close admin client failed in kafka "+
"DDL sink", zap.Error(err))
"DDL sink", zap.Error(closeErr))
}
}
}()
Expand Down
25 changes: 25 additions & 0 deletions cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,31 @@ func initBroker(t *testing.T, partitionNum int) (*sarama.MockBroker, string) {
return leader, topic
}

func TestNewKafkaDDLSinkFailed(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

leader, topic := initBroker(t, kafka.DefaultMockPartitionNum)
defer leader.Close()
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=avro"
uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic)

sinkURI, err := url.Parse(uri)
require.Nil(t, err)
replicaConfig := config.GetDefaultReplicaConfig()
require.Nil(t, replicaConfig.ValidateAndAdjust(sinkURI))

s, err := NewKafkaDDLSink(ctx, sinkURI, replicaConfig,
kafka.NewMockAdminClient, ddlproducer.NewMockDDLProducer)
require.ErrorContains(t, err, "Avro protocol requires parameter \"schema-registry\"",
"should report error when protocol is avro but schema-registry is not set")
require.Nil(t, s)
}

func TestWriteDDLEventToAllPartitions(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 2 additions & 2 deletions cdc/sinkv2/eventsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func NewKafkaDMLSink(
// otherwise the adminClient will never be closed and lead to a goroutine leak.
defer func() {
if err != nil {
if err = adminClient.Close(); err != nil {
if closeErr := adminClient.Close(); closeErr != nil {
log.Error("Close admin client failed in kafka "+
"DML sink", zap.Error(err))
"DML sink", zap.Error(closeErr))
}
}
}()
Expand Down
26 changes: 26 additions & 0 deletions cdc/sinkv2/eventsink/mq/mq_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,32 @@ func initBroker(t *testing.T, partitionNum int) (*sarama.MockBroker, string) {
return leader, topic
}

func TestNewKafkaDMLSinkFailed(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

leader, topic := initBroker(t, kafka.DefaultMockPartitionNum)
defer leader.Close()
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=avro"
uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic)

sinkURI, err := url.Parse(uri)
require.Nil(t, err)
replicaConfig := config.GetDefaultReplicaConfig()
require.Nil(t, replicaConfig.ValidateAndAdjust(sinkURI))
errCh := make(chan error, 1)

s, err := NewKafkaDMLSink(ctx, sinkURI, replicaConfig, errCh,
kafka.NewMockAdminClient, dmlproducer.NewDMLMockProducer)
require.ErrorContains(t, err, "Avro protocol requires parameter \"schema-registry\"",
"should report error when protocol is avro but schema-registry is not set")
require.Nil(t, s)
}

func TestWriteEvents(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit d8fff7a

Please sign in to comment.