Skip to content

Commit

Permalink
add externalScaling health check type, help function for client, redo…
Browse files Browse the repository at this point in the history
…ne scaling logic for external calculator list
  • Loading branch information
gauron99 committed May 29, 2023
1 parent b1655bf commit 5da7297
Show file tree
Hide file tree
Showing 4 changed files with 507 additions and 40 deletions.
8 changes: 7 additions & 1 deletion apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ type AdvancedConfig struct {
ComplexScalingLogic ComplexScalingLogic `json:"complexScalingLogic,omitempty"`
}

// complexScalingLogic describes formula for scaling with its target
// ComplexScalingLogic describes advanced scaling logic options like formula
// and gRPC server for external calculations
type ComplexScalingLogic struct {
// +optional
ExternalCalculations []ExternalCalculation `json:"externalCalculator,omitempty"`
Expand All @@ -114,6 +115,9 @@ type ComplexScalingLogic struct {
Target string `json:"target,omitempty"`
}

// ExternalCalculation structure describes names and URLs of a gRPC server
// that KEDA can connect to with collected metrics and modify them. Each server
// has a timeout and optional Fallback functionality.
type ExternalCalculation struct {
Name string `json:"name"`
URL string `json:"url"`
Expand Down Expand Up @@ -176,6 +180,8 @@ type ScaledObjectStatus struct {
// +optional
CompositeScalerName string `json:"compositeScalerName,omitempty"`
// +optional
ExternalCalculationHealth map[string]HealthStatus `json:"externalCalculationHealth,omitempty"`
// +optional
Conditions Conditions `json:"conditions,omitempty"`
// +optional
Health map[string]HealthStatus `json:"health,omitempty"`
Expand Down
37 changes: 37 additions & 0 deletions pkg/externalscaling/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"

"github.com/kedacore/keda/v2/apis/keda/v1alpha1"
cl "github.com/kedacore/keda/v2/pkg/externalscaling/api"
)

Expand Down Expand Up @@ -73,3 +76,37 @@ func (c *GrpcClient) WaitForConnectionReady(ctx context.Context, logger logr.Log
}
return true
}

// ConvertToGeneratedStruct converts K8s external metrics list to gRPC generated
// external metrics list
func ConvertToGeneratedStruct(inK8sList []external_metrics.ExternalMetricValue) (outExternal *cl.MetricsList) {
listStruct := cl.MetricsList{}
for _, val := range inK8sList {
// if value is 0, its empty in the list
metric := &cl.Metric{Name: val.MetricName, Value: float32(val.Value.Value())}
listStruct.MetricValues = append(listStruct.MetricValues, metric)
}
return
}

// ConvertFromGeneratedStruct converts gRPC generated external metrics list to
// K8s external_metrics list
func ConvertFromGeneratedStruct(inExternal *cl.MetricsList) (outK8sList []external_metrics.ExternalMetricValue) {
for _, inValue := range inExternal.MetricValues {
outValue := external_metrics.ExternalMetricValue{}
outValue.MetricName = inValue.Name
outValue.Timestamp = v1.Now()
outValue.Value.SetMilli(int64(inValue.Value * 1000))
outK8sList = append(outK8sList, outValue)
}
return
}

func Fallback(err bool, list *cl.MetricsList, ec v1alpha1.ExternalCalculation) (listOut *cl.MetricsList, errOut bool) {
if err {
// returned metrics
return
}

return
}
67 changes: 28 additions & 39 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
externalscaling "github.com/kedacore/keda/v2/pkg/externalscaling"
externalscalingAPI "github.com/kedacore/keda/v2/pkg/externalscaling/api"
"github.com/kedacore/keda/v2/pkg/fallback"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/scalers"
Expand Down Expand Up @@ -521,56 +520,46 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
}

if len(matchingMetrics) == 0 {
return nil, fmt.Errorf("no matching metrics found for " + metricName)
return nil, fmt.Errorf("no matching metrics found for " + metricsName)
}

logger.V(0).Info(">>3: MATCHED-METRICS", "metrics", matchingMetrics, "metricsName", metricsName)

// apply external calculations
for i, ec := range scaledObject.Spec.Advanced.ComplexScalingLogic.ExternalCalculations {
// call server on each url and return modified metric list in order
logger.V(0).Info(">>3.1 EXT-CALC CALL", "i", i, "name", ec.Name, "url", ec.URL)

listStruct := externalscalingAPI.MetricsList{}
for _, val := range matchingMetrics {
// if value is 0, its empty in the list
metric := &externalscalingAPI.Metric{Name: val.MetricName, Value: float32(val.Value.Value())}
listStruct.MetricValues = append(listStruct.MetricValues, metric)
}
logger.V(0).Info(">>3.2 LIST", "list", listStruct.MetricValues)
grpcMetricList := externalscaling.ConvertToGeneratedStruct(matchingMetrics)
logger.V(0).Info(">>3.1 LIST", "list", grpcMetricList.MetricValues)

// Apply external calculations - call gRPC server on each url and return
// modified metric list in order
for i, ec := range scaledObject.Spec.Advanced.ComplexScalingLogic.ExternalCalculations {
hasError := false
esGrpcClient, err := externalscaling.NewGrpcClient(ec.URL, logger)
if err != nil {
logger.Error(err, "error connecting external calculation grpc client to server")
break
}

if !esGrpcClient.WaitForConnectionReady(ctx, logger) {
err = fmt.Errorf("client didnt connect to server successfully")
logger.Error(err, "error in connection")
}
logger.Info("connected to gRPC server")

res, err := esGrpcClient.Calculate(ctx, &listStruct, logger)
if err != nil {
logger.Error(err, "error calculating in grpc server for external calculation")
break
}
logger.V(0).Info(">>3.4 CALCULATE GRPC", "result", res)
// TODO: turn to fallback here if error != nil & err is given (remove break 2 lines up)
hasError = true
} else {
if !esGrpcClient.WaitForConnectionReady(ctx, logger) {
err = fmt.Errorf("client didnt connect to server successfully")
logger.Error(err, "error in connection")
hasError = true
}
logger.Info("connected to gRPC server")

var finalList []external_metrics.ExternalMetricValue
grpcMetricList, err = esGrpcClient.Calculate(ctx, grpcMetricList, logger)
if err != nil {
logger.Error(err, "error calculating in grpc server at %s for external calculation", ec.URL)
hasError = true
}

for _, val := range res.MetricValues {
tmp := external_metrics.ExternalMetricValue{}
tmp.MetricName = val.Name
tmp.Timestamp = v1.Now()
tmp.Value.SetMilli(int64(val.Value * 1000))
finalList = append(finalList, tmp)
// run Fallback if error was given
grpcMetricList, hasError = externalscaling.Fallback(hasError, grpcMetricList, ec)
if hasError {
// if fallback metrics not used, stop calculating
break
}
}
matchingMetrics = finalList
logger.V(0).Info(">>3.5 CALCULATE AFTER", "matchingMetrics", matchingMetrics)
logger.V(0).Info(fmt.Sprintf(">>3.5:%d CALCULATE END", i), "metrics", grpcMetricList)
}
matchingMetrics = externalscaling.ConvertFromGeneratedStruct(grpcMetricList)

// apply formula
if scaledObject.Spec.Advanced.ComplexScalingLogic.Formula != "" {
Expand Down
Loading

0 comments on commit 5da7297

Please sign in to comment.