From 824947bce1c8c78676156e73ebced781bb0c6581 Mon Sep 17 00:00:00 2001 From: Yarden Siboni Date: Sat, 12 Feb 2022 17:31:54 +0200 Subject: [PATCH] feat: Provide Azure Data Explorer Scaler (#1488) Signed-off-by: Yarden Siboni --- CHANGELOG.md | 2 +- go.mod | 1 + go.sum | 5 + pkg/scalers/azure/azure_data_explorer.go | 132 ++++++++++ pkg/scalers/azure/azure_data_explorer_test.go | 107 ++++++++ pkg/scalers/azure_data_explorer_scaler.go | 194 ++++++++++++++ .../azure_data_explorer_scaler_test.go | 158 ++++++++++++ pkg/scaling/scale_handler.go | 2 + tests/scalers/azure-data-explorer.test.ts | 237 ++++++++++++++++++ 9 files changed, 837 insertions(+), 1 deletion(-) create mode 100644 pkg/scalers/azure/azure_data_explorer.go create mode 100644 pkg/scalers/azure/azure_data_explorer_test.go create mode 100644 pkg/scalers/azure_data_explorer_scaler.go create mode 100644 pkg/scalers/azure_data_explorer_scaler_test.go create mode 100644 tests/scalers/azure-data-explorer.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index fa37d120de4..9d6f4e70e9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,7 +30,7 @@ ### New -- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX)) +- Add Azure Data Explorer Scaler ([#pr-num](https://github.com/kedacore/keda/pull/pr-num)) ### Improvements diff --git a/go.mod b/go.mod index bae8c89bf9d..cba453a2db8 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( cloud.google.com/go/monitoring v1.2.0 github.com/Azure/azure-amqp-common-go/v3 v3.2.2 github.com/Azure/azure-event-hubs-go/v3 v3.3.16 + github.com/Azure/azure-kusto-go v0.5.0 github.com/Azure/azure-sdk-for-go v61.4.0+incompatible github.com/Azure/azure-service-bus-go v0.11.5 github.com/Azure/azure-storage-blob-go v0.14.0 diff --git a/go.sum b/go.sum index 0252b3edc74..9090ba7148c 100644 --- a/go.sum +++ b/go.sum @@ -62,11 +62,14 @@ github.com/Azure/azure-amqp-common-go/v3 v3.2.2 h1:CJpxNAGxP7UBhDusRUoaOn0uOorQy github.com/Azure/azure-amqp-common-go/v3 v3.2.2/go.mod h1:O6X1iYHP7s2x7NjUKsXVhkwWrQhxrd+d8/3rRadj4CI= github.com/Azure/azure-event-hubs-go/v3 v3.3.16 h1:e3iHaU6Tgq5A8F313uIUWwwXtXAn75iIC6ekgzW6TG8= github.com/Azure/azure-event-hubs-go/v3 v3.3.16/go.mod h1:xgDvUi1+8/bb11WTEaU7VwZREYufzKzjWE4YiPZixb0= +github.com/Azure/azure-kusto-go v0.5.0 h1:BUD6WK22gXlWp2x81uy8ztWyyAKoP8Mv2RU82XaiZhw= +github.com/Azure/azure-kusto-go v0.5.0/go.mod h1:2xOhBxRcHyyNifFHmNMcqYL6AMdhyrUHCkEJkrZ+EI4= github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/azure-sdk-for-go v61.2.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go v61.4.0+incompatible h1:BF2Pm3aQWIa6q9KmxyF1JYKYXtVw67vtvu2Wd54NGuY= github.com/Azure/azure-sdk-for-go v61.4.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go/sdk/azcore v0.19.0/go.mod h1:h6H6c8enJmmocHUbLiiGY6sx7f9i+X3m1CHdd5c6Rdw= @@ -388,6 +391,7 @@ github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/E github.com/gocql/gocql v0.0.0-20211222173705-d73e6b1002a7 h1:jmIMM+nEO+vjz9xaRIg9sZNtNLq5nsSbsxwe1OtRwv4= github.com/gocql/gocql v0.0.0-20211222173705-d73e6b1002a7/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -687,6 +691,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= diff --git a/pkg/scalers/azure/azure_data_explorer.go b/pkg/scalers/azure/azure_data_explorer.go new file mode 100644 index 00000000000..fa912e83b5d --- /dev/null +++ b/pkg/scalers/azure/azure_data_explorer.go @@ -0,0 +1,132 @@ +/* +Copyright 2021 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "context" + "fmt" + "io" + "strconv" + + "github.com/Azure/azure-kusto-go/kusto" + "github.com/Azure/azure-kusto-go/kusto/data/table" + "github.com/Azure/azure-kusto-go/kusto/unsafe" + "github.com/Azure/go-autorest/autorest/azure/auth" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +type DataExplorerMetadata struct { + ClientId string + ClientSecret string + DatabaseName string + Endpoint string + MetricName string + PodIdentity string + Query string + TenantId string + Threshold int64 +} + +var azureDataExplorerLogger = logf.Log.WithName("azure_data_explorer_scaler") + +func CreateAzureDataExplorerClient(metadata *DataExplorerMetadata) (*kusto.Client, error) { + authConfig, err := getDataExplorerAuthConfig(metadata) + if err != nil { + return nil, fmt.Errorf("failed to get data explorer auth config: %v", err) + } + authorizer := kusto.Authorization{Config: *authConfig} + + client, err := kusto.New(metadata.Endpoint, authorizer) + if err != nil { + return nil, fmt.Errorf("failed to create kusto client: %v", err) + } + + return client, nil +} + +func getDataExplorerAuthConfig(metadata *DataExplorerMetadata) (*auth.AuthorizerConfig, error) { + var authConfig auth.AuthorizerConfig + + if metadata.PodIdentity != "" { + authConfig = auth.NewMSIConfig() + azureDataExplorerLogger.Info("Creating Azure Data Explorer Client using Pod Identity") + } else if metadata.ClientId != "" && metadata.ClientSecret != "" && metadata.TenantId != "" { + authConfig = auth.NewClientCredentialsConfig(metadata.ClientId, metadata.ClientSecret, metadata.TenantId) + azureDataExplorerLogger.Info("Creating Azure Data Explorer Client using clientId, clientSecret and tenantId") + } else { + return nil, fmt.Errorf("missing credentials. please reconfigure your scaled object metadata") + } + + return &authConfig, nil +} + +func GetAzureDataExplorerMetricValue(ctx context.Context, client *kusto.Client, db string, query string) (int64, error) { + azureDataExplorerLogger.Info("Querying Azure Data Explorer", "db", db, "query", query) + + iter, err := client.Query(ctx, db, kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{true, false})).UnsafeAdd(query)) + if err != nil { + return -1, fmt.Errorf("failed to get azure data explorer metric result from query %s: %v", query, err) + } + defer iter.Stop() + + // Get the first (and only) row. + row, err := iter.Next() + if err != nil { + return -1, fmt.Errorf("failed to get query %s result: %v", query, err) + } + + if !row.ColumnTypes[0].Type.Valid() { + return -1, fmt.Errorf("column type %s is not valid", row.ColumnTypes[0].Type) + } + + // Return error if there is more than one row. + _, err = iter.Next() + if err != io.EOF { + return -1, fmt.Errorf("query %s result had more than a single result row", query) + } + + metricValue, err := extractDataExplorerMetricValue(row) + if err != nil { + return -1, fmt.Errorf("failed to extract value from query %s: %v", query, err) + } + + return metricValue, nil +} + +func extractDataExplorerMetricValue(row *table.Row) (int64, error) { + if row == nil || len(row.ColumnTypes) == 0 { + return -1, fmt.Errorf("query has no results") + } + + dataType := row.ColumnTypes[0].Type + value := row.Values[0].String() + azureDataExplorerLogger.Info("Query Result", "value", value, "dataType", dataType) + + // Query result validation. + if dataType == "real" || dataType == "int" || dataType == "long" { + convertedValue, err := strconv.Atoi(value) + if err != nil { + return -1, fmt.Errorf("failed to convert result %s to int", value) + } + if convertedValue < 0 { + return -1, fmt.Errorf("query result must be >= 0 but recieved: %s", value) + } + return int64(convertedValue), nil + } + + return -1, fmt.Errorf("failed to extract metric value from Data Explorer request") +} diff --git a/pkg/scalers/azure/azure_data_explorer_test.go b/pkg/scalers/azure/azure_data_explorer_test.go new file mode 100644 index 00000000000..e3f890c2044 --- /dev/null +++ b/pkg/scalers/azure/azure_data_explorer_test.go @@ -0,0 +1,107 @@ +/* +Copyright 2021 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "testing" + + "github.com/Azure/azure-kusto-go/kusto/data/errors" + "github.com/Azure/azure-kusto-go/kusto/data/table" + "github.com/Azure/azure-kusto-go/kusto/data/types" + "github.com/Azure/azure-kusto-go/kusto/data/value" +) + +type testExtractDataExplorerMetricValue struct { + testRow *table.Row + isError bool +} + +type testGetDataExplorerAuthConfig struct { + testMetadata *DataExplorerMetadata + isError bool +} + +type testGetDataExplorerEndpoint struct { + testMetadata *DataExplorerMetadata + isError bool +} + +var ( + clientId = "test_client_id" + clusterName = "test_cluster_name" + region = "test_region" + rowName = "result" + rowType types.Column = "long" + rowValue int64 = 3 + podIdentity = "Azure" + secret = "test_secret" + tenantId = "test_tenant_id" +) + +var testExtractDataExplorerMetricValues = []testExtractDataExplorerMetricValue{ + // pass + {testRow: &table.Row{ColumnTypes: table.Columns{{rowName, rowType}}, Values: value.Values{value.Long{Value: rowValue, Valid: true}}, Op: errors.OpQuery}, isError: false}, + // nil row - fail + {testRow: nil, isError: true}, + // Empty row - fail + {testRow: &table.Row{}, isError: true}, + // Metric value is not bigger than 0 - fail + {testRow: &table.Row{ColumnTypes: table.Columns{{rowName, rowType}}, Values: value.Values{value.Long{Value: -1, Valid: true}}, Op: errors.OpQuery}, isError: true}, + // Metric result is invalid - fail + {testRow: &table.Row{ColumnTypes: table.Columns{{rowName, rowType}}, Values: value.Values{value.String{Value: "invalid", Valid: true}}, Op: errors.OpQuery}, isError: true}, + // Metric Type is not valid - fail + {testRow: &table.Row{ColumnTypes: table.Columns{{rowName, "String"}}, Values: value.Values{value.Long{Value: rowValue, Valid: true}}, Op: errors.OpQuery}, isError: true}, +} + +var testGetDataExplorerAuthConfigs = []testGetDataExplorerAuthConfig{ + // Auth with aad app - pass + {testMetadata: &DataExplorerMetadata{ClientId: clientId, ClientSecret: secret, TenantId: tenantId}, isError: false}, + // Auth with podIdentity - pass + {testMetadata: &DataExplorerMetadata{PodIdentity: podIdentity}, isError: false}, + // Empty metadata - fail + {testMetadata: &DataExplorerMetadata{}, isError: true}, + // Empty tenantId - fail + {testMetadata: &DataExplorerMetadata{ClientId: clientId, ClientSecret: secret}, isError: true}, + // Empty clientId - fail + {testMetadata: &DataExplorerMetadata{ClientSecret: secret, TenantId: tenantId}, isError: true}, + // Empty clientSecret - fail + {testMetadata: &DataExplorerMetadata{ClientId: clientId, TenantId: tenantId}, isError: true}, +} + +func TestExtractDataExplorerMetricValue(t *testing.T) { + for _, testData := range testExtractDataExplorerMetricValues { + _, err := extractDataExplorerMetricValue(testData.testRow) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } +} + +func TestGetDataExplorerAuthConfig(t *testing.T) { + for _, testData := range testGetDataExplorerAuthConfigs { + _, err := getDataExplorerAuthConfig(testData.testMetadata) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } +} diff --git a/pkg/scalers/azure_data_explorer_scaler.go b/pkg/scalers/azure_data_explorer_scaler.go new file mode 100644 index 00000000000..73cadfaffeb --- /dev/null +++ b/pkg/scalers/azure_data_explorer_scaler.go @@ -0,0 +1,194 @@ +/* +Copyright 2021 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scalers + +import ( + "context" + "fmt" + "strconv" + + "github.com/Azure/azure-kusto-go/kusto" + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + "github.com/kedacore/keda/v2/pkg/scalers/azure" + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +type azureDataExplorerScaler struct { + metadata *azure.DataExplorerMetadata + client *kusto.Client + name string + namespace string +} + +const adxName = "azure-data-explorer" + +var dataExplorerLogger = logf.Log.WithName("azure_data_explorer_scaler") + +func NewAzureDataExplorerScaler(config *ScalerConfig) (Scaler, error) { + metadata, err := parseAzureDataExplorerMetadata(config) + if err != nil { + return nil, fmt.Errorf("failed to parse azure data explorer metadata: %s", err) + } + + client, err := azure.CreateAzureDataExplorerClient(metadata) + if err != nil { + return nil, fmt.Errorf("failed to create azure data explorer client: %s", err) + } + + return &azureDataExplorerScaler{ + metadata: metadata, + client: client, + name: config.Name, + namespace: config.Namespace, + }, nil +} + +func parseAzureDataExplorerMetadata(config *ScalerConfig) (*azure.DataExplorerMetadata, error) { + metadata, err := parseAzureDataExplorerAuthParams(config) + + // Get database name. + databaseName, err := getParameterFromConfig(config, "databaseName", false) + if err != nil { + return nil, err + } + metadata.DatabaseName = databaseName + + // Get endpoint. + endpoint, err := getParameterFromConfig(config, "endpoint", false) + if err != nil { + return nil, err + } + metadata.Endpoint = endpoint + + // Get query. + query, err := getParameterFromConfig(config, "query", false) + if err != nil { + return nil, err + } + metadata.Query = query + + // Get threshold, observe that we don't check AuthParams for threshold. + if val, ok := config.TriggerMetadata["threshold"]; ok { + threshold, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return nil, fmt.Errorf("error parsing metadata. Details: can't parse threshold. Inner Error: %v", err) + } + metadata.Threshold = threshold + } + + // Resolve metricName. + if val, ok := config.TriggerMetadata["metricName"]; ok { + metadata.MetricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", adxName, val)) + } else { + metadata.MetricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", adxName, metadata.DatabaseName)) + } + + dataExplorerLogger.Info("Parsed azureDataExplorerMetadata", + "database", metadata.DatabaseName, + "endpoint", metadata.Endpoint, + "query", metadata.Query, + "threshold", metadata.Threshold) + + return metadata, nil +} + +func parseAzureDataExplorerAuthParams(config *ScalerConfig) (*azure.DataExplorerMetadata, error) { + metadata := azure.DataExplorerMetadata{} + + switch config.PodIdentity { + case kedav1alpha1.PodIdentityProviderAzure: + metadata.PodIdentity = string(config.PodIdentity) + case "", kedav1alpha1.PodIdentityProviderNone: + // Check whether pod identity is provided through metadata instead of TriggerAuthentication object + podIdentity, err := getParameterFromConfig(config, "podIdentity", true) + if err != nil { + dataExplorerLogger.Info("pod identity is not provided. trying to resolve clientId, clientSecret and tenantId.") + } else { + metadata.PodIdentity = podIdentity + return &metadata, nil + } + + tenantId, err := getParameterFromConfig(config, "tenantId", true) + if err != nil { + return nil, err + } + metadata.TenantId = tenantId + + clientId, err := getParameterFromConfig(config, "clientId", true) + if err != nil { + return nil, err + } + metadata.ClientId = clientId + + clientSecret, err := getParameterFromConfig(config, "clientSecret", true) + if err != nil { + return nil, err + } + metadata.ClientSecret = clientSecret + default: + return nil, fmt.Errorf("error parsing auth params") + } + return &metadata, nil +} + +func (s azureDataExplorerScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + metricValue, err := azure.GetAzureDataExplorerMetricValue(ctx, s.client, s.metadata.DatabaseName, s.metadata.Query) + if err != nil { + return []external_metrics.ExternalMetricValue{}, fmt.Errorf("failed to get metrics for scaled object %s in namespace %s: %v", s.name, s.namespace, err) + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(metricValue, resource.DecimalSI), + Timestamp: metav1.Now(), + } + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +func (s azureDataExplorerScaler) GetMetricSpecForScaling(ctx context.Context) []v2beta2.MetricSpec { + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: s.metadata.MetricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: resource.NewQuantity(s.metadata.Threshold, resource.DecimalSI), + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2beta2.MetricSpec{metricSpec} +} + +func (s azureDataExplorerScaler) IsActive(ctx context.Context) (bool, error) { + metricValue, err := azure.GetAzureDataExplorerMetricValue(ctx, s.client, s.metadata.DatabaseName, s.metadata.Query) + if err != nil { + return false, fmt.Errorf("failed to get azure data explorer metric value: %s", err) + } + + return metricValue > 0, nil +} + +func (s azureDataExplorerScaler) Close(context.Context) error { + return nil +} diff --git a/pkg/scalers/azure_data_explorer_scaler_test.go b/pkg/scalers/azure_data_explorer_scaler_test.go new file mode 100644 index 00000000000..5ed1b9da850 --- /dev/null +++ b/pkg/scalers/azure_data_explorer_scaler_test.go @@ -0,0 +1,158 @@ +/* +Copyright 2021 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scalers + +import ( + "context" + "fmt" + "testing" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" +) + +type parseDataExplorerMetadataTestData struct { + metadata map[string]string + isError bool +} + +type dataExplorerMetricIdentifier struct { + metadataTestData *parseDataExplorerMetadataTestData + scalerIndex int + name string +} + +var ( + aadAppClientID = "eebdbbab-cf74-4791-a5c6-1ef5d90b1fa8" + aadAppSecret = "test_app_secret" + azureTenantID = "8fe57c22-02b1-4b87-8c24-ae21dea4fa6a" + databaseName = "test_database" + dataExplorerQuery = "print 3" + dataExplorerThreshold = "1" + dataExplorerEndpoint = "https://test-keda-e2e.eastus.kusto.windows.net" + metricName = "test_metric" + podIdentity = "azure" +) + +// Valid auth params with aad application and passwd +var dataExplorerResolvedEnv = map[string]string{ + "tenantId": azureTenantID, + "clientId": aadAppClientID, + "clientSecret": aadAppSecret, +} + +var testDataExplorerMetadataWithClientAndSecret = []parseDataExplorerMetadataTestData{ + // Empty metadata - fail + {map[string]string{}, true}, + // Missing tenantId - fail + {map[string]string{"tenantId": "", "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": dataExplorerQuery, "threshold": dataExplorerThreshold}, true}, + // Missing clientId - fail + {map[string]string{"tenantId": azureTenantID, "clientId": "", "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": dataExplorerQuery, "threshold": dataExplorerThreshold}, true}, + // Missing clientSecret - fail + {map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": "", "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": dataExplorerQuery, "threshold": dataExplorerThreshold}, true}, + // Missing endpoint - fail + {map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": "", "databaseName": databaseName, "query": dataExplorerQuery, "threshold": dataExplorerThreshold}, true}, + // Missing databaseName - fail + {map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": "", "query": dataExplorerQuery, "threshold": dataExplorerThreshold}, true}, + // Missing query - fail + {map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": "", "threshold": dataExplorerThreshold}, true}, + // Missing threshold - fail + {map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": dataExplorerQuery, "threshold": ""}, true}, + // All parameters set - pass + {map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": dataExplorerQuery, "threshold": dataExplorerThreshold}, false}, +} + +var testDataExplorerMetadataWithPodIdentity = []parseDataExplorerMetadataTestData{ + // Empty metadata - fail + {map[string]string{}, true}, + // Missing endpoint - fail + {map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": "", "databaseName": databaseName, "query": dataExplorerQuery, "threshold": dataExplorerThreshold}, true}, + // Missing query - fail + {map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": "", "threshold": dataExplorerThreshold}, true}, + // Missing threshold - fail + {map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": dataExplorerQuery, "threshold": ""}, true}, + // All parameters set - pass + {map[string]string{"tenantId": azureTenantID, "clientId": aadAppClientID, "clientSecret": aadAppSecret, "endpoint": dataExplorerEndpoint, "databaseName": databaseName, "query": dataExplorerQuery, "threshold": dataExplorerThreshold, "metricName": metricName}, false}, +} + +var testDataExplorerMetricIdentifiers = []dataExplorerMetricIdentifier{ + {&testDataExplorerMetadataWithClientAndSecret[len(testDataExplorerMetadataWithClientAndSecret)-1], 0, fmt.Sprintf("%s-%s", adxName, databaseName)}, + {&testDataExplorerMetadataWithPodIdentity[len(testDataExplorerMetadataWithPodIdentity)-1], 1, fmt.Sprintf("%s-%s", adxName, metricName)}, +} + +func TestDataExplorerParseMetadata(t *testing.T) { + // Auth through clientId, clientSecret and tenantId + for _, testData := range testDataExplorerMetadataWithClientAndSecret { + _, err := parseAzureDataExplorerMetadata( + &ScalerConfig{ + ResolvedEnv: dataExplorerResolvedEnv, + TriggerMetadata: testData.metadata, + AuthParams: map[string]string{}, + PodIdentity: ""}) + + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } + + // Auth through Pod Identity + for _, testData := range testDataExplorerMetadataWithPodIdentity { + _, err := parseAzureDataExplorerMetadata( + &ScalerConfig{ + ResolvedEnv: dataExplorerResolvedEnv, + TriggerMetadata: testData.metadata, + AuthParams: map[string]string{}, + PodIdentity: kedav1alpha1.PodIdentityProviderAzure}) + + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } +} + +func TestDataExplorerGetMetricSpecForScaling(t *testing.T) { + for _, testData := range testDataExplorerMetricIdentifiers { + meta, err := parseAzureDataExplorerMetadata( + &ScalerConfig{ + ResolvedEnv: dataExplorerResolvedEnv, + TriggerMetadata: testData.metadataTestData.metadata, + AuthParams: map[string]string{}, + PodIdentity: "", + ScalerIndex: testData.scalerIndex}) + if err != nil { + t.Fatal("Failed to parse metadata:", err) + } + + mockDataExplorerScaler := azureDataExplorerScaler{ + metadata: meta, + client: nil, + name: "mock_scaled_object", + namespace: "mock_namespace", + } + + metricSpec := mockDataExplorerScaler.GetMetricSpecForScaling(context.Background()) + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.name { + t.Error("Wrong External metric source name:", metricName) + } + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 9f384ee24f2..3e98f28b39f 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -362,6 +362,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewAzureAppInsightsScaler(config) case "azure-blob": return scalers.NewAzureBlobScaler(config) + case "azure-data-explorer": + return scalers.NewAzureDataExplorerScaler(config) case "azure-eventhub": return scalers.NewAzureEventHubScaler(config) case "azure-log-analytics": diff --git a/tests/scalers/azure-data-explorer.test.ts b/tests/scalers/azure-data-explorer.test.ts new file mode 100644 index 00000000000..438bbd36809 --- /dev/null +++ b/tests/scalers/azure-data-explorer.test.ts @@ -0,0 +1,237 @@ +import * as fs from 'fs' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import test from 'ava' + +const dataExplorerDb = process.env['TEST_DATA_EXPLORER_DB'] +const dataExplorerEndpoint = process.env['TEST_DATA_EXPLORER_ENDPOINT'] +const spId = process.env['AZURE_SP_ID'] +const spSecret = process.env['AZURE_SP_KEY'] +const spTenantId = process.env['AZURE_SP_TENANT'] + +const testName = 'test-azure-data-explorer' +const dataExplorerNamespace = `${testName}-ns` +const scaleInDesiredReplicaCount = '0' +const scaleInMetricValue = '0' +const scaleOutDesiredReplicaCount = '4' +const scaleOutMetricValue = '18' +const scaledObjectName = `${testName}-scaled-object` +const secretName = `${testName}-secret` +const serviceName = `${testName}-sts` +const triggerAuthThroughClientAndSecretName = `${testName}-trigger-auth-client-and-secret` +const triggerAuthThroughPodIdentityName = `${testName}-trigger-auth-pod-identity` + +test.before(t => { + if (!spId || !spSecret || !spTenantId) { + t.fail('required parameters for data explorer e2e test were not resolved') + } + + sh.config.silent = true + + // Create namespace + t.is( + 0, + sh.exec(`kubectl create namespace ${dataExplorerNamespace}`).code, + 'Create namespace should work.') + + // Create secret + const secretFile = tmp.fileSync() + fs.writeFileSync( + secretFile.name, + secretYaml + .replace('{{CLIENT_ID}}', Buffer.from(spId).toString('base64')) + .replace('{{CLIENT_SECRET}}', Buffer.from(spSecret).toString('base64')) + .replace('{{TENANT_ID}}', Buffer.from(spTenantId).toString('base64'))) + t.is( + 0, + sh.exec(`kubectl apply -f ${secretFile.name} -n ${dataExplorerNamespace}`).code, + 'Creating a secret should work.') + + // Create deployment + t.is( + 0, + sh.exec(`kubectl apply -f ${createYamlFile(stsYaml)} -n ${dataExplorerNamespace}`).code, + 'Creating a statefulset should work.') + + // Validate initial replica count + const replicaCount = sh.exec(`kubectl get sts ${serviceName} -n ${dataExplorerNamespace} -o jsonpath="{.spec.replicas}"`).stdout + t.is( + replicaCount, + scaleInDesiredReplicaCount, + `Replica count should start with ${scaleInDesiredReplicaCount} replicas.`) +}) + +test.serial.cb(`Replica count should be scaled out to ${scaleOutDesiredReplicaCount} replicas [Pod Identity]`, t => { + // Create trigger auth through Pod Identity + t.is( + 0, + sh.exec(`kubectl apply -f ${createYamlFile(triggerAuthPodIdentityYaml)}`).code, + 'Creating a trigger auth should work.') + + // Create scaled object + t.is( + 0, + sh.exec(`kubectl apply -f ${createYamlFile(scaledObjectYaml)}`).code, + 'Creating a scaled object should work.') + + // Test scale out [Pod Identity] + testDeploymentScale(t, scaleOutDesiredReplicaCount) + t.end() +}) + +test.serial.cb(`Replica count should be scaled in to ${scaleInDesiredReplicaCount} replicas [Pod Identity]`, t => { + // Edit azure data explorer query in order to scale down to 0 replicas + const scaledObjectFile = tmp.fileSync() + fs.writeFileSync(scaledObjectFile.name, scaledObjectYaml.replace(scaleOutMetricValue, scaleInMetricValue)) + t.is( + 0, + sh.exec(`kubectl apply -f ${scaledObjectFile.name} -n ${dataExplorerNamespace}`).code, + 'Edit scaled object query should work.') + + // Test scale in [Pod Identity] + testDeploymentScale(t, scaleInDesiredReplicaCount) + t.end() +}) + +test.serial.cb(`Replica count should be scaled out to ${scaleOutDesiredReplicaCount} replicas [clientId & clientSecret]`, t => { + // Create trigger auth through clientId, clientSecret and tenantId + t.is( + 0, + sh.exec(`kubectl apply -f ${createYamlFile(triggerAuthClientIdAndSecretYaml)} -n ${dataExplorerNamespace}`).code, + 'Change trigger of scaled object auth from pod identity to aad app should work.') + + // Change trigger auth of scaled object from pod identity to clientId and clientSecret + const scaledObjectFile = tmp.fileSync() + fs.writeFileSync(scaledObjectFile.name, scaledObjectYaml.replace(triggerAuthThroughPodIdentityName, triggerAuthThroughClientAndSecretName)) + t.is( + 0, + sh.exec(`kubectl apply -f ${scaledObjectFile.name} -n ${dataExplorerNamespace}`).code, + 'Change trigger of scaled object auth from pod identity to aad app should work.') + + // Test scale out [clientId & clientSecret] + testDeploymentScale(t, scaleOutDesiredReplicaCount) + t.end() +}) + +test.after.always.cb('Clean up E2E K8s objects', t => { + const resources = [ + `scaledobject.keda.sh/${scaledObjectName}`, + `triggerauthentications.keda.sh/${triggerAuthThroughClientAndSecretName}`, + `triggerauthentications.keda.sh/${triggerAuthThroughPodIdentityName}`, + `statefulsets.apps/${serviceName}`, + `v1/${secretName}`, + `v1/${dataExplorerNamespace}` + ] + + for (const resource of resources) { + sh.exec(`kubectl delete ${resource} -n ${dataExplorerNamespace}`) + } + + t.end() +}) + +function testDeploymentScale(t, desiredReplicaCount: string) { + let currentReplicaCount = '-1' + for (let i = 0; i < 120 && currentReplicaCount !== desiredReplicaCount; i++) { + currentReplicaCount = sh.exec(`kubectl get sts ${serviceName} -n ${dataExplorerNamespace} -o jsonpath="{.spec.replicas}"`).stdout + if (currentReplicaCount !== desiredReplicaCount) { + sh.exec(`sleep 2s`) + } + } + t.is(desiredReplicaCount, currentReplicaCount, `Replica count should be ${desiredReplicaCount} after some time`) +} + +function createYamlFile(yaml: string) { + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, yaml) + return tmpFile.name +} + +const stsYaml = +`apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: ${serviceName} + namespace: ${dataExplorerNamespace} +spec: + serviceName: ${serviceName} + replicas: ${scaleInDesiredReplicaCount} + selector: + matchLabels: + app: ${serviceName} + template: + metadata: + labels: + app: ${serviceName} + spec: + containers: + - name: nginx + image: nginx:1.16.1` + +const scaledObjectYaml = +`apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: ${scaledObjectName} + namespace: ${dataExplorerNamespace} + labels: + deploymentName: ${serviceName} +spec: + scaleTargetRef: + kind: StatefulSet + name: ${serviceName} + cooldownPeriod: 10 + minReplicaCount: 0 + maxReplicaCount: 10 + pollingInterval: 30 + triggers: + - type: azure-data-explorer + metadata: + databaseName: ${dataExplorerDb} + endpoint: ${dataExplorerEndpoint} + query: print result = ${scaleOutMetricValue} + threshold: "5" + authenticationRef: + name: ${triggerAuthThroughPodIdentityName}` + +// K8s manifests for auth through Pod Identity +const triggerAuthPodIdentityYaml = +`apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: ${triggerAuthThroughPodIdentityName} + namespace: ${dataExplorerNamespace} +spec: + podIdentity: + provider: azure` + +// K8s manifests for auth through clientId, clientSecret and tenantId +const secretYaml = +`apiVersion: v1 +kind: Secret +metadata: + name: ${secretName} + namespace: ${dataExplorerNamespace} +type: Opaque +data: + clientId: {{CLIENT_ID}} + clientSecret: {{CLIENT_SECRET}} + tenantId: {{TENANT_ID}}` + +const triggerAuthClientIdAndSecretYaml = +`apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: ${triggerAuthThroughClientAndSecretName} + namespace: ${dataExplorerNamespace} +spec: + secretTargetRef: + - parameter: clientId + name: ${secretName} + key: clientId + - parameter: clientSecret + name: ${secretName} + key: clientSecret + - parameter: tenantId + name: ${secretName} + key: tenantId`