diff --git a/CHANGELOG.md b/CHANGELOG.md index 419ba137ec6..e7f6d962eb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **CPU/Memory scaler**: Add support for scale to zero if there are multiple triggers([#4269](https://github.com/kedacore/keda/issues/4269)) - TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX)) +- **General:** Introduce new Solr Scaler ([#4234](https://github.com/kedacore/keda/issues/4234)) ### Improvements diff --git a/pkg/scalers/solr_scaler.go b/pkg/scalers/solr_scaler.go new file mode 100644 index 00000000000..486eb0c4cd7 --- /dev/null +++ b/pkg/scalers/solr_scaler.go @@ -0,0 +1,186 @@ +package scalers + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + + "github.com/go-logr/logr" + v2 "k8s.io/api/autoscaling/v2" + "k8s.io/metrics/pkg/apis/external_metrics" + + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +type solrScaler struct { + metricType v2.MetricTargetType + metadata *solrMetadata + httpClient *http.Client + logger logr.Logger +} + +type solrMetadata struct { + host string + collection string + targetQueryValue float64 + activationTargetQueryValue float64 + query string + scalerIndex int + + // Authentication + username string + password string +} + +type solrResponse struct { + Response struct { + NumFound int `json:"numFound"` + } `json:"response"` +} + +// NewSolrScaler creates a new solr Scaler +func NewSolrScaler(config *ScalerConfig) (Scaler, error) { + metricType, err := GetMetricTargetType(config) + if err != nil { + return nil, fmt.Errorf("error getting scaler metric type: %w", err) + } + + meta, err := parseSolrMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing Solr metadata: %w", err) + } + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false) + + logger := InitializeLogger(config, "solr_scaler") + + return &solrScaler{ + metricType: metricType, + metadata: meta, + httpClient: httpClient, + logger: logger, + }, nil +} + +// parseSolrMetadata parses the metadata and returns a solrMetadata or an error if the ScalerConfig is invalid. +func parseSolrMetadata(config *ScalerConfig) (*solrMetadata, error) { + meta := solrMetadata{} + + if val, ok := config.TriggerMetadata["host"]; ok { + meta.host = val + } else { + return nil, fmt.Errorf("no host given") + } + + if val, ok := config.TriggerMetadata["collection"]; ok { + meta.collection = val + } else { + return nil, fmt.Errorf("no collection given") + } + + if val, ok := config.TriggerMetadata["query"]; ok { + meta.query = val + } else { + meta.query = "*:*" + } + + if val, ok := config.TriggerMetadata["targetQueryValue"]; ok { + targetQueryValue, err := strconv.ParseFloat(val, 64) + if err != nil { + return nil, fmt.Errorf("targetQueryValue parsing error %w", err) + } + meta.targetQueryValue = targetQueryValue + } else { + return nil, fmt.Errorf("no targetQueryValue given") + } + + meta.activationTargetQueryValue = 0 + if val, ok := config.TriggerMetadata["activationTargetQueryValue"]; ok { + activationTargetQueryValue, err := strconv.ParseFloat(val, 64) + if err != nil { + return nil, fmt.Errorf("invalid activationTargetQueryValue - must be an integer") + } + meta.activationTargetQueryValue = activationTargetQueryValue + } + // Parse Authentication + if val, ok := config.AuthParams["username"]; ok { + meta.username = val + } else { + return nil, fmt.Errorf("no username given") + } + + if val, ok := config.AuthParams["password"]; ok { + meta.password = val + } else { + return nil, fmt.Errorf("no password given") + } + + meta.scalerIndex = config.ScalerIndex + return &meta, nil +} + +func (s *solrScaler) getItemCount(ctx context.Context) (float64, error) { + var SolrResponse1 *solrResponse + var itemCount float64 + + url := fmt.Sprintf("%s/solr/%s/select?q=%s&wt=json", + s.metadata.host, s.metadata.collection, s.metadata.query) + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return -1, err + } + // Add BasicAuth + req.SetBasicAuth(s.metadata.username, s.metadata.password) + + resp, err := s.httpClient.Do(req) + if err != nil { + return -1, fmt.Errorf("error sending request to solr, %w", err) + } + + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return -1, err + } + + err = json.Unmarshal(body, &SolrResponse1) + if err != nil { + return -1, fmt.Errorf("%w, make sure you enter username, password and collection values correctly in the yaml file", err) + } + itemCount = float64(SolrResponse1.Response.NumFound) + return itemCount, nil +} + +// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler +func (s *solrScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { + externalMetric := &v2.ExternalMetricSource{ + Metric: v2.MetricIdentifier{ + Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString("solr")), + }, + Target: GetMetricTargetMili(s.metricType, s.metadata.targetQueryValue), + } + metricSpec := v2.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2.MetricSpec{metricSpec} +} + +// GetMetricsAndActivity query from Solr,and return to external metrics and activity +func (s *solrScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + result, err := s.getItemCount(ctx) + if err != nil { + return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("failed to inspect solr, because of %w", err) + } + + metric := GenerateMetricInMili(metricName, result) + + return append([]external_metrics.ExternalMetricValue{}, metric), result > s.metadata.activationTargetQueryValue, nil +} + +// Close closes the http client connection. +func (s *solrScaler) Close(context.Context) error { + return nil +} diff --git a/pkg/scalers/solr_scaler_test.go b/pkg/scalers/solr_scaler_test.go new file mode 100644 index 00000000000..a91f1ea4bb9 --- /dev/null +++ b/pkg/scalers/solr_scaler_test.go @@ -0,0 +1,77 @@ +package scalers + +import ( + "context" + "net/http" + "testing" +) + +type parseSolrMetadataTestData struct { + metadata map[string]string + isError bool + authParams map[string]string +} + +type solrMetricIdentifier struct { + metadataTestData *parseSolrMetadataTestData + scalerIndex int + name string +} + +var testSolrMetadata = []parseSolrMetadataTestData{ + // nothing passed + {map[string]string{}, true, map[string]string{}}, + // properly formed metadata + {map[string]string{"host": "http://192.168.49.2:30217", "collection": "my_core", "query": "*:*", "targetQueryValue": "1"}, false, map[string]string{"username": "test_username", "password": "test_password"}}, + // no query passed + {map[string]string{"host": "http://192.168.49.2:30217", "collection": "my_core", "targetQueryValue": "1"}, false, map[string]string{"username": "test_username", "password": "test_password"}}, + // no host passed + {map[string]string{"collection": "my_core", "query": "*:*", "targetQueryValue": "1"}, true, map[string]string{"username": "test_username", "password": "test_password"}}, + // no collection passed + {map[string]string{"host": "http://192.168.49.2:30217", "query": "*:*", "targetQueryValue": "1"}, true, map[string]string{"username": "test_username", "password": "test_password"}}, + // no targetQueryValue passed + {map[string]string{"host": "http://192.168.49.2:30217", "collection": "my_core", "query": "*:*"}, true, map[string]string{"username": "test_username", "password": "test_password"}}, + // no username passed + {map[string]string{"host": "http://192.168.49.2:30217", "collection": "my_core", "query": "*:*", "targetQueryValue": "1"}, true, map[string]string{"password": "test_password"}}, + // no password passed + {map[string]string{"host": "http://192.168.49.2:30217", "collection": "my_core", "query": "*:*", "targetQueryValue": "1"}, true, map[string]string{"username": "test_username"}}, +} + +var solrMetricIdentifiers = []solrMetricIdentifier{ + {&testSolrMetadata[1], 0, "s0-solr"}, + {&testSolrMetadata[2], 1, "s1-solr"}, +} + +func TestSolrParseMetadata(t *testing.T) { + testCaseNum := 1 + for _, testData := range testSolrMetadata { + _, err := parseSolrMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) + if err != nil && !testData.isError { + t.Errorf("Expected success but got error for unit test # %v", testCaseNum) + } + if testData.isError && err == nil { + t.Errorf("Expected error but got success for unit test # %v", testCaseNum) + } + testCaseNum++ + } +} + +func TestSolrGetMetricSpecForScaling(t *testing.T) { + for _, testData := range solrMetricIdentifiers { + ctx := context.Background() + meta, err := parseSolrMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ScalerIndex: testData.scalerIndex, AuthParams: testData.metadataTestData.authParams}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockSolrScaler := solrScaler{ + metadata: meta, + httpClient: http.DefaultClient, + } + + metricSpec := mockSolrScaler.GetMetricSpecForScaling(ctx) + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.name { + t.Errorf("Wrong External metric source name: %s, expected: %s", metricName, testData.name) + } + } +} diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index 4f198040558..30a0c2e9ef0 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -225,6 +225,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewSeleniumGridScaler(config) case "solace-event-queue": return scalers.NewSolaceScaler(config) + case "solr": + return scalers.NewSolrScaler(config) case "stan": return scalers.NewStanScaler(config) default: diff --git a/tests/scalers/solr/solr_test.go b/tests/scalers/solr/solr_test.go new file mode 100644 index 00000000000..ffbf6c5d44e --- /dev/null +++ b/tests/scalers/solr/solr_test.go @@ -0,0 +1,295 @@ +//go:build e2e +// +build e2e + +package solr_test + +import ( + "encoding/base64" + "errors" + "fmt" + "strings" + "testing" + "time" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" // For helper methods +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../.env") + +const ( + testName = "solr-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + secretName = fmt.Sprintf("%s-secret", testName) + solrUsername = "solr" + solrPassword = "SolrRocks" + solrCollection = "my_core" + solrPodName = "solr-0" + solrPath = "bin/solr" + + minReplicaCount = 0 + maxReplicaCount = 2 +) + +type templateData struct { + TestNamespace string + DeploymentName string + ScaledObjectName string + SecretName string + SolrUsernameBase64 string + SolrPasswordBase64 string +} + +const ( + secretTemplate = `apiVersion: v1 +kind: Secret +metadata: + name: {{.SecretName}} + namespace: {{.TestNamespace}} +data: + solr_username: {{.SolrUsernameBase64}} + solr_password: {{.SolrPasswordBase64}} +` + + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-trigger-auth-solr-secret + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: username + name: {{.SecretName}} + key: solr_username + - parameter: password + name: {{.SecretName}} + key: solr_password +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: nginx + image: nginxinc/nginx-unprivileged + ports: + - containerPort: 80 +` + + solrDeploymentTemplate = ` +apiVersion: apps/v1 +kind: StatefulSet +metadata: + labels: + app: solr-app + name: solr + namespace: {{.TestNamespace}} +spec: + serviceName: {{.DeploymentName}} + replicas: 1 + selector: + matchLabels: + app: solr-app + template: + metadata: + labels: + app: solr-app + spec: + containers: + - name: solr + image: solr:latest + ports: + - containerPort: 8983 + volumeMounts: + - name: data + mountPath: /var/solr + volumes: + - name: data + emptyDir: {} +` + + serviceTemplate = ` +apiVersion: v1 +kind: Service +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} +spec: + selector: + app: solr-app + type: ClusterIP + ports: + - protocol: TCP + port: 8983 + targetPort: 8983 +` + + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + minReplicaCount: 0 + maxReplicaCount: 2 + pollingInterval: 1 + cooldownPeriod: 1 + triggers: + - type: solr + metadata: + host: "http://{{.DeploymentName}}.{{.TestNamespace}}.svc.cluster.local:8983" + collection: "my_core" + query: "*:*" + targetQueryValue: "1" + activationTargetQueryValue: "5" + authenticationRef: + name: keda-trigger-auth-solr-secret +` +) + +func TestSolrScaler(t *testing.T) { + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + // setup solr + setupSolr(t, kc) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", minReplicaCount) + + // test scaling + testActivation(t, kc) + testScaleOut(t, kc) + testScaleIn(t, kc) + + // cleanup + DeleteKubernetesResources(t, kc, testNamespace, data, templates) +} + +func setupSolr(t *testing.T, kc *kubernetes.Clientset) { + assert.True(t, WaitForStatefulsetReplicaReadyCount(t, kc, "solr", testNamespace, 1, 60, 3), + "solr should be up") + err := checkIfSolrStatusIsReady(t, solrPodName) + assert.NoErrorf(t, err, "%s", err) + + // Create the collection + out, errOut, _ := ExecCommandOnSpecificPod(t, solrPodName, testNamespace, fmt.Sprintf("%s create_core -c %s", solrPath, solrCollection)) + t.Logf("Output: %s, Error: %s", out, errOut) + + // Enable BasicAuth + out, errOut, _ = ExecCommandOnSpecificPod(t, solrPodName, testNamespace, "echo '{\"authentication\":{\"class\":\"solr.BasicAuthPlugin\",\"credentials\":{\"solr\":\"IV0EHq1OnNrj6gvRCwvFwTrZ1+z1oBbnQdiVC3otuq0= Ndd7LKvVBAaZIF0QAVi1ekCfAJXr1GGfLtRUXhgrF8c=\"}},\"authorization\":{\"class\":\"solr.RuleBasedAuthorizationPlugin\",\"permissions\":[{\"name\":\"security-edit\",\"role\":\"admin\"}],\"user-role\":{\"solr\":\"admin\"}}}' > /var/solr/data/security.json") + t.Logf("Output: %s, Error: %s", out, errOut) + + // Restart solr to apply auth + out, errOut, _ = ExecCommandOnSpecificPod(t, solrPodName, testNamespace, fmt.Sprintf("%s restart", solrPath)) + t.Logf("Output: %s, Error: %s", out, errOut) + + err = checkIfSolrStatusIsReady(t, solrPodName) + assert.NoErrorf(t, err, "%s", err) + t.Log("--- BasicAuth plugin activated ---") + + t.Log("--- solr is ready ---") +} + +func checkIfSolrStatusIsReady(t *testing.T, name string) error { + t.Log("--- checking solr status ---") + + time.Sleep(time.Second * 10) + for i := 0; i < 60; i++ { + out, errOut, _ := ExecCommandOnSpecificPod(t, name, testNamespace, fmt.Sprintf("%s status –", solrPath)) + t.Logf("Output: %s, Error: %s", out, errOut) + if !strings.Contains(out, "running on port") { + time.Sleep(time.Second * 10) + continue + } + return nil + } + return errors.New("solr is not ready") +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + SecretName: secretName, + SolrUsernameBase64: base64.StdEncoding.EncodeToString([]byte(solrUsername)), + SolrPasswordBase64: base64.StdEncoding.EncodeToString([]byte(solrPassword)), + }, []Template{ + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "serviceTemplate", Config: serviceTemplate}, + {Name: "solrDeploymentTemplate", Config: solrDeploymentTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} + +// add 3 documents to solr -> activation should not happen (activationTargetValue = 5) +func testActivation(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing activation ---") + + // Add documents + out, errOut, _ := ExecCommandOnSpecificPod(t, solrPodName, testNamespace, fmt.Sprintf("curl -u %s:%s -X POST -H 'Content-Type: application/json' 'http://localhost:8983/solr/%s/update' --data-binary '[{\"id\": \"1\",\"title\": \"Doc 1\"},,{\"id\": \"2\",\"title\": \"Doc 2\"},{\"id\": \"3\",\"title\":\"Doc 3\"}]'", solrUsername, solrPassword, solrCollection)) + t.Logf("Output: %s, Error: %s", out, errOut) + // Update documents + out, errOut, _ = ExecCommandOnSpecificPod(t, solrPodName, testNamespace, fmt.Sprintf("curl -u %s:%s -X POST 'http://localhost:8983/solr/%s/update' --data-binary '{\"commit\":{}}' -H 'Content-type:application/json'", solrUsername, solrPassword, solrCollection)) + t.Logf("Output: %s, Error: %s", out, errOut) + + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60) +} + +// add 3 more documents to solr, which in total is 6 -> should be scaled up +func testScaleOut(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale out ---") + + // Add documents + out, errOut, _ := ExecCommandOnSpecificPod(t, solrPodName, testNamespace, fmt.Sprintf("curl -u %s:%s -X POST -H 'Content-Type: application/json' 'http://localhost:8983/solr/%s/update' --data-binary '[{ \"id\": \"10\",\"title\": \"Doc 10\"},{ \"id\": \"20\",\"title\": \"Doc 20\"},{ \"id\": \"30\",\"title\": \"Doc 30\"}]'", solrUsername, solrPassword, solrCollection)) + t.Logf("Output: %s, Error: %s", out, errOut) + // Update documents + out, errOut, _ = ExecCommandOnSpecificPod(t, solrPodName, testNamespace, fmt.Sprintf("curl -u %s:%s -X POST 'http://localhost:8983/solr/%s/update' --data-binary '{\"commit\":{}}' -H 'Content-type:application/json'", solrUsername, solrPassword, solrCollection)) + t.Logf("Output: %s, Error: %s", out, errOut) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", maxReplicaCount) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + + _, _, err := ExecCommandOnSpecificPod(t, solrPodName, testNamespace, fmt.Sprintf("bin/post -u %s:%s -c %s -d \"*:*\"", solrUsername, solrPassword, solrCollection)) + assert.NoErrorf(t, err, "cannot enqueue messages - %s", err) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), + "replica count should be %d after 3 minutes", minReplicaCount) +}