Skip to content

Commit

Permalink
Add Apiserver apiserver client to add and remove containers (sustaina…
Browse files Browse the repository at this point in the history
…ble-computing-io#635)

* manifests: add access permission to list pods in apiserver

Signed-off-by: Marcelo Amaral <marcelo.amaral1@ibm.com>

* cmd: add option to configure apiserver client with kubeconfig file

Signed-off-by: Marcelo Amaral <marcelo.amaral1@ibm.com>

* collector: add apiserver client to add and remove containers

Signed-off-by: Marcelo Amaral <marcelo.amaral1@ibm.com>

* vendor: add apiserver libs dependencies

Signed-off-by: Marcelo Amaral <marcelo.amaral1@ibm.com>

---------

Signed-off-by: Marcelo Amaral <marcelo.amaral1@ibm.com>
  • Loading branch information
Marcelo Carneiro do Amaral authored Jun 17, 2023
1 parent 4f69b9b commit bc981ed
Show file tree
Hide file tree
Showing 56 changed files with 8,283 additions and 6 deletions.
Binary file modified .DS_Store
Binary file not shown.
4 changes: 4 additions & 0 deletions cmd/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ var (
profileDuration = flag.Int("profile-duration", 60, "duration in seconds")
enabledMSR = flag.Bool("enable-msr", false, "whether MSR is allowed to obtain energy data")
enabledBPFBatchDelete = flag.Bool("enable-bpf-batch-del", true, "bpf map batch deletion can be enabled for backported kernels older than 5.6")
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file, if empty we use the in-cluster configuration")
apiserverEnabled = flag.Bool("apiserver", true, "if apiserver is disabled, we collect pod information from kubelet")
kernelSourceDirPath = flag.String("kernel-source-dir", "", "path to the kernel source directory")
)

Expand Down Expand Up @@ -152,6 +154,8 @@ func main() {
config.SetEnabledHardwareCounterMetrics(*exposeHardwareCounterMetrics)
config.SetEnabledGPU(*enableGPU)
config.EnabledMSR = *enabledMSR
config.SetKubeConfig(*kubeconfig)
config.SetEnableAPIServer(*apiserverEnabled)
if err := config.SetKernelSourceDir(*kernelSourceDirPath); err != nil {
klog.Warningf("failed to set kernel source dir to %q: %v", *kernelSourceDirPath, err)
}
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6 h1:A3dhViTeFDSQcGOXuUi6ukCQSMyDtDISBp2z6OOo2YM=
Expand Down
4 changes: 4 additions & 0 deletions manifests/config/exporter/exporter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumes:
- name: lib-modules
hostPath:
Expand Down
5 changes: 3 additions & 2 deletions manifests/config/rbac/k8s_clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ rules:
- apiGroups: [""]
resources:
- nodes/metrics # access /metrics/resource
- nodes/proxy # access /pods
- nodes/stats # access /pods
- nodes/proxy
- nodes/stats
- pods
verbs:
- 'get'
- 'watch'
Expand Down
6 changes: 6 additions & 0 deletions pkg/cgroup/cgroup_stats_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func (c CCgroupV1StatManager) SetCGroupStat(containerID string, cgroupStatMap ma
if err != nil {
return err
}
if stat.Memory == nil {
return fmt.Errorf("cgroup metrics does not exist, the cgroup might be deleted")
}
// cgroup v1 memory
if stat.Memory != nil {
cgroupStatMap[config.CgroupfsMemory].SetAggrStat(containerID, stat.Memory.Usage.Usage)
Expand Down Expand Up @@ -111,6 +114,9 @@ func (c CCgroupV2StatManager) SetCGroupStat(containerID string, cgroupStatMap ma
if err != nil {
return err
}
if stat.Memory == nil {
return fmt.Errorf("cgroup metrics does not exist, the cgroup might be deleted")
}
// memory
if stat.Memory != nil {
cgroupStatMap[config.CgroupfsMemory].SetAggrStat(containerID, stat.Memory.Usage)
Expand Down
4 changes: 2 additions & 2 deletions pkg/collector/metric/container_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (c *ContainerMetrics) SumAllDynAggrValues() uint64 {

func (c *ContainerMetrics) String() string {
return fmt.Sprintf("energy from pod/container (%d active processes): name: %s/%s namespace: %s \n"+
"\tcgrouppid: %d pid: %d comm: %s\n"+
"\tcgrouppid: %d pid: %d comm: %s containerid:%s\n"+
"\tDyn ePkg (mJ): %s (eCore: %s eDram: %s eUncore: %s) eGPU (mJ): %s eOther (mJ): %s \n"+
"\tIdle ePkg (mJ): %s (eCore: %s eDram: %s eUncore: %s) eGPU (mJ): %s eOther (mJ): %s \n"+
"\tCPUTime: %d (%d)\n"+
Expand All @@ -250,7 +250,7 @@ func (c *ContainerMetrics) String() string {
"\tcgroupfs: %v\n"+
"\tkubelets: %v\n",
c.CurrProcesses, c.PodName, c.ContainerName, c.Namespace,
c.CGroupPID, c.PIDS, c.Command,
c.CGroupPID, c.PIDS, c.Command, c.ContainerID,
c.DynEnergyInPkg, c.DynEnergyInCore, c.DynEnergyInDRAM, c.DynEnergyInUncore, c.DynEnergyInGPU, c.DynEnergyInOther,
c.IdleEnergyInPkg, c.IdleEnergyInCore, c.IdleEnergyInDRAM, c.IdleEnergyInUncore, c.IdleEnergyInGPU, c.IdleEnergyInOther,
c.CPUTime.Delta, c.CPUTime.Aggr,
Expand Down
3 changes: 3 additions & 0 deletions pkg/collector/metric/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ func isCounterStatEnabled(label string) bool {
}

func getNodeName() string {
if nodeName := os.Getenv("NODE_NAME"); nodeName != "" {
return nodeName
}
nodeName, err := os.Hostname()
if err != nil {
klog.Fatalf("could not get the node name: %s", err)
Expand Down
22 changes: 20 additions & 2 deletions pkg/collector/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,30 @@ package collector
import (
"github.com/sustainable-computing-io/kepler/pkg/cgroup"
collector_metric "github.com/sustainable-computing-io/kepler/pkg/collector/metric"

"github.com/sustainable-computing-io/kepler/pkg/kubernetes"
)

// this function is only called with the watcher delayed to sync and update the container info or if the watcher is not enabled
func (c *Collector) createContainersMetricsIfNotExist(containerID string, cGroupID, pid uint64, withCGroupID bool) {
if _, ok := c.ContainersMetrics[containerID]; !ok {
info, _ := cgroup.GetContainerInfo(cGroupID, pid, withCGroupID)
c.ContainersMetrics[containerID] = collector_metric.NewContainerMetrics(info.ContainerName, info.PodName, info.Namespace, containerID)
// We feel the info with generic values because the watcher will eventually update it.
podName := c.systemProcessName
containerName := c.systemProcessName
namespace := c.systemProcessNamespace

// In case the pod watcher is not enabled, we need to retrieve the information about the
// pod and container from the kubelet API. However, we prefer to use the watcher approach
// as accessing the kubelet API might be restricted in certain systems.
// Additionally, the code that fetches the information from the kubelet API utilizes cache
// for performance reasons. Therefore, if the kubelet API delay the information of the
// containerID (which occasionally occurs), the container will be wrongly identified for its entire lifetime.
if !kubernetes.IsWatcherEnabled {
info, _ := cgroup.GetContainerInfo(cGroupID, pid, withCGroupID)
c.ContainersMetrics[containerID] = collector_metric.NewContainerMetrics(info.ContainerName, info.PodName, info.Namespace, containerID)
}
c.ContainersMetrics[containerID] = collector_metric.NewContainerMetrics(
podName, containerName, namespace, containerID)
}
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ var (
FixedModelNameKey = "MODEL"
ModelFiltersKey = "FILTERS"
////////////////////////////////////

// KubeConfig is used to start k8s client with the pod running outside the cluster
KubeConfig = ""
EnableAPIServer = false
)

func logBoolConfigs() {
Expand Down Expand Up @@ -222,6 +226,16 @@ func SetEnabledGPU(enabled bool) {
EnabledGPU = enabled || EnabledGPU
}

// SetKubeConfig set kubeconfig file
func SetKubeConfig(k string) {
KubeConfig = k
}

// SetEnableAPIServer enables Kepler to watch apiserver
func SetEnableAPIServer(enabled bool) {
EnableAPIServer = enabled
}

func (c config) getUnixName() (unix.Utsname, error) {
var utsname unix.Utsname
err := unix.Uname(&utsname)
Expand Down
223 changes: 223 additions & 0 deletions pkg/kubernetes/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
Copyright 2023.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubernetes

import (
"fmt"
"regexp"
"sync"
"time"

k8sv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"

collector_metric "github.com/sustainable-computing-io/kepler/pkg/collector/metric"
"github.com/sustainable-computing-io/kepler/pkg/config"
)

const (
informerTimeout = time.Minute
podResourceType = "pods"
)

var (
regexReplaceContainerIDPrefix = regexp.MustCompile(`.*//`)
IsWatcherEnabled = false
managedPods = make(map[string]bool)
)

type ObjListWatcher struct {
// Lock to syncronize the collector update with the watcher
Mx *sync.Mutex

k8sCli *kubernetes.Clientset
ResourceKind string
informer cache.SharedInformer
stopChannel chan struct{}

// ContainersMetrics holds all container energy and resource usage metrics
ContainersMetrics *map[string]*collector_metric.ContainerMetrics
}

func newK8sClient() *kubernetes.Clientset {
var restConf *rest.Config
var err error
if config.KubeConfig == "" {
// creates the in-cluster config
restConf, err = rest.InClusterConfig()
klog.Infoln("Using in cluster k8s config")
} else {
// use the current context in kubeconfig
restConf, err = clientcmd.BuildConfigFromFlags("", config.KubeConfig)
klog.Infoln("Using out cluster k8s config: ", config.KubeConfig)
}
if err != nil {
klog.Infoln("%v", err)
return nil
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(restConf)
if err != nil {
klog.Fatalf("%v", err)
}
return clientset
}

func NewObjListWatcher() *ObjListWatcher {
w := &ObjListWatcher{
stopChannel: make(chan struct{}),
k8sCli: newK8sClient(),
ResourceKind: podResourceType,
}
if w.k8sCli == nil || !config.EnableAPIServer {
return w
}

optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fmt.Sprintf("spec.nodeName=%s", collector_metric.NodeName) // to filter events per node
}
objListWatcher := cache.NewFilteredListWatchFromClient(
w.k8sCli.CoreV1().RESTClient(),
w.ResourceKind,
metav1.NamespaceAll,
optionsModifier,
)

w.informer = cache.NewSharedInformer(objListWatcher, nil, 0)
w.stopChannel = make(chan struct{})
w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
w.handleUpdate(newObj)
},
DeleteFunc: func(obj interface{}) {
w.handleDeleted(obj)
},
})
IsWatcherEnabled = true
return w
}

func (w *ObjListWatcher) Run() {
if !IsWatcherEnabled {
return
}
go w.informer.Run(w.stopChannel)
timeoutCh := make(chan struct{})
timeoutTimer := time.AfterFunc(informerTimeout, func() {
close(timeoutCh)
})
defer timeoutTimer.Stop()
if !cache.WaitForCacheSync(timeoutCh, w.informer.HasSynced) {
klog.Fatalf("watcher timed out waiting for caches to sync")
}
}

func (w *ObjListWatcher) Stop() {
close(w.stopChannel)
}

func (w *ObjListWatcher) handleUpdate(obj interface{}) {
switch w.ResourceKind {
case podResourceType:
pod, ok := obj.(*k8sv1.Pod)
if !ok {
klog.Infof("Could not convert obj: %v", w.ResourceKind)
return
}
podID := string(pod.GetUID())
// Pod object can have many updates such as change in the annotations and labels.
// We only add the pod information when all containers are ready, then when the
// pod is in our managed list we can skip the informantion update.
if _, exist := managedPods[podID]; exist {
return
}
for _, condition := range pod.Status.Conditions {
if condition.Type != k8sv1.ContainersReady && condition.Status != k8sv1.ConditionTrue {
continue
}
w.Mx.Lock()
err1 := w.fillInfo(pod, pod.Status.ContainerStatuses)
err2 := w.fillInfo(pod, pod.Status.InitContainerStatuses)
err3 := w.fillInfo(pod, pod.Status.EphemeralContainerStatuses)
w.Mx.Unlock()
// only add pod to cache if all containers successfully added to map
if err1 == nil && err2 == nil && err3 == nil {
managedPods[podID] = true
}
}

default:
klog.Infof("Watcher does not support object type %s", w.ResourceKind)
return
}
}

func (w *ObjListWatcher) fillInfo(pod *k8sv1.Pod, containers []k8sv1.ContainerStatus) error {
var err error
var exist bool
for j := 0; j < len(containers); j++ {
containerID := ParseContainerIDFromPodStatus(containers[j].ContainerID)
// verify if container ID was already initialized
if containerID == "" {
err = fmt.Errorf("container %s did not start yet", containers[j].Name)
continue
}
if _, exist = (*w.ContainersMetrics)[containerID]; !exist {
(*w.ContainersMetrics)[containerID] = collector_metric.NewContainerMetrics(containers[j].Name, pod.Name, pod.Namespace, containerID)
}
(*w.ContainersMetrics)[containerID].ContainerName = containers[j].Name
(*w.ContainersMetrics)[containerID].PodName = pod.Name
(*w.ContainersMetrics)[containerID].Namespace = pod.Namespace
}
return err
}

func (w *ObjListWatcher) handleDeleted(obj interface{}) {
switch w.ResourceKind {
case podResourceType:
pod, ok := obj.(*k8sv1.Pod)
if !ok {
klog.Fatalf("Could not convert obj: %v", w.ResourceKind)
}
delete(managedPods, string(pod.GetUID()))
w.Mx.Lock()
w.deleteInfo(pod.Status.ContainerStatuses)
w.deleteInfo(pod.Status.InitContainerStatuses)
w.deleteInfo(pod.Status.EphemeralContainerStatuses)
w.Mx.Unlock()
default:
klog.Infof("Watcher does not support object type %s", w.ResourceKind)
return
}
}

// TODO: instead of delete, it might be better to mark it to delete since k8s takes time to really delete an object
func (w *ObjListWatcher) deleteInfo(containers []k8sv1.ContainerStatus) {
for j := 0; j < len(containers); j++ {
containerID := ParseContainerIDFromPodStatus(containers[j].ContainerID)
delete(*w.ContainersMetrics, containerID)
}
}

func ParseContainerIDFromPodStatus(containerID string) string {
return regexReplaceContainerIDPrefix.ReplaceAllString(containerID, "")
}
Loading

0 comments on commit bc981ed

Please sign in to comment.