diff --git a/controllers/keda/hpa.go b/controllers/keda/hpa.go index c0873fe8efc..56d4a3e8ee9 100644 --- a/controllers/keda/hpa.go +++ b/controllers/keda/hpa.go @@ -252,6 +252,7 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context, // if ComplexScalingLogic struct is not nil, expect Formula or ExternalCalculation // to be non-empty. If target is > 0.0 create a compositeScaler structure + // TODO: rename structures to unified name if !reflect.DeepEqual(scaledObject.Spec.Advanced.ComplexScalingLogic, kedav1alpha1.ComplexScalingLogic{}) { validNumTarget, validMetricType, err := validateCompositeScalingLogic(scaledObject, scaledObjectMetricSpecs) if err != nil { @@ -369,21 +370,13 @@ func validateCompositeScalingLogic(so *kedav1alpha1.ScaledObject, specs []autosc } } - // if both are empty OR both are given its an error - // if (csl.Formula == "" && len(csl.ComplexScalingLogic) == 0) || - // (csl.Formula != "" && len(csl.ComplexScalingLogic) > 0) { - // err := fmt.Errorf("error exactly one of Formula or ExternalCalculator can be given") - // return -1, autoscalingv2.MetricTargetType(""), err - // } - // if target is given, complex custom scaler for metric collection will be // passed to HPA config -> all types need to be the same if csl.Target != "" { // make sure all scalers have the same metricTargetType - for i, metric := range specs { - if i == 0 { - metricType = metric.External.Target.Type - } else if metric.External.Target.Type != metricType { + metricType = specs[0].External.Target.Type + for _, metric := range specs { + if metric.External.Target.Type != metricType { err := fmt.Errorf("error metric target type not the same for composite scaler: %s & %s", metricType, metric.External.Target.Type) return -1, metricType, err } diff --git a/pkg/externalscaling/client.go b/pkg/externalscaling/client.go index 50c18bd5e22..bbf7d400865 100644 --- a/pkg/externalscaling/client.go +++ b/pkg/externalscaling/client.go @@ -56,18 +56,16 @@ func (c *GrpcClient) Calculate(ctx context.Context, list *cl.MetricsList, logger } // WaitForConnectionReady waits for gRPC connection to be ready -// returns true if the connection was successful, false if we hit a timeut from context +// returns true if the connection was successful, false if we hit a timeout from context +// TODO: add timeout instead into time.Sleep() - removed for testing func (c *GrpcClient) WaitForConnectionReady(ctx context.Context, url string, timeout time.Duration, logger logr.Logger) bool { currentState := c.connection.GetState() if currentState != connectivity.Ready { logger.Info(fmt.Sprintf("Waiting for establishing a gRPC connection to server for external calculator at %s", url)) - timer := time.After(timeout) for { select { case <-ctx.Done(): return false - case <-timer: - return false default: c.connection.Connect() time.Sleep(500 * time.Millisecond) diff --git a/pkg/scaling/cache/scalers_cache.go b/pkg/scaling/cache/scalers_cache.go index de14500b9cf..077aa770abb 100644 --- a/pkg/scaling/cache/scalers_cache.go +++ b/pkg/scaling/cache/scalers_cache.go @@ -30,16 +30,24 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/eventreason" + externalscaling "github.com/kedacore/keda/v2/pkg/externalscaling" "github.com/kedacore/keda/v2/pkg/scalers" ) var log = logf.Log.WithName("scalers_cache") type ScalersCache struct { - ScaledObject *kedav1alpha1.ScaledObject - Scalers []ScalerBuilder - ScalableObjectGeneration int64 - Recorder record.EventRecorder + ScaledObject *kedav1alpha1.ScaledObject + Scalers []ScalerBuilder + ScalableObjectGeneration int64 + Recorder record.EventRecorder + ExternalCalculationGrpcClients []ExternalCalculationClient +} + +type ExternalCalculationClient struct { + Name string + Client *externalscaling.GrpcClient + Connected bool } type ScalerBuilder struct { diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index a17e4ef8d4d..1fa0c37a1f7 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -296,6 +296,8 @@ func (h *scaleHandler) getScalersCacheForScaledObject(ctx context.Context, scale // performGetScalersCache returns cache for input scalableObject, it is common code used by GetScalersCache() and getScalersCacheForScaledObject() methods func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, scalableObject interface{}, scalableObjectGeneration *int64, scalableObjectKind, scalableObjectNamespace, scalableObjectName string) (*cache.ScalersCache, error) { + log.Info("<< PERFORM GET SCALERS CACHE BEGIN") + h.scalerCachesLock.RLock() if cache, ok := h.scalerCaches[key]; ok { // generation was specified -> let's include it in the check as well @@ -367,6 +369,53 @@ func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, s return nil, err } + log.Info("<< CHECK OBJECT", "scalableObjectKind", scalableObjectKind, "so", scalableObject) + log.Info("<< CHECK OBJECT2", "ns", scalableObjectNamespace, "name", scalableObjectName) + externalCalculationClients := []cache.ExternalCalculationClient{} + + // if scalableObject is scaledObject, check for External Calculators and establish + // their connections to gRPC servers + // new: scaledObject is sometimes NOT updated due to unresolved issue (more info https://github.com/kedacore/keda/issues/4389) + switch val := scalableObject.(type) { + case *kedav1alpha1.ScaledObject: + + // if scaledObject has externalCalculators, try to establish a connection to + // its gRPC server and save the client instances + // TODO: check if connection already established first? + log.Info("<< EXT CALCS", "ComplexLogic", val.Spec.Advanced.ComplexScalingLogic) + for _, ec := range val.Spec.Advanced.ComplexScalingLogic.ExternalCalculations { + timeout, err := strconv.ParseInt(ec.Timeout, 10, 64) + if err != nil { + // expect timeout in time format like 1m10s + parsedTime, err := time.ParseDuration(ec.Timeout) + if err != nil { + log.Error(err, "error while converting type of timeout for external calculator") + break + } + timeout = int64(parsedTime.Seconds()) + } + ecClient, err := externalscaling.NewGrpcClient(ec.URL, log) + + var connected bool + if err != nil { + log.Error(err, fmt.Sprintf("error creating new grpc client for external calculator at %s", ec.URL)) + } else { + if !ecClient.WaitForConnectionReady(ctx, ec.URL, time.Duration(timeout), log) { + connected = false + err = fmt.Errorf("client failed to connect to server") + log.Error(err, fmt.Sprintf("error in creating gRPC connection for external calculator '%s' via %s", ec.Name, ec.URL)) + } else { + connected = true + log.Info(fmt.Sprintf("successfully connected to gRPC server external calculator '%s'", ec.Name)) + } + } + ecClientStruct := cache.ExternalCalculationClient{Name: ec.Name, Client: ecClient, Connected: connected} + externalCalculationClients = append(externalCalculationClients, ecClientStruct) + } + log.Info("<< END SO", "so passed", val) + default: + } + newCache := &cache.ScalersCache{ Scalers: scalers, ScalableObjectGeneration: withTriggers.Generation, @@ -375,10 +424,12 @@ func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, s switch obj := scalableObject.(type) { case *kedav1alpha1.ScaledObject: newCache.ScaledObject = obj + newCache.ExternalCalculationGrpcClients = externalCalculationClients default: } h.scalerCaches[key] = newCache + log.Info("<< FINISH PERFORM") return h.scalerCaches[key], nil } @@ -416,7 +467,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN var matchingMetrics []external_metrics.ExternalMetricValue - cache, err := h.getScalersCacheForScaledObject(ctx, scaledObjectName, scaledObjectNamespace) + cacheObj, err := h.getScalersCacheForScaledObject(ctx, scaledObjectName, scaledObjectNamespace) prommetrics.RecordScaledObjectError(scaledObjectNamespace, scaledObjectName, err) if err != nil { @@ -424,8 +475,8 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN } var scaledObject *kedav1alpha1.ScaledObject - if cache.ScaledObject != nil { - scaledObject = cache.ScaledObject + if cacheObj.ScaledObject != nil { + scaledObject = cacheObj.ScaledObject } else { err := fmt.Errorf("scaledObject not found in the cache") logger.Error(err, "scaledObject not found in the cache") @@ -435,22 +486,27 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN isScalerError := false scaledObjectIdentifier := scaledObject.GenerateIdentifier() - metricsArray := getTrueMetricArray(metricsName, scaledObject) + // returns all relevant metrics for current scaler (standard is one metric, + // composite scaler gets all external metrics for further computation) + metricsArray, _, err := h.getTrueMetricArray(ctx, metricsName, scaledObject) + if err != nil { + logger.Error(err, "error getting true metrics array, probably because of invalid cache") + } metricTriggerPairList := make(map[string]string) // let's check metrics for all scalers in a ScaledObject - scalers, scalerConfigs := cache.GetScalers() + scalers, scalerConfigs := cacheObj.GetScalers() for scalerIndex := 0; scalerIndex < len(scalers); scalerIndex++ { scalerName := strings.Replace(fmt.Sprintf("%T", scalers[scalerIndex]), "*scalers.", "", 1) if scalerConfigs[scalerIndex].TriggerName != "" { scalerName = scalerConfigs[scalerIndex].TriggerName } - metricSpecs, err := cache.GetMetricSpecForScalingForScaler(ctx, scalerIndex) + metricSpecs, err := cacheObj.GetMetricSpecForScalingForScaler(ctx, scalerIndex) if err != nil { isScalerError = true logger.Error(err, "error getting metric spec for the scaler", "scaler", scalerName) - cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + cacheObj.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) } logger.V(0).Info(">>2: MATCHED-METRICS", "metricsArr", metricsArray, "metricsName", metricsName) @@ -495,7 +551,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN if !metricsFoundInCache { var latency int64 - metrics, _, latency, err = cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName) + metrics, _, latency, err = cacheObj.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName) if latency != -1 { prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency)) } @@ -538,65 +594,43 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN // convert k8s list to grpc generated list structure grpcMetricList := externalscaling.ConvertToGeneratedStruct(matchingMetrics, logger) - // grpcMetricList := &externalscalingAPI.MetricsList{} - // for _, val := range matchingMetrics { - // metric := externalscalingAPI.Metric{Name: val.MetricName, Value: float32(val.Value.Value())} - // grpcMetricList.MetricValues = append(grpcMetricList.MetricValues, &metric) - // } // Apply external calculations - call gRPC server on each url and return // modified metric list in order for i, ec := range scaledObject.Spec.Advanced.ComplexScalingLogic.ExternalCalculations { - timeout, err := strconv.ParseInt(ec.Timeout, 10, 64) - if err != nil { - // expect timeout in time format like 1m10s - parsedTime, err := time.ParseDuration(ec.Timeout) - if err != nil { - logger.Error(err, "error while converting type of timeout for external calculation") + // get client instances from cache + ecCacheClient := cache.ExternalCalculationClient{} + var connected bool + for _, ecClient := range cacheObj.ExternalCalculationGrpcClients { + if ecClient.Name == ec.Name { + connected = ecClient.Connected + ecCacheClient = ecClient break } - timeout = int64(parsedTime.Seconds()) } - - esGrpcClient, err := externalscaling.NewGrpcClient(ec.URL, logger) - if err != nil { - logger.Error(err, "error creating new grpc client for external calculator") + if !connected { + err = fmt.Errorf("trying to call Calculate for %s External Calculator when not connected", ec.Name) + logger.Error(err, "grpc client is not connected to external calculator server") } else { - if !esGrpcClient.WaitForConnectionReady(ctx, ec.URL, time.Duration(timeout), logger) { - err = fmt.Errorf("client didnt connect to server successfully") - logger.Error(err, fmt.Sprintf("error in grpc connection for external calculator %s", ec.URL)) - } else { - logger.Info(fmt.Sprintf("connected to gRPC server for external calculation %s", ec.Name)) - } - - grpcMetricList, err = esGrpcClient.Calculate(ctx, grpcMetricList, logger) + grpcMetricList, err = ecCacheClient.Client.Calculate(ctx, grpcMetricList, logger) if err != nil { - logger.Error(err, "error calculating in grpc server at %s for external calculation", ec.URL) + logger.Error(err, fmt.Sprintf("error calculating in grpc server at %s for external calculation", ec.URL)) } if grpcMetricList == nil { err = fmt.Errorf("grpc method Calculate returned nil metric list for external calculator") logger.Error(err, "error in external calculator after Calculate") } + grpcMetricList, errFallback := externalscaling.Fallback(err, grpcMetricList, ec, scaledObject.Spec.Advanced.ComplexScalingLogic.Target, logger) + if errFallback != nil { + logger.Error(errFallback, "subsequent error occurred when trying to apply fallback metrics for external calculation") + break + } + logger.V(0).Info(fmt.Sprintf(">>3.5:%d CALCULATE END", i), "metrics", grpcMetricList) } - grpcMetricList, errFallback := externalscaling.Fallback(err, grpcMetricList, ec, scaledObject.Spec.Advanced.ComplexScalingLogic.Target, logger) - if errFallback != nil { - logger.Error(errFallback, "subsequent error occurred when trying to apply fallback metrics for external calculation") - break - } - logger.V(0).Info(fmt.Sprintf(">>3.5:%d CALCULATE END", i), "metrics", grpcMetricList) } // Convert from generated structure to k8s structure matchingMetrics = externalscaling.ConvertFromGeneratedStruct(grpcMetricList) - // outK8sList := []external_metrics.ExternalMetricValue{} - // for _, inValue := range grpcMetricList.MetricValues { - // outValue := external_metrics.ExternalMetricValue{} - // outValue.MetricName = inValue.Name - // outValue.Timestamp = v1.Now() - // outValue.Value.SetMilli(int64(inValue.Value * 1000)) - // outK8sList = append(outK8sList, outValue) - // } - // matchingMetrics = outK8sList // apply formula matchingMetrics, err = applyComplexLogicFormula(scaledObject.Spec.Advanced.ComplexScalingLogic, matchingMetrics, metricTriggerPairList, logger) @@ -722,12 +756,27 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k // what metrics should be used. In case of composite scaler, all external // metrics will be used (returns all external metrics), otherwise it returns the // same metric given -func getTrueMetricArray(metricName string, so *kedav1alpha1.ScaledObject) []string { +func (h *scaleHandler) getTrueMetricArray(ctx context.Context, metricName string, so *kedav1alpha1.ScaledObject) ([]string, bool, error) { // if composite scaler is given return all external metrics if so.Spec.Advanced.ComplexScalingLogic.Target != "" { - return so.Status.ExternalMetricNames + if len(so.Status.ExternalMetricNames) == 0 { + scaledObject := &kedav1alpha1.ScaledObject{} + err := h.client.Get(ctx, types.NamespacedName{Name: so.Name, Namespace: so.Namespace}, scaledObject) + if err != nil { + log.Error(err, "failed to get ScaledObject", "name", so.Name, "namespace", so.Namespace) + return nil, false, err + } + if len(scaledObject.Status.ExternalMetricNames) == 0 { + err := fmt.Errorf("failed to get ScaledObject.Status.ExternalMetricNames, probably invalid ScaledObject cache") + log.Error(err, "failed to get ScaledObject.Status.ExternalMetricNames, probably invalid ScaledObject cache", "scaledObject.Name", scaledObject.Name, "scaledObject.Namespace", scaledObject.Namespace) + return nil, false, err + } + + so = scaledObject + } + return so.Status.ExternalMetricNames, true, nil } - return []string{metricName} + return []string{metricName}, false, nil } // help function to determine whether or not metricName is the correct one. diff --git a/tests/internals/external_scaling/external_scaling_test.go b/tests/internals/external_scaling/external_scaling_test.go index 605310ad346..746aef2127b 100644 --- a/tests/internals/external_scaling/external_scaling_test.go +++ b/tests/internals/external_scaling/external_scaling_test.go @@ -26,6 +26,13 @@ const ( // Load environment variables from .env file var _ = godotenv.Load("../../.env") +const ( + serverAvgName = "server-avg" + serverAddName = "server-add" + targetAvgPort = 50051 + targetAddPort = 50052 +) + var ( namespace = fmt.Sprintf("%s-ns", testName) deploymentName = fmt.Sprintf("%s-deployment", testName) @@ -36,14 +43,10 @@ var ( secretName = fmt.Sprintf("%s-secret", testName) metricsServerEndpoint = fmt.Sprintf("http://%s.%s.svc.cluster.local:8080/api/value", serviceName, namespace) - serverAvgName = "server-avg" - serverAddName = "server-add" serviceExternalAvgName = fmt.Sprintf("%s-%s-service", testName, serverAvgName) serviceExternalAddName = fmt.Sprintf("%s-%s-service", testName, serverAddName) podExternalAvgName = fmt.Sprintf("%s-pod", serverAvgName) podExternalAddname = fmt.Sprintf("%s-pod", serverAddName) - targetAvgPort = 50051 - targetAddPort = 50052 ) type templateData struct { @@ -165,13 +168,13 @@ spec: advanced: complexScalingLogic: target: '2' - externalCalculator: - - name: first_avg - url: {{.ExternalAvgIP}}:{{.ExternalAvgPort}} - timeout: '20' - - name: second_add - url: {{.ExternalAddIP}}:{{.ExternalAddPort}} - timeout: '20' + externalCalculators: + - name: first_avg + url: {{.ExternalAvgIP}}:{{.ExternalAvgPort}} + timeout: '10s' + - name: second_add + url: {{.ExternalAddIP}}:{{.ExternalAddPort}} + timeout: '10s' pollingInterval: 5 cooldownPeriod: 5 minReplicaCount: 0 @@ -205,6 +208,10 @@ spec: scaleTargetRef: name: {{.DeploymentName}} advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 5 complexScalingLogic: target: '2' formula: metrics_api + kw_trig @@ -229,6 +236,53 @@ spec: value: '1' ` + soBothTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObject}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 5 + complexScalingLogic: + target: '2' + externalCalculators: + - name: first_avg + url: {{.ExternalAvgIP}}:{{.ExternalAvgPort}} + timeout: '10s' + - name: second_add + url: {{.ExternalAddIP}}:{{.ExternalAddPort}} + timeout: '10s' + formula: second_add + 10 + pollingInterval: 5 + cooldownPeriod: 5 + minReplicaCount: 0 + maxReplicaCount: 10 + triggers: + - type: metrics-api + name: metrics_api + metadata: + targetValue: "2" + url: "{{.MetricsServerEndpoint}}" + valueLocation: 'value' + method: "query" + authenticationRef: + name: {{.TriggerAuthName}} + - type: kubernetes-workload + name: kw_trig + metadata: + podSelector: pod=workload-test + value: '1' +` + workloadDeploymentTemplate = ` apiVersion: apps/v1 kind: Deployment @@ -269,6 +323,7 @@ spec: command: ["curl", "-X", "POST", "{{.MetricsServerEndpoint}}/{{.MetricValue}}"] restartPolicy: OnFailure ` + // image contains python grpc server that creates average from given metrics podExternalAvgTemplate = ` apiVersion: v1 @@ -350,20 +405,20 @@ func TestExternalScaling(t *testing.T) { CreateKubernetesResources(t, kc, namespace, data, templates) // check grpc server pods are running - assert.True(t, waitForPodsReadyInNamespace(t, kc, namespace, []string{serverAddName, serverAvgName}, 6, 10), - fmt.Sprintf("pods '%v' should be ready after 1 minute", []string{serverAddName, serverAvgName})) + assert.True(t, waitForPodsReadyInNamespace(t, kc, namespace, []string{serverAddName, serverAvgName}, 12, 10), + fmt.Sprintf("pods '%v' should be ready after 2 minutes", []string{serverAddName, serverAvgName})) - ADDIP, err := ExecuteCommand(fmt.Sprintf("kubectl get service %s -o custom-columns=IP:spec.clusterIP -n %s", serviceExternalAddName, namespace)) + ADDIP, err := ExecuteCommand(fmt.Sprintf("kubectl get endpoints %s -o custom-columns=IP:.subsets[0].addresses[0].ip -n %s", serviceExternalAddName, namespace)) assert.NoErrorf(t, err, "cannot get service ADD - %s", err) - AVGIP, err := ExecuteCommand(fmt.Sprintf("kubectl get service %s -o custom-columns=IP:spec.clusterIP -n %s", serviceExternalAvgName, namespace)) + AVGIP, err := ExecuteCommand(fmt.Sprintf("kubectl get endpoints %s -o custom-columns=IP:.subsets[0].addresses[0].ip -n %s", serviceExternalAvgName, namespace)) assert.NoErrorf(t, err, "cannot get service AVG - %s", err) data.ExternalAvgIP = strings.Split(string(AVGIP), "\n")[1] data.ExternalAddIP = strings.Split(string(ADDIP), "\n")[1] testTwoExternalCalculators(t, kc, data) testComplexFormula(t, kc, data) - // testFormulaAndEC() + testFormulaAndEC(t, kc, data) DeleteKubernetesResources(t, namespace, data, templates) } @@ -371,7 +426,6 @@ func TestExternalScaling(t *testing.T) { func testTwoExternalCalculators(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("-- testTwoExternalCalculators ---") KubectlApplyWithTemplate(t, data, "soExternalCalculatorTwoTemplate", soExternalCalculatorTwoTemplate) - // metrics calculation: avg-> 3 + 3 = 6 / 2 = 3; add-> 3 + 2 = 5; target=2 ==> 3 data.MetricValue = 3 KubectlApplyWithTemplate(t, data, "updateMetricsTemplate", updateMetricsTemplate) @@ -384,34 +438,51 @@ func testTwoExternalCalculators(t *testing.T, kc *kubernetes.Clientset, data tem assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, namespace, 3, 12, 10), "replica count should be %d after 2 minutes", 3) - // metrics calculation: avg-> 5 + 5 = 10 / 2 = 5; add-> 5 + 2 = 7; target=2 ==> 4 - data.MetricValue = 5 + // // ------------------------------------------------------------------ // // + + // metrics calculation: avg-> 11 + 5 = 16 / 2 = 8; add-> 8 + 2 = 10; target=2 ==> 5 + data.MetricValue = 11 KubectlApplyWithTemplate(t, data, "updateMetricsTemplate", updateMetricsTemplate) _, err = ExecuteCommand(fmt.Sprintf("kubectl scale deployment/depl-workload-base --replicas=5 -n %s", namespace)) assert.NoErrorf(t, err, "cannot scale workload deployment - %s", err) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, "depl-workload-base", namespace, 5, 6, 10), "replica count should be %d after 1 minute", 5) - - assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, namespace, 4, 12, 10), - "replica count should be %d after 2 minutes", 4) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, namespace, 5, 12, 10), + "replica count should be %d after 2 minutes", 5) } func testComplexFormula(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testComplexFormula ---") - // formula simply adds 2 metrics together - data.MetricValue = 5 + // formula simply adds 2 metrics together (3+2=5; target = 2 -> 5/2 replicas should be 3) + data.MetricValue = 3 KubectlApplyWithTemplate(t, data, "updateMetricsTemplate", updateMetricsTemplate) KubectlApplyWithTemplate(t, data, "soFormulaTemplate", soFormulaTemplate) + _, err := ExecuteCommand(fmt.Sprintf("kubectl scale deployment/depl-workload-base --replicas=2 -n %s", namespace)) + assert.NoErrorf(t, err, "cannot scale workload deployment - %s", err) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, "depl-workload-base", namespace, 2, 6, 10), + "replica count should be %d after 1 minute", 2) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, namespace, 3, 12, 10), + "replica count should be %d after 2 minutes", 3) +} + +func testFormulaAndEC(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testFormulaAndEC ---") + KubectlApplyWithTemplate(t, data, "soBothTemplate", soBothTemplate) + + data.MetricValue = 5 + KubectlApplyWithTemplate(t, data, "updateMetricsTemplate", updateMetricsTemplate) + _, err := ExecuteCommand(fmt.Sprintf("kubectl scale deployment/depl-workload-base --replicas=5 -n %s", namespace)) assert.NoErrorf(t, err, "cannot scale workload deployment - %s", err) + // first -> 5 + 5 = 10 / 2 = 5; add 5 + 2 = 7; formula 7 + 10 = 17 / 2 -> 9 assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, "depl-workload-base", namespace, 5, 6, 10), "replica count should be %d after 1 minute", 5) - - assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, namespace, 5, 12, 10), - "replica count should be %d after 2 minutes", 5) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, namespace, 9, 12, 10), + "replica count should be %d after 2 minutes", 9) } func getTemplateData() (templateData, []Template) {