From 59deb960f58a260391128c1e51f1f190217cc334 Mon Sep 17 00:00:00 2001 From: kitianFresh <1549722424@qq.com> Date: Tue, 26 Apr 2022 12:40:28 +0800 Subject: [PATCH] add workload selector for evpa --- cmd/craned/app/manager.go | 14 +-- pkg/autoscaling/estimator/estimator.go | 12 ++- pkg/autoscaling/estimator/percentile.go | 94 ++++++++++++++----- pkg/controller/evpa/container_policy.go | 10 +- .../evpa/effective_vpa_controller.go | 4 +- pkg/metricquery/type.go | 8 +- pkg/prediction/dsp/prediction.go | 19 ++-- pkg/prediction/generic.go | 16 +--- pkg/prediction/percentile/prediction.go | 17 +++- .../metricserver/rest_metric_client.go | 83 ++++++++-------- .../metricserver/builder_test.go | 8 +- 11 files changed, 175 insertions(+), 110 deletions(-) diff --git a/cmd/craned/app/manager.go b/cmd/craned/app/manager.go index f8ba3c739..4350c0c58 100644 --- a/cmd/craned/app/manager.go +++ b/cmd/craned/app/manager.go @@ -210,6 +210,8 @@ func initializationControllers(ctx context.Context, mgr ctrl.Manager, opts *opti scaleKindResolver, ) + targetSelectorFetcher := target.NewSelectorFetcher(mgr.GetScheme(), mgr.GetRESTMapper(), scaleClient, mgr.GetClient()) + podOOMRecorder := &oom.PodOOMRecorder{ Client: mgr.GetClient(), } @@ -254,17 +256,17 @@ func initializationControllers(ctx context.Context, mgr ctrl.Manager, opts *opti } if err := (&evpa.EffectiveVPAController{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Recorder: mgr.GetEventRecorderFor("effective-vpa-controller"), - OOMRecorder: podOOMRecorder, - Predictor: predictorMgr.GetPredictor(predictionapi.AlgorithmTypePercentile), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor("effective-vpa-controller"), + OOMRecorder: podOOMRecorder, + Predictor: predictorMgr.GetPredictor(predictionapi.AlgorithmTypePercentile), + TargetFetcher: targetSelectorFetcher, }).SetupWithManager(mgr); err != nil { klog.Exit(err, "unable to create controller", "controller", "EffectiveVPAController") } } - targetSelectorFetcher := target.NewSelectorFetcher(mgr.GetScheme(), mgr.GetRESTMapper(), scaleClient, mgr.GetClient()) // TspController if utilfeature.DefaultFeatureGate.Enabled(features.CraneTimeSeriesPrediction) { tspController := timeseriesprediction.NewController( diff --git a/pkg/autoscaling/estimator/estimator.go b/pkg/autoscaling/estimator/estimator.go index 6ae8308d0..5269b3f38 100644 --- a/pkg/autoscaling/estimator/estimator.go +++ b/pkg/autoscaling/estimator/estimator.go @@ -11,6 +11,7 @@ import ( "github.com/gocrane/crane/pkg/oom" "github.com/gocrane/crane/pkg/prediction" + "github.com/gocrane/crane/pkg/utils/target" ) // ResourceEstimatorManager controls how to get or delete estimators for EffectiveVPA @@ -30,18 +31,19 @@ type estimatorManager struct { estimatorMap map[string]ResourceEstimator } -func NewResourceEstimatorManager(client client.Client, oomRecorder oom.Recorder, predictor prediction.Interface) ResourceEstimatorManager { +func NewResourceEstimatorManager(client client.Client, fetcher target.SelectorFetcher, oomRecorder oom.Recorder, predictor prediction.Interface) ResourceEstimatorManager { resourceEstimatorManager := &estimatorManager{ estimatorMap: make(map[string]ResourceEstimator), } - resourceEstimatorManager.buildEstimators(client, oomRecorder, predictor) + resourceEstimatorManager.buildEstimators(client, fetcher, oomRecorder, predictor) return resourceEstimatorManager } -func (m *estimatorManager) buildEstimators(client client.Client, oomRecorder oom.Recorder, predictor prediction.Interface) { +func (m *estimatorManager) buildEstimators(client client.Client, fetcher target.SelectorFetcher, oomRecorder oom.Recorder, predictor prediction.Interface) { percentileEstimator := &PercentileResourceEstimator{ - Predictor: predictor, - Client: client, + Predictor: predictor, + Client: client, + TargetFetcher: fetcher, } m.registerEstimator("Percentile", percentileEstimator) oomEstimator := &OOMResourceEstimator{ diff --git a/pkg/autoscaling/estimator/percentile.go b/pkg/autoscaling/estimator/percentile.go index 1751ba4b8..dd6e98378 100644 --- a/pkg/autoscaling/estimator/percentile.go +++ b/pkg/autoscaling/estimator/percentile.go @@ -1,11 +1,11 @@ package estimator import ( + "context" "fmt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -16,19 +16,29 @@ import ( "github.com/gocrane/crane/pkg/metricquery" "github.com/gocrane/crane/pkg/prediction" predictionconfig "github.com/gocrane/crane/pkg/prediction/config" - "github.com/gocrane/crane/pkg/utils" + "github.com/gocrane/crane/pkg/utils/target" ) const callerFormat = "EVPACaller-%s-%s" type PercentileResourceEstimator struct { - Predictor prediction.Interface - Client client.Client + Predictor prediction.Interface + Client client.Client + TargetFetcher target.SelectorFetcher } func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi.EffectiveVerticalPodAutoscaler, config map[string]string, containerName string, currRes *corev1.ResourceRequirements) (corev1.ResourceList, error) { recommendResource := corev1.ResourceList{} + selector, err := e.TargetFetcher.Fetch(&corev1.ObjectReference{ + APIVersion: evpa.Spec.TargetRef.APIVersion, + Kind: evpa.Spec.TargetRef.Kind, + Name: evpa.Spec.TargetRef.Name, + Namespace: evpa.Namespace, + }) + if err != nil { + klog.ErrorS(err, "Failed to fetch evpa target workload selector.", "evpa", klog.KObj(evpa)) + } caller := fmt.Sprintf(callerFormat, klog.KObj(evpa), string(evpa.UID)) cpuMetricNamer := &metricnaming.GeneralMetricNamer{ CallerName: caller, @@ -39,23 +49,12 @@ func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi Namespace: evpa.Namespace, WorkloadName: evpa.Spec.TargetRef.Name, ContainerName: containerName, - Selector: labels.Everything(), + Selector: selector, }, }, } cpuConfig := getCpuConfig(config) - tsList, err := utils.QueryPredictedValues(e.Predictor, caller, cpuConfig, cpuMetricNamer) - if err != nil { - return nil, err - } - - if len(tsList) < 1 || len(tsList[0].Samples) < 1 { - return nil, fmt.Errorf("no value retured for queryExpr: %s", cpuMetricNamer.BuildUniqueKey()) - } - - cpuValue := int64(tsList[0].Samples[0].Value * 1000) - recommendResource[corev1.ResourceCPU] = *resource.NewMilliQuantity(cpuValue, resource.DecimalSI) memoryMetricNamer := &metricnaming.GeneralMetricNamer{ CallerName: caller, @@ -66,28 +65,71 @@ func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi Namespace: evpa.Namespace, WorkloadName: evpa.Spec.TargetRef.Name, ContainerName: containerName, - Selector: labels.Everything(), + Selector: selector, }, }, } - memConfig := getMemConfig(config) - tsList, err = utils.QueryPredictedValues(e.Predictor, caller, memConfig, memoryMetricNamer) + + var errs []error + // first register cpu & memory, or the memory will be not registered before the cpu prediction succeed + err1 := e.Predictor.WithQuery(cpuMetricNamer, caller, *cpuConfig) + if err1 != nil { + errs = append(errs, err1) + } + err2 := e.Predictor.WithQuery(memoryMetricNamer, caller, *memConfig) + if err2 != nil { + errs = append(errs, err2) + } + if len(errs) > 0 { + return nil, fmt.Errorf("failed to register metricNamer: %v", errs) + } + + var predictErrs []error + var noValueErrs []error + tsList, err := e.Predictor.QueryRealtimePredictedValues(context.TODO(), cpuMetricNamer) + if err != nil { + predictErrs = append(predictErrs, err) + } + + if len(tsList) > 1 && len(tsList[0].Samples) > 1 { + cpuValue := int64(tsList[0].Samples[0].Value * 1000) + recommendResource[corev1.ResourceCPU] = *resource.NewMilliQuantity(cpuValue, resource.DecimalSI) + } else { + noValueErrs = append(noValueErrs, fmt.Errorf("no value retured for queryExpr: %s", cpuMetricNamer.BuildUniqueKey())) + } + + tsList, err = e.Predictor.QueryRealtimePredictedValues(context.TODO(), memoryMetricNamer) if err != nil { - return nil, err + predictErrs = append(predictErrs, err) } - if len(tsList) < 1 || len(tsList[0].Samples) < 1 { - return nil, fmt.Errorf("no value retured for queryExpr: %s", memoryMetricNamer.BuildUniqueKey()) + if len(tsList) > 1 && len(tsList[0].Samples) > 1 { + memValue := int64(tsList[0].Samples[0].Value) + recommendResource[corev1.ResourceMemory] = *resource.NewQuantity(memValue, resource.BinarySI) + } else { + noValueErrs = append(noValueErrs, fmt.Errorf("no value retured for queryExpr: %s", memoryMetricNamer.BuildUniqueKey())) } - memValue := int64(tsList[0].Samples[0].Value) - recommendResource[corev1.ResourceMemory] = *resource.NewQuantity(memValue, resource.BinarySI) + // all failed + if len(recommendResource) == 0 { + return recommendResource, fmt.Errorf("all resource predicted failed, predictErrs: %v, noValueErrs: %v", predictErrs, noValueErrs) + } + // at least one succeed return recommendResource, nil } func (e *PercentileResourceEstimator) DeleteEstimation(evpa *autoscalingapi.EffectiveVerticalPodAutoscaler) { + selector, err := e.TargetFetcher.Fetch(&corev1.ObjectReference{ + APIVersion: evpa.Spec.TargetRef.APIVersion, + Kind: evpa.Spec.TargetRef.Kind, + Name: evpa.Spec.TargetRef.Name, + Namespace: evpa.Namespace, + }) + if err != nil { + klog.ErrorS(err, "Failed to fetch evpa target workload selector.", "evpa", klog.KObj(evpa)) + } for _, containerPolicy := range evpa.Spec.ResourcePolicy.ContainerPolicies { caller := fmt.Sprintf(callerFormat, klog.KObj(evpa), string(evpa.UID)) cpuMetricNamer := &metricnaming.GeneralMetricNamer{ @@ -99,7 +141,7 @@ func (e *PercentileResourceEstimator) DeleteEstimation(evpa *autoscalingapi.Effe Namespace: evpa.Namespace, WorkloadName: evpa.Spec.TargetRef.Name, ContainerName: containerPolicy.ContainerName, - Selector: labels.Everything(), + Selector: selector, }, }, } @@ -116,7 +158,7 @@ func (e *PercentileResourceEstimator) DeleteEstimation(evpa *autoscalingapi.Effe Namespace: evpa.Namespace, WorkloadName: evpa.Spec.TargetRef.Name, ContainerName: containerPolicy.ContainerName, - Selector: labels.Everything(), + Selector: selector, }, }, } diff --git a/pkg/controller/evpa/container_policy.go b/pkg/controller/evpa/container_policy.go index b44388a3f..61ad2bc1a 100644 --- a/pkg/controller/evpa/container_policy.go +++ b/pkg/controller/evpa/container_policy.go @@ -191,11 +191,17 @@ func GetScaleEventKey(namespace string, workload string, container string, direc // UpdateRecommendStatus update recommend resource func UpdateRecommendStatus(recommendation *vpatypes.RecommendedPodResources, containerName string, recommendResource corev1.ResourceList) { + if recommendation == nil { + recommendation = &vpatypes.RecommendedPodResources{ + ContainerRecommendations: make([]vpatypes.RecommendedContainerResources, 0), + } + } for i := range recommendation.ContainerRecommendations { if recommendation.ContainerRecommendations[i].ContainerName == containerName { ResourceWithTolerance(recommendResource, recommendation.ContainerRecommendations[i].Target) - recommendation.ContainerRecommendations[i].Target = recommendResource - + for resource, quantity := range recommendResource { + recommendation.ContainerRecommendations[i].Target[resource] = quantity + } return } } diff --git a/pkg/controller/evpa/effective_vpa_controller.go b/pkg/controller/evpa/effective_vpa_controller.go index 6056ff2d8..d8e59c802 100644 --- a/pkg/controller/evpa/effective_vpa_controller.go +++ b/pkg/controller/evpa/effective_vpa_controller.go @@ -23,6 +23,7 @@ import ( "github.com/gocrane/crane/pkg/oom" "github.com/gocrane/crane/pkg/prediction" "github.com/gocrane/crane/pkg/utils" + "github.com/gocrane/crane/pkg/utils/target" ) // EffectiveVPAController is responsible for scaling workload's replica based on EffectiveVerticalPodAutoscaler spec @@ -34,6 +35,7 @@ type EffectiveVPAController struct { EstimatorManager estimator.ResourceEstimatorManager lastScaleTime map[string]metav1.Time Predictor prediction.Interface + TargetFetcher target.SelectorFetcher mu sync.Mutex } @@ -146,7 +148,7 @@ func (c *EffectiveVPAController) UpdateStatus(ctx context.Context, evpa *autosca } func (c *EffectiveVPAController) SetupWithManager(mgr ctrl.Manager) error { - estimatorManager := estimator.NewResourceEstimatorManager(mgr.GetClient(), c.OOMRecorder, c.Predictor) + estimatorManager := estimator.NewResourceEstimatorManager(mgr.GetClient(), c.TargetFetcher, c.OOMRecorder, c.Predictor) c.EstimatorManager = estimatorManager return ctrl.NewControllerManagedBy(mgr). For(&autoscalingapi.EffectiveVerticalPodAutoscaler{}). diff --git a/pkg/metricquery/type.go b/pkg/metricquery/type.go index e5f24adae..bc75dc2a1 100644 --- a/pkg/metricquery/type.go +++ b/pkg/metricquery/type.go @@ -54,7 +54,8 @@ type WorkloadNamerInfo struct { Kind string Name string APIVersion string - Selector labels.Selector + // used to fetch workload pods and containers, when use metric server, it is required + Selector labels.Selector } type ContainerNamerInfo struct { @@ -62,9 +63,9 @@ type ContainerNamerInfo struct { WorkloadName string Kind string APIVersion string - PodName string ContainerName string - Selector labels.Selector + // used to fetch workload pods and containers, when use metric server, it is required + Selector labels.Selector } type PodNamerInfo struct { @@ -166,7 +167,6 @@ func (m *Metric) keyByContainer() string { strings.ToLower(m.MetricName), m.Container.Namespace, m.Container.WorkloadName, - m.Container.PodName, m.Container.ContainerName, selectorStr}, "_") } diff --git a/pkg/prediction/dsp/prediction.go b/pkg/prediction/dsp/prediction.go index f4d9085b9..1aa86d587 100644 --- a/pkg/prediction/dsp/prediction.go +++ b/pkg/prediction/dsp/prediction.go @@ -30,9 +30,11 @@ const ( type periodicSignalPrediction struct { prediction.GenericPrediction - a aggregateSignals - stopChMap sync.Map - modelConfig config.AlgorithmModelConfig + a aggregateSignals + stopChMap sync.Map + // record the query routine already started + queryRoutines sync.Map + modelConfig config.AlgorithmModelConfig } func (p *periodicSignalPrediction) QueryPredictionStatus(ctx context.Context, metricNamer metricnaming.MetricNamer) (prediction.Status, error) { @@ -45,6 +47,7 @@ func NewPrediction(realtimeProvider providers.RealTime, historyProvider provider GenericPrediction: prediction.NewGenericPrediction(realtimeProvider, historyProvider, withCh, delCh), a: newAggregateSignals(), stopChMap: sync.Map{}, + queryRoutines: sync.Map{}, modelConfig: mc, } } @@ -159,11 +162,13 @@ func (p *periodicSignalPrediction) Run(stopCh <-chan struct{}) { for { // Waiting for a WithQuery request qc := <-p.WithCh - if !p.a.Add(qc) { + // update if the query config updated, idempotent + p.a.Add(qc) + QueryExpr := qc.MetricNamer.BuildUniqueKey() + + if _, ok := p.queryRoutines.Load(QueryExpr); ok { continue } - - QueryExpr := qc.MetricNamer.BuildUniqueKey() if _, ok := p.stopChMap.Load(QueryExpr); ok { continue } @@ -171,6 +176,7 @@ func (p *periodicSignalPrediction) Run(stopCh <-chan struct{}) { go func(namer metricnaming.MetricNamer) { queryExpr := namer.BuildUniqueKey() + p.queryRoutines.Store(queryExpr, struct{}{}) ticker := time.NewTicker(p.modelConfig.UpdateInterval) defer ticker.Stop() @@ -184,6 +190,7 @@ func (p *periodicSignalPrediction) Run(stopCh <-chan struct{}) { select { case <-predStopCh: + p.queryRoutines.Delete(queryExpr) klog.V(4).InfoS("Prediction routine stopped.", "queryExpr", queryExpr) return case <-ticker.C: diff --git a/pkg/prediction/generic.go b/pkg/prediction/generic.go index 79e420efe..aa105b2f3 100644 --- a/pkg/prediction/generic.go +++ b/pkg/prediction/generic.go @@ -40,8 +40,6 @@ type WithMetricEvent struct { type GenericPrediction struct { historyProvider providers.History realtimeProvider providers.RealTime - metricsMap map[string][]common.QueryCondition - querySet map[string]struct{} WithCh chan QueryExprWithCaller DelCh chan QueryExprWithCaller mutex sync.Mutex @@ -52,8 +50,6 @@ func NewGenericPrediction(realtimeProvider providers.RealTime, historyProvider p WithCh: withCh, DelCh: delCh, mutex: sync.Mutex{}, - metricsMap: map[string][]common.QueryCondition{}, - querySet: map[string]struct{}{}, realtimeProvider: realtimeProvider, historyProvider: historyProvider, } @@ -81,11 +77,8 @@ func (p *GenericPrediction) WithQuery(namer metricnaming.MetricNamer, caller str Config: config, } - if _, exists := p.querySet[q.String()]; !exists { - p.querySet[q.String()] = struct{}{} - klog.V(4).InfoS("Put tuple{query,caller,config} into with channel.", "query", q.MetricNamer.BuildUniqueKey(), "caller", q.Caller) - p.WithCh <- q - } + klog.V(4).InfoS("Put tuple{query,caller,config} into with channel.", "query", q.MetricNamer.BuildUniqueKey(), "caller", q.Caller) + p.WithCh <- q return nil } @@ -103,10 +96,7 @@ func (p *GenericPrediction) DeleteQuery(namer metricnaming.MetricNamer, caller s Caller: caller, } - if _, exists := p.querySet[q.String()]; exists { - delete(p.querySet, q.String()) - p.DelCh <- q - } + p.DelCh <- q return nil } diff --git a/pkg/prediction/percentile/prediction.go b/pkg/prediction/percentile/prediction.go index 0079e9501..b665d7bb5 100644 --- a/pkg/prediction/percentile/prediction.go +++ b/pkg/prediction/percentile/prediction.go @@ -19,8 +19,10 @@ var _ prediction.Interface = &percentilePrediction{} type percentilePrediction struct { prediction.GenericPrediction - a aggregateSignals - stopChMap sync.Map + a aggregateSignals + // record the query routine already started + queryRoutines sync.Map + stopChMap sync.Map } func (p *percentilePrediction) QueryPredictionStatus(ctx context.Context, metricNamer metricnaming.MetricNamer) (prediction.Status, error) { @@ -265,6 +267,7 @@ func NewPrediction(realtimeProvider providers.RealTime, historyProvider provider return &percentilePrediction{ GenericPrediction: prediction.NewGenericPrediction(realtimeProvider, historyProvider, withCh, delCh), a: newAggregateSignals(), + queryRoutines: sync.Map{}, stopChMap: sync.Map{}, } } @@ -273,11 +276,15 @@ func (p *percentilePrediction) Run(stopCh <-chan struct{}) { go func() { for { qc := <-p.WithCh - if !p.a.Add(qc) { + // update if the query config updated, idempotent + p.a.Add(qc) + + QueryExpr := qc.MetricNamer.BuildUniqueKey() + + if _, ok := p.queryRoutines.Load(QueryExpr); ok { continue } - QueryExpr := qc.MetricNamer.BuildUniqueKey() if _, ok := p.stopChMap.Load(QueryExpr); ok { continue } @@ -313,6 +320,7 @@ func (p *percentilePrediction) Run(stopCh <-chan struct{}) { // this is our default policy, one metric only has one config at a time. go func(namer metricnaming.MetricNamer) { queryExpr := namer.BuildUniqueKey() + p.queryRoutines.Store(queryExpr, struct{}{}) if c := p.a.GetConfig(queryExpr); c != nil { ticker := time.NewTicker(c.sampleInterval) defer ticker.Stop() @@ -324,6 +332,7 @@ func (p *percentilePrediction) Run(stopCh <-chan struct{}) { p.addSamples(namer) select { case <-predStopCh: + p.queryRoutines.Delete(queryExpr) klog.V(4).InfoS("Prediction routine stopped.", "queryExpr", queryExpr) return case <-ticker.C: diff --git a/pkg/providers/metricserver/rest_metric_client.go b/pkg/providers/metricserver/rest_metric_client.go index acc8af287..65379437a 100644 --- a/pkg/providers/metricserver/rest_metric_client.go +++ b/pkg/providers/metricserver/rest_metric_client.go @@ -132,7 +132,8 @@ func (c *resourceMetricsClient) workloadMetric(metric *metricquery.Metric) (Reso if workload.Selector != nil { selector = workload.Selector.String() } - metrics, err := c.client.PodMetricses(workload.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector}) + // use resourceVersion=0 to avoid traffic for apiserver to etcd + metrics, err := c.client.PodMetricses(workload.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector, ResourceVersion: "0"}) if err != nil { return nil, time.Time{}, fmt.Errorf("unable to fetch metrics from resource metrics API: %v", err) } @@ -141,8 +142,7 @@ func (c *resourceMetricsClient) workloadMetric(metric *metricquery.Metric) (Reso return nil, time.Time{}, fmt.Errorf("no metrics returned from resource metrics API") } - res := getWorkloadMetrics(v1.ResourceName(metric.MetricName), metrics.Items) - timestamp := metrics.Items[0].Timestamp.Time + res, timestamp := getWorkloadMetrics(v1.ResourceName(metric.MetricName), metrics.Items) return res, timestamp, nil } @@ -153,16 +153,23 @@ func (c *resourceMetricsClient) containerMetric(metric *metricquery.Metric) (Res return nil, time.Time{}, fmt.Errorf("metric ContainerNamerInfo is null") } - podMetrics, err := c.client.PodMetricses(container.Namespace).Get(context.TODO(), container.PodName, metav1.GetOptions{}) + selector := "" + if container.Selector != nil { + selector = container.Selector.String() + } + // now if we use workloadName info only, then we should first fetch workload pods by kube client, then use PodMetricses to get pods metrics + // each metric model's addSample will trigger this two listing. + // so we give the workload label selector directly to get pod metricses, use resourceVersion=0 to avoid traffic for apiserver to etcd + podMetrics, err := c.client.PodMetricses(container.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector, ResourceVersion: "0"}) if err != nil { return nil, time.Time{}, fmt.Errorf("unable to fetch metrics from resource metrics API: %v", err) } - res := getContainerMetrics(v1.ResourceName(metric.MetricName), podMetrics, container.ContainerName) - if err != nil { - return nil, time.Time{}, err + if len(podMetrics.Items) == 0 { + return nil, time.Time{}, fmt.Errorf("no metrics returned from resource metrics API") } - timestamp := podMetrics.Timestamp.Time + + res, timestamp := getContainerMetrics(v1.ResourceName(metric.MetricName), podMetrics.Items, container.ContainerName) return res, timestamp, nil } @@ -172,13 +179,12 @@ func (c *resourceMetricsClient) podMetric(metric *metricquery.Metric) (ResourceM return nil, time.Time{}, fmt.Errorf("metric PodNamerInfo is null") } - podMetrics, err := c.client.PodMetricses(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) + podMetrics, err := c.client.PodMetricses(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{ResourceVersion: "0"}) if err != nil { return nil, time.Time{}, fmt.Errorf("unable to fetch metrics from resource metrics API: %v", err) } - res := getPodMetrics(v1.ResourceName(metric.MetricName), podMetrics) - timestamp := podMetrics.Timestamp.Time + res, timestamp := getPodMetrics(v1.ResourceName(metric.MetricName), podMetrics) return res, timestamp, nil } @@ -193,40 +199,39 @@ func (c *resourceMetricsClient) nodeMetric(metric *metricquery.Metric) (Resource return nil, time.Time{}, fmt.Errorf("unable to fetch metrics from resource metrics API: %v", err) } - res := getNodeMetrics(v1.ResourceName(metric.MetricName), metrics) - if err != nil { - return nil, time.Time{}, err - } - timestamp := metrics.Timestamp.Time + res, timestamp := getNodeMetrics(v1.ResourceName(metric.MetricName), metrics) return res, timestamp, nil } -func getContainerMetrics(resource v1.ResourceName, podMetric *metricsapi.PodMetrics, container string) ResourceMetricInfo { +func getContainerMetrics(resource v1.ResourceName, podMetricsList []metricsapi.PodMetrics, container string) (ResourceMetricInfo, time.Time) { res := make(ResourceMetricInfo, 0) var total int64 var timestamp metav1.Time var window metav1.Duration - timestamp = podMetric.Timestamp - window = podMetric.Window - for _, containerMetric := range podMetric.Containers { - if containerMetric.Name != container { - continue - } - if usage, ok := containerMetric.Usage[resource]; ok { - total += usage.MilliValue() + + for _, podMetric := range podMetricsList { + timestamp = podMetric.Timestamp + window = podMetric.Window + for _, containerMetric := range podMetric.Containers { + if containerMetric.Name != container { + continue + } + if usage, ok := containerMetric.Usage[resource]; ok { + total += usage.MilliValue() + } + // now workload has no labels for promql + res = append(res, ResourceMetric{ + Timestamp: timestamp.Time, + Window: window.Duration, + Value: float64(total) / 1000., + Labels: []common.Label{}, + }) } } - // now workload has no labels for promql - res = append(res, ResourceMetric{ - Timestamp: timestamp.Time, - Window: window.Duration, - Value: float64(total) / 1000., - Labels: []common.Label{}, - }) - return res + return res, timestamp.Time } -func getPodMetrics(resource v1.ResourceName, podMetric *metricsapi.PodMetrics) ResourceMetricInfo { +func getPodMetrics(resource v1.ResourceName, podMetric *metricsapi.PodMetrics) (ResourceMetricInfo, time.Time) { res := make(ResourceMetricInfo, 0) var total int64 var timestamp metav1.Time @@ -245,10 +250,10 @@ func getPodMetrics(resource v1.ResourceName, podMetric *metricsapi.PodMetrics) R Value: float64(total) / 1000., Labels: []common.Label{}, }) - return res + return res, timestamp.Time } -func getWorkloadMetrics(resource v1.ResourceName, podMetrics []metricsapi.PodMetrics) ResourceMetricInfo { +func getWorkloadMetrics(resource v1.ResourceName, podMetrics []metricsapi.PodMetrics) (ResourceMetricInfo, time.Time) { res := make(ResourceMetricInfo, 0) var total int64 var timestamp metav1.Time @@ -269,10 +274,10 @@ func getWorkloadMetrics(resource v1.ResourceName, podMetrics []metricsapi.PodMet Value: float64(total) / 1000., Labels: []common.Label{}, }) - return res + return res, timestamp.Time } -func getNodeMetrics(resource v1.ResourceName, nodeMetric *metricsapi.NodeMetrics) ResourceMetricInfo { +func getNodeMetrics(resource v1.ResourceName, nodeMetric *metricsapi.NodeMetrics) (ResourceMetricInfo, time.Time) { res := make(ResourceMetricInfo, 0) var total int64 var timestamp metav1.Time @@ -290,7 +295,7 @@ func getNodeMetrics(resource v1.ResourceName, nodeMetric *metricsapi.NodeMetrics Value: float64(total) / 1000., Labels: []common.Label{}, }) - return res + return res, timestamp.Time } func (cm *customMetricsClient) GetObjectMetric(metric *metricquery.Metric) (*customapi.MetricValue, time.Time, error) { diff --git a/pkg/querybuilder-providers/metricserver/builder_test.go b/pkg/querybuilder-providers/metricserver/builder_test.go index 56073e5d8..45211f601 100644 --- a/pkg/querybuilder-providers/metricserver/builder_test.go +++ b/pkg/querybuilder-providers/metricserver/builder_test.go @@ -90,7 +90,7 @@ func TestBuildQuery(t *testing.T) { Type: metricquery.ContainerMetricType, Container: &metricquery.ContainerNamerInfo{ Namespace: "default", - PodName: "pod-xxx", + WorkloadName: "workload-xxx", ContainerName: "container", }, }, @@ -100,7 +100,7 @@ func TestBuildQuery(t *testing.T) { Type: metricquery.ContainerMetricType, Container: &metricquery.ContainerNamerInfo{ Namespace: "default", - PodName: "pod-xxx", + WorkloadName: "workload-xxx", ContainerName: "container", }, }, @@ -113,7 +113,7 @@ func TestBuildQuery(t *testing.T) { Type: metricquery.ContainerMetricType, Container: &metricquery.ContainerNamerInfo{ Namespace: "default", - PodName: "pod-xxx", + WorkloadName: "workload-xxx", ContainerName: "container", }, }, @@ -123,7 +123,7 @@ func TestBuildQuery(t *testing.T) { Type: metricquery.ContainerMetricType, Container: &metricquery.ContainerNamerInfo{ Namespace: "default", - PodName: "pod-xxx", + WorkloadName: "workload-xxx", ContainerName: "container", }, },