Skip to content

Commit

Permalink
*: support new MetricsGetter interface
Browse files Browse the repository at this point in the history
Signed-off-by: Damien Grisonnet <dgrisonn@redhat.com>
  • Loading branch information
dgrisonnet committed Aug 11, 2022
1 parent 9321bf0 commit cca107d
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 116 deletions.
10 changes: 5 additions & 5 deletions cmd/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/transport"
Expand Down Expand Up @@ -238,15 +238,15 @@ func (cmd *PrometheusAdapter) addResourceMetricsAPI(promClient prom.Client, stop
return err
}

client, err := kubernetes.NewForConfig(rest)
client, err := metadata.NewForConfig(rest)
if err != nil {
return err
}

podInformerFactory := informers.NewFilteredSharedInformerFactory(client, 0, corev1.NamespaceAll, func(options *metav1.ListOptions) {
podInformerFactory := metadatainformer.NewFilteredSharedInformerFactory(client, 0, corev1.NamespaceAll, func(options *metav1.ListOptions) {
options.FieldSelector = "status.phase=Running"
})
podInformer := podInformerFactory.Core().V1().Pods()
podInformer := podInformerFactory.ForResource(corev1.SchemeGroupVersion.WithResource("pods"))

informer, err := cmd.Informers()
if err != nil {
Expand Down
102 changes: 58 additions & 44 deletions pkg/resourceprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
metrics "k8s.io/metrics/pkg/apis/metrics"

Expand Down Expand Up @@ -123,12 +123,11 @@ type nsQueryResults struct {
}

// GetPodMetrics implements the api.MetricsProvider interface.
func (p *resourceProvider) GetPodMetrics(pods ...apitypes.NamespacedName) ([]api.TimeInfo, [][]metrics.ContainerMetrics, error) {
resTimes := make([]api.TimeInfo, len(pods))
resMetrics := make([][]metrics.ContainerMetrics, len(pods))
func (p *resourceProvider) GetPodMetrics(pods ...*metav1.PartialObjectMetadata) ([]metrics.PodMetrics, error) {
resMetrics := make([]metrics.PodMetrics, 0, len(pods))

if len(pods) == 0 {
return resTimes, resMetrics, nil
return resMetrics, nil
}

// TODO(directxman12): figure out how well this scales if we go to list 1000+ pods
Expand Down Expand Up @@ -168,37 +167,40 @@ func (p *resourceProvider) GetPodMetrics(pods ...apitypes.NamespacedName) ([]api

// convert the unorganized per-container results into results grouped
// together by namespace, pod, and container
for i, pod := range pods {
p.assignForPod(pod, resultsByNs, &resMetrics[i], &resTimes[i])
for _, pod := range pods {
podMetric := p.assignForPod(pod, resultsByNs)
if podMetric != nil {
resMetrics = append(resMetrics, *podMetric)
}
}

return resTimes, resMetrics, nil
return resMetrics, nil
}

// assignForPod takes the resource metrics for all containers in the given pod
// from resultsByNs, and places them in MetricsProvider response format in resMetrics,
// also recording the earliest time in resTime. It will return without operating if
// any data is missing.
func (p *resourceProvider) assignForPod(pod apitypes.NamespacedName, resultsByNs map[string]nsQueryResults, resMetrics *[]metrics.ContainerMetrics, resTime *api.TimeInfo) {
func (p *resourceProvider) assignForPod(pod *metav1.PartialObjectMetadata, resultsByNs map[string]nsQueryResults) *metrics.PodMetrics {
// check to make sure everything is present
nsRes, nsResPresent := resultsByNs[pod.Namespace]
if !nsResPresent {
klog.Errorf("unable to fetch metrics for pods in namespace %q, skipping pod %s", pod.Namespace, pod.String())
return
return nil
}
cpuRes, hasResult := nsRes.cpu[pod.Name]
if !hasResult {
klog.Errorf("unable to fetch CPU metrics for pod %s, skipping", pod.String())
return
return nil
}
memRes, hasResult := nsRes.mem[pod.Name]
if !hasResult {
klog.Errorf("unable to fetch memory metrics for pod %s, skipping", pod.String())
return
return nil
}

earliestTs := pmodel.Latest
containerMetrics := make(map[string]metrics.ContainerMetrics)
earliestTs := pmodel.Latest

// organize all the CPU results
for _, cpu := range cpuRes {
Expand Down Expand Up @@ -241,40 +243,50 @@ func (p *resourceProvider) assignForPod(pod apitypes.NamespacedName, resultsByNs
}
}

// store the time in the final format
*resTime = api.TimeInfo{
Timestamp: earliestTs.Time(),
Window: p.window,
podMetric := &metrics.PodMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
Labels: pod.Labels,
CreationTimestamp: metav1.Now(),
},
// store the time in the final format
Timestamp: metav1.NewTime(earliestTs.Time()),
Window: metav1.Duration{Duration: p.window},
}

// store the container metrics in the final format
containerMetricsList := make([]metrics.ContainerMetrics, 0, len(containerMetrics))
podMetric.Containers = make([]metrics.ContainerMetrics, 0, len(containerMetrics))
for _, containerMetric := range containerMetrics {
containerMetricsList = append(containerMetricsList, containerMetric)
podMetric.Containers = append(podMetric.Containers, containerMetric)
}
*resMetrics = containerMetricsList

return podMetric
}

// GetNodeMetrics implements the api.MetricsProvider interface.
func (p *resourceProvider) GetNodeMetrics(nodes ...string) ([]api.TimeInfo, []corev1.ResourceList, error) {
resTimes := make([]api.TimeInfo, len(nodes))
resMetrics := make([]corev1.ResourceList, len(nodes))
func (p *resourceProvider) GetNodeMetrics(nodes ...*corev1.Node) ([]metrics.NodeMetrics, error) {
resMetrics := make([]metrics.NodeMetrics, 0, len(nodes))

if len(nodes) == 0 {
return resTimes, resMetrics, nil
return resMetrics, nil
}

now := pmodel.Now()
nodeNames := make([]string, 0, len(nodes))
for _, node := range nodes {
nodeNames = append(nodeNames, node.Name)
}

// run the actual query
qRes := p.queryBoth(now, nodeResource, "", nodes...)
qRes := p.queryBoth(now, nodeResource, "", nodeNames...)
if qRes.err != nil {
klog.Errorf("failed querying node metrics: %v", qRes.err)
return resTimes, resMetrics, nil
return resMetrics, nil
}

// organize the results
for i, nodeName := range nodes {
for i, nodeName := range nodeNames {
// skip if any data is missing
rawCPUs, gotResult := qRes.cpu[nodeName]
if !gotResult {
Expand All @@ -290,28 +302,30 @@ func (p *resourceProvider) GetNodeMetrics(nodes ...string) ([]api.TimeInfo, []co
rawMem := rawMems[0]
rawCPU := rawCPUs[0]

// store the results
resMetrics[i] = corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(int64(rawCPU.Value*1000.0), resource.DecimalSI),
corev1.ResourceMemory: *resource.NewMilliQuantity(int64(rawMem.Value*1000.0), resource.BinarySI),
}

// use the earliest timestamp available (in order to be conservative
// when determining if metrics are tainted by startup)
if rawMem.Timestamp.Before(rawCPU.Timestamp) {
resTimes[i] = api.TimeInfo{
Timestamp: rawMem.Timestamp.Time(),
Window: p.window,
}
} else {
resTimes[i] = api.TimeInfo{
Timestamp: rawCPU.Timestamp.Time(),
Window: 1 * time.Minute,
}
ts := rawCPU.Timestamp.Time()
if ts.After(rawMem.Timestamp.Time()) {
ts = rawMem.Timestamp.Time()
}

// store the results
resMetrics = append(resMetrics, metrics.NodeMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: nodes[i].Name,
Labels: nodes[i].Labels,
CreationTimestamp: metav1.Now(),
},
Usage: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(int64(rawCPU.Value*1000.0), resource.DecimalSI),
corev1.ResourceMemory: *resource.NewMilliQuantity(int64(rawMem.Value*1000.0), resource.BinarySI),
},
Timestamp: metav1.NewTime(ts),
Window: metav1.Duration{Duration: p.window},
})
}

return resTimes, resMetrics, nil
return resMetrics, nil
}

// queryBoth queries for both CPU and memory metrics on the given
Expand Down
Loading

0 comments on commit cca107d

Please sign in to comment.