Skip to content

Commit

Permalink
Refactor Artemis scaler config (#5836)
Browse files Browse the repository at this point in the history
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
  • Loading branch information
dttung2905 authored Jun 4, 2024
1 parent cd9e021 commit e25aca9
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 130 deletions.
189 changes: 64 additions & 125 deletions pkg/scalers/artemis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/go-logr/logr"
Expand All @@ -27,17 +26,17 @@ type artemisScaler struct {

//revive:disable:var-naming breaking change on restApiTemplate, wouldn't bring any benefit to users
type artemisMetadata struct {
managementEndpoint string
queueName string
brokerName string
brokerAddress string
username string
password string
restAPITemplate string
queueLength int64
activationQueueLength int64
corsHeader string
triggerIndex int
TriggerIndex int
ManagementEndpoint string `keda:"name=managementEndpoint, order=triggerMetadata, optional"`
QueueName string `keda:"name=queueName, order=triggerMetadata, optional"`
BrokerName string `keda:"name=brokerName, order=triggerMetadata, optional"`
BrokerAddress string `keda:"name=brokerAddress, order=triggerMetadata, optional"`
Username string `keda:"name=username, order=authParams;triggerMetadata;resolvedEnv"`
Password string `keda:"name=password, order=authParams;triggerMetadata;resolvedEnv"`
RestAPITemplate string `keda:"name=restApiTemplate, order=triggerMetadata, optional"`
QueueLength int64 `keda:"name=queueLength, order=triggerMetadata, optional, default=10"`
ActivationQueueLength int64 `keda:"name=activationQueueLength, order=triggerMetadata, optional, default=10"`
CorsHeader string `keda:"name=corsHeader, order=triggerMetadata, optional"`
}

//revive:enable:var-naming
Expand All @@ -49,13 +48,38 @@ type artemisMonitoring struct {
}

const (
artemisMetricType = "External"
defaultArtemisQueueLength = 10
defaultArtemisActivationQueueLength = 0
defaultRestAPITemplate = "http://<<managementEndpoint>>/console/jolokia/read/org.apache.activemq.artemis:broker=\"<<brokerName>>\",component=addresses,address=\"<<brokerAddress>>\",subcomponent=queues,routing-type=\"anycast\",queue=\"<<queueName>>\"/MessageCount"
defaultCorsHeader = "http://%s"
artemisMetricType = "External"
defaultRestAPITemplate = "http://<<managementEndpoint>>/console/jolokia/read/org.apache.activemq.artemis:broker=\"<<brokerName>>\",component=addresses,address=\"<<brokerAddress>>\",subcomponent=queues,routing-type=\"anycast\",queue=\"<<queueName>>\"/MessageCount"
defaultCorsHeader = "http://%s"
)

func (a *artemisMetadata) Validate() error {
if a.RestAPITemplate != "" {
var err error
if *a, err = getAPIParameters(*a); err != nil {
return fmt.Errorf("can't parse restApiTemplate : %s ", err)
}
} else {
a.RestAPITemplate = defaultRestAPITemplate
if a.ManagementEndpoint == "" {
return errors.New("no management endpoint given")
}
if a.QueueName == "" {
return errors.New("no queue name given")
}
if a.BrokerName == "" {
return errors.New("no broker name given")
}
if a.BrokerAddress == "" {
return errors.New("no broker address given")
}
}
if a.CorsHeader == "" {
a.CorsHeader = fmt.Sprintf(defaultCorsHeader, a.ManagementEndpoint)
}
return nil
}

// NewArtemisQueueScaler creates a new artemis queue Scaler
func NewArtemisQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
// do we need to guarantee this timeout for a specific
Expand All @@ -82,108 +106,23 @@ func NewArtemisQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
}

func parseArtemisMetadata(config *scalersconfig.ScalerConfig) (*artemisMetadata, error) {
meta := artemisMetadata{}

meta.queueLength = defaultArtemisQueueLength
meta.activationQueueLength = defaultArtemisActivationQueueLength

if val, ok := config.TriggerMetadata["restApiTemplate"]; ok && val != "" {
meta.restAPITemplate = config.TriggerMetadata["restApiTemplate"]
var err error
if meta, err = getAPIParameters(meta); err != nil {
return nil, fmt.Errorf("can't parse restApiTemplate : %s ", err)
}
} else {
meta.restAPITemplate = defaultRestAPITemplate
if config.TriggerMetadata["managementEndpoint"] == "" {
return nil, errors.New("no management endpoint given")
}
meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"]

if config.TriggerMetadata["queueName"] == "" {
return nil, errors.New("no queue name given")
}
meta.queueName = config.TriggerMetadata["queueName"]

if config.TriggerMetadata["brokerName"] == "" {
return nil, errors.New("no broker name given")
}
meta.brokerName = config.TriggerMetadata["brokerName"]

if config.TriggerMetadata["brokerAddress"] == "" {
return nil, errors.New("no broker address given")
}
meta.brokerAddress = config.TriggerMetadata["brokerAddress"]
meta := &artemisMetadata{}
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing prometheus metadata: %w", err)
}

if val, ok := config.TriggerMetadata["corsHeader"]; ok && val != "" {
meta.corsHeader = config.TriggerMetadata["corsHeader"]
} else {
meta.corsHeader = fmt.Sprintf(defaultCorsHeader, meta.managementEndpoint)
}

if val, ok := config.TriggerMetadata["queueLength"]; ok {
queueLength, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("can't parse queueLength: %w", err)
}

meta.queueLength = queueLength
}

if val, ok := config.TriggerMetadata["activationQueueLength"]; ok {
activationQueueLength, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("can't parse activationQueueLength: %w", err)
}

meta.activationQueueLength = activationQueueLength
}

