diff --git a/CHANGELOG.md b/CHANGELOG.md index 877da376b4f..37515d0f275 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **AWS Scalers**: Add setting AWS endpoint url. ([#3337](https://github.com/kedacore/keda/issues/3337)) - **Azure Service Bus Scaler**: Add support for Shared Access Signature (SAS) tokens for authentication. ([#2920](https://github.com/kedacore/keda/issues/2920)) - **Azure Service Bus Scaler:** Support regex usage in queueName / subscriptionName parameters. ([#1624](https://github.com/kedacore/keda/issues/1624)) +- **ElasticSearch Scaler**: Support for ElasticSearch Service on Elastic Cloud ([#3785]https://github.com/kedacore/keda/issues/3785) - **Selenium Grid Scaler:** Allow setting url trigger parameter from TriggerAuthentication/ClusterTriggerAuthentication ([#3752](https://github.com/kedacore/keda/pull/3752)) ### Improvements diff --git a/pkg/scalers/elasticsearch_scaler.go b/pkg/scalers/elasticsearch_scaler.go index 3b2803e4343..88832c558e5 100644 --- a/pkg/scalers/elasticsearch_scaler.go +++ b/pkg/scalers/elasticsearch_scaler.go @@ -33,6 +33,8 @@ type elasticsearchMetadata struct { unsafeSsl bool username string password string + cloudID string + apiKey string indexes []string searchTemplateName string parameters []string @@ -70,25 +72,36 @@ func NewElasticsearchScaler(config *ScalerConfig) (Scaler, error) { const defaultUnsafeSsl = false -func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, error) { - meta := elasticsearchMetadata{} +func hasCloudConfig(meta *elasticsearchMetadata) bool { + if meta.cloudID != "" { + return true + } + if meta.apiKey != "" { + return true + } + return false +} - var err error - addresses, err := GetFromAuthOrMeta(config, "addresses") - if err != nil { - return nil, err +func hasEndpointsConfig(meta *elasticsearchMetadata) bool { + if len(meta.addresses) > 0 { + return true } - meta.addresses = splitAndTrimBySep(addresses, ",") + if meta.username != "" { + return true + } + if meta.password != "" { + return true + } + return false +} - if val, ok := config.TriggerMetadata["unsafeSsl"]; ok { - meta.unsafeSsl, err = strconv.ParseBool(val) - if err != nil { - return nil, fmt.Errorf("error parsing unsafeSsl: %s", err) - } - } else { - meta.unsafeSsl = defaultUnsafeSsl +func extractEndpointsConfig(config *ScalerConfig, meta *elasticsearchMetadata) error { + addresses, err := GetFromAuthOrMeta(config, "addresses") + if err != nil { + return err } + meta.addresses = splitAndTrimBySep(addresses, ",") if val, ok := config.AuthParams["username"]; ok { meta.username = val } else if val, ok := config.TriggerMetadata["username"]; ok { @@ -101,6 +114,60 @@ func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, e meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]] } + return nil +} + +func extractCloudConfig(config *ScalerConfig, meta *elasticsearchMetadata) error { + cloudID, err := GetFromAuthOrMeta(config, "cloudID") + if err != nil { + return err + } + meta.cloudID = cloudID + + apiKey, err := GetFromAuthOrMeta(config, "apiKey") + if err != nil { + return err + } + meta.apiKey = apiKey + return nil +} + +func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, error) { + meta := elasticsearchMetadata{} + + var err error + addresses, err := GetFromAuthOrMeta(config, "addresses") + cloudID, errCloudConfig := GetFromAuthOrMeta(config, "cloudID") + if err != nil && errCloudConfig != nil { + return nil, fmt.Errorf("must provide either endpoint addresses or cloud config") + } + + if err == nil && addresses != "" { + err = extractEndpointsConfig(config, &meta) + if err != nil { + return nil, err + } + } + if errCloudConfig == nil && cloudID != "" { + err = extractCloudConfig(config, &meta) + if err != nil { + return nil, err + } + } + + if hasEndpointsConfig(&meta) && hasCloudConfig(&meta) { + return nil, fmt.Errorf("can't provide endpoint addresses and cloud config at the same time") + } + + if val, ok := config.TriggerMetadata["unsafeSsl"]; ok { + meta.unsafeSsl, err = strconv.ParseBool(val) + if err != nil { + return nil, fmt.Errorf("error parsing unsafeSsl: %s", err) + } + } else { + meta.unsafeSsl = defaultUnsafeSsl + } + index, err := GetFromAuthOrMeta(config, "index") if err != nil { return nil, err @@ -144,12 +211,23 @@ func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, e // newElasticsearchClient creates elasticsearch db connection func newElasticsearchClient(meta *elasticsearchMetadata, logger logr.Logger) (*elasticsearch.Client, error) { - config := elasticsearch.Config{Addresses: meta.addresses} - if meta.username != "" { - config.Username = meta.username - } - if meta.password != "" { - config.Password = meta.password + var config elasticsearch.Config + + if hasCloudConfig(meta) { + config = elasticsearch.Config{ + CloudID: meta.cloudID, + APIKey: meta.apiKey, + } + } else { + config = elasticsearch.Config{ + Addresses: meta.addresses, + } + if meta.username != "" { + config.Username = meta.username + } + if meta.password != "" { + config.Password = meta.password + } } transport := http.DefaultTransport.(*http.Transport) diff --git a/pkg/scalers/elasticsearch_scaler_test.go b/pkg/scalers/elasticsearch_scaler_test.go index 477dac688c8..7ec1f1a4149 100644 --- a/pkg/scalers/elasticsearch_scaler_test.go +++ b/pkg/scalers/elasticsearch_scaler_test.go @@ -33,10 +33,22 @@ type elasticsearchMetricIdentifier struct { var testCases = []parseElasticsearchMetadataTestData{ { - name: "no addresses given", + name: "must provide either endpoint addresses or cloud config", metadata: map[string]string{}, authParams: map[string]string{}, - expectedError: errors.New("no addresses given"), + expectedError: errors.New("must provide either endpoint addresses or cloud config"), + }, + { + name: "no apiKey given", + metadata: map[string]string{"cloudID": "my-cluster:xxxxxxxxxxx"}, + authParams: map[string]string{}, + expectedError: errors.New("no apiKey given"), + }, + { + name: "can't provide endpoint addresses and cloud config at the same time", + metadata: map[string]string{"addresses": "http://localhost:9200", "cloudID": "my-cluster:xxxxxxxxxxx"}, + authParams: map[string]string{"username": "admin", "apiKey": "xxxxxxxxx"}, + expectedError: errors.New("can't provide endpoint addresses and cloud config at the same time"), }, { name: "no index given", @@ -447,6 +459,10 @@ func TestElasticsearchGetMetricSpecForScaling(t *testing.T) { AuthParams: testData.metadataTestData.authParams, ScalerIndex: testData.scalerIndex, }) + if testData.metadataTestData.expectedError != nil { + assert.Equal(t, err, testData.metadataTestData.expectedError) + continue + } if err != nil { t.Fatal("Could not parse metadata:", err) }