Skip to content

Commit

Permalink
e2e fallback test, timeout, close connection on close cache
Browse files Browse the repository at this point in the history
  • Loading branch information
gauron99 committed Jul 4, 2023
1 parent d13ae11 commit 81ea333
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 64 deletions.
21 changes: 18 additions & 3 deletions pkg/externalscaling/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,20 @@ func (c *GrpcClient) Calculate(ctx context.Context, list *cl.MetricsList, logger
}

// WaitForConnectionReady waits for gRPC connection to be ready
// returns true if the connection was successful, false if we hit a timeout from context
// TODO: add timeout instead into time.Sleep() - removed for testing
// returns true if the connection was successful, false if we hit a timeout or context canceled
func (c *GrpcClient) WaitForConnectionReady(ctx context.Context, url string, timeout time.Duration, logger logr.Logger) bool {
currentState := c.connection.GetState()
if currentState != connectivity.Ready {
logger.Info(fmt.Sprintf("Waiting for establishing a gRPC connection to server for external calculator at %s", url))
logger.Info(fmt.Sprintf("Waiting for %v to establish a gRPC connection to server for external calculator at %s", timeout, url))
timeoutTimer := time.After(timeout)
for {
select {
case <-ctx.Done():
return false
case <-timeoutTimer:
err := fmt.Errorf("hit '%v' timeout trying to connect externalCalculator at '%s'", timeout, url)
logger.Error(err, "error while waiting for connection for externalCalculator")
return false
default:
c.connection.Connect()
time.Sleep(500 * time.Millisecond)
Expand Down Expand Up @@ -101,3 +105,14 @@ func ConvertFromGeneratedStruct(inExternal *cl.MetricsList) []external_metrics.E
}
return outK8sList
}

// close connection
func (c *GrpcClient) CloseConnection() error {
if c.connection != nil {
err := c.connection.Close()
if err != nil {
return err
}
}
return nil
}
24 changes: 12 additions & 12 deletions pkg/fallback/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,9 @@ func isFallbackEnabled(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.Me
return true
}

func GetMetricsWithFallbackExternalCalculator(ctx context.Context, client runtimeclient.Client, metrics *cl.MetricsList, suppressedError error, metricName string, scaledObject *kedav1alpha1.ScaledObject) error {
if metrics == nil {
metrics = &cl.MetricsList{}
metrics.Reset()
}
func GetMetricsWithFallbackExternalCalculator(ctx context.Context, client runtimeclient.Client, metrics *cl.MetricsList, suppressedError error, metricName string, scaledObject *kedav1alpha1.ScaledObject) (bool, error) {
// init empty list if it is nil because of an error earlier

const determiner string = "externalcalculator"
status := scaledObject.Status.DeepCopy()

Expand All @@ -74,7 +72,7 @@ func GetMetricsWithFallbackExternalCalculator(ctx context.Context, client runtim
if healthStatus == nil {
// should never be nil
err := fmt.Errorf("internal error getting health status in GetMetricsWithFallbackExternalCalculator - wrong determiner")
return err
return false, err
}

// if there is no error
Expand All @@ -86,7 +84,7 @@ func GetMetricsWithFallbackExternalCalculator(ctx context.Context, client runtim

updateStatus(ctx, client, scaledObject, status, v2.MetricSpec{})

return nil
return false, nil
}

healthStatus.Status = kedav1alpha1.HealthStatusFailing
Expand All @@ -96,15 +94,16 @@ func GetMetricsWithFallbackExternalCalculator(ctx context.Context, client runtim

switch {
case !isFallbackEnabled(scaledObject, v2.MetricSpec{}, determiner):
return suppressedError
return false, suppressedError
case !validateFallback(scaledObject, determiner):
log.Info("Failed to validate ScaledObject ComplexScalingLogic Fallback. Please check that parameters are positive integers", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name)
return suppressedError
return false, suppressedError
case *healthStatus.NumberOfFailures > scaledObject.Spec.Advanced.ComplexScalingLogic.Fallback.FailureThreshold:
doExternalCalculationFallback(scaledObject, metrics, metricName, suppressedError)
return nil

return true, nil
default:
return suppressedError
return false, suppressedError
}
}

Expand Down Expand Up @@ -210,10 +209,11 @@ func doExternalCalculationFallback(scaledObject *kedav1alpha1.ScaledObject, metr
}
metric := cl.Metric{
Name: metricName,
Value: float32(normalisationValue * 1000 * float64(replicas)),
Value: float32(normalisationValue * float64(replicas)),
}

metrics.MetricValues = []*cl.Metric{&metric}

log.Info(fmt.Sprintf("Suppressing error, externalCalculator falling back to %d fallback.replicas", scaledObject.Spec.Advanced.ComplexScalingLogic.Fallback.Replicas), "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "suppressedError", suppressedError)
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ func (c *ScalersCache) GetPushScalers() []scalers.PushScaler {

// Close closes all scalers in the cache
func (c *ScalersCache) Close(ctx context.Context) {
for _, client := range c.ExternalCalculationGrpcClients {
err := client.Client.CloseConnection()
if err != nil {
log.Error(err, fmt.Sprintf("couldn't close grpc connection for externalCalculator '%s'", client.Name))
} else {
log.V(0).Info(fmt.Sprintf("successfully closed grpc connection for externalCalculator '%s'", client.Name))
}
}
scalers := c.Scalers
c.Scalers = nil
for _, s := range scalers {
Expand Down
48 changes: 17 additions & 31 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"time"

expr "github.com/antonmedv/expr"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -41,6 +40,7 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
externalscaling "github.com/kedacore/keda/v2/pkg/externalscaling"
externalscalingAPI "github.com/kedacore/keda/v2/pkg/externalscaling/api"
"github.com/kedacore/keda/v2/pkg/fallback"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/scalers"
Expand Down Expand Up @@ -371,13 +371,10 @@ func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, s
externalCalculationClients := []cache.ExternalCalculationClient{}

// if scalableObject is scaledObject, check for External Calculators and establish
// their connections to gRPC servers
// their connections to gRPC servers and save the client instances
// new: scaledObject is sometimes NOT updated due to unresolved issue (more info https://github.com/kedacore/keda/issues/4389)
switch val := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:

// if scaledObject has externalCalculators, try to establish a connection to
// its gRPC server and save the client instances
// TODO: check if connection already established first?
for _, ec := range val.Spec.Advanced.ComplexScalingLogic.ExternalCalculations {
timeout, err := strconv.ParseInt(ec.Timeout, 10, 64)
Expand All @@ -396,13 +393,13 @@ func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, s
if err != nil {
log.Error(err, fmt.Sprintf("error creating new grpc client for external calculator at %s", ec.URL))
} else {
if !ecClient.WaitForConnectionReady(ctx, ec.URL, time.Duration(timeout), log) {
if !ecClient.WaitForConnectionReady(ctx, ec.URL, time.Duration(timeout)*time.Second, log) {
connected = false
err = fmt.Errorf("client failed to connect to server")
log.Error(err, fmt.Sprintf("error in creating gRPC connection for external calculator '%s' via %s", ec.Name, ec.URL))
log.Error(err, fmt.Sprintf("error in creating gRPC connection for external calculator '%s' via '%s'", ec.Name, ec.URL))
} else {
connected = true
log.Info(fmt.Sprintf("successfully connected to gRPC server external calculator '%s'", ec.Name))
log.Info(fmt.Sprintf("successfully connected to gRPC server ExternalCalculator '%s' at '%s'", ec.Name, ec.URL))
}
}
ecClientStruct := cache.ExternalCalculationClient{Name: ec.Name, Client: ecClient, Connected: connected}
Expand Down Expand Up @@ -457,7 +454,6 @@ func (h *scaleHandler) ClearScalersCache(ctx context.Context, scalableObject int
// It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler.
func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricsName string) (*external_metrics.ExternalMetricValueList, error) {
logger := log.WithValues("scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName)

var matchingMetrics []external_metrics.ExternalMetricValue

cacheObj, err := h.getScalersCacheForScaledObject(ctx, scaledObjectName, scaledObjectNamespace)
Expand Down Expand Up @@ -507,6 +503,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
logger.Error(err, "error metricsArray is empty")
// TODO: add cache.Recorder?
}

for _, spec := range metricSpecs {
// skip cpu/memory resource scaler
if spec.External == nil {
Expand Down Expand Up @@ -595,40 +592,34 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
connected = ecClient.Connected
ecCacheClient = ecClient
break
}
}

// simulate false connection for first externalCalculator
if ec.Name == "first_avg" {
connected = false
} //TODO: didnt find client in cache??
}

if connected {
grpcMetricList, err = ecCacheClient.Client.Calculate(ctx, grpcMetricList, logger)
// if err != nil {
// logger.Info(fmt.Sprintf("Warning - will try fallback: %s: externalCalculator '%s' (at '%s') method Calculate returned an error (server error)", err, ec.Name, ec.URL))
// }
} else {
err = fmt.Errorf("trying to call method Calculate for '%s' externalCalculator when not connected", ec.Name)
// logger.Info(fmt.Sprintf("Warning: %s: grpc client is not connected to externalCalculator server", err))
}
if grpcMetricList == nil {
grpcMetricList = &externalscalingAPI.MetricsList{}
err = errors.Join(err, fmt.Errorf("grpc method Calculate returned nil metric list for externalCalculator"))
logger.Error(err, "metric list is nil for externalCalculator")
}

err = fallback.GetMetricsWithFallbackExternalCalculator(ctx, h.client, grpcMetricList, err, ec.Name, scaledObject)
fallbackApplied, err := fallback.GetMetricsWithFallbackExternalCalculator(ctx, h.client, grpcMetricList, err, ec.Name, scaledObject)
if err != nil {
logger.Error(err, fmt.Sprintf("error remained after trying to apply fallback metrics for externalCalculator '%s'", ec.Name))
break
}
// if fallback was applied, continue immediately
if fallbackApplied {
break
}
}

// Convert from generated structure to k8s structure
matchingMetrics = externalscaling.ConvertFromGeneratedStruct(grpcMetricList)

// apply formula
matchingMetrics, err = applyComplexLogicFormula(scaledObject.Spec.Advanced.ComplexScalingLogic, matchingMetrics, metricTriggerPairList, logger)
matchingMetrics, err = applyComplexLogicFormula(scaledObject.Spec.Advanced.ComplexScalingLogic, matchingMetrics, metricTriggerPairList)
if err != nil {
logger.Error(err, "error applying custom compositeScaler formula")
}
Expand Down Expand Up @@ -788,7 +779,7 @@ func arrayContainsElement(el string, arr []string) bool {
}

// if given right conditions, try to apply the given custom formula in SO
func applyComplexLogicFormula(csl kedav1alpha1.ComplexScalingLogic, metrics []external_metrics.ExternalMetricValue, pairList map[string]string, logger logr.Logger) ([]external_metrics.ExternalMetricValue, error) {
func applyComplexLogicFormula(csl kedav1alpha1.ComplexScalingLogic, metrics []external_metrics.ExternalMetricValue, pairList map[string]string) ([]external_metrics.ExternalMetricValue, error) {
if csl.Formula != "" {
// add last external calculation name as a possible trigger (user can
// manipulate with metrics in ExternalCalculation service and it is expected
Expand All @@ -799,16 +790,14 @@ func applyComplexLogicFormula(csl kedav1alpha1.ComplexScalingLogic, metrics []ex
// expect last element of external calculation array via its name
pairList[lastElem] = lastElem
}
logger.V(0).Info(">>3.2 FORMULA", "pairlist", pairList)
logger.V(0).Info(">>3.2 FORMULA", "formula", csl.Formula, "target", csl.Target)
metrics, err := calculateComplexLogicFormula(metrics, csl.Formula, pairList, logger)
metrics, err := calculateComplexLogicFormula(metrics, csl.Formula, pairList)
return metrics, err
}
return metrics, nil
}

// calculate custom formula to metrics and return calculated and finalized metric
func calculateComplexLogicFormula(list []external_metrics.ExternalMetricValue, formula string, pairList map[string]string, logger logr.Logger) ([]external_metrics.ExternalMetricValue, error) {
func calculateComplexLogicFormula(list []external_metrics.ExternalMetricValue, formula string, pairList map[string]string) ([]external_metrics.ExternalMetricValue, error) {
var ret external_metrics.ExternalMetricValue
var out float64
ret.MetricName = "composite-metric-name"
Expand All @@ -819,12 +808,10 @@ func calculateComplexLogicFormula(list []external_metrics.ExternalMetricValue, f
for _, v := range list {
data[pairList[v.MetricName]] = v.Value.AsApproximateFloat64()
}
logger.Info(">>3.6 FORMULA", "data", data)
program, err := expr.Compile(formula)
if err != nil {
return nil, fmt.Errorf("error trying to compile custom formula: %s", err)
}
logger.Info(">>3.7 FORMULA", "compile", program)

tmp, err := expr.Run(program, data)
if err != nil {
Expand All @@ -833,7 +820,6 @@ func calculateComplexLogicFormula(list []external_metrics.ExternalMetricValue, f

out = tmp.(float64)
ret.Value.SetMilli(int64(out * 1000))
logger.V(0).Info(">>3.8: RUN", "tmp", tmp, "out", out, "struct", ret)
return []external_metrics.ExternalMetricValue{ret}, nil
}

Expand Down
Loading

0 comments on commit 81ea333

Please sign in to comment.