Skip to content

Commit

Permalink
Add pod owner attributes for RDMA metrics
Browse files Browse the repository at this point in the history
Signed-off-by: lou-lan <loulan@loulan.me>
  • Loading branch information
lou-lan committed Nov 19, 2024
1 parent cdc0e82 commit 0be0d4c
Show file tree
Hide file tree
Showing 9 changed files with 405 additions and 29 deletions.
4 changes: 4 additions & 0 deletions cmd/spiderpool-agent/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/spf13/pflag"
"go.uber.org/atomic"
"gopkg.in/yaml.v3"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/spidernet-io/spiderpool/api/v1/agent/client"
Expand Down Expand Up @@ -121,6 +122,9 @@ type AgentContext struct {
SubnetManager subnetmanager.SubnetManager
KubevirtManager kubevirtmanager.KubevirtManager

// k8s client
ClientSet *kubernetes.Clientset

// handler
HttpServer *server.Server
UnixServer *server.Server
Expand Down
18 changes: 18 additions & 0 deletions cmd/spiderpool-agent/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
apiruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -147,6 +148,13 @@ func DaemonMain() {
}
agentContext.CRDManager = mgr

logger.Info("Begin to initialize k8s clientSet")
clientSet, err := initK8sClientSet()
if nil != err {
logger.Fatal(err.Error())
}
agentContext.ClientSet = clientSet

logger.Info("Begin to initialize spiderpool-agent metrics HTTP server")
initAgentMetricsServer(agentContext.InnerCtx)

Expand Down Expand Up @@ -325,6 +333,16 @@ func waitAPIServerReady(ctx context.Context) error {
return errors.New("failed to talk to API Server")
}

// initK8sClientSet will new kubernetes ClientSet
func initK8sClientSet() (*kubernetes.Clientset, error) {
clientSet, err := kubernetes.NewForConfig(ctrl.GetConfigOrDie())
if nil != err {
return nil, fmt.Errorf("failed to init K8s clientset: %v", err)
}

return clientSet, nil
}

func initAgentServiceManagers(ctx context.Context) {
logger.Debug("Begin to initialize Node manager")
nodeManager, err := nodemanager.NewNodeManager(
Expand Down
16 changes: 15 additions & 1 deletion cmd/spiderpool-agent/cmd/metrics_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"fmt"
"net/http"

"k8s.io/client-go/informers"

"github.com/spidernet-io/spiderpool/pkg/constant"
"github.com/spidernet-io/spiderpool/pkg/metric"
"github.com/spidernet-io/spiderpool/pkg/podownercache"
)

// initAgentMetricsServer will start an opentelemetry http server for spiderpool agent.
Expand All @@ -23,7 +26,18 @@ func initAgentMetricsServer(ctx context.Context) {
logger.Fatal(err.Error())
}

err = metric.InitSpiderpoolAgentMetrics(ctx, agentContext.Cfg.EnableRDMAMetric, agentContext.CRDManager.GetClient())
var cache podownercache.CacheInterface
if agentContext.Cfg.EnableRDMAMetric { //nolint:golint
informerFactory := informers.NewSharedInformerFactory(agentContext.ClientSet, 0)
podInformer := informerFactory.Core().V1().Pods().Informer()

cache, err = podownercache.New(ctx, podInformer, agentContext.CRDManager.GetClient())
if err != nil {
logger.Fatal(err.Error())
}
}

err = metric.InitSpiderpoolAgentMetrics(ctx, cache)
if nil != err {
logger.Fatal(err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ippoolmanager/ippool_manager_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var _ = BeforeSuite(func() {
Build()
_, err = metric.InitMetric(context.TODO(), constant.SpiderpoolAgent, false, false)
Expect(err).NotTo(HaveOccurred())
err = metric.InitSpiderpoolAgentMetrics(context.TODO(), false, fakeClient)
err = metric.InitSpiderpoolAgentMetrics(context.TODO(), nil)
Expect(err).NotTo(HaveOccurred())

tracker = k8stesting.NewObjectTracker(scheme, k8sscheme.Codecs.UniversalDecoder())
Expand Down
8 changes: 4 additions & 4 deletions pkg/metric/metrics_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"go.opentelemetry.io/otel/attribute"
api "go.opentelemetry.io/otel/metric"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/spidernet-io/spiderpool/pkg/lock"
"github.com/spidernet-io/spiderpool/pkg/podownercache"
"github.com/spidernet-io/spiderpool/pkg/rdmametrics"
)

Expand Down Expand Up @@ -221,10 +221,10 @@ func (a *asyncInt64Gauge) Record(value int64, attrs ...attribute.KeyValue) {
}

// InitSpiderpoolAgentMetrics serves for spiderpool agent metrics initialization
func InitSpiderpoolAgentMetrics(ctx context.Context, enableRDMAMetric bool, client client.Client) error {
func InitSpiderpoolAgentMetrics(ctx context.Context, cache podownercache.CacheInterface) error {
// for rdma
if enableRDMAMetric {
err := rdmametrics.Register(ctx, meter, client)
if cache != nil {
err := rdmametrics.Register(ctx, meter, cache)
if err != nil {
return err
}
Expand Down
169 changes: 169 additions & 0 deletions pkg/podownercache/pod_owner_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2024 Authors of spidernet-io
// SPDX-License-Identifier: Apache-2.0

package podownercache

import (
"context"
"fmt"
"github.com/spidernet-io/spiderpool/pkg/lock"
"github.com/spidernet-io/spiderpool/pkg/logutils"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type PodOwnerCache struct {
ctx context.Context
apiReader client.Reader

cacheLock lock.RWMutex
pods map[types.NamespacedName]Pod
ipToPod map[string]types.NamespacedName
}

type Pod struct {
types.NamespacedName
OwnerInfo OwnerInfo
IPs []string
}

type OwnerInfo struct {
APIVersion string
Kind string
Namespace string
Name string
}

type CacheInterface interface {
GetPodByIP(ip string) *Pod
}

var logger *zap.Logger

func New(ctx context.Context, podInformer cache.SharedIndexInformer, apiReader client.Reader) (CacheInterface, error) {
logger = logutils.Logger.Named("PodOwnerCache")
logger.Info("create PodOwnerCache informer")

res := &PodOwnerCache{
ctx: ctx,
apiReader: apiReader,
cacheLock: lock.RWMutex{},
pods: make(map[types.NamespacedName]Pod),
ipToPod: make(map[string]types.NamespacedName),
}

_, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: res.onPodAdd,
UpdateFunc: res.onPodUpdate,
DeleteFunc: res.onPodDel,
})
if nil != err {
logger.Error(err.Error())
return nil, err
}

Check warning on line 68 in pkg/podownercache/pod_owner_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/podownercache/pod_owner_cache.go#L66-L68

Added lines #L66 - L68 were not covered by tests

return res, nil
}

func (s *PodOwnerCache) onPodAdd(obj interface{}) {
if pod, ok := obj.(*corev1.Pod); ok {
if len(pod.Status.PodIPs) > 0 {
ips := make([]string, 0, len(pod.Status.PodIPs))
for _, p := range pod.Status.PodIPs {
ips = append(ips, p.IP)
}
owner, err := s.getFinalOwner(pod)
if err != nil {
logger.Warn("", zap.Error(err))
return
}

Check warning on line 84 in pkg/podownercache/pod_owner_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/podownercache/pod_owner_cache.go#L82-L84

Added lines #L82 - L84 were not covered by tests
s.cacheLock.Lock()
defer s.cacheLock.Unlock()
key := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
s.pods[key] = Pod{
NamespacedName: key,
OwnerInfo: *owner,
IPs: ips,
}
for _, ip := range ips {
s.ipToPod[ip] = key
}
}
}
}

func (s *PodOwnerCache) onPodUpdate(oldObj, newObj interface{}) {
s.onPodAdd(newObj)
}

func (s *PodOwnerCache) onPodDel(obj interface{}) {
if pod, ok := obj.(*corev1.Pod); ok {
s.cacheLock.Lock()
defer s.cacheLock.Unlock()

key := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
if _, ok := s.pods[key]; !ok {
return
}

Check warning on line 112 in pkg/podownercache/pod_owner_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/podownercache/pod_owner_cache.go#L111-L112

Added lines #L111 - L112 were not covered by tests
for _, ip := range s.pods[key].IPs {
delete(s.ipToPod, ip)
}
delete(s.pods, key)
}
}

func (s *PodOwnerCache) getFinalOwner(obj metav1.Object) (*OwnerInfo, error) {
var finalOwner *OwnerInfo

for {
ownerRefs := obj.GetOwnerReferences()
if len(ownerRefs) == 0 {
break
}

ownerRef := ownerRefs[0] // Assuming the first owner reference
finalOwner = &OwnerInfo{
APIVersion: ownerRef.APIVersion,
Kind: ownerRef.Kind,
Namespace: obj.GetNamespace(),
Name: ownerRef.Name,
}

// Prepare an empty object of the owner kind
ownerObj := &unstructured.Unstructured{}
ownerObj.SetAPIVersion(ownerRef.APIVersion)
ownerObj.SetKind(ownerRef.Kind)

err := s.apiReader.Get(s.ctx, client.ObjectKey{
Namespace: obj.GetNamespace(),
Name: ownerRef.Name,
}, ownerObj)
if err != nil {
return nil, fmt.Errorf("error fetching owner: %v", err)
}

Check warning on line 148 in pkg/podownercache/pod_owner_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/podownercache/pod_owner_cache.go#L147-L148

Added lines #L147 - L148 were not covered by tests

// Set obj to the current owner to continue the loop
obj = ownerObj
}

return finalOwner, nil
}

func (s *PodOwnerCache) GetPodByIP(ip string) *Pod {
s.cacheLock.RLock()
defer s.cacheLock.RUnlock()
item, exists := s.ipToPod[ip]
if !exists {
return nil
}
pod, exists := s.pods[item]
if !exists {
return nil
}

Check warning on line 167 in pkg/podownercache/pod_owner_cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/podownercache/pod_owner_cache.go#L166-L167

Added lines #L166 - L167 were not covered by tests
return &pod
}
Loading

0 comments on commit 0be0d4c

Please sign in to comment.