Skip to content

Commit

Permalink
Add connection name for the rabbitmq scaler (#6093)
Browse files Browse the repository at this point in the history
* add a static connection name

Signed-off-by: robpickerill <r.pickerill@gmail.com>

* Update pkg/scalers/rabbitmq_scaler.go

Co-authored-by: Jorge Turrado Ferrero <Jorge_turrado@hotmail.es>
Signed-off-by: Rob Pickerill <r.pickerill@gmail.com>

* add improvement to changelog

Signed-off-by: robpickerill <r.pickerill@gmail.com>

* add namepace and so name to conn name

Signed-off-by: robpickerill <r.pickerill@gmail.com>

* Update comment

Signed-off-by: Jorge Turrado Ferrero <Jorge_turrado@hotmail.es>

---------

Signed-off-by: robpickerill <r.pickerill@gmail.com>
Signed-off-by: Rob Pickerill <r.pickerill@gmail.com>
Signed-off-by: Jorge Turrado Ferrero <Jorge_turrado@hotmail.es>
Co-authored-by: Jorge Turrado Ferrero <Jorge_turrado@hotmail.es>
  • Loading branch information
robpickerill and JorTurFer authored Aug 21, 2024
1 parent 0771b35 commit f4261e3
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ Here is an overview of all new **experimental** features:
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958))
- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))

### Fixes

Expand Down
36 changes: 24 additions & 12 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type rabbitMQScaler struct {

type rabbitMQMetadata struct {
queueName string
connectionName string // name used for the AMQP connection
mode string // QueueLength or MessageRate
value float64 // trigger value (queue length or publish/sec. rate)
activationValue float64 // activation value
Expand Down Expand Up @@ -232,7 +233,9 @@ func resolveTLSAuthParams(config *scalersconfig.ScalerConfig, meta *rabbitMQMeta
}

func parseRabbitMQMetadata(config *scalersconfig.ScalerConfig) (*rabbitMQMetadata, error) {
meta := rabbitMQMetadata{}
meta := rabbitMQMetadata{
connectionName: connectionName(config),
}

// Resolve protocol type
if err := resolveProtocol(config, &meta); err != nil {
Expand Down Expand Up @@ -445,22 +448,25 @@ func parseTrigger(meta *rabbitMQMetadata, config *scalersconfig.ScalerConfig) (*
}

// getConnectionAndChannel returns an amqp connection. If enableTLS is true tls connection is made using
//
// the given ceClient cert, ceClient key,and CA certificate. If clientKeyPassword is not empty the provided password will be used to
//
// the given ceClient cert, ceClient key,and CA certificate. If clientKeyPassword is not empty the provided password will be used to
// decrypt the given key. If enableTLS is disabled then amqp connection will be created without tls.
func getConnectionAndChannel(host string, meta *rabbitMQMetadata) (*amqp.Connection, *amqp.Channel, error) {
var conn *amqp.Connection
var err error
amqpConfig := amqp.Config{
Properties: amqp.Table{
"connection_name": meta.connectionName,
},
}

if meta.enableTLS {
tlsConfig, configErr := kedautil.NewTLSConfigWithPassword(meta.cert, meta.key, meta.keyPassword, meta.ca, meta.unsafeSsl)
if configErr != nil {
return nil, nil, configErr
tlsConfig, err := kedautil.NewTLSConfigWithPassword(meta.cert, meta.key, meta.keyPassword, meta.ca, meta.unsafeSsl)
if err != nil {
return nil, nil, err
}
conn, err = amqp.DialTLS(host, tlsConfig)
} else {
conn, err = amqp.Dial(host)

amqpConfig.TLSClientConfig = tlsConfig
}

conn, err := amqp.DialConfig(host, amqpConfig)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -715,3 +721,9 @@ func (s *rabbitMQScaler) anonymizeRabbitMQError(err error) error {
errorMessage := fmt.Sprintf("error inspecting rabbitMQ: %s", err)
return fmt.Errorf(rabbitMQAnonymizePattern.ReplaceAllString(errorMessage, "user:password@"))
}

// connectionName is used to provide a deterministic AMQP connection name when
// connecting to RabbitMQ
func connectionName(config *scalersconfig.ScalerConfig) string {
return fmt.Sprintf("keda-%s-%s", config.ScalableObjectNamespace, config.ScalableObjectName)
}
13 changes: 13 additions & 0 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,3 +746,16 @@ func TestRegexQueueMissingError(t *testing.T) {
}
}
}

func TestConnectionName(t *testing.T) {
c := scalersconfig.ScalerConfig{
ScalableObjectNamespace: "test-namespace",
ScalableObjectName: "test-name",
}

connectionName := connectionName(&c)

if connectionName != "keda-test-namespace-test-name" {
t.Error("Expected connection name to be keda-test-namespace-test-name but got", connectionName)
}
}

0 comments on commit f4261e3

Please sign in to comment.