Skip to content

Commit

Permalink
Merge pull request #285 from kitianFresh/feature/use-workoadinfo-to-q…
Browse files Browse the repository at this point in the history
…uery-container-for-metricserver

add workload selector for evpa
  • Loading branch information
kitianFresh authored Apr 28, 2022
2 parents 4114dd5 + 59deb96 commit 07afecd
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 110 deletions.
14 changes: 8 additions & 6 deletions cmd/craned/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func initializationControllers(ctx context.Context, mgr ctrl.Manager, opts *opti
scaleKindResolver,
)

targetSelectorFetcher := target.NewSelectorFetcher(mgr.GetScheme(), mgr.GetRESTMapper(), scaleClient, mgr.GetClient())

podOOMRecorder := &oom.PodOOMRecorder{
Client: mgr.GetClient(),
}
Expand Down Expand Up @@ -254,17 +256,17 @@ func initializationControllers(ctx context.Context, mgr ctrl.Manager, opts *opti
}

if err := (&evpa.EffectiveVPAController{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("effective-vpa-controller"),
OOMRecorder: podOOMRecorder,
Predictor: predictorMgr.GetPredictor(predictionapi.AlgorithmTypePercentile),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("effective-vpa-controller"),
OOMRecorder: podOOMRecorder,
Predictor: predictorMgr.GetPredictor(predictionapi.AlgorithmTypePercentile),
TargetFetcher: targetSelectorFetcher,
}).SetupWithManager(mgr); err != nil {
klog.Exit(err, "unable to create controller", "controller", "EffectiveVPAController")
}
}

