Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve AWS Cloudwatch Scaler metric exporting logic #2243

Merged
merged 7 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 117 additions & 67 deletions pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
defaultMetricCollectionTime = 300
defaultMetricStat = "Average"
defaultMetricStatPeriod = 300
defaultMetricEndTimeOffset = 0
)

type awsCloudwatchScaler struct {
Expand All @@ -44,7 +45,9 @@ type awsCloudwatchMetadata struct {

metricCollectionTime int64
metricStat string
metricUnit string
metricStatPeriod int64
metricEndTimeOffset int64

awsRegion string

Expand All @@ -67,44 +70,41 @@ func NewAwsCloudwatchScaler(config *ScalerConfig) (Scaler, error) {
}, nil
}

func parseMetricValues(config *ScalerConfig) (*awsCloudwatchMetadata, error) {
metricsMeta := awsCloudwatchMetadata{}

if val, ok := config.TriggerMetadata["metricCollectionTime"]; ok && val != "" {
if n, ok := strconv.ParseInt(val, 10, 64); ok == nil {
metricsMeta.metricCollectionTime = n
} else {
return nil, fmt.Errorf("metricCollectionTime not a valid number")
func getIntMetadataValue(metadata map[string]string, key string, required bool, defaultValue int64) (int64, error) {
if val, ok := metadata[key]; ok && val != "" {
value, err := strconv.Atoi(val)
if err != nil {
return 0, fmt.Errorf("error parsing %s metadata: %v", key, err)
}
} else {
metricsMeta.metricCollectionTime = defaultMetricCollectionTime
return int64(value), nil
}

if required {
return 0, fmt.Errorf("metadata %s not given", key)
}

if val, ok := config.TriggerMetadata["metricStatPeriod"]; ok && val != "" {
if n, ok := strconv.ParseInt(val, 10, 64); ok == nil {
metricsMeta.metricStatPeriod = n
} else {
return nil, fmt.Errorf("metricStatPeriod not a valid number")
return defaultValue, nil
}

func getFloatMetadataValue(metadata map[string]string, key string, required bool, defaultValue float64) (float64, error) {
if val, ok := metadata[key]; ok && val != "" {
value, err := strconv.ParseFloat(val, 64)
if err != nil {
return 0, fmt.Errorf("error parsing %s metadata: %v", key, err)
}
} else {
metricsMeta.metricStatPeriod = defaultMetricStatPeriod
return value, nil
}

if val, ok := config.TriggerMetadata["metricStat"]; ok && val != "" {
metricsMeta.metricStat = val
} else {
metricsMeta.metricStat = defaultMetricStat
if required {
return 0, fmt.Errorf("metadata %s not given", key)
}

return &metricsMeta, nil
return defaultValue, nil
}

func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, error) {
meta, err := parseMetricValues(config)

if err != nil {
return nil, fmt.Errorf("an error occurred when the scaler tried to get the metrics values")
}
var err error
meta := awsCloudwatchMetadata{}

if val, ok := config.TriggerMetadata["namespace"]; ok && val != "" {
meta.namespace = val
Expand Down Expand Up @@ -134,48 +134,50 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e
return nil, fmt.Errorf("dimensionName and dimensionValue are not matching in size")
}

if val, ok := config.TriggerMetadata["targetMetricValue"]; ok && val != "" {
targetMetricValue, err := strconv.ParseFloat(val, 64)
if err != nil {
cloudwatchLog.Error(err, "Error parsing targetMetricValue metadata")
} else {
meta.targetMetricValue = targetMetricValue
}
} else {
return nil, fmt.Errorf("target Metric Value not given")
}

if val, ok := config.TriggerMetadata["minMetricValue"]; ok && val != "" {
minMetricValue, err := strconv.ParseFloat(val, 64)
if err != nil {
cloudwatchLog.Error(err, "Error parsing minMetricValue metadata")
} else {
meta.minMetricValue = minMetricValue
}
} else {
return nil, fmt.Errorf("min metric value not given")
meta.targetMetricValue, err = getFloatMetadataValue(config.TriggerMetadata, "targetMetricValue", true, 0)
if err != nil {
return nil, err
}

if val, ok := config.TriggerMetadata["metricCollectionTime"]; ok && val != "" {
metricCollectionTime, err := strconv.Atoi(val)
if err != nil {
cloudwatchLog.Error(err, "Error parsing metricCollectionTime metadata")
} else {
meta.metricCollectionTime = int64(metricCollectionTime)
}
meta.minMetricValue, err = getFloatMetadataValue(config.TriggerMetadata, "minMetricValue", true, 0)
if err != nil {
return nil, err
}

meta.metricStat = defaultMetricStat
if val, ok := config.TriggerMetadata["metricStat"]; ok && val != "" {
meta.metricStat = val
}
if err = checkMetricStat(meta.metricStat); err != nil {
return nil, err
}

if val, ok := config.TriggerMetadata["metricStatPeriod"]; ok && val != "" {
metricStatPeriod, err := strconv.Atoi(val)
if err != nil {
cloudwatchLog.Error(err, "Error parsing metricStatPeriod metadata")
} else {
meta.metricStatPeriod = int64(metricStatPeriod)
}
meta.metricStatPeriod, err = getIntMetadataValue(config.TriggerMetadata, "metricStatPeriod", false, defaultMetricStatPeriod)
if err != nil {
return nil, err
}

if err = checkMetricStatPeriod(meta.metricStatPeriod); err != nil {
return nil, err
}

meta.metricCollectionTime, err = getIntMetadataValue(config.TriggerMetadata, "metricCollectionTime", false, defaultMetricCollectionTime)
if err != nil {
return nil, err
}

if meta.metricCollectionTime < 0 || meta.metricCollectionTime%meta.metricStatPeriod != 0 {
return nil, fmt.Errorf("metricCollectionTime must be greater than 0 and a multiple of metricStatPeriod(%d), %d is given", meta.metricStatPeriod, meta.metricCollectionTime)
}

meta.metricEndTimeOffset, err = getIntMetadataValue(config.TriggerMetadata, "metricEndTimeOffset", false, defaultMetricEndTimeOffset)
if err != nil {
return nil, err
}

meta.metricUnit = config.TriggerMetadata["metricUnit"]
if err = checkMetricUnit(meta.metricUnit); err != nil {
return nil, err
}

if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" {
Expand All @@ -184,16 +186,54 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e
return nil, fmt.Errorf("no awsRegion given")
}

auth, err := getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv)
meta.awsAuthorization, err = getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv)
if err != nil {
return nil, err
}

meta.awsAuthorization = auth

meta.scalerIndex = config.ScalerIndex

return meta, nil
return &meta, nil
}

func checkMetricStat(stat string) error {
for _, s := range cloudwatch.Statistic_Values() {
if stat == s {
return nil
}
}
return fmt.Errorf("metricStat '%s' is not one of %v", stat, cloudwatch.Statistic_Values())
}

func checkMetricUnit(unit string) error {
if unit == "" {
return nil
}
for _, u := range cloudwatch.StandardUnit_Values() {
if unit == u {
return nil
}
}
return fmt.Errorf("metricUnit '%s' is not one of %v", unit, cloudwatch.StandardUnit_Values())
}

func checkMetricStatPeriod(period int64) error {
if period < 1 {
return fmt.Errorf("metricStatPeriod can not be smaller than 1, however, %d is provided", period)
} else if period <= 60 {
switch period {
case 1, 5, 10, 30, 60:
return nil
default:
return fmt.Errorf("metricStatPeriod < 60 has to be one of [1, 5, 10, 30], however, %d is provided", period)
}
}

if period%60 != 0 {
return fmt.Errorf("metricStatPeriod >= 60 has to be a multiple of 60, however, %d is provided", period)
}

return nil
}

func (c *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
Expand Down Expand Up @@ -273,9 +313,18 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
})
}

endTime := time.Now().Add(time.Second * -1 * time.Duration(c.metadata.metricEndTimeOffset)).Truncate(time.Duration(c.metadata.metricStatPeriod) * time.Second)
startTime := endTime.Add(time.Second * -1 * time.Duration(c.metadata.metricCollectionTime))

var metricUnit *string
if c.metadata.metricUnit != "" {
metricUnit = aws.String(c.metadata.metricUnit)
}

input := cloudwatch.GetMetricDataInput{
StartTime: aws.Time(time.Now().Add(time.Second * -1 * time.Duration(c.metadata.metricCollectionTime))),
EndTime: aws.Time(time.Now()),
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
ScanBy: aws.String(cloudwatch.ScanByTimestampDescending),
MetricDataQueries: []*cloudwatch.MetricDataQuery{
{
Id: aws.String("c1"),
Expand All @@ -287,6 +336,7 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
},
Period: aws.Int64(c.metadata.metricStatPeriod),
Stat: aws.String(c.metadata.metricStat),
Unit: metricUnit,
},
ReturnData: aws.Bool(true),
},
Expand Down
75 changes: 75 additions & 0 deletions pkg/scalers/aws_cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,81 @@ var testAWSCloudwatchMetadata = []parseAWSCloudwatchMetadataTestData{
"awsRegion": "eu-west-1"},
testAWSAuthentication, false,
"Missing metricStatPeriod not generate error because will get the default value"},
{map[string]string{
"namespace": "AWS/SQS",
"dimensionName": "QueueName",
"dimensionValue": "keda",
"metricName": "ApproximateNumberOfMessagesVisible",
"targetMetricValue": "2",
"minMetricValue": "0",
"metricStat": "Average",
"metricUnit": "Count",
"metricEndTimeOffset": "60",
"awsRegion": "eu-west-1"},
testAWSAuthentication, false,
"set a supported metricUnit"},
{map[string]string{
"namespace": "AWS/SQS",
"dimensionName": "QueueName",
"dimensionValue": "keda",
"metricName": "ApproximateNumberOfMessagesVisible",
"targetMetricValue": "2",
"minMetricValue": "0",
"metricCollectionTime": "300",
"metricStat": "SomeStat",
"awsRegion": "eu-west-1"},
testAWSAuthentication, true,
"metricStat is not supported"},
{map[string]string{
"namespace": "AWS/SQS",
"dimensionName": "QueueName",
"dimensionValue": "keda",
"metricName": "ApproximateNumberOfMessagesVisible",
"targetMetricValue": "2",
"minMetricValue": "0",
"metricStatPeriod": "300",
"metricCollectionTime": "100",
"metricStat": "Average",
"awsRegion": "eu-west-1"},
testAWSAuthentication, true,
"metricCollectionTime smaller than metricStatPeriod"},
{map[string]string{
"namespace": "AWS/SQS",
"dimensionName": "QueueName",
"dimensionValue": "keda",
"metricName": "ApproximateNumberOfMessagesVisible",
"targetMetricValue": "2",
"minMetricValue": "0",
"metricStatPeriod": "250",
"metricStat": "Average",
"awsRegion": "eu-west-1"},
testAWSAuthentication, true,
"unsupported metricStatPeriod"},
{map[string]string{
"namespace": "AWS/SQS",
"dimensionName": "QueueName",
"dimensionValue": "keda",
"metricName": "ApproximateNumberOfMessagesVisible",
"targetMetricValue": "2",
"minMetricValue": "0",
"metricStatPeriod": "25",
"metricStat": "Average",
"awsRegion": "eu-west-1"},
testAWSAuthentication, true,
"unsupported metricStatPeriod"},
{map[string]string{
"namespace": "AWS/SQS",
"dimensionName": "QueueName",
"dimensionValue": "keda",
"metricName": "ApproximateNumberOfMessagesVisible",
"targetMetricValue": "2",
"minMetricValue": "0",
"metricStatPeriod": "25",
"metricStat": "Average",
"metricUnit": "Hour",
"awsRegion": "eu-west-1"},
testAWSAuthentication, true,
"unsupported metricUnit"},
}

var awsCloudwatchMetricIdentifiers = []awsCloudwatchMetricIdentifier{
Expand Down