Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka(ticdc): sarama admin client fetch metadata by cache (#9511) #9550

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cdc/sink/mq/manager/kafka_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewKafkaTopicManager(
ctx context.Context,
admin kafka.ClusterAdminClient,
cfg *kafkaconfig.AutoCreateTopicConfig,
) (*kafkaTopicManager, error) {
) *kafkaTopicManager {
changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
mgr := &kafkaTopicManager{
changefeedID: changefeedID,
Expand All @@ -74,7 +74,7 @@ func NewKafkaTopicManager(
// Background refresh metadata.
go mgr.backgroundRefreshMeta(ctx)

return mgr, nil
return mgr
}

// GetPartitionNum returns the number of partitions of the topic.
Expand Down
18 changes: 6 additions & 12 deletions cdc/sink/mq/manager/kafka_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ func TestPartitions(t *testing.T) {
ReplicationFactor: 1,
}

manager, err := NewKafkaTopicManager(context.TODO(), adminClient, cfg)
require.Nil(t, err)
manager := NewKafkaTopicManager(context.TODO(), adminClient, cfg)
defer manager.Close()
partitionsNum, err := manager.GetPartitionNum(
kafka.DefaultMockTopicName)
partitionsNum, err := manager.GetPartitionNum(kafka.DefaultMockTopicName)
require.Nil(t, err)
require.Equal(t, int32(3), partitionsNum)
}
Expand All @@ -59,8 +57,7 @@ func TestCreateTopic(t *testing.T) {
ReplicationFactor: 1,
}

manager, err := NewKafkaTopicManager(ctx, adminClient, cfg)
require.Nil(t, err)
manager := NewKafkaTopicManager(ctx, adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.CreateTopicAndWaitUntilVisible(kafka.DefaultMockTopicName)
require.Nil(t, err)
Expand All @@ -75,8 +72,7 @@ func TestCreateTopic(t *testing.T) {

// Try to create a topic without auto create.
cfg.AutoCreate = false
manager, err = NewKafkaTopicManager(ctx, adminClient, cfg)
require.Nil(t, err)
manager = NewKafkaTopicManager(ctx, adminClient, cfg)
defer manager.Close()
_, err = manager.CreateTopicAndWaitUntilVisible("new-topic2")
require.Regexp(
Expand All @@ -92,8 +88,7 @@ func TestCreateTopic(t *testing.T) {
PartitionNum: 2,
ReplicationFactor: 4,
}
manager, err = NewKafkaTopicManager(ctx, adminClient, cfg)
require.Nil(t, err)
manager = NewKafkaTopicManager(ctx, adminClient, cfg)
defer manager.Close()
_, err = manager.CreateTopicAndWaitUntilVisible("new-topic-failed")
require.Regexp(
Expand All @@ -116,8 +111,7 @@ func TestCreateTopicWithDelay(t *testing.T) {
ReplicationFactor: 1,
}

manager, err := NewKafkaTopicManager(context.TODO(), adminClient, cfg)
require.Nil(t, err)
manager := NewKafkaTopicManager(context.TODO(), adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.createTopic("new_topic")
require.Nil(t, err)
Expand Down
10 changes: 1 addition & 9 deletions cdc/sink/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,15 +432,7 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

topicManager, err := manager.NewKafkaTopicManager(
ctx,
adminClient,
baseConfig.DeriveTopicConfig(),
)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

topicManager := manager.NewKafkaTopicManager(ctx, adminClient, baseConfig.DeriveTopicConfig())
if _, err := topicManager.CreateTopicAndWaitUntilVisible(topic); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err)
}
Expand Down
24 changes: 12 additions & 12 deletions cdc/sink/mq/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,21 +434,21 @@ func NewSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
}
config.Version = version

// Producer fetch metadata from brokers frequently, if metadata cannot be
// refreshed easily, this would indicate the network condition between the
// Admin client would refresh metadata periodically,
// if metadata cannot be refreshed easily, this would indicate the network condition between the
// capture server and kafka broker is not good.
// In the scenario that cannot get response from Kafka server, this default
// setting can help to get response more quickly.
config.Metadata.Retry.Max = 1
config.Metadata.Retry.Backoff = 100 * time.Millisecond
// Set the timeout to 2 minutes to ensure that the underlying client does not retry for too long.
// If retrying to obtain the metadata fails, simply return the error and let sinkManager rebuild the sink.
config.Metadata.Retry.Max = 10
config.Metadata.Retry.Backoff = 200 * time.Millisecond
// This Timeout is useless if the `RefreshMetadata` time cost is less than it.
config.Metadata.Timeout = 1 * time.Minute
config.Metadata.Timeout = 2 * time.Minute

// Admin.Retry take effect on `ClusterAdmin` related operations,
// only `CreateTopic` for cdc now. set the `Timeout` to `1m` to make CI stable.
config.Admin.Retry.Max = 5
config.Admin.Retry.Backoff = 100 * time.Millisecond
config.Admin.Timeout = 1 * time.Minute
config.Admin.Retry.Max = 10
config.Admin.Retry.Backoff = 200 * time.Millisecond
// This timeout control the request timeout for each admin request.
// set it as the read timeout.
config.Admin.Timeout = 10 * time.Second

// Producer.Retry take effect when the producer try to send message to kafka
// brokers. If kafka cluster is healthy, just the default value should be enough.
Expand Down
10 changes: 1 addition & 9 deletions cdc/sinkv2/util/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,7 @@ func GetTopicManagerAndTryCreateTopic(
topicCfg *kafka.AutoCreateTopicConfig,
adminClient pkafka.ClusterAdminClient,
) (manager.TopicManager, error) {
topicManager, err := manager.NewKafkaTopicManager(
ctx,
adminClient,
topicCfg,
)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

topicManager := manager.NewKafkaTopicManager(ctx, adminClient, topicCfg)
if _, err := topicManager.CreateTopicAndWaitUntilVisible(topic); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err)
}
Expand Down
Loading