Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions bindings/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,14 @@ metadata:
The default is none.
example: '"gzip"'
default: "none"
- name: consumerGroupRebalanceStrategy
type: string
required: false
description: |
The strategy to use for consumer group rebalancing.
example: '"sticky"'
default: '"range"'
allowedValues:
- "range"
- "sticky"
- "roundrobin"
20 changes: 20 additions & 0 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
config.Consumer.Fetch.Default = meta.consumerFetchDefault
config.Consumer.Group.Heartbeat.Interval = meta.HeartbeatInterval
config.Consumer.Group.Session.Timeout = meta.SessionTimeout
k.initConsumerGroupRebalanceStrategy(config, metadata)
config.ChannelBufferSize = meta.channelBufferSize

config.Producer.Compression = meta.internalCompression
Expand Down Expand Up @@ -260,6 +261,25 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
return nil
}

func (k *Kafka) initConsumerGroupRebalanceStrategy(config *sarama.Config, metadata map[string]string) {
consumerGroupRebalanceStrategy, ok := kitmd.GetMetadataProperty(metadata, "consumerGroupRebalanceStrategy")
if !ok {
consumerGroupRebalanceStrategy = consumerGroupRebalanceStrategyRange
}
switch strings.ToLower(consumerGroupRebalanceStrategy) {
case consumerGroupRebalanceStrategySticky:
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
case consumerGroupRebalanceStrategyRoundRobin:
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
case consumerGroupRebalanceStrategyRange:
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
default:
k.logger.Warnf("Invalid consumer group rebalance strategy: %s. Using default strategy: '%s'", consumerGroupRebalanceStrategy, consumerGroupRebalanceStrategyRange)
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
}
k.logger.Infof("Consumer group rebalance strategy set to '%s'", config.Consumer.Group.Rebalance.GroupStrategies[0].Name())
}

func (k *Kafka) ValidateAWS(metadata map[string]string) (*awsAuth.DeprecatedKafkaIAM, error) {
const defaultSessionName = "DaprDefaultSession"
// This is needed as we remove the aws prefixed fields to use the builtin AWS profile fields instead.
Expand Down
86 changes: 86 additions & 0 deletions common/component/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,89 @@ func TestValidateAWS(t *testing.T) {
})
}
}

func TestInitConsumerGroupRebalanceStrategy(t *testing.T) {
tests := []struct {
name string
metadata map[string]string
expectedStrategy string
}{
{
name: "missing consumerGroupRebalanceStrategy property defaults to Range",
metadata: map[string]string{},
expectedStrategy: "range",
},
{
name: "empty consumerGroupRebalanceStrategy property defaults to Range",
metadata: map[string]string{
"consumerGroupRebalanceStrategy": "",
},
expectedStrategy: "range",
},
{
name: "valid sticky strategy",
metadata: map[string]string{
"consumerGroupRebalanceStrategy": "sticky",
},
expectedStrategy: "sticky",
},
{
name: "valid roundrobin strategy",
metadata: map[string]string{
"consumerGroupRebalanceStrategy": "roundrobin",
},
expectedStrategy: "roundrobin",
},
{
name: "valid range strategy",
metadata: map[string]string{
"consumerGroupRebalanceStrategy": "range",
},
expectedStrategy: "range",
},
{
name: "case insensitive strategy",
metadata: map[string]string{
"consumerGroupRebalanceStrategy": "sTickY",
},
expectedStrategy: "sticky",
},
{
name: "case insensitive strategy default",
metadata: map[string]string{
"consumerGroupRebalanceStrategy": "Range",
},
expectedStrategy: "range",
},
{
name: "invalid strategy defaults to Range with warning",
metadata: map[string]string{
"consumerGroupRebalanceStrategy": "invalid",
},
expectedStrategy: "range",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create Kafka instance with logger
k := &Kafka{
logger: logger.NewLogger("kafka_test"),
}

// Create sarama config
config := sarama.NewConfig()

// Call the method under test
k.initConsumerGroupRebalanceStrategy(config, tt.metadata)

// Verify the strategy was set correctly
require.Len(t, config.Consumer.Group.Rebalance.GroupStrategies, 1, "Expected exactly one rebalance strategy")

assert.Equal(t, tt.expectedStrategy, config.Consumer.Group.Rebalance.GroupStrategies[0].Name())

// Note: Warning verification would require a more sophisticated mock
// For now, we just verify the strategy is set correctly
})
}
}
54 changes: 29 additions & 25 deletions common/component/kafka/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,32 @@ import (
)

const (
key = "partitionKey"
keyMetadataKey = "__key"
timestampMetadataKey = "__timestamp"
offsetMetadataKey = "__offset"
partitionMetadataKey = "__partition"
topicMetadataKey = "__topic"
skipVerify = "skipVerify"
caCert = "caCert"
certificateAuthType = "certificate"
clientCert = "clientCert"
clientKey = "clientKey"
consumeRetryInterval = "consumeRetryInterval"
authType = "authType"
passwordAuthType = "password"
oidcAuthType = "oidc"
mtlsAuthType = "mtls"
awsIAMAuthType = "awsiam"
noAuthType = "none"
consumerFetchMin = "consumerFetchMin"
consumerFetchDefault = "consumerFetchDefault"
channelBufferSize = "channelBufferSize"
valueSchemaType = "valueSchemaType"
compression = "compression"
key = "partitionKey"
keyMetadataKey = "__key"
timestampMetadataKey = "__timestamp"
offsetMetadataKey = "__offset"
partitionMetadataKey = "__partition"
topicMetadataKey = "__topic"
skipVerify = "skipVerify"
caCert = "caCert"
certificateAuthType = "certificate"
clientCert = "clientCert"
clientKey = "clientKey"
consumeRetryInterval = "consumeRetryInterval"
authType = "authType"
passwordAuthType = "password"
oidcAuthType = "oidc"
mtlsAuthType = "mtls"
awsIAMAuthType = "awsiam"
noAuthType = "none"
consumerFetchMin = "consumerFetchMin"
consumerFetchDefault = "consumerFetchDefault"
channelBufferSize = "channelBufferSize"
valueSchemaType = "valueSchemaType"
compression = "compression"
consumerGroupRebalanceStrategyRange = "range"
consumerGroupRebalanceStrategySticky = "sticky"
consumerGroupRebalanceStrategyRoundRobin = "roundrobin"

// Kafka client config default values.
// Refresh interval < keep alive time so that way connection can be kept alive indefinitely if desired.
Expand Down Expand Up @@ -99,8 +102,9 @@ type KafkaMetadata struct {

channelBufferSize int `mapstructure:"-"`

consumerFetchMin int32 `mapstructure:"-"`
consumerFetchDefault int32 `mapstructure:"-"`
consumerFetchMin int32 `mapstructure:"-"`
consumerFetchDefault int32 `mapstructure:"-"`
ConsumerGroupRebalanceStrategy string `mapstructure:"consumerGroupRebalanceStrategy"`

// configs for kafka producer
Compression string `mapstructure:"compression"`
Expand Down
11 changes: 11 additions & 0 deletions pubsub/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,14 @@ metadata:
The default is none.
example: '"gzip"'
default: "none"
- name: consumerGroupRebalanceStrategy
type: string
required: false
description: |
The strategy to use for consumer group rebalancing.
example: '"sticky"'
default: '"range"'
allowedValues:
- "range"
- "sticky"
- "roundrobin"
Loading