Skip to content

Commit

Permalink
hack for recent cache error, added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gauron99 committed Jul 4, 2023
1 parent 537b7f6 commit e8bd08f
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 97 deletions.
15 changes: 4 additions & 11 deletions controllers/keda/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context,

// if ComplexScalingLogic struct is not nil, expect Formula or ExternalCalculation
// to be non-empty. If target is > 0.0 create a compositeScaler structure
// TODO: rename structures to unified name
if !reflect.DeepEqual(scaledObject.Spec.Advanced.ComplexScalingLogic, kedav1alpha1.ComplexScalingLogic{}) {
validNumTarget, validMetricType, err := validateCompositeScalingLogic(scaledObject, scaledObjectMetricSpecs)
if err != nil {
Expand Down Expand Up @@ -369,21 +370,13 @@ func validateCompositeScalingLogic(so *kedav1alpha1.ScaledObject, specs []autosc
}
}

// if both are empty OR both are given its an error
// if (csl.Formula == "" && len(csl.ComplexScalingLogic) == 0) ||
// (csl.Formula != "" && len(csl.ComplexScalingLogic) > 0) {
// err := fmt.Errorf("error exactly one of Formula or ExternalCalculator can be given")
// return -1, autoscalingv2.MetricTargetType(""), err
// }

// if target is given, complex custom scaler for metric collection will be
// passed to HPA config -> all types need to be the same
if csl.Target != "" {
// make sure all scalers have the same metricTargetType
for i, metric := range specs {
if i == 0 {
metricType = metric.External.Target.Type
} else if metric.External.Target.Type != metricType {
metricType = specs[0].External.Target.Type
for _, metric := range specs {
if metric.External.Target.Type != metricType {
err := fmt.Errorf("error metric target type not the same for composite scaler: %s & %s", metricType, metric.External.Target.Type)
return -1, metricType, err
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/externalscaling/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,16 @@ func (c *GrpcClient) Calculate(ctx context.Context, list *cl.MetricsList, logger
}

// WaitForConnectionReady waits for gRPC connection to be ready
// returns true if the connection was successful, false if we hit a timeut from context
// returns true if the connection was successful, false if we hit a timeout from context
// TODO: add timeout instead into time.Sleep() - removed for testing
func (c *GrpcClient) WaitForConnectionReady(ctx context.Context, url string, timeout time.Duration, logger logr.Logger) bool {
currentState := c.connection.GetState()
if currentState != connectivity.Ready {
logger.Info(fmt.Sprintf("Waiting for establishing a gRPC connection to server for external calculator at %s", url))
timer := time.After(timeout)
for {
select {
case <-ctx.Done():
return false
case <-timer:
return false
default:
c.connection.Connect()
time.Sleep(500 * time.Millisecond)
Expand Down
16 changes: 12 additions & 4 deletions pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,24 @@ import (

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
externalscaling "github.com/kedacore/keda/v2/pkg/externalscaling"
"github.com/kedacore/keda/v2/pkg/scalers"
)

var log = logf.Log.WithName("scalers_cache")

type ScalersCache struct {
ScaledObject *kedav1alpha1.ScaledObject
Scalers []ScalerBuilder
ScalableObjectGeneration int64
Recorder record.EventRecorder
ScaledObject *kedav1alpha1.ScaledObject
Scalers []ScalerBuilder
ScalableObjectGeneration int64
Recorder record.EventRecorder
ExternalCalculationGrpcClients []ExternalCalculationClient
}

type ExternalCalculationClient struct {
Name string
Client *externalscaling.GrpcClient
Connected bool
}

type ScalerBuilder struct {
Expand Down
151 changes: 100 additions & 51 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ func (h *scaleHandler) getScalersCacheForScaledObject(ctx context.Context, scale

// performGetScalersCache returns cache for input scalableObject, it is common code used by GetScalersCache() and getScalersCacheForScaledObject() methods
func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, scalableObject interface{}, scalableObjectGeneration *int64, scalableObjectKind, scalableObjectNamespace, scalableObjectName string) (*cache.ScalersCache, error) {
log.Info("<< PERFORM GET SCALERS CACHE BEGIN")

h.scalerCachesLock.RLock()
if cache, ok := h.scalerCaches[key]; ok {
// generation was specified -> let's include it in the check as well
Expand Down Expand Up @@ -367,6 +369,53 @@ func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, s
return nil, err
}

log.Info("<< CHECK OBJECT", "scalableObjectKind", scalableObjectKind, "so", scalableObject)
log.Info("<< CHECK OBJECT2", "ns", scalableObjectNamespace, "name", scalableObjectName)
externalCalculationClients := []cache.ExternalCalculationClient{}

// if scalableObject is scaledObject, check for External Calculators and establish
// their connections to gRPC servers
// new: scaledObject is sometimes NOT updated due to unresolved issue (more info https://github.com/kedacore/keda/issues/4389)
switch val := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:

// if scaledObject has externalCalculators, try to establish a connection to
// its gRPC server and save the client instances
// TODO: check if connection already established first?
log.Info("<< EXT CALCS", "ComplexLogic", val.Spec.Advanced.ComplexScalingLogic)
for _, ec := range val.Spec.Advanced.ComplexScalingLogic.ExternalCalculations {
timeout, err := strconv.ParseInt(ec.Timeout, 10, 64)
if err != nil {
// expect timeout in time format like 1m10s
parsedTime, err := time.ParseDuration(ec.Timeout)
if err != nil {
log.Error(err, "error while converting type of timeout for external calculator")
break
}
timeout = int64(parsedTime.Seconds())
}
ecClient, err := externalscaling.NewGrpcClient(ec.URL, log)

var connected bool
if err != nil {
log.Error(err, fmt.Sprintf("error creating new grpc client for external calculator at %s", ec.URL))
} else {
if !ecClient.WaitForConnectionReady(ctx, ec.URL, time.Duration(timeout), log) {
connected = false
err = fmt.Errorf("client failed to connect to server")
log.Error(err, fmt.Sprintf("error in creating gRPC connection for external calculator '%s' via %s", ec.Name, ec.URL))
} else {
connected = true
log.Info(fmt.Sprintf("successfully connected to gRPC server external calculator '%s'", ec.Name))
}
}
ecClientStruct := cache.ExternalCalculationClient{Name: ec.Name, Client: ecClient, Connected: connected}
externalCalculationClients = append(externalCalculationClients, ecClientStruct)
}
log.Info("<< END SO", "so passed", val)
default:
}

newCache := &cache.ScalersCache{
Scalers: scalers,
ScalableObjectGeneration: withTriggers.Generation,
Expand All @@ -375,10 +424,12 @@ func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, s
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
newCache.ScaledObject = obj
newCache.ExternalCalculationGrpcClients = externalCalculationClients
default:
}

h.scalerCaches[key] = newCache
log.Info("<< FINISH PERFORM")

return h.scalerCaches[key], nil
}
Expand Down Expand Up @@ -416,16 +467,16 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN

var matchingMetrics []external_metrics.ExternalMetricValue

cache, err := h.getScalersCacheForScaledObject(ctx, scaledObjectName, scaledObjectNamespace)
cacheObj, err := h.getScalersCacheForScaledObject(ctx, scaledObjectName, scaledObjectNamespace)
prommetrics.RecordScaledObjectError(scaledObjectNamespace, scaledObjectName, err)

if err != nil {
return nil, fmt.Errorf("error getting scalers %w", err)
}

var scaledObject *kedav1alpha1.ScaledObject
if cache.ScaledObject != nil {
scaledObject = cache.ScaledObject
if cacheObj.ScaledObject != nil {
scaledObject = cacheObj.ScaledObject
} else {
err := fmt.Errorf("scaledObject not found in the cache")
logger.Error(err, "scaledObject not found in the cache")
Expand All @@ -435,22 +486,27 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
isScalerError := false
scaledObjectIdentifier := scaledObject.GenerateIdentifier()

metricsArray := getTrueMetricArray(metricsName, scaledObject)
// returns all relevant metrics for current scaler (standard is one metric,
// composite scaler gets all external metrics for further computation)
metricsArray, _, err := h.getTrueMetricArray(ctx, metricsName, scaledObject)
if err != nil {
logger.Error(err, "error getting true metrics array, probably because of invalid cache")
}
metricTriggerPairList := make(map[string]string)

// let's check metrics for all scalers in a ScaledObject
scalers, scalerConfigs := cache.GetScalers()
scalers, scalerConfigs := cacheObj.GetScalers()
for scalerIndex := 0; scalerIndex < len(scalers); scalerIndex++ {
scalerName := strings.Replace(fmt.Sprintf("%T", scalers[scalerIndex]), "*scalers.", "", 1)
if scalerConfigs[scalerIndex].TriggerName != "" {
scalerName = scalerConfigs[scalerIndex].TriggerName
}

metricSpecs, err := cache.GetMetricSpecForScalingForScaler(ctx, scalerIndex)
metricSpecs, err := cacheObj.GetMetricSpecForScalingForScaler(ctx, scalerIndex)
if err != nil {
isScalerError = true
logger.Error(err, "error getting metric spec for the scaler", "scaler", scalerName)
cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
cacheObj.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
}

logger.V(0).Info(">>2: MATCHED-METRICS", "metricsArr", metricsArray, "metricsName", metricsName)
Expand Down Expand Up @@ -495,7 +551,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN

if !metricsFoundInCache {
var latency int64
metrics, _, latency, err = cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName)
metrics, _, latency, err = cacheObj.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName)
if latency != -1 {
prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency))
}
Expand Down Expand Up @@ -538,65 +594,43 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN

// convert k8s list to grpc generated list structure
grpcMetricList := externalscaling.ConvertToGeneratedStruct(matchingMetrics, logger)
// grpcMetricList := &externalscalingAPI.MetricsList{}
// for _, val := range matchingMetrics {
// metric := externalscalingAPI.Metric{Name: val.MetricName, Value: float32(val.Value.Value())}
// grpcMetricList.MetricValues = append(grpcMetricList.MetricValues, &metric)
// }

// Apply external calculations - call gRPC server on each url and return
// modified metric list in order
for i, ec := range scaledObject.Spec.Advanced.ComplexScalingLogic.ExternalCalculations {
timeout, err := strconv.ParseInt(ec.Timeout, 10, 64)
if err != nil {
// expect timeout in time format like 1m10s
parsedTime, err := time.ParseDuration(ec.Timeout)
if err != nil {
logger.Error(err, "error while converting type of timeout for external calculation")
// get client instances from cache
ecCacheClient := cache.ExternalCalculationClient{}
var connected bool
for _, ecClient := range cacheObj.ExternalCalculationGrpcClients {
if ecClient.Name == ec.Name {
connected = ecClient.Connected
ecCacheClient = ecClient
break
}
timeout = int64(parsedTime.Seconds())
}

esGrpcClient, err := externalscaling.NewGrpcClient(ec.URL, logger)
if err != nil {
logger.Error(err, "error creating new grpc client for external calculator")
if !connected {
err = fmt.Errorf("trying to call Calculate for %s External Calculator when not connected", ec.Name)
logger.Error(err, "grpc client is not connected to external calculator server")
} else {
if !esGrpcClient.WaitForConnectionReady(ctx, ec.URL, time.Duration(timeout), logger) {
err = fmt.Errorf("client didnt connect to server successfully")
logger.Error(err, fmt.Sprintf("error in grpc connection for external calculator %s", ec.URL))
} else {
logger.Info(fmt.Sprintf("connected to gRPC server for external calculation %s", ec.Name))
}

grpcMetricList, err = esGrpcClient.Calculate(ctx, grpcMetricList, logger)
grpcMetricList, err = ecCacheClient.Client.Calculate(ctx, grpcMetricList, logger)
if err != nil {
logger.Error(err, "error calculating in grpc server at %s for external calculation", ec.URL)
logger.Error(err, fmt.Sprintf("error calculating in grpc server at %s for external calculation", ec.URL))
}
if grpcMetricList == nil {
err = fmt.Errorf("grpc method Calculate returned nil metric list for external calculator")
logger.Error(err, "error in external calculator after Calculate")
}
grpcMetricList, errFallback := externalscaling.Fallback(err, grpcMetricList, ec, scaledObject.Spec.Advanced.ComplexScalingLogic.Target, logger)
if errFallback != nil {
logger.Error(errFallback, "subsequent error occurred when trying to apply fallback metrics for external calculation")
break
}
logger.V(0).Info(fmt.Sprintf(">>3.5:%d CALCULATE END", i), "metrics", grpcMetricList)
}
grpcMetricList, errFallback := externalscaling.Fallback(err, grpcMetricList, ec, scaledObject.Spec.Advanced.ComplexScalingLogic.Target, logger)
if errFallback != nil {
logger.Error(errFallback, "subsequent error occurred when trying to apply fallback metrics for external calculation")
break
}
logger.V(0).Info(fmt.Sprintf(">>3.5:%d CALCULATE END", i), "metrics", grpcMetricList)
}

// Convert from generated structure to k8s structure
matchingMetrics = externalscaling.ConvertFromGeneratedStruct(grpcMetricList)
// outK8sList := []external_metrics.ExternalMetricValue{}
// for _, inValue := range grpcMetricList.MetricValues {
// outValue := external_metrics.ExternalMetricValue{}
// outValue.MetricName = inValue.Name
// outValue.Timestamp = v1.Now()
// outValue.Value.SetMilli(int64(inValue.Value * 1000))
// outK8sList = append(outK8sList, outValue)
// }
// matchingMetrics = outK8sList

// apply formula
matchingMetrics, err = applyComplexLogicFormula(scaledObject.Spec.Advanced.ComplexScalingLogic, matchingMetrics, metricTriggerPairList, logger)
Expand Down Expand Up @@ -722,12 +756,27 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
// what metrics should be used. In case of composite scaler, all external
// metrics will be used (returns all external metrics), otherwise it returns the
// same metric given
func getTrueMetricArray(metricName string, so *kedav1alpha1.ScaledObject) []string {
func (h *scaleHandler) getTrueMetricArray(ctx context.Context, metricName string, so *kedav1alpha1.ScaledObject) ([]string, bool, error) {
// if composite scaler is given return all external metrics
if so.Spec.Advanced.ComplexScalingLogic.Target != "" {
return so.Status.ExternalMetricNames
if len(so.Status.ExternalMetricNames) == 0 {
scaledObject := &kedav1alpha1.ScaledObject{}
err := h.client.Get(ctx, types.NamespacedName{Name: so.Name, Namespace: so.Namespace}, scaledObject)
if err != nil {
log.Error(err, "failed to get ScaledObject", "name", so.Name, "namespace", so.Namespace)
return nil, false, err
}
if len(scaledObject.Status.ExternalMetricNames) == 0 {
err := fmt.Errorf("failed to get ScaledObject.Status.ExternalMetricNames, probably invalid ScaledObject cache")
log.Error(err, "failed to get ScaledObject.Status.ExternalMetricNames, probably invalid ScaledObject cache", "scaledObject.Name", scaledObject.Name, "scaledObject.Namespace", scaledObject.Namespace)
return nil, false, err
}

so = scaledObject
}
return so.Status.ExternalMetricNames, true, nil
}
return []string{metricName}
return []string{metricName}, false, nil
}

// help function to determine whether or not metricName is the correct one.
Expand Down
Loading

0 comments on commit e8bd08f

Please sign in to comment.