Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added alternative expression field in AWS CloudWatch scaler #2997

Merged
merged 4 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md
- **General:** Updated HTTPClient to be proxy-aware, if available, from environment variables. ([#2577](https://github.com/kedacore/keda/issues/2577))
- **General:** Using manager client in KEDA Metrics Server to avoid flush request to Kubernetes Apiserver([2914](https://github.com/kedacore/keda/issues/2914))
- **ActiveMQ Scaler:** Add CorsHeader information to ActiveMQ Scaler ([#2884](https://github.com/kedacore/keda/issues/2884))
- **AWS CloudWatch:** Add support to use expressions([#2998](https://github.com/kedacore/keda/issues/2998))
- **Azure Application Insights Scaler:** Provide support for non-public clouds ([#2735](https://github.com/kedacore/keda/issues/2735))
- **Azure Blob Storage Scaler:** Add optional parameters for counting blobs recursively ([#1789](https://github.com/kedacore/keda/issues/1789))
- **Azure Event Hub Scaler:** Improve logging when blob container not found ([#2363](https://github.com/kedacore/keda/issues/2363))
Expand Down
161 changes: 90 additions & 71 deletions pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ type awsCloudwatchMetadata struct {
metricsName string
dimensionName []string
dimensionValue []string
expression string

targetMetricValue float64
minMetricValue float64
targetMetricValue int64
minMetricValue int64

metricCollectionTime int64
metricStat string
Expand Down Expand Up @@ -95,22 +96,6 @@ func getIntMetadataValue(metadata map[string]string, key string, required bool,
return defaultValue, nil
}

func getFloatMetadataValue(metadata map[string]string, key string, required bool, defaultValue float64) (float64, error) {
if val, ok := metadata[key]; ok && val != "" {
value, err := strconv.ParseFloat(val, 64)
if err != nil {
return 0, fmt.Errorf("error parsing %s metadata: %v", key, err)
}
return value, nil
}

if required {
return 0, fmt.Errorf("metadata %s not given", key)
}

return defaultValue, nil
}

func createCloudwatchClient(metadata *awsCloudwatchMetadata) *cloudwatch.CloudWatch {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(metadata.awsRegion),
Expand Down Expand Up @@ -153,28 +138,41 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e
return nil, fmt.Errorf("metric name not given")
}

if val, ok := config.TriggerMetadata["dimensionName"]; ok && val != "" {
meta.dimensionName = strings.Split(val, ";")
if config.TriggerMetadata["expression"] != "" {
if val, ok := config.TriggerMetadata["expression"]; ok && val != "" {
meta.expression = val
} else {
return nil, fmt.Errorf("expression not given")
}
} else {
return nil, fmt.Errorf("dimension name not given")
}
if val, ok := config.TriggerMetadata["dimensionName"]; ok && val != "" {
meta.dimensionName = strings.Split(val, ";")
} else {
return nil, fmt.Errorf("dimension name not given")
}

if val, ok := config.TriggerMetadata["dimensionValue"]; ok && val != "" {
meta.dimensionValue = strings.Split(val, ";")
} else {
return nil, fmt.Errorf("dimension value not given")
}
if val, ok := config.TriggerMetadata["dimensionValue"]; ok && val != "" {
meta.dimensionValue = strings.Split(val, ";")
} else {
return nil, fmt.Errorf("dimension value not given")
}

if len(meta.dimensionName) != len(meta.dimensionValue) {
return nil, fmt.Errorf("dimensionName and dimensionValue are not matching in size")
}

if len(meta.dimensionName) != len(meta.dimensionValue) {
return nil, fmt.Errorf("dimensionName and dimensionValue are not matching in size")
meta.metricUnit = config.TriggerMetadata["metricUnit"]
if err = checkMetricUnit(meta.metricUnit); err != nil {
return nil, err
}
}

meta.targetMetricValue, err = getFloatMetadataValue(config.TriggerMetadata, "targetMetricValue", true, 0)
meta.targetMetricValue, err = getIntMetadataValue(config.TriggerMetadata, "targetMetricValue", true, 0)
if err != nil {
return nil, err
}

meta.minMetricValue, err = getFloatMetadataValue(config.TriggerMetadata, "minMetricValue", true, 0)
meta.minMetricValue, err = getIntMetadataValue(config.TriggerMetadata, "minMetricValue", true, 0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -210,11 +208,6 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e
return nil, err
}

meta.metricUnit = config.TriggerMetadata["metricUnit"]
if err = checkMetricUnit(meta.metricUnit); err != nil {
return nil, err
}

if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" {
meta.awsRegion = val
} else {
Expand Down Expand Up @@ -287,19 +280,27 @@ func (c *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string,

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(metricValue), resource.DecimalSI),
Value: *resource.NewQuantity(metricValue, resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (c *awsCloudwatchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
var metricNameSuffix string

if c.metadata.expression != "" {
metricNameSuffix = c.metadata.metricsName
} else {
metricNameSuffix = c.metadata.dimensionName[0]
}

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(c.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-cloudwatch-%s", c.metadata.dimensionName[0]))),
Name: GenerateMetricNameWithIndex(c.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-cloudwatch-%s", metricNameSuffix))),
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
},
Target: GetMetricTarget(c.metricType, int64(c.metadata.targetMetricValue)),
Target: GetMetricTarget(c.metricType, c.metadata.targetMetricValue),
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta2.MetricSpec{metricSpec}
Expand All @@ -319,42 +320,60 @@ func (c *awsCloudwatchScaler) Close(context.Context) error {
return nil
}

func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
dimensions := []*cloudwatch.Dimension{}
for i := range c.metadata.dimensionName {
dimensions = append(dimensions, &cloudwatch.Dimension{
Name: &c.metadata.dimensionName[i],
Value: &c.metadata.dimensionValue[i],
})
}
func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (int64, error) {
var input cloudwatch.GetMetricDataInput

startTime, endTime := computeQueryWindow(time.Now(), c.metadata.metricStatPeriod, c.metadata.metricEndTimeOffset, c.metadata.metricCollectionTime)

var metricUnit *string
if c.metadata.metricUnit != "" {
metricUnit = aws.String(c.metadata.metricUnit)
}

input := cloudwatch.GetMetricDataInput{
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
ScanBy: aws.String(cloudwatch.ScanByTimestampDescending),
MetricDataQueries: []*cloudwatch.MetricDataQuery{
{
Id: aws.String("c1"),
MetricStat: &cloudwatch.MetricStat{
Metric: &cloudwatch.Metric{
Namespace: aws.String(c.metadata.namespace),
Dimensions: dimensions,
MetricName: aws.String(c.metadata.metricsName),
if c.metadata.expression != "" {
input = cloudwatch.GetMetricDataInput{
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
ScanBy: aws.String(cloudwatch.ScanByTimestampDescending),
MetricDataQueries: []*cloudwatch.MetricDataQuery{
{
Expression: aws.String(c.metadata.expression),
Id: aws.String("q1"),
Period: aws.Int64(c.metadata.metricStatPeriod),
Label: aws.String(c.metadata.metricsName),
},
},
}
} else {
dimensions := []*cloudwatch.Dimension{}
for i := range c.metadata.dimensionName {
dimensions = append(dimensions, &cloudwatch.Dimension{
Name: &c.metadata.dimensionName[i],
Value: &c.metadata.dimensionValue[i],
})
}

var metricUnit *string
if c.metadata.metricUnit != "" {
metricUnit = aws.String(c.metadata.metricUnit)
}

input = cloudwatch.GetMetricDataInput{
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
ScanBy: aws.String(cloudwatch.ScanByTimestampDescending),
MetricDataQueries: []*cloudwatch.MetricDataQuery{
{
Id: aws.String("c1"),
MetricStat: &cloudwatch.MetricStat{
Metric: &cloudwatch.Metric{
Namespace: aws.String(c.metadata.namespace),
Dimensions: dimensions,
MetricName: aws.String(c.metadata.metricsName),
},
Period: aws.Int64(c.metadata.metricStatPeriod),
Stat: aws.String(c.metadata.metricStat),
Unit: metricUnit,
},
Period: aws.Int64(c.metadata.metricStatPeriod),
Stat: aws.String(c.metadata.metricStat),
Unit: metricUnit,
ReturnData: aws.Bool(true),
},
ReturnData: aws.Bool(true),
},
},
}
}

output, err := c.cwClient.GetMetricData(&input)
Expand All @@ -365,9 +384,9 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
}

cloudwatchLog.V(1).Info("Received Metric Data", "data", output)
var metricValue float64
var metricValue int64
if len(output.MetricDataResults) > 0 && len(output.MetricDataResults[0].Values) > 0 {
metricValue = *output.MetricDataResults[0].Values[0]
metricValue = int64(*output.MetricDataResults[0].Values[0])
} else {
cloudwatchLog.Info("empty metric data received, returning minMetricValue")
metricValue = c.metadata.minMetricValue
Expand Down
44 changes: 37 additions & 7 deletions pkg/scalers/aws_cloudwatch_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ var testAWSCloudwatchMetadata = []parseAWSCloudwatchMetadataTestData{
testAWSAuthentication,
false,
"properly formed cloudwatch query and awsRegion"},
// properly formed cloudwatch expression query and awsRegion
{map[string]string{
"namespace": "AWS/SQS",
"expression": "SELECT MIN(MessageCount) FROM \"AWS/AmazonMQ\" WHERE Broker = 'production' and Queue = 'worker'",
"metricName": "ApproximateNumberOfMessagesVisible",
"targetMetricValue": "2",
"minMetricValue": "0",
"awsRegion": "eu-west-1"},
testAWSAuthentication,
false,
"properly formed cloudwatch expression query and awsRegion"},
// Properly formed cloudwatch query with optional parameters
{map[string]string{
"namespace": "AWS/SQS",
Expand Down Expand Up @@ -343,6 +354,7 @@ var testAWSCloudwatchMetadata = []parseAWSCloudwatchMetadataTestData{
var awsCloudwatchMetricIdentifiers = []awsCloudwatchMetricIdentifier{
{&testAWSCloudwatchMetadata[1], 0, "s0-aws-cloudwatch-QueueName"},
{&testAWSCloudwatchMetadata[1], 3, "s3-aws-cloudwatch-QueueName"},
{&testAWSCloudwatchMetadata[2], 5, "s5-aws-cloudwatch-ApproximateNumberOfMessagesVisible"},
}

var awsCloudwatchGetMetricTestData = []awsCloudwatchMetadata{
Expand Down Expand Up @@ -410,21 +422,39 @@ var awsCloudwatchGetMetricTestData = []awsCloudwatchMetadata{
awsAuthorization: awsAuthorizationMetadata{podIdentityOwner: false},
scalerIndex: 0,
},
{
namespace: "Custom",
metricsName: "HasDataFromExpression",
expression: "SELECT MIN(MessageCount) FROM \"AWS/AmazonMQ\" WHERE Broker = 'production' and Queue = 'worker'",
targetMetricValue: 100,
minMetricValue: 0,
metricCollectionTime: 60,
metricStat: "Average",
metricUnit: "SampleCount",
metricStatPeriod: 60,
metricEndTimeOffset: 60,
awsRegion: "us-west-2",
awsAuthorization: awsAuthorizationMetadata{podIdentityOwner: false},
scalerIndex: 0,
},
}

type mockCloudwatch struct {
cloudwatchiface.CloudWatchAPI
}

func (m *mockCloudwatch) GetMetricData(input *cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error) {
switch *input.MetricDataQueries[0].MetricStat.Metric.MetricName {
case testAWSCloudwatchErrorMetric:
return nil, errors.New("error")
case testAWSCloudwatchNoValueMetric:
return &cloudwatch.GetMetricDataOutput{
MetricDataResults: []*cloudwatch.MetricDataResult{},
}, nil
if input.MetricDataQueries[0].MetricStat != nil {
switch *input.MetricDataQueries[0].MetricStat.Metric.MetricName {
case testAWSCloudwatchErrorMetric:
return nil, errors.New("error")
case testAWSCloudwatchNoValueMetric:
return &cloudwatch.GetMetricDataOutput{
MetricDataResults: []*cloudwatch.MetricDataResult{},
}, nil
}
}

return &cloudwatch.GetMetricDataOutput{
MetricDataResults: []*cloudwatch.MetricDataResult{
{
Expand Down
Loading