targetSelectorFetcher := target.NewSelectorFetcher(mgr.GetScheme(), mgr.GetRESTMapper(), scaleClient, mgr.GetClient())
// TspController
if utilfeature.DefaultFeatureGate.Enabled(features.CraneTimeSeriesPrediction) {
tspController := timeseriesprediction.NewController(
Expand Down
12 changes: 7 additions & 5 deletions pkg/autoscaling/estimator/estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/gocrane/crane/pkg/oom"
"github.com/gocrane/crane/pkg/prediction"
"github.com/gocrane/crane/pkg/utils/target"
)

// ResourceEstimatorManager controls how to get or delete estimators for EffectiveVPA
Expand All @@ -30,18 +31,19 @@ type estimatorManager struct {
estimatorMap map[string]ResourceEstimator
}

func NewResourceEstimatorManager(client client.Client, oomRecorder oom.Recorder, predictor prediction.Interface) ResourceEstimatorManager {
func NewResourceEstimatorManager(client client.Client, fetcher target.SelectorFetcher, oomRecorder oom.Recorder, predictor prediction.Interface) ResourceEstimatorManager {
resourceEstimatorManager := &estimatorManager{
estimatorMap: make(map[string]ResourceEstimator),
}
resourceEstimatorManager.buildEstimators(client, oomRecorder, predictor)
resourceEstimatorManager.buildEstimators(client, fetcher, oomRecorder, predictor)
return resourceEstimatorManager
}

func (m *estimatorManager) buildEstimators(client client.Client, oomRecorder oom.Recorder, predictor prediction.Interface) {
func (m *estimatorManager) buildEstimators(client client.Client, fetcher target.SelectorFetcher, oomRecorder oom.Recorder, predictor prediction.Interface) {
percentileEstimator := &PercentileResourceEstimator{
Predictor: predictor,
Client: client,
Predictor: predictor,
Client: client,
TargetFetcher: fetcher,
}
m.registerEstimator("Percentile", percentileEstimator)
oomEstimator := &OOMResourceEstimator{
Expand Down
94 changes: 68 additions & 26 deletions pkg/autoscaling/estimator/percentile.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package estimator

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -16,19 +16,29 @@ import (
"github.com/gocrane/crane/pkg/metricquery"
"github.com/gocrane/crane/pkg/prediction"
predictionconfig "github.com/gocrane/crane/pkg/prediction/config"
"github.com/gocrane/crane/pkg/utils"
"github.com/gocrane/crane/pkg/utils/target"
)

const callerFormat = "EVPACaller-%s-%s"

type PercentileResourceEstimator struct {
Predictor prediction.Interface
Client client.Client
Predictor prediction.Interface
Client client.Client
TargetFetcher target.SelectorFetcher
}

func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi.EffectiveVerticalPodAutoscaler, config map[string]string, containerName string, currRes *corev1.ResourceRequirements) (corev1.ResourceList, error) {
recommendResource := corev1.ResourceList{}

selector, err := e.TargetFetcher.Fetch(&corev1.ObjectReference{
APIVersion: evpa.Spec.TargetRef.APIVersion,
Kind: evpa.Spec.TargetRef.Kind,
Name: evpa.Spec.TargetRef.Name,
Namespace: evpa.Namespace,
})
if err != nil {
klog.ErrorS(err, "Failed to fetch evpa target workload selector.", "evpa", klog.KObj(evpa))
}
caller := fmt.Sprintf(callerFormat, klog.KObj(evpa), string(evpa.UID))
cpuMetricNamer := &metricnaming.GeneralMetricNamer{
CallerName: caller,
Expand All @@ -39,23 +49,12 @@ func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi
Namespace: evpa.Namespace,
WorkloadName: evpa.Spec.TargetRef.Name,
ContainerName: containerName,
Selector: labels.Everything(),
Selector: selector,
},
},
}

cpuConfig := getCpuConfig(config)
tsList, err := utils.QueryPredictedValues(e.Predictor, caller, cpuConfig, cpuMetricNamer)
if err != nil {
return nil, err
}

if len(tsList) < 1 || len(tsList[0].Samples) < 1 {
return nil, fmt.Errorf("no value retured for queryExpr: %s", cpuMetricNamer.BuildUniqueKey())
}

cpuValue := int64(tsList[0].Samples[0].Value * 1000)
recommendResource[corev1.ResourceCPU] = *resource.NewMilliQuantity(cpuValue, resource.DecimalSI)

memoryMetricNamer := &metricnaming.GeneralMetricNamer{
CallerName: caller,
Expand All @@ -66,28 +65,71 @@ func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi
Namespace: evpa.Namespace,
WorkloadName: evpa.Spec.TargetRef.Name,
ContainerName: containerName,
Selector: labels.Everything(),
Selector: selector,
},
},
}

memConfig := getMemConfig(config)
tsList, err = utils.QueryPredictedValues(e.Predictor, caller, memConfig, memoryMetricNamer)

var errs []error
// first register cpu & memory, or the memory will be not registered before the cpu prediction succeed
err1 := e.Predictor.WithQuery(cpuMetricNamer, caller, *cpuConfig)
if err1 != nil {
errs = append(errs, err1)
}
err2 := e.Predictor.WithQuery(memoryMetricNamer, caller, *memConfig)
if err2 != nil {
errs = append(errs, err2)
}
if len(errs) > 0 {
return nil, fmt.Errorf("failed to register metricNamer: %v", errs)
}

var predictErrs []error
var noValueErrs []error
tsList, err := e.Predictor.QueryRealtimePredictedValues(context.TODO(), cpuMetricNamer)
if err != nil {
predictErrs = append(predictErrs, err)
}

if len(tsList) > 1 && len(tsList[0].Samples) > 1 {
cpuValue := int64(tsList[0].Samples[0].Value * 1000)
recommendResource[corev1.ResourceCPU] = *resource.NewMilliQuantity(cpuValue, resource.DecimalSI)
} else {
noValueErrs = append(noValueErrs, fmt.Errorf("no value retured for queryExpr: %s", cpuMetricNamer.BuildUniqueKey()))
}

tsList, err = e.Predictor.QueryRealtimePredictedValues(context.TODO(), memoryMetricNamer)
if err != nil {
return nil, err
predictErrs = append(predictErrs, err)
}

if len(tsList) < 1 || len(tsList[0].Samples) < 1 {
return nil, fmt.Errorf("no value retured for queryExpr: %s", memoryMetricNamer.BuildUniqueKey())
if len(tsList) > 1 && len(tsList[0].Samples) > 1 {
memValue := int64(tsList[0].Samples[0].Value)
recommendResource[corev1.ResourceMemory] = *resource.NewQuantity(memValue, resource.BinarySI)
} else {
noValueErrs = append(noValueErrs, fmt.Errorf("no value retured for queryExpr: %s", memoryMetricNamer.BuildUniqueKey()))
}

memValue := int64(tsList[0].Samples[0].Value)
recommendResource[corev1.ResourceMemory] = *resource.NewQuantity(memValue, resource.BinarySI)
// all failed
if len(recommendResource) == 0 {
return recommendResource, fmt.Errorf("all resource predicted failed, predictErrs: %v, noValueErrs: %v", predictErrs, noValueErrs)
}

// at least one succeed
return recommendResource, nil
}

func (e *PercentileResourceEstimator) DeleteEstimation(evpa *autoscalingapi.EffectiveVerticalPodAutoscaler) {
selector, err := e.TargetFetcher.Fetch(&corev1.ObjectReference{
APIVersion: evpa.Spec.TargetRef.APIVersion,
Kind: evpa.Spec.TargetRef.Kind,
Name: evpa.Spec.TargetRef.Name,
Namespace: evpa.Namespace,
})
if err != nil {
klog.ErrorS(err, "Failed to fetch evpa target workload selector.", "evpa", klog.KObj(evpa))
}
for _, containerPolicy := range evpa.Spec.ResourcePolicy.ContainerPolicies {
caller := fmt.Sprintf(callerFormat, klog.KObj(evpa), string(evpa.UID))
cpuMetricNamer := &metricnaming.GeneralMetricNamer{
Expand All @@ -99,7 +141,7 @@ func (e *PercentileResourceEstimator) DeleteEstimation(evpa *autoscalingapi.Effe
Namespace: evpa.Namespace,
WorkloadName: evpa.Spec.TargetRef.Name,
ContainerName: containerPolicy.ContainerName,
Selector: labels.Everything(),
Selector: selector,
},
},
}
Expand All @@ -116,7 +158,7 @@ func (e *PercentileResourceEstimator) DeleteEstimation(evpa *autoscalingapi.Effe
Namespace: evpa.Namespace,
WorkloadName: evpa.Spec.TargetRef.Name,
ContainerName: containerPolicy.ContainerName,
Selector: labels.Everything(),
Selector: selector,
},
},
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/controller/evpa/container_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,17 @@ func GetScaleEventKey(namespace string, workload string, container string, direc

// UpdateRecommendStatus update recommend resource
func UpdateRecommendStatus(recommendation *vpatypes.RecommendedPodResources, containerName string, recommendResource corev1.ResourceList) {
if recommendation == nil {
recommendation = &vpatypes.RecommendedPodResources{
ContainerRecommendations: make([]vpatypes.RecommendedContainerResources, 0),
}
}
for i := range recommendation.ContainerRecommendations {
if recommendation.ContainerRecommendations[i].ContainerName == containerName {
ResourceWithTolerance(recommendResource, recommendation.ContainerRecommendations[i].Target)
recommendation.ContainerRecommendations[i].Target = recommendResource

for resource, quantity := range recommendResource {
recommendation.ContainerRecommendations[i].Target[resource] = quantity
}
return
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/evpa/effective_vpa_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/gocrane/crane/pkg/oom"
"github.com/gocrane/crane/pkg/prediction"
"github.com/gocrane/crane/pkg/utils"
"github.com/gocrane/crane/pkg/utils/target"
)

// EffectiveVPAController is responsible for scaling workload's replica based on EffectiveVerticalPodAutoscaler spec
Expand All @@ -34,6 +35,7 @@ type EffectiveVPAController struct {
EstimatorManager estimator.ResourceEstimatorManager
lastScaleTime map[string]metav1.Time
Predictor prediction.Interface
TargetFetcher target.SelectorFetcher
mu sync.Mutex
}

Expand Down Expand Up @@ -146,7 +148,7 @@ func (c *EffectiveVPAController) UpdateStatus(ctx context.Context, evpa *autosca
}

func (c *EffectiveVPAController) SetupWithManager(mgr ctrl.Manager) error {
estimatorManager := estimator.NewResourceEstimatorManager(mgr.GetClient(), c.OOMRecorder, c.Predictor)
estimatorManager := estimator.NewResourceEstimatorManager(mgr.GetClient(), c.TargetFetcher, c.OOMRecorder, c.Predictor)
c.EstimatorManager = estimatorManager
return ctrl.NewControllerManagedBy(mgr).
For(&autoscalingapi.EffectiveVerticalPodAutoscaler{}).
Expand Down
8 changes: 4 additions & 4 deletions pkg/metricquery/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,18 @@ type WorkloadNamerInfo struct {
Kind string
Name string
APIVersion string
Selector labels.Selector
// used to fetch workload pods and containers, when use metric server, it is required
Selector labels.Selector
}

type ContainerNamerInfo struct {
Namespace string
WorkloadName string
Kind string
APIVersion string
PodName string
ContainerName string
Selector labels.Selector
// used to fetch workload pods and containers, when use metric server, it is required
Selector labels.Selector
}

type PodNamerInfo struct {
Expand Down Expand Up @@ -166,7 +167,6 @@ func (m *Metric) keyByContainer() string {
strings.ToLower(m.MetricName),
m.Container.Namespace,
m.Container.WorkloadName,
m.Container.PodName,
m.Container.ContainerName,
selectorStr}, "_")
}
Expand Down
19 changes: 13 additions & 6 deletions pkg/prediction/dsp/prediction.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ const (

type periodicSignalPrediction struct {
prediction.GenericPrediction
a aggregateSignals
stopChMap sync.Map
modelConfig config.AlgorithmModelConfig
a aggregateSignals
stopChMap sync.Map
// record the query routine already started
queryRoutines sync.Map
modelConfig config.AlgorithmModelConfig
}

func (p *periodicSignalPrediction) QueryPredictionStatus(ctx context.Context, metricNamer metricnaming.MetricNamer) (prediction.Status, error) {
Expand All @@ -45,6 +47,7 @@ func NewPrediction(realtimeProvider providers.RealTime, historyProvider provider
GenericPrediction: prediction.NewGenericPrediction(realtimeProvider, historyProvider, withCh, delCh),
a: newAggregateSignals(),
stopChMap: sync.Map{},
queryRoutines: sync.Map{},
modelConfig: mc,
}
}
Expand Down Expand Up @@ -159,18 +162,21 @@ func (p *periodicSignalPrediction) Run(stopCh <-chan struct{}) {
for {
// Waiting for a WithQuery request
qc := <-p.WithCh
if !p.a.Add(qc) {
// update if the query config updated, idempotent
p.a.Add(qc)
QueryExpr := qc.MetricNamer.BuildUniqueKey()

if _, ok := p.queryRoutines.Load(QueryExpr); ok {
continue
}

QueryExpr := qc.MetricNamer.BuildUniqueKey()
if _, ok := p.stopChMap.Load(QueryExpr); ok {
continue
}
klog.V(6).InfoS("Register a query expression for prediction.", "queryExpr", QueryExpr, "caller", qc.Caller)

go func(namer metricnaming.MetricNamer) {
queryExpr := namer.BuildUniqueKey()
p.queryRoutines.Store(queryExpr, struct{}{})
ticker := time.NewTicker(p.modelConfig.UpdateInterval)
defer ticker.Stop()

Expand All @@ -184,6 +190,7 @@ func (p *periodicSignalPrediction) Run(stopCh <-chan struct{}) {

select {
case <-predStopCh:
p.queryRoutines.Delete(queryExpr)
klog.V(4).InfoS("Prediction routine stopped.", "queryExpr", queryExpr)
return
case <-ticker.C:
Expand Down
Loading

0 comments on commit 07afecd

Please sign in to comment.