diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index 8a93330e5..3701dbd7f 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -130,7 +130,7 @@ type ConsumerConfig struct { IsRegexPattern bool `json:"isRegexPattern,omitempty"` SchemaProperties map[string]string `json:"schemaProperties,omitempty"` ConsumerProperties map[string]string `json:"consumerProperties,omitempty"` - ReceiverQueueSize int32 `json:"receiverQueueSize,omitempty"` + ReceiverQueueSize *int32 `json:"receiverQueueSize,omitempty"` CryptoConfig *CryptoConfig `json:"cryptoConfig,omitempty"` } diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index ae93d7354..0ef02279d 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -42,6 +42,11 @@ func (in *ConsumerConfig) DeepCopyInto(out *ConsumerConfig) { (*out)[key] = val } } + if in.ReceiverQueueSize != nil { + in, out := &in.ReceiverQueueSize, &out.ReceiverQueueSize + *out = new(int32) + **out = **in + } if in.CryptoConfig != nil { in, out := &in.CryptoConfig, &out.CryptoConfig *out = new(CryptoConfig) diff --git a/controllers/spec/common.go b/controllers/spec/common.go index 9fcee3a2f..85e79ccd4 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -396,6 +396,9 @@ func convertSubPosition(pos v1alpha1.SubscribePosition) proto.SubscriptionPositi } func generateRetryDetails(maxMessageRetry int32, deadLetterTopic string) *proto.RetryDetails { + if maxMessageRetry <= 0 && deadLetterTopic == "" { + return nil + } return &proto.RetryDetails{ MaxMessageRetries: maxMessageRetry, DeadLetterTopic: deadLetterTopic, diff --git a/controllers/spec/utils.go b/controllers/spec/utils.go index 7cc623bb2..1df623737 100644 --- a/controllers/spec/utils.go +++ b/controllers/spec/utils.go @@ -124,11 +124,17 @@ func generateInputSpec(sourceConf v1alpha1.InputConf) map[string]*proto.Consumer if sourceConf.SourceSpecs != nil && len(sourceConf.SourceSpecs) > 0 { for topicName, conf := range sourceConf.SourceSpecs { + var receiverQueueSize *proto.ConsumerSpec_ReceiverQueueSize + if conf.ReceiverQueueSize != nil { + receiverQueueSize = &proto.ConsumerSpec_ReceiverQueueSize{Value: *conf.ReceiverQueueSize} + } else { + receiverQueueSize = nil + } inputSpecs[topicName] = &proto.ConsumerSpec{ SchemaType: conf.SchemaType, SerdeClassName: conf.SerdeClassName, IsRegexPattern: conf.IsRegexPattern, - ReceiverQueueSize: &proto.ConsumerSpec_ReceiverQueueSize{Value: conf.ReceiverQueueSize}, + ReceiverQueueSize: receiverQueueSize, SchemaProperties: conf.SchemaProperties, ConsumerProperties: conf.ConsumerProperties, CryptoSpec: generateCryptoSpec(conf.CryptoConfig), diff --git a/tools/migration.go b/tools/migration.go index 2d5d3a46d..95a8d9e46 100755 --- a/tools/migration.go +++ b/tools/migration.go @@ -82,7 +82,7 @@ func main() { SchemaType: functionConfig.InputSpecs[s].SchemaType, SerdeClassName: functionConfig.InputSpecs[s].SerdeClassName, IsRegexPattern: functionConfig.InputSpecs[s].IsRegexPattern, - ReceiverQueueSize: receiveQueueSize, + ReceiverQueueSize: &receiveQueueSize, } } timeoutMs := int32(0)