diff --git a/resourcetopo/README.md b/resourcetopo/README.md new file mode 100644 index 0000000..ea118f5 --- /dev/null +++ b/resourcetopo/README.md @@ -0,0 +1,103 @@ +# Resourcetopo + +Resourcetopo is an open source util to manager the resource topology in your kubernetes cluster. +It can help you to obtain the resource topology relation and trigger related events when the topology graph has changed. + +## Quick Start + +The running code of this part is placed in examples/ dir. + +### Create a Resourcetopo Manager + +We need to create a resourcetopo manager instance before all. + +```go +topoMgr, err := resourcetopo.NewResourcesTopoManager( + resourcetopo.ManagerConfig{ + NodeEventQueueSize: 1024, + RelationEventQueueSize: 1024, + TopologyConfig: nil, + }, +) +``` + +### Setup Topology Config + +We need to set up the topology config for the resourcetopo manager. +The topology config is used to describe the resource topology in your kubernetes cluster. + +```go + +func buildExampleTopologyConfig() *resourcetopo.TopologyConfig { + return &resourcetopo.TopologyConfig{ + GetInformer: getInformer, // getInformer function resolve meta and return the related informer. + Resolvers: []resourcetopo.RelationResolver{ + { + PreMeta: deployMeta, + PostMetas: []metav1.TypeMeta{ + podMeta, + }, + Resolve: func(preObject resourcetopo.Object) []resourcetopo.ResourceRelation { + deployObj, ok := preObject.(*appsv1.Deployment) + if !ok { + return nil + } + return []resourcetopo.ResourceRelation{{ + PostMeta: podMeta, + LabelSelector: deployObj.Spec.Selector, + }} + }, + OwnerRelation: []metav1.TypeMeta{ + podMeta, + }, + ReverseNotice: nil, + }, + }, + +} +``` +### Add Event Handler + +This is an optional step, we can add event handlers to handle the node or relation change events. + +```go +func main() { + // ... + err = topoManager.AddNodeHandler(podMeta, &podEventhandler{}) + err = topoManager.AddRelationHandler(virtualSymptomMeta, podMeta, &symptomPodRelationEventHandler{}) +} + +// podEventHandler is an implementation of the interface NodeHandler, which is designed to handle add/update/delete events of resources. +// Specially, we define the OnRelatedUpdate function, to handle event when the related node has been modified. +// For example, if a new pod has been added, except for a pod add event, a deployment related update event will +// also be generated and call the registered handler. +var _ resourcetopo.NodeHandler = &podEventhandler{} + +// symptomPodRelationEventHandler is an impl of preDefined RelationHandler, included function to handler relation add/delete Event. +var _ resourcetopo.RelationHandler = &symptomPodRelationEventHandler{} + +``` + +### Start Manager + +After configuration settled, we can call Start func and wait for the node/relation events coming. + +```go +func main() { + // ... + topoManager.Start() +} +``` + +### Receive and Handle Events + +In the base example demo, we just print out the meta info, but you can do more things according to specific scenario. +Part of the demo output content is as follows: + +``` +I0509 10:50:09.967769 45952 base_example.go:235] received add event for pod etcd/etcd-0 +...... +I0509 10:50:10.835929 45952 base_example.go:247] related node has changed and effected pod etcd/etcd-0 +I0509 10:50:10.835938 45952 base_example.go:248] related pre nodes are {{Service core/v1}:etcd/etcd} +I0509 10:50:10.835941 45952 base_example.go:249] related post nodes are {{PersistentVolumeClaim core/v1}:etcd/etcd-data-dir-etcd-0} +``` diff --git a/resourcetopo/examples/base_example.go b/resourcetopo/examples/base_example.go new file mode 100644 index 0000000..9b3d13c --- /dev/null +++ b/resourcetopo/examples/base_example.go @@ -0,0 +1,270 @@ +/** + * Copyright 2024 KusionStack Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "fmt" + "strings" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" + + "kusionstack.io/kube-utils/resourcetopo" +) + +var ( + mgrCache cache.Cache + backGroundCtx context.Context +) + +func main() { + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}) + if err != nil { + klog.Fatal(err.Error()) + } + mgrCache = mgr.GetCache() + backGroundCtx = signals.SetupSignalHandler() + + topoManager, err := resourcetopo.NewResourcesTopoManager( + resourcetopo.ManagerConfig{ + NodeEventQueueSize: 1024, + RelationEventQueueSize: 1024, + TopologyConfig: buildExampleTopologyConfig(), // could also be set later by topoManager.AddTopologyConfig + }, + ) + if err != nil { + klog.Fatal(err.Error()) + } + + // AddTopologyConfig could be called multiple times for different relations + if err = topoManager.AddTopologyConfig(*buildVirtualAppTopology()); err != nil { + klog.Fatal(err.Error()) + } + + if err = topoManager.AddNodeHandler(podMeta, &podEventhandler{}); err != nil { + klog.Fatal(err.Error()) + } + if err = topoManager.AddRelationHandler(virtualAppMeta, podMeta, &appPodRelationEventHandler{}); err != nil { + klog.Fatal(err.Error()) + } + + topoManager.Start(backGroundCtx.Done()) + if err = mgr.Start(backGroundCtx); err != nil { + klog.Fatal(err.Error()) + } +} + +var ( + deploymentMeta = metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + } + podMeta = metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "core/v1", + } + persistentVolumeClaimMeta = metav1.TypeMeta{ + Kind: "PersistentVolumeClaim", + APIVersion: "core/v1", + } + serviceMeta = metav1.TypeMeta{ + Kind: "Service", + APIVersion: "core/v1", + } + virtualAppMeta = metav1.TypeMeta{ + Kind: "App", + APIVersion: "virtual.kubernetes.io/v1", + } +) + +func buildExampleTopologyConfig() *resourcetopo.TopologyConfig { + return &resourcetopo.TopologyConfig{ + GetInformer: getInformer, + Resolvers: []resourcetopo.RelationResolver{ + { + // this block define the relation between deployment and pod + PreMeta: deploymentMeta, + PostMetas: []metav1.TypeMeta{podMeta}, + Resolve: func(preObject resourcetopo.Object) []resourcetopo.ResourceRelation { + deployObj, ok := preObject.(*appsv1.Deployment) + if !ok { + return nil + } + return []resourcetopo.ResourceRelation{{ + PostMeta: podMeta, + LabelSelector: deployObj.Spec.Selector, + }} + }, + OwnerRelation: []metav1.TypeMeta{podMeta}, + ReverseNotice: nil, + }, + { + PreMeta: serviceMeta, + PostMetas: []metav1.TypeMeta{podMeta}, + Resolve: func(preObject resourcetopo.Object) []resourcetopo.ResourceRelation { + svcObj, ok := preObject.(*corev1.Service) + if !ok { + return nil + } + return []resourcetopo.ResourceRelation{{ + PostMeta: podMeta, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: svcObj.Spec.Selector, + }, + }} + }, + OwnerRelation: nil, + // Configure to notice pod if upstream service has changed + ReverseNotice: []metav1.TypeMeta{podMeta}, + }, + { + PreMeta: podMeta, + PostMetas: []metav1.TypeMeta{persistentVolumeClaimMeta}, + Resolve: func(preObject resourcetopo.Object) []resourcetopo.ResourceRelation { + podObj, ok := preObject.(*corev1.Pod) + if !ok { + return nil + } + var pvcNames []types.NamespacedName + for _, v := range podObj.Spec.Volumes { + if v.PersistentVolumeClaim != nil { + pvcNames = append(pvcNames, types.NamespacedName{ + Namespace: podObj.Namespace, + Name: v.PersistentVolumeClaim.ClaimName, + }) + } + } + return []resourcetopo.ResourceRelation{{ + PostMeta: persistentVolumeClaimMeta, + DirectRefs: pvcNames, + }} + }, + OwnerRelation: nil, + ReverseNotice: nil, + }, + }, + } +} + +func buildVirtualAppTopology() *resourcetopo.TopologyConfig { + return &resourcetopo.TopologyConfig{ + GetInformer: getInformer, + Discoverers: []resourcetopo.VirtualResourceDiscoverer{ + { + // assume we want to know the relations among pods and apps + // and app name will be added to pod's labels. + PreMeta: virtualAppMeta, + PostMeta: podMeta, + Discover: func(preObject resourcetopo.Object) []types.NamespacedName { + podObj, ok := preObject.(*corev1.Pod) + if !ok { + return nil + } + podLabels := podObj.ObjectMeta.Labels + if len(podLabels) == 0 { + return nil + } + if app := podLabels["app.kubernetes.io/name"]; len(app) > 0 { + return []types.NamespacedName{{ + Name: app, + }} + } else { + return nil + } + }, + }, + }, + } +} + +func getInformer(meta metav1.TypeMeta) resourcetopo.Informer { + var informer cache.Informer + var err error + switch meta { + case deploymentMeta: + informer, err = mgrCache.GetInformer(backGroundCtx, &appsv1.Deployment{}) + case podMeta: + informer, err = mgrCache.GetInformer(backGroundCtx, &corev1.Pod{}) + case serviceMeta: + informer, err = mgrCache.GetInformer(backGroundCtx, &corev1.Service{}) + case persistentVolumeClaimMeta: + informer, err = mgrCache.GetInformer(backGroundCtx, &corev1.PersistentVolumeClaim{}) + default: + klog.Errorf("unexpected type meta %v", meta) + return nil + } + if err != nil { + klog.Errorf("failed to get informer for meta %v: %s", meta, err.Error()) + return nil + } + return informer +} + +var _ resourcetopo.NodeHandler = &podEventhandler{} + +type podEventhandler struct{} + +func (p *podEventhandler) OnAdd(info resourcetopo.NodeInfo) { + klog.Infof("received add event for pod %s", info.NodeInfo().String()) +} + +func (p *podEventhandler) OnUpdate(info resourcetopo.NodeInfo) { + klog.Infof("received update event for pod %s", info.NodeInfo().String()) +} + +func (p *podEventhandler) OnDelete(info resourcetopo.NodeInfo) { + klog.Infof("received delete event for pod %s", info.NodeInfo().String()) +} + +func (p *podEventhandler) OnRelatedUpdate(info resourcetopo.NodeInfo) { + klog.Infof("related node has changed and effected pod %s", info.NodeInfo().String()) + klog.Infof("related pre nodes are %s", nodes2Str(info.GetPreOrders())) + klog.Infof("related post nodes are %s", nodes2Str(info.GetPostOrders())) +} + +var _ resourcetopo.RelationHandler = &appPodRelationEventHandler{} + +type appPodRelationEventHandler struct{} + +func (s *appPodRelationEventHandler) OnAdd(preOrder resourcetopo.NodeInfo, postOrder resourcetopo.NodeInfo) { + klog.Infof("received relation add event for %s -> %s", preOrder.NodeInfo().String(), postOrder.NodeInfo().String()) +} + +func (s *appPodRelationEventHandler) OnDelete(preOrder resourcetopo.NodeInfo, postOrder resourcetopo.NodeInfo) { + klog.Infof("received relation delete event for %s -> %s", preOrder.NodeInfo().String(), postOrder.NodeInfo().String()) +} + +func nodes2Str(nodes []resourcetopo.NodeInfo) string { + if len(nodes) == 0 { + return "nil" + } + strBld := strings.Builder{} + for _, node := range nodes { + if strBld.Len() > 0 { + strBld.WriteString(",") + } + strBld.WriteString(fmt.Sprintf("{%v:%v}", node.TypeInfo(), node.NodeInfo())) + } + return strBld.String() +}