From cca107d97c6f72fdedc5893ae0ae770bbb6c5d28 Mon Sep 17 00:00:00 2001 From: Damien Grisonnet Date: Tue, 9 Aug 2022 18:55:29 +0200 Subject: [PATCH] *: support new MetricsGetter interface Signed-off-by: Damien Grisonnet --- cmd/adapter/adapter.go | 10 +- pkg/resourceprovider/provider.go | 102 ++++++++++-------- pkg/resourceprovider/provider_test.go | 148 ++++++++++++++------------ 3 files changed, 144 insertions(+), 116 deletions(-) diff --git a/cmd/adapter/adapter.go b/cmd/adapter/adapter.go index 0169f40ac..a1e9dbaa8 100644 --- a/cmd/adapter/adapter.go +++ b/cmd/adapter/adapter.go @@ -32,8 +32,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" genericapiserver "k8s.io/apiserver/pkg/server" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" + "k8s.io/client-go/metadata" + "k8s.io/client-go/metadata/metadatainformer" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/transport" @@ -238,15 +238,15 @@ func (cmd *PrometheusAdapter) addResourceMetricsAPI(promClient prom.Client, stop return err } - client, err := kubernetes.NewForConfig(rest) + client, err := metadata.NewForConfig(rest) if err != nil { return err } - podInformerFactory := informers.NewFilteredSharedInformerFactory(client, 0, corev1.NamespaceAll, func(options *metav1.ListOptions) { + podInformerFactory := metadatainformer.NewFilteredSharedInformerFactory(client, 0, corev1.NamespaceAll, func(options *metav1.ListOptions) { options.FieldSelector = "status.phase=Running" }) - podInformer := podInformerFactory.Core().V1().Pods() + podInformer := podInformerFactory.ForResource(corev1.SchemeGroupVersion.WithResource("pods")) informer, err := cmd.Informers() if err != nil { diff --git a/pkg/resourceprovider/provider.go b/pkg/resourceprovider/provider.go index 483377fac..ff2740923 100644 --- a/pkg/resourceprovider/provider.go +++ b/pkg/resourceprovider/provider.go @@ -26,9 +26,9 @@ import ( corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" - apitypes "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" metrics "k8s.io/metrics/pkg/apis/metrics" @@ -123,12 +123,11 @@ type nsQueryResults struct { } // GetPodMetrics implements the api.MetricsProvider interface. -func (p *resourceProvider) GetPodMetrics(pods ...apitypes.NamespacedName) ([]api.TimeInfo, [][]metrics.ContainerMetrics, error) { - resTimes := make([]api.TimeInfo, len(pods)) - resMetrics := make([][]metrics.ContainerMetrics, len(pods)) +func (p *resourceProvider) GetPodMetrics(pods ...*metav1.PartialObjectMetadata) ([]metrics.PodMetrics, error) { + resMetrics := make([]metrics.PodMetrics, 0, len(pods)) if len(pods) == 0 { - return resTimes, resMetrics, nil + return resMetrics, nil } // TODO(directxman12): figure out how well this scales if we go to list 1000+ pods @@ -168,37 +167,40 @@ func (p *resourceProvider) GetPodMetrics(pods ...apitypes.NamespacedName) ([]api // convert the unorganized per-container results into results grouped // together by namespace, pod, and container - for i, pod := range pods { - p.assignForPod(pod, resultsByNs, &resMetrics[i], &resTimes[i]) + for _, pod := range pods { + podMetric := p.assignForPod(pod, resultsByNs) + if podMetric != nil { + resMetrics = append(resMetrics, *podMetric) + } } - return resTimes, resMetrics, nil + return resMetrics, nil } // assignForPod takes the resource metrics for all containers in the given pod // from resultsByNs, and places them in MetricsProvider response format in resMetrics, // also recording the earliest time in resTime. It will return without operating if // any data is missing. -func (p *resourceProvider) assignForPod(pod apitypes.NamespacedName, resultsByNs map[string]nsQueryResults, resMetrics *[]metrics.ContainerMetrics, resTime *api.TimeInfo) { +func (p *resourceProvider) assignForPod(pod *metav1.PartialObjectMetadata, resultsByNs map[string]nsQueryResults) *metrics.PodMetrics { // check to make sure everything is present nsRes, nsResPresent := resultsByNs[pod.Namespace] if !nsResPresent { klog.Errorf("unable to fetch metrics for pods in namespace %q, skipping pod %s", pod.Namespace, pod.String()) - return + return nil } cpuRes, hasResult := nsRes.cpu[pod.Name] if !hasResult { klog.Errorf("unable to fetch CPU metrics for pod %s, skipping", pod.String()) - return + return nil } memRes, hasResult := nsRes.mem[pod.Name] if !hasResult { klog.Errorf("unable to fetch memory metrics for pod %s, skipping", pod.String()) - return + return nil } - earliestTs := pmodel.Latest containerMetrics := make(map[string]metrics.ContainerMetrics) + earliestTs := pmodel.Latest // organize all the CPU results for _, cpu := range cpuRes { @@ -241,40 +243,50 @@ func (p *resourceProvider) assignForPod(pod apitypes.NamespacedName, resultsByNs } } - // store the time in the final format - *resTime = api.TimeInfo{ - Timestamp: earliestTs.Time(), - Window: p.window, + podMetric := &metrics.PodMetrics{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + Labels: pod.Labels, + CreationTimestamp: metav1.Now(), + }, + // store the time in the final format + Timestamp: metav1.NewTime(earliestTs.Time()), + Window: metav1.Duration{Duration: p.window}, } // store the container metrics in the final format - containerMetricsList := make([]metrics.ContainerMetrics, 0, len(containerMetrics)) + podMetric.Containers = make([]metrics.ContainerMetrics, 0, len(containerMetrics)) for _, containerMetric := range containerMetrics { - containerMetricsList = append(containerMetricsList, containerMetric) + podMetric.Containers = append(podMetric.Containers, containerMetric) } - *resMetrics = containerMetricsList + + return podMetric } // GetNodeMetrics implements the api.MetricsProvider interface. -func (p *resourceProvider) GetNodeMetrics(nodes ...string) ([]api.TimeInfo, []corev1.ResourceList, error) { - resTimes := make([]api.TimeInfo, len(nodes)) - resMetrics := make([]corev1.ResourceList, len(nodes)) +func (p *resourceProvider) GetNodeMetrics(nodes ...*corev1.Node) ([]metrics.NodeMetrics, error) { + resMetrics := make([]metrics.NodeMetrics, 0, len(nodes)) if len(nodes) == 0 { - return resTimes, resMetrics, nil + return resMetrics, nil } now := pmodel.Now() + nodeNames := make([]string, 0, len(nodes)) + for _, node := range nodes { + nodeNames = append(nodeNames, node.Name) + } // run the actual query - qRes := p.queryBoth(now, nodeResource, "", nodes...) + qRes := p.queryBoth(now, nodeResource, "", nodeNames...) if qRes.err != nil { klog.Errorf("failed querying node metrics: %v", qRes.err) - return resTimes, resMetrics, nil + return resMetrics, nil } // organize the results - for i, nodeName := range nodes { + for i, nodeName := range nodeNames { // skip if any data is missing rawCPUs, gotResult := qRes.cpu[nodeName] if !gotResult { @@ -290,28 +302,30 @@ func (p *resourceProvider) GetNodeMetrics(nodes ...string) ([]api.TimeInfo, []co rawMem := rawMems[0] rawCPU := rawCPUs[0] - // store the results - resMetrics[i] = corev1.ResourceList{ - corev1.ResourceCPU: *resource.NewMilliQuantity(int64(rawCPU.Value*1000.0), resource.DecimalSI), - corev1.ResourceMemory: *resource.NewMilliQuantity(int64(rawMem.Value*1000.0), resource.BinarySI), - } - // use the earliest timestamp available (in order to be conservative // when determining if metrics are tainted by startup) - if rawMem.Timestamp.Before(rawCPU.Timestamp) { - resTimes[i] = api.TimeInfo{ - Timestamp: rawMem.Timestamp.Time(), - Window: p.window, - } - } else { - resTimes[i] = api.TimeInfo{ - Timestamp: rawCPU.Timestamp.Time(), - Window: 1 * time.Minute, - } + ts := rawCPU.Timestamp.Time() + if ts.After(rawMem.Timestamp.Time()) { + ts = rawMem.Timestamp.Time() } + + // store the results + resMetrics = append(resMetrics, metrics.NodeMetrics{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodes[i].Name, + Labels: nodes[i].Labels, + CreationTimestamp: metav1.Now(), + }, + Usage: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewMilliQuantity(int64(rawCPU.Value*1000.0), resource.DecimalSI), + corev1.ResourceMemory: *resource.NewMilliQuantity(int64(rawMem.Value*1000.0), resource.BinarySI), + }, + Timestamp: metav1.NewTime(ts), + Window: metav1.Duration{Duration: p.window}, + }) } - return resTimes, resMetrics, nil + return resMetrics, nil } // queryBoth queries for both CPU and memory metrics on the given diff --git a/pkg/resourceprovider/provider_test.go b/pkg/resourceprovider/provider_test.go index f3fe8d7b4..3b9b688be 100644 --- a/pkg/resourceprovider/provider_test.go +++ b/pkg/resourceprovider/provider_test.go @@ -23,9 +23,9 @@ import ( corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "k8s.io/metrics/pkg/apis/metrics" "sigs.k8s.io/metrics-server/pkg/api" @@ -122,10 +122,10 @@ var _ = Describe("Resource Metrics Provider", func() { }) It("should be able to list metrics pods across different namespaces", func() { - pods := []types.NamespacedName{ - {Namespace: "some-ns", Name: "pod1"}, - {Namespace: "some-ns", Name: "pod3"}, - {Namespace: "other-ns", Name: "pod27"}, + pods := []*metav1.PartialObjectMetadata{ + {ObjectMeta: metav1.ObjectMeta{Namespace: "some-ns", Name: "pod1"}}, + {ObjectMeta: metav1.ObjectMeta{Namespace: "some-ns", Name: "pod3"}}, + {ObjectMeta: metav1.ObjectMeta{Namespace: "other-ns", Name: "pod27"}}, } fakeProm.QueryResults = map[prom.Selector]prom.QueryResult{ mustBuild(cpuQueries.contQuery.Build("", podResource, "some-ns", []string{cpuQueries.containerLabel}, labels.Everything(), "pod1", "pod3")): buildQueryRes("container_cpu_usage_seconds_total", @@ -149,28 +149,34 @@ var _ = Describe("Resource Metrics Provider", func() { } By("querying for metrics for some pods") - times, metricVals, err := prov.GetPodMetrics(pods...) + podMetrics, err := prov.GetPodMetrics(pods...) Expect(err).NotTo(HaveOccurred()) + By("verifying that metrics have been fetched for all the pods") + Expect(podMetrics).To(HaveLen(3)) + By("verifying that the reported times for each are the earliest times for each pod") - Expect(times).To(Equal([]api.TimeInfo{ - {Timestamp: pmodel.Time(10).Time(), Window: 1 * time.Minute}, - {Timestamp: pmodel.Time(10).Time(), Window: 1 * time.Minute}, - {Timestamp: pmodel.Time(270).Time(), Window: 1 * time.Minute}, - })) + Expect(podMetrics[0].Timestamp.Time).To(Equal(pmodel.Time(10).Time())) + Expect(podMetrics[0].Window.Duration).To(Equal(time.Minute)) + + Expect(podMetrics[1].Timestamp.Time).To(Equal(pmodel.Time(10).Time())) + Expect(podMetrics[1].Window.Duration).To(Equal(time.Minute)) + + Expect(podMetrics[2].Timestamp.Time).To(Equal(pmodel.Time(270).Time())) + Expect(podMetrics[2].Window.Duration).To(Equal(time.Minute)) By("verifying that the right metrics were fetched") - Expect(metricVals).To(HaveLen(3)) - Expect(metricVals[0]).To(ConsistOf( + Expect(podMetrics).To(HaveLen(3)) + Expect(podMetrics[0].Containers).To(ConsistOf( metrics.ContainerMetrics{Name: "cont1", Usage: buildResList(1100.0, 3100.0)}, metrics.ContainerMetrics{Name: "cont2", Usage: buildResList(1110.0, 3110.0)}, )) - Expect(metricVals[1]).To(ConsistOf( + Expect(podMetrics[1].Containers).To(ConsistOf( metrics.ContainerMetrics{Name: "cont1", Usage: buildResList(1300.0, 3300.0)}, metrics.ContainerMetrics{Name: "cont2", Usage: buildResList(1310.0, 3310.0)}, )) - Expect(metricVals[2]).To(ConsistOf( + Expect(podMetrics[2].Containers).To(ConsistOf( metrics.ContainerMetrics{Name: "cont1", Usage: buildResList(2200.0, 4200.0)}, )) }) @@ -188,23 +194,22 @@ var _ = Describe("Resource Metrics Provider", func() { } By("querying for metrics for some pods, one of which is missing") - times, metricVals, err := prov.GetPodMetrics( - types.NamespacedName{Namespace: "some-ns", Name: "pod1"}, - types.NamespacedName{Namespace: "some-ns", Name: "pod-nonexistant"}, + podMetrics, err := prov.GetPodMetrics( + &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: "some-ns", Name: "pod1"}}, + &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: "some-ns", Name: "pod-nonexistant"}}, ) Expect(err).NotTo(HaveOccurred()) - By("verifying that the missing pod had nil metrics") - Expect(metricVals).To(HaveLen(2)) - Expect(metricVals[1]).To(BeNil()) + By("verifying that the missing pod had no metrics") + Expect(podMetrics).To(HaveLen(1)) By("verifying that the rest of time metrics and times are correct") - Expect(metricVals[0]).To(ConsistOf( + Expect(podMetrics[0].Timestamp.Time).To(Equal(pmodel.Time(10).Time())) + Expect(podMetrics[0].Window.Duration).To(Equal(time.Minute)) + Expect(podMetrics[0].Containers).To(ConsistOf( metrics.ContainerMetrics{Name: "cont1", Usage: buildResList(1100.0, 3100.0)}, metrics.ContainerMetrics{Name: "cont2", Usage: buildResList(1110.0, 3110.0)}, )) - Expect(times).To(HaveLen(2)) - Expect(times[0]).To(Equal(api.TimeInfo{Timestamp: pmodel.Time(10).Time(), Window: 1 * time.Minute})) }) It("should return metrics of value zero when pod metrics have NaN or negative values", func() { @@ -224,25 +229,27 @@ var _ = Describe("Resource Metrics Provider", func() { } By("querying for metrics for some pods") - times, metricVals, err := prov.GetPodMetrics( - types.NamespacedName{Namespace: "some-ns", Name: "pod1"}, - types.NamespacedName{Namespace: "some-ns", Name: "pod3"}, + podMetrics, err := prov.GetPodMetrics( + &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: "some-ns", Name: "pod1"}}, + &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: "some-ns", Name: "pod3"}}, ) Expect(err).NotTo(HaveOccurred()) + By("verifying that metrics have been fetched for all the pods") + Expect(podMetrics).To(HaveLen(2)) + By("verifying that the reported times for each are the earliest times for each pod") - Expect(times).To(Equal([]api.TimeInfo{ - {Timestamp: pmodel.Time(10).Time(), Window: 1 * time.Minute}, - {Timestamp: pmodel.Time(10).Time(), Window: 1 * time.Minute}, - })) + Expect(podMetrics[0].Timestamp.Time).To(Equal(pmodel.Time(10).Time())) + Expect(podMetrics[0].Window.Duration).To(Equal(time.Minute)) + Expect(podMetrics[1].Timestamp.Time).To(Equal(pmodel.Time(10).Time())) + Expect(podMetrics[1].Window.Duration).To(Equal(time.Minute)) By("verifying that NaN and negative values were replaced by zero") - Expect(metricVals).To(HaveLen(2)) - Expect(metricVals[0]).To(ConsistOf( + Expect(podMetrics[0].Containers).To(ConsistOf( metrics.ContainerMetrics{Name: "cont1", Usage: buildResList(0, 3100.0)}, metrics.ContainerMetrics{Name: "cont2", Usage: buildResList(0, 0)}, )) - Expect(metricVals[1]).To(ConsistOf( + Expect(podMetrics[1].Containers).To(ConsistOf( metrics.ContainerMetrics{Name: "cont1", Usage: buildResList(0, 0)}, metrics.ContainerMetrics{Name: "cont2", Usage: buildResList(1310.0, 0)}, )) @@ -260,20 +267,24 @@ var _ = Describe("Resource Metrics Provider", func() { ), } By("querying for metrics for some nodes") - times, metricVals, err := prov.GetNodeMetrics("node1", "node2") + nodeMetrics, err := prov.GetNodeMetrics( + &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, + &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node2"}}, + ) Expect(err).NotTo(HaveOccurred()) - By("verifying that the reported times for each are the earliest times for each pod") - Expect(times).To(Equal([]api.TimeInfo{ - {Timestamp: pmodel.Time(10).Time(), Window: 1 * time.Minute}, - {Timestamp: pmodel.Time(12).Time(), Window: 1 * time.Minute}, - })) + By("verifying that metrics have been fetched for all the nodes") + Expect(nodeMetrics).To(HaveLen(2)) + + By("verifying that the reported times for each are the earliest times for each node") + Expect(nodeMetrics[0].Timestamp.Time).To(Equal(pmodel.Time(10).Time())) + Expect(nodeMetrics[0].Window.Duration).To(Equal(time.Minute)) + Expect(nodeMetrics[1].Timestamp.Time).To(Equal(pmodel.Time(12).Time())) + Expect(nodeMetrics[1].Window.Duration).To(Equal(time.Minute)) By("verifying that the right metrics were fetched") - Expect(metricVals).To(Equal([]corev1.ResourceList{ - buildResList(1100.0, 2100.0), - buildResList(1200.0, 2200.0), - })) + Expect(nodeMetrics[0].Usage).To(Equal(buildResList(1100.0, 2100.0))) + Expect(nodeMetrics[1].Usage).To(Equal(buildResList(1200.0, 2200.0))) }) It("should return nil metrics for missing nodes, but still return partial results", func() { @@ -288,24 +299,23 @@ var _ = Describe("Resource Metrics Provider", func() { ), } By("querying for metrics for some nodes, one of which is missing") - times, metricVals, err := prov.GetNodeMetrics("node1", "node2", "node3") + nodeMetrics, err := prov.GetNodeMetrics( + &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, + &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node2"}}, + &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node3"}}, + ) Expect(err).NotTo(HaveOccurred()) - By("verifying that the missing pod had nil metrics") - Expect(metricVals).To(HaveLen(3)) - Expect(metricVals[2]).To(BeNil()) + By("verifying that the missing pod had no metrics") + Expect(nodeMetrics).To(HaveLen(2)) By("verifying that the rest of time metrics and times are correct") - Expect(metricVals).To(Equal([]corev1.ResourceList{ - buildResList(1100.0, 2100.0), - buildResList(1200.0, 2200.0), - nil, - })) - Expect(times).To(Equal([]api.TimeInfo{ - {Timestamp: pmodel.Time(10).Time(), Window: 1 * time.Minute}, - {Timestamp: pmodel.Time(12).Time(), Window: 1 * time.Minute}, - {}, - })) + Expect(nodeMetrics[0].Usage).To(Equal(buildResList(1100.0, 2100.0))) + Expect(nodeMetrics[0].Timestamp.Time).To(Equal(pmodel.Time(10).Time())) + Expect(nodeMetrics[0].Window.Duration).To(Equal(time.Minute)) + Expect(nodeMetrics[1].Usage).To(Equal(buildResList(1200.0, 2200.0))) + Expect(nodeMetrics[1].Timestamp.Time).To(Equal(pmodel.Time(12).Time())) + Expect(nodeMetrics[1].Window.Duration).To(Equal(time.Minute)) }) It("should return metrics of value zero when node metrics have NaN or negative values", func() { @@ -320,19 +330,23 @@ var _ = Describe("Resource Metrics Provider", func() { ), } By("querying for metrics for some nodes") - times, metricVals, err := prov.GetNodeMetrics("node1", "node2") + nodeMetrics, err := prov.GetNodeMetrics( + &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, + &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node2"}}, + ) Expect(err).NotTo(HaveOccurred()) + By("verifying that metrics have been fetched for all the nodes") + Expect(nodeMetrics).To(HaveLen(2)) + By("verifying that the reported times for each are the earliest times for each pod") - Expect(times).To(Equal([]api.TimeInfo{ - {Timestamp: pmodel.Time(10).Time(), Window: 1 * time.Minute}, - {Timestamp: pmodel.Time(12).Time(), Window: 1 * time.Minute}, - })) + Expect(nodeMetrics[0].Timestamp.Time).To(Equal(pmodel.Time(10).Time())) + Expect(nodeMetrics[0].Window.Duration).To(Equal(time.Minute)) + Expect(nodeMetrics[1].Timestamp.Time).To(Equal(pmodel.Time(12).Time())) + Expect(nodeMetrics[1].Window.Duration).To(Equal(time.Minute)) By("verifying that NaN and negative values were replaced by zero") - Expect(metricVals).To(Equal([]corev1.ResourceList{ - buildResList(0, 2100.0), - buildResList(1200.0, 0), - })) + Expect(nodeMetrics[0].Usage).To(Equal(buildResList(0, 2100.0))) + Expect(nodeMetrics[1].Usage).To(Equal(buildResList(1200.0, 0))) }) })