diff --git a/pkg/handler/scale_handler.go b/pkg/handler/scale_handler.go index 3a1347b7823..4b9e3bd6cb4 100644 --- a/pkg/handler/scale_handler.go +++ b/pkg/handler/scale_handler.go @@ -306,6 +306,8 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn return scalers.NewPostgresScaler(resolvedEnv, triggerMetadata, authParams) case "mysql": return scalers.NewMySQLScaler(resolvedEnv, triggerMetadata, authParams) + case "azure-monitor": + return scalers.NewAzureMonitorScaler(resolvedEnv, triggerMetadata, authParams) default: return nil, fmt.Errorf("no scaler found for type: %s", triggerType) } diff --git a/pkg/scalers/azure_monitor.go b/pkg/scalers/azure_monitor.go new file mode 100644 index 00000000000..2753c1ead3d --- /dev/null +++ b/pkg/scalers/azure_monitor.go @@ -0,0 +1,202 @@ +package scalers + +import ( + "context" + "fmt" + "math" + "strconv" + "strings" + "time" + + "github.com/Azure/azure-sdk-for-go/services/preview/monitor/mgmt/2018-03-01/insights" + "github.com/Azure/go-autorest/autorest/azure/auth" + "k8s.io/klog" +) + +// Much of the code in this file is taken from the Azure Kubernetes Metrics Adapter +// https://github.com/Azure/azure-k8s-metrics-adapter/tree/master/pkg/azure/externalmetrics + +type azureExternalMetricRequest struct { + MetricName string + SubscriptionID string + ResourceName string + ResourceProviderNamespace string + ResourceType string + Aggregation string + Timespan string + Filter string + ResourceGroup string +} + +// GetAzureMetricValue returns the value of an Azure Monitor metric, rounded to the nearest int +func GetAzureMetricValue(ctx context.Context, metricMetadata *azureMonitorMetadata) (int32, error) { + client := createMetricsClient(metricMetadata) + + requestPtr, err := createMetricsRequest(metricMetadata) + if err != nil { + return -1, err + } + + return executeRequest(client, requestPtr) +} + +func createMetricsClient(metadata *azureMonitorMetadata) insights.MetricsClient { + client := insights.NewMetricsClient(metadata.subscriptionID) + config := auth.NewClientCredentialsConfig(metadata.clientID, metadata.clientPassword, metadata.tenantID) + + authorizer, _ := config.Authorizer() + client.Authorizer = authorizer + + return client +} + +func createMetricsRequest(metadata *azureMonitorMetadata) (*azureExternalMetricRequest, error) { + metricRequest := azureExternalMetricRequest{ + MetricName: metadata.name, + SubscriptionID: metadata.subscriptionID, + Aggregation: metadata.aggregationType, + Filter: metadata.filter, + ResourceGroup: metadata.resourceGroupName, + } + + resourceInfo := strings.Split(metadata.resourceURI, "/") + metricRequest.ResourceProviderNamespace = resourceInfo[0] + metricRequest.ResourceType = resourceInfo[1] + metricRequest.ResourceName = resourceInfo[2] + + // if no timespan is provided, defaults to 5 minutes + timespan, err := formatTimeSpan(metadata.aggregationInterval) + if err != nil { + return nil, err + } + + metricRequest.Timespan = timespan + + return &metricRequest, nil +} + +func executeRequest(client insights.MetricsClient, request *azureExternalMetricRequest) (int32, error) { + metricResponse, err := getAzureMetric(client, *request) + if err != nil { + azureMonitorLog.Error(err, "error getting azure monitor metric") + return -1, fmt.Errorf("Error getting azure monitor metric %s: %s", request.MetricName, err.Error()) + } + + // casting drops everything after decimal, so round first + metricValue := int32(math.Round(metricResponse)) + + return metricValue, nil +} + +func getAzureMetric(client insights.MetricsClient, azMetricRequest azureExternalMetricRequest) (float64, error) { + err := azMetricRequest.validate() + if err != nil { + return -1, err + } + + metricResourceURI := azMetricRequest.metricResourceURI() + klog.V(2).Infof("resource uri: %s", metricResourceURI) + + metricResult, err := client.List(context.Background(), metricResourceURI, + azMetricRequest.Timespan, nil, + azMetricRequest.MetricName, azMetricRequest.Aggregation, nil, + "", azMetricRequest.Filter, "", "") + if err != nil { + return -1, err + } + + value, err := extractValue(azMetricRequest, metricResult) + + return value, err +} + +func extractValue(azMetricRequest azureExternalMetricRequest, metricResult insights.Response) (float64, error) { + metricVals := *metricResult.Value + + if len(metricVals) == 0 { + err := fmt.Errorf("Got an empty response for metric %s/%s and aggregate type %s", azMetricRequest.ResourceProviderNamespace, azMetricRequest.MetricName, insights.AggregationType(strings.ToTitle(azMetricRequest.Aggregation))) + return -1, err + } + + timeseries := *metricVals[0].Timeseries + if timeseries == nil { + err := fmt.Errorf("Got metric result for %s/%s and aggregate type %s without timeseries", azMetricRequest.ResourceProviderNamespace, azMetricRequest.MetricName, insights.AggregationType(strings.ToTitle(azMetricRequest.Aggregation))) + return -1, err + } + + data := *timeseries[0].Data + if data == nil { + err := fmt.Errorf("Got metric result for %s/%s and aggregate type %s without any metric values", azMetricRequest.ResourceProviderNamespace, azMetricRequest.MetricName, insights.AggregationType(strings.ToTitle(azMetricRequest.Aggregation))) + return -1, err + } + + valuePtr, err := verifyAggregationTypeIsSupported(azMetricRequest.Aggregation, data) + if err != nil { + return -1, fmt.Errorf("Unable to get value for metric %s/%s with aggregation %s. No value returned by Azure Monitor", azMetricRequest.ResourceProviderNamespace, azMetricRequest.MetricName, azMetricRequest.Aggregation) + } + + klog.V(2).Infof("metric type: %s %f", azMetricRequest.Aggregation, *valuePtr) + + return *valuePtr, nil +} + +func (amr azureExternalMetricRequest) validate() error { + if amr.MetricName == "" { + return fmt.Errorf("metricName is required") + } + if amr.ResourceGroup == "" { + return fmt.Errorf("resourceGroup is required") + } + if amr.SubscriptionID == "" { + return fmt.Errorf("subscriptionID is required. set a default or pass via label selectors") + } + return nil +} + +func (amr azureExternalMetricRequest) metricResourceURI() string { + return fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/%s/%s/%s", + amr.SubscriptionID, + amr.ResourceGroup, + amr.ResourceProviderNamespace, + amr.ResourceType, + amr.ResourceName) +} + +// formatTimeSpan defaults to a 5 minute timespan if the user does not provide one +func formatTimeSpan(timeSpan string) (string, error) { + endtime := time.Now().UTC().Format(time.RFC3339) + starttime := time.Now().Add(-(5 * time.Minute)).UTC().Format(time.RFC3339) + if timeSpan != "" { + aggregationInterval := strings.Split(timeSpan, ":") + hours, herr := strconv.Atoi(aggregationInterval[0]) + minutes, merr := strconv.Atoi(aggregationInterval[1]) + seconds, serr := strconv.Atoi(aggregationInterval[2]) + + if herr != nil || merr != nil || serr != nil { + return "", fmt.Errorf("Errors parsing metricAggregationInterval: %v, %v, %v", herr, merr, serr) + } + + starttime = time.Now().Add(-(time.Duration(hours)*time.Hour + time.Duration(minutes)*time.Minute + time.Duration(seconds)*time.Second)).UTC().Format(time.RFC3339) + } + return fmt.Sprintf("%s/%s", starttime, endtime), nil +} + +func verifyAggregationTypeIsSupported(aggregationType string, data []insights.MetricValue) (*float64, error) { + var valuePtr *float64 + if strings.EqualFold(string(insights.Average), aggregationType) && data[len(data)-1].Average != nil { + valuePtr = data[len(data)-1].Average + } else if strings.EqualFold(string(insights.Total), aggregationType) && data[len(data)-1].Total != nil { + valuePtr = data[len(data)-1].Total + } else if strings.EqualFold(string(insights.Maximum), aggregationType) && data[len(data)-1].Maximum != nil { + valuePtr = data[len(data)-1].Maximum + } else if strings.EqualFold(string(insights.Minimum), aggregationType) && data[len(data)-1].Minimum != nil { + valuePtr = data[len(data)-1].Minimum + } else if strings.EqualFold(string(insights.Count), aggregationType) && data[len(data)-1].Count != nil { + fValue := float64(*data[len(data)-1].Count) + valuePtr = &fValue + } else { + err := fmt.Errorf("Unsupported aggregation type %s", insights.AggregationType(strings.ToTitle(aggregationType))) + return nil, err + } + return valuePtr, nil +} diff --git a/pkg/scalers/azure_monitor_scaler.go b/pkg/scalers/azure_monitor_scaler.go new file mode 100644 index 00000000000..a27d83a34a7 --- /dev/null +++ b/pkg/scalers/azure_monitor_scaler.go @@ -0,0 +1,194 @@ +package scalers + +import ( + "context" + "fmt" + "strconv" + "strings" + + v2beta1 "k8s.io/api/autoscaling/v2beta1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +const ( + azureMonitorMetricName = "metricName" + targetValueName = "targetValue" + defaultClientIDSetting = "" + defaultClientPasswordSetting = "" +) + +type azureMonitorScaler struct { + metadata *azureMonitorMetadata +} + +type azureMonitorMetadata struct { + resourceURI string + tenantID string + subscriptionID string + resourceGroupName string + name string + filter string + aggregationInterval string + aggregationType string + clientID string + clientPassword string + targetValue int +} + +var azureMonitorLog = logf.Log.WithName("azure_monitor_scaler") + +// NewAzureMonitorScaler creates a new AzureMonitorScaler +func NewAzureMonitorScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) { + meta, err := parseAzureMonitorMetadata(metadata, resolvedEnv, authParams) + if err != nil { + return nil, fmt.Errorf("error parsing azure monitor metadata: %s", err) + } + + return &azureMonitorScaler{ + metadata: meta, + }, nil +} + +func parseAzureMonitorMetadata(metadata, resolvedEnv, authParams map[string]string) (*azureMonitorMetadata, error) { + meta := azureMonitorMetadata{} + + if val, ok := metadata[targetValueName]; ok && val != "" { + targetValue, err := strconv.Atoi(val) + if err != nil { + azureMonitorLog.Error(err, "Error parsing azure monitor metadata", "targetValue", targetValueName) + return nil, fmt.Errorf("Error parsing azure monitor metadata %s: %s", targetValueName, err.Error()) + } + meta.targetValue = targetValue + } else { + return nil, fmt.Errorf("no targetValue given") + } + + if val, ok := metadata["resourceURI"]; ok && val != "" { + resourceURI := strings.Split(val, "/") + if len(resourceURI) != 3 { + return nil, fmt.Errorf("resourceURI not in the correct format. Should be namespace/resource_type/resource_name") + } + meta.resourceURI = val + } else { + return nil, fmt.Errorf("no resourceURI given") + } + + if val, ok := metadata["resourceGroupName"]; ok && val != "" { + meta.resourceGroupName = val + } else { + return nil, fmt.Errorf("no resourceGroupName given") + } + + if val, ok := metadata[azureMonitorMetricName]; ok && val != "" { + meta.name = val + } else { + return nil, fmt.Errorf("no metricName given") + } + + if val, ok := metadata["metricAggregationType"]; ok && val != "" { + meta.aggregationType = val + } else { + return nil, fmt.Errorf("no metricAggregationType given") + } + + if val, ok := metadata["metricFilter"]; ok && val != "" { + meta.filter = val + } + + if val, ok := metadata["metricAggregationInterval"]; ok && val != "" { + aggregationInterval := strings.Split(val, ":") + if len(aggregationInterval) != 3 { + return nil, fmt.Errorf("metricAggregationInterval not in the correct format. Should be hh:mm:ss") + } + meta.aggregationInterval = val + } + + // Required authentication parameters below + + if val, ok := metadata["subscriptionId"]; ok && val != "" { + meta.subscriptionID = val + } else { + return nil, fmt.Errorf("no subscriptionId given") + } + + if val, ok := metadata["tenantId"]; ok && val != "" { + meta.tenantID = val + } else { + return nil, fmt.Errorf("no tenantId given") + } + + if val, ok := authParams["activeDirectoryClientId"]; ok && val != "" { + meta.clientID = val + } else { + clientIDSetting := defaultClientIDSetting + if val, ok := metadata["activeDirectoryClientId"]; ok && val != "" { + clientIDSetting = val + } + + if val, ok := resolvedEnv[clientIDSetting]; ok { + meta.clientID = val + } else { + return nil, fmt.Errorf("no activeDirectoryClientId given") + } + } + + if val, ok := authParams["activeDirectoryClientPassword"]; ok && val != "" { + meta.clientPassword = val + } else { + clientPasswordSetting := defaultClientPasswordSetting + if val, ok := metadata["activeDirectoryClientPassword"]; ok && val != "" { + clientPasswordSetting = val + } + + if val, ok := resolvedEnv[clientPasswordSetting]; ok { + meta.clientPassword = val + } else { + return nil, fmt.Errorf("no activeDirectoryClientPassword given") + } + } + + return &meta, nil +} + +// Returns true if the Azure Monitor metric value is greater than zero +func (s *azureMonitorScaler) IsActive(ctx context.Context) (bool, error) { + val, err := GetAzureMetricValue(ctx, s.metadata) + if err != nil { + azureMonitorLog.Error(err, "error getting azure monitor metric") + return false, err + } + + return val > 0, nil +} + +func (s *azureMonitorScaler) Close() error { + return nil +} + +func (s *azureMonitorScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { + targetMetricVal := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI) + externalMetric := &v2beta1.ExternalMetricSource{MetricName: azureMonitorMetricName, TargetAverageValue: targetMetricVal} + metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2beta1.MetricSpec{metricSpec} +} + +// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric +func (s *azureMonitorScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + val, err := GetAzureMetricValue(ctx, s.metadata) + if err != nil { + azureMonitorLog.Error(err, "error getting azure monitor metric") + return []external_metrics.ExternalMetricValue{}, err + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(val), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} diff --git a/pkg/scalers/azure_monitor_test.go b/pkg/scalers/azure_monitor_test.go new file mode 100644 index 00000000000..883be47df33 --- /dev/null +++ b/pkg/scalers/azure_monitor_test.go @@ -0,0 +1,64 @@ +package scalers + +import ( + "testing" +) + +type parseAzMonitorMetadataTestData struct { + metadata map[string]string + isError bool + resolvedEnv map[string]string + authParams map[string]string +} + +var testAzMonitorResolvedEnv = map[string]string{ + "CLIENT_ID": "xxx", + "CLIENT_PASSWORD": "yyy", +} + +var testParseAzMonitorMetadata = []parseAzMonitorMetadataTestData{ + // nothing passed + {map[string]string{}, true, map[string]string{}, map[string]string{}}, + // properly formed + {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "activeDirectoryClientId": "CLIENT_ID", "activeDirectoryClientPassword": "CLIENT_PASSWORD", "targetValue": "5"}, false, testAzMonitorResolvedEnv, map[string]string{}}, + // no optional parameters + {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationType": "Average", "activeDirectoryClientId": "CLIENT_ID", "activeDirectoryClientPassword": "CLIENT_PASSWORD", "targetValue": "5"}, false, testAzMonitorResolvedEnv, map[string]string{}}, + // incorrectly formatted resourceURI + {map[string]string{"resourceURI": "bad/format", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "activeDirectoryClientId": "CLIENT_ID", "activeDirectoryClientPassword": "CLIENT_PASSWORD", "targetValue": "5"}, true, testAzMonitorResolvedEnv, map[string]string{}}, + // improperly formatted aggregationInterval + {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:1", "metricAggregationType": "Average", "activeDirectoryClientId": "CLIENT_ID", "activeDirectoryClientPassword": "CLIENT_PASSWORD", "targetValue": "5"}, true, testAzMonitorResolvedEnv, map[string]string{}}, + // missing resourceURI + {map[string]string{"tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "activeDirectoryClientId": "CLIENT_ID", "activeDirectoryClientPassword": "CLIENT_PASSWORD", "targetValue": "5"}, true, testAzMonitorResolvedEnv, map[string]string{}}, + // missing tenantId + {map[string]string{"resourceURI": "test/resource/uri", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "activeDirectoryClientId": "CLIENT_ID", "activeDirectoryClientPassword": "CLIENT_PASSWORD", "targetValue": "5"}, true, testAzMonitorResolvedEnv, map[string]string{}}, + // missing subscriptionId + {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "activeDirectoryClientId": "CLIENT_ID", "activeDirectoryClientPassword": "CLIENT_PASSWORD", "targetValue": "5"}, true, testAzMonitorResolvedEnv, map[string]string{}}, + // missing resourceGroupName + {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "metricName": "metric", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "activeDirectoryClientId": "CLIENT_ID", "activeDirectoryClientPassword": "CLIENT_PASSWORD", "targetValue": "5"}, true, testAzMonitorResolvedEnv, map[string]string{}}, + // missing metricName + {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "activeDirectoryClientId": "CLIENT_ID", "activeDirectoryClientPassword": "CLIENT_PASSWORD", "targetValue": "5"}, true, testAzMonitorResolvedEnv, map[string]string{}}, + // missing metricAggregationType + {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:15:0", "activeDirectoryClientId": "CLIENT_ID", "activeDirectoryClientPassword": "CLIENT_PASSWORD", "targetValue": "5"}, true, testAzMonitorResolvedEnv, map[string]string{}}, + // filter included + {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricFilter": "namespace eq 'default'", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "activeDirectoryClientId": "CLIENT_ID", "activeDirectoryClientPassword": "CLIENT_PASSWORD", "targetValue": "5"}, false, testAzMonitorResolvedEnv, map[string]string{}}, + // missing activeDirectoryClientId + {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "activeDirectoryClientPassword": "CLIENT_PASSWORD", "targetValue": "5"}, true, testAzMonitorResolvedEnv, map[string]string{}}, + // missing activeDirectoryClientPassword + {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "activeDirectoryClientId": "CLIENT_ID", "targetValue": "5"}, true, testAzMonitorResolvedEnv, map[string]string{}}, + // missing targetValue + {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "activeDirectoryClientId": "CLIENT_ID", "activeDirectoryClientPassword": "CLIENT_PASSWORD"}, true, testAzMonitorResolvedEnv, map[string]string{}}, + // connection from authParams + {map[string]string{"resourceURI": "test/resource/uri", "tenantId": "123", "subscriptionId": "456", "resourceGroupName": "test", "metricName": "metric", "metricAggregationInterval": "0:15:0", "metricAggregationType": "Average", "targetValue": "5"}, false, map[string]string{}, map[string]string{"activeDirectoryClientId": "zzz", "activeDirectoryClientPassword": "password"}}, +} + +func TestAzMonitorParseMetadata(t *testing.T) { + for _, testData := range testParseAzMonitorMetadata { + _, err := parseAzureMonitorMetadata(testData.metadata, testData.resolvedEnv, testData.authParams) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Errorf("Expected error but got success. testData: %v", testData) + } + } +}