if val, ok := config.AuthParams["username"]; ok && val != "" {
meta.username = val
} else if val, ok := config.TriggerMetadata["username"]; ok && val != "" {
username := val

if val, ok := config.ResolvedEnv[username]; ok && val != "" {
meta.username = val
} else {
meta.username = username
}
}

if meta.username == "" {
return nil, fmt.Errorf("username cannot be empty")
}

if val, ok := config.AuthParams["password"]; ok && val != "" {
meta.password = val
} else if val, ok := config.TriggerMetadata["password"]; ok && val != "" {
password := val

if val, ok := config.ResolvedEnv[password]; ok && val != "" {
meta.password = val
} else {
meta.password = password
}
}

if meta.password == "" {
return nil, fmt.Errorf("password cannot be empty")
}

meta.triggerIndex = config.TriggerIndex
meta.TriggerIndex = config.TriggerIndex

return &meta, nil
return meta, nil
}

// getAPIParameters parse restAPITemplate to provide managementEndpoint , brokerName, brokerAddress, queueName
func getAPIParameters(meta artemisMetadata) (artemisMetadata, error) {
u, err := url.ParseRequestURI(meta.restAPITemplate)
u, err := url.ParseRequestURI(meta.RestAPITemplate)
if err != nil {
return meta, fmt.Errorf("unable to parse the artemis restAPITemplate: %w", err)
}
meta.managementEndpoint = u.Host
meta.ManagementEndpoint = u.Host
splitURL := strings.Split(strings.Split(u.RawPath, ":")[1], "/")[0] // This returns : broker="<<brokerName>>",component=addresses,address="<<brokerAddress>>",subcomponent=queues,routing-type="anycast",queue="<<queueName>>"
replacer := strings.NewReplacer(",", "&", "\"\"", "")
v, err := url.ParseQuery(replacer.Replace(splitURL)) // This returns a map with key: string types and element type [] string. : map[address:["<<brokerAddress>>"] broker:["<<brokerName>>"] component:[addresses] queue:["<<queueName>>"] routing-type:["anycast"] subcomponent:[queues]]
Expand All @@ -194,28 +133,28 @@ func getAPIParameters(meta artemisMetadata) (artemisMetadata, error) {
if len(v["address"][0]) == 0 {
return meta, errors.New("no brokerAddress given")
}
meta.brokerAddress = v["address"][0]
meta.BrokerAddress = v["address"][0]

if len(v["queue"][0]) == 0 {
return meta, errors.New("no queueName is given")
}
meta.queueName = v["queue"][0]
meta.QueueName = v["queue"][0]

if len(v["broker"][0]) == 0 {
return meta, fmt.Errorf("no brokerName given: %s", meta.restAPITemplate)
return meta, fmt.Errorf("no brokerName given: %s", meta.RestAPITemplate)
}
meta.brokerName = v["broker"][0]
meta.BrokerName = v["broker"][0]

return meta, nil
}

func (s *artemisScaler) getMonitoringEndpoint() string {
replacer := strings.NewReplacer("<<managementEndpoint>>", s.metadata.managementEndpoint,
"<<queueName>>", s.metadata.queueName,
"<<brokerName>>", s.metadata.brokerName,
"<<brokerAddress>>", s.metadata.brokerAddress)
replacer := strings.NewReplacer("<<managementEndpoint>>", s.metadata.ManagementEndpoint,
"<<queueName>>", s.metadata.QueueName,
"<<brokerName>>", s.metadata.BrokerName,
"<<brokerAddress>>", s.metadata.BrokerAddress)

monitoringEndpoint := replacer.Replace(s.metadata.restAPITemplate)
monitoringEndpoint := replacer.Replace(s.metadata.RestAPITemplate)

return monitoringEndpoint
}
Expand All @@ -231,8 +170,8 @@ func (s *artemisScaler) getQueueMessageCount(ctx context.Context) (int64, error)
if err != nil {
return -1, err
}
req.SetBasicAuth(s.metadata.username, s.metadata.password)
req.Header.Set("Origin", s.metadata.corsHeader)
req.SetBasicAuth(s.metadata.Username, s.metadata.Password)
req.Header.Set("Origin", s.metadata.CorsHeader)

resp, err := client.Do(req)
if err != nil {
Expand All @@ -250,17 +189,17 @@ func (s *artemisScaler) getQueueMessageCount(ctx context.Context) (int64, error)
return -1, fmt.Errorf("artemis management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status)
}

s.logger.V(1).Info(fmt.Sprintf("Artemis scaler: Providing metrics based on current queue length %d queue length limit %d", messageCount, s.metadata.queueLength))
s.logger.V(1).Info(fmt.Sprintf("Artemis scaler: Providing metrics based on current queue length %d queue length limit %d", messageCount, s.metadata.QueueLength))

return messageCount, nil
}

func (s *artemisScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("artemis-%s", s.metadata.queueName))),
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, kedautil.NormalizeString(fmt.Sprintf("artemis-%s", s.metadata.QueueName))),
},
Target: GetMetricTarget(s.metricType, s.metadata.queueLength),
Target: GetMetricTarget(s.metricType, s.metadata.QueueLength),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: artemisMetricType}
return []v2.MetricSpec{metricSpec}
Expand All @@ -271,13 +210,13 @@ func (s *artemisScaler) GetMetricsAndActivity(ctx context.Context, metricName st
messages, err := s.getQueueMessageCount(ctx)

if err != nil {
s.logger.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
s.logger.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.ManagementEndpoint)
return []external_metrics.ExternalMetricValue{}, false, err
}

metric := GenerateMetricInMili(metricName, float64(messages))

return []external_metrics.ExternalMetricValue{metric}, messages > s.metadata.activationQueueLength, nil
return []external_metrics.ExternalMetricValue{metric}, messages > s.metadata.ActivationQueueLength, nil
}

func (s *artemisScaler) Close(context.Context) error {
Expand Down
10 changes: 5 additions & 5 deletions pkg/scalers/artemis_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var artemisMetricIdentifiers = []artemisMetricIdentifier{
var testArtemisMetadataWithEmptyAuthParams = []parseArtemisMetadataTestData{
// nothing passed
{map[string]string{}, true},
// Missing missing managementEndpoint should fail
// Missing managementEndpoint should fail
{map[string]string{"managementEndpoint": "", "queueName": "queue1", "brokerName": "broker-activemq", "brokerAddress": "address1"}, true},
// Missing queue name, should fail
{map[string]string{"managementEndpoint": "localhost:8161", "queueName": "", "brokerName": "broker-activemq", "brokerAddress": "address1"}, true},
Expand All @@ -93,8 +93,8 @@ func TestArtemisDefaultCorsHeader(t *testing.T) {
if err != nil {
t.Error("Expected success but got error", err)
}
if !(meta.corsHeader == "http://localhost:8161") {
t.Errorf("Expected http://localhost:8161 but got %s", meta.corsHeader)
if !(meta.CorsHeader == "http://localhost:8161") {
t.Errorf("Expected http://localhost:8161 but got %s", meta.CorsHeader)
}
}

Expand All @@ -105,8 +105,8 @@ func TestArtemisCorsHeader(t *testing.T) {
if err != nil {
t.Error("Expected success but got error", err)
}
if !(meta.corsHeader == "test") {
t.Errorf("Expected test but got %s", meta.corsHeader)
if !(meta.CorsHeader == "test") {
t.Errorf("Expected test but got %s", meta.CorsHeader)
}
}

Expand Down

0 comments on commit e25aca9

Please sign in to comment.