diff --git a/cmd/katalyst-agent/app/options/metaserver/metaserver.go b/cmd/katalyst-agent/app/options/metaserver/metaserver.go index 5ccc145458..44c1a8123e 100644 --- a/cmd/katalyst-agent/app/options/metaserver/metaserver.go +++ b/cmd/katalyst-agent/app/options/metaserver/metaserver.go @@ -41,6 +41,7 @@ const ( const ( defaultServiceProfileSkipCorruptionError = true defaultServiceProfileCacheTTL = 1 * time.Minute + defaultSPDGetFromRemote = false ) const ( @@ -74,6 +75,7 @@ type MetaServerOptions struct { // configurations for spd ServiceProfileSkipCorruptionError bool ServiceProfileCacheTTL time.Duration + SPDGetFromRemote bool // configurations for pod-cache KubeletPodCacheSyncPeriod time.Duration @@ -104,6 +106,7 @@ func NewMetaServerOptions() *MetaServerOptions { ServiceProfileSkipCorruptionError: defaultServiceProfileSkipCorruptionError, ServiceProfileCacheTTL: defaultServiceProfileCacheTTL, + SPDGetFromRemote: defaultSPDGetFromRemote, KubeletPodCacheSyncPeriod: defaultKubeletPodCacheSyncPeriod, KubeletPodCacheSyncMaxRate: defaultKubeletPodCacheSyncMaxRate, @@ -142,6 +145,7 @@ func (o *MetaServerOptions) AddFlags(fss *cliflag.NamedFlagSets) { "Whether to skip corruption error when loading spd checkpoint") fs.DurationVar(&o.ServiceProfileCacheTTL, "service-profile-cache-ttl", o.ServiceProfileCacheTTL, "The ttl of service profile manager cache remote spd") + fs.BoolVar(&o.SPDGetFromRemote, "spd-get-from-remote", o.SPDGetFromRemote, "get spd from remote if not in cache") fs.DurationVar(&o.KubeletPodCacheSyncPeriod, "kubelet-pod-cache-sync-period", o.KubeletPodCacheSyncPeriod, "The period of meta server to sync pod from kubelet 10255 port") @@ -174,6 +178,7 @@ func (o *MetaServerOptions) ApplyTo(c *metaserver.MetaServerConfiguration) error c.ServiceProfileSkipCorruptionError = o.ServiceProfileSkipCorruptionError c.ServiceProfileCacheTTL = o.ServiceProfileCacheTTL + c.SPDGetFromRemote = o.SPDGetFromRemote c.KubeletPodCacheSyncPeriod = o.KubeletPodCacheSyncPeriod c.KubeletPodCacheSyncMaxRate = rate.Limit(o.KubeletPodCacheSyncMaxRate) diff --git a/pkg/config/agent/metaserver/spd.go b/pkg/config/agent/metaserver/spd.go index 45a4f99c33..fb98af243c 100644 --- a/pkg/config/agent/metaserver/spd.go +++ b/pkg/config/agent/metaserver/spd.go @@ -21,6 +21,7 @@ import "time" type SPDConfiguration struct { ServiceProfileSkipCorruptionError bool ServiceProfileCacheTTL time.Duration + SPDGetFromRemote bool } func NewSPDConfiguration() *SPDConfiguration { diff --git a/pkg/controller/spd/indicator-plugin/manager.go b/pkg/controller/spd/indicator-plugin/manager.go index 1e92da6cba..6072e1c49e 100644 --- a/pkg/controller/spd/indicator-plugin/manager.go +++ b/pkg/controller/spd/indicator-plugin/manager.go @@ -34,7 +34,7 @@ const ( // IndicatorUpdater is used by IndicatorPlugin as a unified implementation // to trigger indicator updating logic. type IndicatorUpdater interface { - // UpdateExtendedIndicatorSpec + UpdateBusinessIndicatorSpec + UpdateSystemIndicatorSpec + UpdateBusinessIndicatorStatus + // UpdateExtendedIndicatorSpec + UpdateBusinessIndicatorSpec + UpdateSystemIndicatorSpec + UpdateBusinessIndicatorStatus + UpdateAggMetrics // for indicator add functions, IndicatorUpdater will try to merge them in local stores. UpdateExtendedIndicatorSpec(_ types.NamespacedName, _ []apiworkload.ServiceExtendedIndicatorSpec) UpdateBusinessIndicatorSpec(_ types.NamespacedName, _ []apiworkload.ServiceBusinessIndicatorSpec) @@ -136,7 +136,7 @@ func (u *IndicatorManager) UpdateSystemIndicatorSpec(nn types.NamespacedName, in } } -func (u *IndicatorManager) UpdateBusinessIndicatorStatus(nn types.NamespacedName, indicators []apiworkload.ServiceBusinessIndicatorStatus) { +func (u *IndicatorManager) UpdateBusinessIndicatorStatus(nn types.NamespacedName, businessIndicators []apiworkload.ServiceBusinessIndicatorStatus) { u.statusMtx.Lock() insert := false @@ -144,7 +144,7 @@ func (u *IndicatorManager) UpdateBusinessIndicatorStatus(nn types.NamespacedName insert = true u.statusMap[nn] = initServiceProfileDescriptorStatus() } - for _, indicator := range indicators { + for _, indicator := range businessIndicators { util.InsertSPDBusinessIndicatorStatus(u.statusMap[nn], &indicator) } @@ -223,5 +223,6 @@ func initServiceProfileDescriptorSpec() *apiworkload.ServiceProfileDescriptorSpe func initServiceProfileDescriptorStatus() *apiworkload.ServiceProfileDescriptorStatus { return &apiworkload.ServiceProfileDescriptorStatus{ BusinessStatus: []apiworkload.ServiceBusinessIndicatorStatus{}, + AggMetrics: []apiworkload.AggPodMetrics{}, } } diff --git a/pkg/controller/spd/indicator-plugin/plugin.go b/pkg/controller/spd/indicator-plugin/plugin.go index c19c8a8602..93f0ad8e99 100644 --- a/pkg/controller/spd/indicator-plugin/plugin.go +++ b/pkg/controller/spd/indicator-plugin/plugin.go @@ -20,6 +20,7 @@ import ( "context" "sync" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" apiworkload "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1" @@ -44,6 +45,9 @@ type IndicatorPlugin interface { GetSupportedExtendedIndicatorSpec() []string GetSupportedBusinessIndicatorStatus() []apiworkload.ServiceBusinessIndicatorName GetSupportedAggMetricsStatus() []string + // GetAggMetrics is only used in SPD creation for timing-sensitive status. + // The other non-timing-sensitive status are updated at UpdateAggMetrics in indicatorUpdater. + GetAggMetrics(workload *unstructured.Unstructured) ([]apiworkload.AggPodMetrics, error) } type DummyIndicatorPlugin struct { @@ -78,6 +82,10 @@ func (d DummyIndicatorPlugin) GetSupportedAggMetricsStatus() []string { return nil } +func (d DummyIndicatorPlugin) GetAggMetrics(_ *unstructured.Unstructured) ([]apiworkload.AggPodMetrics, error) { + return nil, nil +} + // pluginInitializers is used to store the initializing function for each plugin var pluginInitializers sync.Map diff --git a/pkg/controller/spd/indicator-plugin/plugins/resource-portrait/indicator_plugin.go b/pkg/controller/spd/indicator-plugin/plugins/resource-portrait/indicator_plugin.go index e9a19d456b..442b88f814 100644 --- a/pkg/controller/spd/indicator-plugin/plugins/resource-portrait/indicator_plugin.go +++ b/pkg/controller/spd/indicator-plugin/plugins/resource-portrait/indicator_plugin.go @@ -121,6 +121,10 @@ func (p *ResourcePortraitIndicatorPlugin) GetSupportedAggMetricsStatus() []strin return []string{resourcePortraitExtendedSpecName} } +func (p *ResourcePortraitIndicatorPlugin) GetAggMetrics(_ *unstructured.Unstructured) ([]apiworkload.AggPodMetrics, error) { + return nil, nil +} + // resyncSpecWorker is used to synchronize global configuration to SPD. func (p *ResourcePortraitIndicatorPlugin) resyncSpecWorker() { spdList, err := p.spdLister.List(labels.Everything()) diff --git a/pkg/controller/spd/spd.go b/pkg/controller/spd/spd.go index 6a04084a74..407ffb2213 100644 --- a/pkg/controller/spd/spd.go +++ b/pkg/controller/spd/spd.go @@ -62,6 +62,7 @@ const ( spdWorkerCount = 1 indicatorSpecWorkerCount = 1 indicatorStatusWorkerCount = 1 + spdCreateWorkerCount = 5 ) // SPDController is responsible to maintain lifecycle of SPD CR, @@ -89,9 +90,10 @@ type SPDController struct { workloadLister map[schema.GroupVersionResource]cache.GenericLister spdWorkloadInformer map[schema.GroupVersionResource]native.DynamicInformer - syncedFunc []cache.InformerSynced - spdQueue workqueue.RateLimitingInterface - workloadSyncQueue workqueue.RateLimitingInterface + syncedFunc []cache.InformerSynced + spdQueue workqueue.RateLimitingInterface + workloadSyncQueue workqueue.RateLimitingInterface + createSPDWorkloadQueue workqueue.RateLimitingInterface metricsEmitter metrics.MetricEmitter @@ -119,18 +121,19 @@ func NewSPDController(ctx context.Context, controlCtx *katalystbase.GenericConte cncInformer := controlCtx.InternalInformerFactory.Config().V1alpha1().CustomNodeConfigs() spdController := &SPDController{ - ctx: ctx, - conf: conf, - qosConfig: qosConfig, - podUpdater: &control.DummyPodUpdater{}, - spdControl: &control.DummySPDControl{}, - workloadControl: &control.DummyUnstructuredControl{}, - spdQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "spd"), - workloadSyncQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "workload"), - metricsEmitter: controlCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(spdControllerName), - workloadGVKLister: make(map[schema.GroupVersionKind]cache.GenericLister), - workloadLister: make(map[schema.GroupVersionResource]cache.GenericLister), - spdWorkloadInformer: make(map[schema.GroupVersionResource]native.DynamicInformer), + ctx: ctx, + conf: conf, + qosConfig: qosConfig, + podUpdater: &control.DummyPodUpdater{}, + spdControl: &control.DummySPDControl{}, + workloadControl: &control.DummyUnstructuredControl{}, + spdQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "spd"), + workloadSyncQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "workload"), + createSPDWorkloadQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "createWorkloadSPD"), + metricsEmitter: controlCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(spdControllerName), + workloadGVKLister: make(map[schema.GroupVersionKind]cache.GenericLister), + workloadLister: make(map[schema.GroupVersionResource]cache.GenericLister), + spdWorkloadInformer: make(map[schema.GroupVersionResource]native.DynamicInformer), } spdController.podLister = podInformer.Lister() @@ -238,7 +241,10 @@ func (sc *SPDController) Run() { go wait.Until(sc.workloadWorker, time.Second, sc.ctx.Done()) } for i := 0; i < spdWorkerCount; i++ { - go wait.Until(sc.spdWorker, time.Second, sc.ctx.Done()) + go wait.Until(sc.syncSPDWorker, time.Second, sc.ctx.Done()) + } + for i := 0; i < spdCreateWorkerCount; i++ { + go wait.Until(sc.createSPDWorker, time.Second, sc.ctx.Done()) } go wait.Until(sc.cleanSPD, time.Minute*5, sc.ctx.Done()) @@ -311,18 +317,33 @@ func (sc *SPDController) addWorkload(workloadGVR string) func(obj interface{}) { klog.Errorf("[spd] cannot convert obj to metav1.Object") return } + + if util.WorkloadSPDEnabled(workload) { + sc.enqueueWorkloadForSPDCreate(workloadGVR, workload) + } + sc.enqueueWorkload(workloadGVR, workload) } } func (sc *SPDController) updateWorkload(workloadGVR string) func(oldObj, newObj interface{}) { - return func(_, cur interface{}) { - workload, ok := cur.(metav1.Object) + return func(oldObj, newObj interface{}) { + newWorkload, ok := newObj.(metav1.Object) if !ok { - klog.Errorf("[spd] cannot convert cur obj to metav1.Object") + klog.Errorf("[spd] cannot convert new obj to metav1.Object") return } - sc.enqueueWorkload(workloadGVR, workload) + oldWorkload, ok := oldObj.(metav1.Object) + if !ok { + klog.Errorf("[spd] cannot convert old obj to metav1.Object") + return + } + + if util.WorkloadSPDEnabled(newWorkload) && !util.WorkloadSPDEnabled(oldWorkload) { + sc.enqueueWorkloadForSPDCreate(workloadGVR, newWorkload) + } + + sc.enqueueWorkload(workloadGVR, newWorkload) } } @@ -341,6 +362,21 @@ func (sc *SPDController) enqueueWorkload(workloadGVR string, workload metav1.Obj sc.workloadSyncQueue.Add(key) } +func (sc *SPDController) enqueueWorkloadForSPDCreate(workloadGVR string, workload metav1.Object) { + if workload == nil { + klog.Warning("[spd] trying to enqueue a nil spd") + return + } + + key, err := native.GenerateUniqGVRNameKey(workloadGVR, workload) + if err != nil { + utilruntime.HandleError(err) + return + } + + sc.createSPDWorkloadQueue.Add(key) +} + func (sc *SPDController) workloadWorker() { for sc.processNextWorkload() { } @@ -348,12 +384,13 @@ func (sc *SPDController) workloadWorker() { func (sc *SPDController) processNextWorkload() bool { key, quit := sc.workloadSyncQueue.Get() + klog.Infof("[spd] process next workload key: ", key) if quit { return false } defer sc.workloadSyncQueue.Done(key) - err := sc.syncWorkload(key.(string)) + err := sc.syncSPDAnnotation(key.(string)) if err == nil { sc.workloadSyncQueue.Forget(key) return true @@ -365,9 +402,8 @@ func (sc *SPDController) processNextWorkload() bool { return true } -// syncWorkload is mainly responsible to maintain the lifecycle of spd for each -// workload, without handling the service profile calculation logic. -func (sc *SPDController) syncWorkload(key string) error { +// syncSPDAnnotation is mainly responsible to patch pod spd annotation. +func (sc *SPDController) syncSPDAnnotation(key string) error { klog.V(5).Infof("[spd] syncing workload [%v]", key) workloadGVR, namespace, name, err := native.ParseUniqGVRNameKey(key) if err != nil { @@ -405,14 +441,67 @@ func (sc *SPDController) syncWorkload(key string) error { return nil } - spd, err := sc.getOrCreateSPDForWorkload(workload) + if err := sc.setPodListSPDAnnotation(podList, workload); err != nil { + klog.Errorf("[spd] set pod list annotations for workload %s/%s failed: %v", namespace, name, err) + return err + } + return nil +} + +func (sc *SPDController) createSPDWorker() { + for sc.processNextSPDCreation() { + } +} + +func (sc *SPDController) processNextSPDCreation() bool { + key, quit := sc.createSPDWorkloadQueue.Get() + klog.Infof("[spd] create next SPD with workload: %s", key) + if quit { + klog.InfoS("[spd] createSPDWorkloadQueue shutdown") + return false + } + defer sc.createSPDWorkloadQueue.Done(key) + + err := sc.syncWorkloadCreateSPD(key.(string)) + if err == nil { + sc.createSPDWorkloadQueue.Forget(key) + return true + } + + utilruntime.HandleError(fmt.Errorf("[spd] sync %q failed with %v", key, err)) + sc.createSPDWorkloadQueue.AddRateLimited(key) + + return true +} + +// syncWorkloadCreateSPD manages workload spd creation lifecycle and patch first batch of annotation. +func (sc *SPDController) syncWorkloadCreateSPD(key string) error { + klog.Infof("[spd] syncing workload create SPD [%v]", key) + workloadGVR, namespace, name, err := native.ParseUniqGVRNameKey(key) if err != nil { - klog.Errorf("[spd] get or create spd for workload %s/%s failed: %v", namespace, name, err) + klog.Errorf("[spd] failed to parse key %s to workload", key) return err } - if err := sc.setPodListSPDAnnotation(podList, spd.Name); err != nil { - klog.Errorf("[spd] set pod list annotations for workload %s/%s failed: %v", namespace, name, err) + gvr, _ := schema.ParseResourceArg(workloadGVR) + if gvr == nil { + err = fmt.Errorf("[spd] ParseResourceArg worload %v failed", workloadGVR) + klog.Error(err) + return err + } + + workload, err := sc.getWorkload(*gvr, namespace, name) + if err != nil { + klog.Errorf("[spd] failed to get workload %s/%s", namespace, name) + if errors.IsNotFound(err) { + return nil + } + return err + } + + _, err = sc.getOrCreateSPDForWorkload(workload) + if err != nil { + klog.Errorf("[spd] get or create spd for workload %s/%s failed: %v", namespace, name, err) return err } return nil @@ -451,7 +540,7 @@ func (sc *SPDController) enqueueSPD(spd *apiworkload.ServiceProfileDescriptor) { sc.spdQueue.Add(key) } -func (sc *SPDController) spdWorker() { +func (sc *SPDController) syncSPDWorker() { for sc.processNextSPD() { } } @@ -498,11 +587,13 @@ func (sc *SPDController) syncSPD(key string) error { newSPD := spd.DeepCopy() err = sc.updateSPDAnnotations(newSPD) if err != nil { + klog.Errorf("[spd] failed to update SPD key [%v] annotations: %v", key, err) return err } _, err = sc.spdControl.PatchSPD(sc.ctx, spd, newSPD) if err != nil { + klog.Errorf("[spd] failed to patch SPD key [%v]: %v", key, err) return err } @@ -612,7 +703,7 @@ func (sc *SPDController) getOrCreateSPDForWorkload(workload *unstructured.Unstru spd, err := util.GetSPDForWorkload(workload, sc.spdIndexer, sc.spdLister) if err != nil { if errors.IsNotFound(err) { - spd := &apiworkload.ServiceProfileDescriptor{ + spd = &apiworkload.ServiceProfileDescriptor{ ObjectMeta: metav1.ObjectMeta{ Name: workload.GetName(), Namespace: workload.GetNamespace(), @@ -632,12 +723,19 @@ func (sc *SPDController) getOrCreateSPDForWorkload(workload *unstructured.Unstru }, } - err := sc.updateSPDAnnotations(spd) + err = sc.updateSPDAnnotations(spd) if err != nil { - return nil, err + return nil, fmt.Errorf("[spd] failed to update spd [%v] annotation: %v", spd.Name, err) + } + + spd, err = sc.spdControl.CreateSPD(sc.ctx, spd, metav1.CreateOptions{}) + cost := spd.GetCreationTimestamp().Sub(workload.GetCreationTimestamp().Time) + _ = sc.metricsEmitter.StoreInt64(metricsNameCreateSPDByWorkloadCost, cost.Microseconds(), metrics.MetricTypeNameRaw) + if err != nil { + return nil, fmt.Errorf("[spd] failed to create spd: %v", err) } - return sc.spdControl.CreateSPD(sc.ctx, spd, metav1.CreateOptions{}) + return sc.setSPDStatus(workload, spd) } return nil, err @@ -646,11 +744,45 @@ func (sc *SPDController) getOrCreateSPDForWorkload(workload *unstructured.Unstru return spd, nil } -func (sc *SPDController) setPodListSPDAnnotation(podList []*core.Pod, spdName string) error { +func (sc *SPDController) setSPDStatus(workload *unstructured.Unstructured, spd *apiworkload.ServiceProfileDescriptor) (*apiworkload.ServiceProfileDescriptor, error) { + klog.Infof("[spd] setting status for new SPD") + for _, plugin := range sc.indicatorPlugins { + aggMetrics, err := plugin.GetAggMetrics(workload) + if err != nil { + klog.Errorf("[spd] failed to get spd aggMetrics for workload %s: %v", workload, err) + continue + } + for _, aggPodMetric := range aggMetrics { + spd.Status.AggMetrics = append(spd.Status.AggMetrics, aggPodMetric) + } + } + + err := sc.updateHash(spd) + if err != nil { + klog.Errorf("[spd] failed to update hash for workload %s: %v", workload, err) + return nil, err + } + + spd, err = sc.spdControl.UpdateSPDStatus(sc.ctx, spd, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("[spd] failed to update spd status for workload %s: %v", workload, err) + } + + return spd, nil +} + +func (sc *SPDController) setPodListSPDAnnotation(podList []*core.Pod, workload *unstructured.Unstructured) error { var mtx sync.Mutex var errList []error + + spdName := workload.GetName() + spd, err := util.GetSPDForWorkload(workload, sc.spdIndexer, sc.spdLister) + if err != nil { + return fmt.Errorf("[spd] failed to get spd %s: %v", spdName, err) + } + spdCreationTime := spd.GetCreationTimestamp() setPodAnnotations := func(i int) { - err := sc.setPodSPDAnnotation(podList[i], spdName) + err := sc.setPodSPDAnnotation(podList[i], spdName, spdCreationTime) if err != nil { mtx.Lock() errList = append(errList, err) @@ -669,7 +801,7 @@ func (sc *SPDController) setPodListSPDAnnotation(podList []*core.Pod, spdName st } // setPodSPDAnnotation add spd name in pod annotations -func (sc *SPDController) setPodSPDAnnotation(pod *core.Pod, spdName string) error { +func (sc *SPDController) setPodSPDAnnotation(pod *core.Pod, spdName string, spdCreationTime metav1.Time) error { if pod.GetAnnotations()[apiconsts.PodAnnotationSPDNameKey] == spdName { return nil } @@ -684,9 +816,12 @@ func (sc *SPDController) setPodSPDAnnotation(pod *core.Pod, spdName string) erro err := sc.podUpdater.PatchPod(sc.ctx, pod, podCopy) if err != nil { - return err + return fmt.Errorf("[spd] failed to patch pod spd annotation: %v", err) } + if pod.GetCreationTimestamp().Sub(spdCreationTime.Time) > 0 { + _ = sc.metricsEmitter.StoreInt64(metricsNameSPDCreatedAfterPod, 1, metrics.MetricTypeNameCount) + } klog.Infof("[spd] successfully set annotations for pod %v to %v", pod.GetName(), spdName) return nil } @@ -726,7 +861,7 @@ func (sc *SPDController) cleanPodSPDAnnotation(pod *core.Pod) error { err := sc.podUpdater.PatchPod(sc.ctx, pod, podCopy) if err != nil { - return err + return fmt.Errorf("[spd] failed to patch clean spd annotation: %v", err) } klog.Infof("[spd] successfully clear annotations for pod %v", pod.GetName()) diff --git a/pkg/controller/spd/spd_indicator.go b/pkg/controller/spd/spd_indicator.go index 02b4df6c27..9d3196f4d1 100644 --- a/pkg/controller/spd/spd_indicator.go +++ b/pkg/controller/spd/spd_indicator.go @@ -37,6 +37,8 @@ const ( metricsNameSyncIndicatorStatusCost = "sync_indicator_status_cost" metricsNameIndicatorSpecChanLength = "indicator_spec_chan_length" metricsNameIndicatorStatusChanLength = "indicator_status_chan_length" + metricsNameCreateSPDByWorkloadCost = "create_spd_by_workload_cost" + metricsNameSPDCreatedAfterPod = "spd_created_after_pod" ) func (sc *SPDController) syncIndicatorSpec() { diff --git a/pkg/controller/spd/spd_test.go b/pkg/controller/spd/spd_test.go index ab50636f25..927f0d5353 100644 --- a/pkg/controller/spd/spd_test.go +++ b/pkg/controller/spd/spd_test.go @@ -25,16 +25,19 @@ import ( "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" + metrics "k8s.io/metrics/pkg/apis/metrics/v1beta1" "k8s.io/utils/pointer" apis "github.com/kubewharf/katalyst-api/pkg/apis/autoscaling/v1alpha1" apiworkload "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1" apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" + apimetricpod "github.com/kubewharf/katalyst-api/pkg/metric/pod" katalystbase "github.com/kubewharf/katalyst-core/cmd/base" "github.com/kubewharf/katalyst-core/pkg/config/controller" "github.com/kubewharf/katalyst-core/pkg/config/generic" @@ -491,189 +494,603 @@ func TestPodIndexerDuplicate(t *testing.T) { assert.Equal(t, true, exist) } -func TestIndicatorUpdater(t *testing.T) { +func TestSPDController_Run_(t *testing.T) { t.Parallel() - var current float32 = 8.3 - var value float32 = 23.1 - - workload := &appsv1.StatefulSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "StatefulSet", - APIVersion: "apps/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "sts1", - Namespace: "default", - Annotations: map[string]string{ - apiconsts.WorkloadAnnotationSPDEnableKey: apiconsts.WorkloadAnnotationSPDEnabled, - }, - }, - Spec: appsv1.StatefulSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "workload": "sts1", - }, - }, - }, + type fields struct { + pod *v1.Pod + workload *appsv1.StatefulSet + spd *apiworkload.ServiceProfileDescriptor } - - spd := &apiworkload.ServiceProfileDescriptor{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "spd1", - ResourceVersion: "1", - }, - Spec: apiworkload.ServiceProfileDescriptorSpec{ - TargetRef: apis.CrossVersionObjectReference{ - Kind: stsGVK.Kind, - Name: "sts1", - APIVersion: stsGVK.GroupVersion().String(), - }, - BusinessIndicator: []apiworkload.ServiceBusinessIndicatorSpec{ - { - Name: "none-exist-b", - Indicators: []apiworkload.Indicator{ - { - IndicatorLevel: apiworkload.IndicatorLevelLowerBound, - Value: 10.2, + tests := []struct { + name string + fields fields + wantWorkload *appsv1.StatefulSet + wantSPD *apiworkload.ServiceProfileDescriptor + }{ + { + name: "delete unwanted spd", + fields: fields{ + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: "sts1", + }, + }, + Annotations: map[string]string{ + apiconsts.PodAnnotationSPDNameKey: "spd1", + }, + Labels: map[string]string{ + "workload": "sts1", }, }, }, - }, - SystemIndicator: []apiworkload.ServiceSystemIndicatorSpec{ - { - Name: "none-exist-s", - Indicators: []apiworkload.Indicator{ - { - IndicatorLevel: apiworkload.IndicatorLevelUpperBound, - Value: 10.5, + workload: &appsv1.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "StatefulSet", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sts1", + Namespace: "default", + Annotations: map[string]string{}, + }, + Spec: appsv1.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "workload": "sts1", + }, }, }, }, - { - Name: "system-3", - Indicators: []apiworkload.Indicator{ - { - IndicatorLevel: apiworkload.IndicatorLevelUpperBound, - Value: 4.5, + spd: &apiworkload.ServiceProfileDescriptor{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "sts1", + }, + Spec: apiworkload.ServiceProfileDescriptorSpec{ + TargetRef: apis.CrossVersionObjectReference{ + Kind: stsGVK.Kind, + Name: "sts1", + APIVersion: stsGVK.GroupVersion().String(), }, }, + Status: apiworkload.ServiceProfileDescriptorStatus{}, }, }, - }, - Status: apiworkload.ServiceProfileDescriptorStatus{ - BusinessStatus: []apiworkload.ServiceBusinessIndicatorStatus{ - { - Name: "none-exist-status", - Current: ¤t, + wantWorkload: &appsv1.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "StatefulSet", + APIVersion: "apps/v1", }, - { - Name: "system-2", - Current: ¤t, + ObjectMeta: metav1.ObjectMeta{ + Name: "sts1", + Namespace: "default", }, - }, - }, - } - - expectedSpd := &apiworkload.ServiceProfileDescriptor{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "spd1", - }, - Spec: apiworkload.ServiceProfileDescriptorSpec{ - TargetRef: apis.CrossVersionObjectReference{ - Kind: stsGVK.Kind, - Name: "sts1", - APIVersion: stsGVK.GroupVersion().String(), - }, - BaselinePercent: pointer.Int32(20), - ExtendedIndicator: []apiworkload.ServiceExtendedIndicatorSpec{ - { - Name: "TestExtended", - Indicators: runtime.RawExtension{ - Raw: func() []byte { - data, _ := json.Marshal(&apiworkload.TestExtendedIndicators{ - Indicators: &apiworkload.TestIndicators{}, - }) - return data - }(), + Spec: appsv1.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "workload": "sts1", + }, }, }, }, - BusinessIndicator: []apiworkload.ServiceBusinessIndicatorSpec{ - { - Name: "business-1", - Indicators: []apiworkload.Indicator{ - { - IndicatorLevel: apiworkload.IndicatorLevelLowerBound, - Value: 10.2, - }, + wantSPD: nil, + }, + { + name: "auto create spd", + fields: fields{ + workload: &appsv1.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "StatefulSet", + APIVersion: "apps/v1", }, - }, - { - Name: "business-2", - Indicators: []apiworkload.Indicator{ - { - IndicatorLevel: apiworkload.IndicatorLevelUpperBound, - Value: 18.3, + ObjectMeta: metav1.ObjectMeta{ + Name: "sts1", + Namespace: "default", + Annotations: map[string]string{ + apiconsts.WorkloadAnnotationSPDEnableKey: apiconsts.WorkloadAnnotationSPDEnabled, }, }, - }, - { - Name: "business-3", - Indicators: []apiworkload.Indicator{ - { - IndicatorLevel: apiworkload.IndicatorLevelUpperBound, - Value: 16.8, + Spec: appsv1.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "workload": "sts1", + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "katalyst.kubewharf.io/qos_level": "dedicated_cores", + }, + }, + Spec: v1.PodSpec{}, }, }, }, + spd: nil, }, - SystemIndicator: []apiworkload.ServiceSystemIndicatorSpec{ - { - Name: "system-3", - Indicators: []apiworkload.Indicator{ - { - IndicatorLevel: apiworkload.IndicatorLevelUpperBound, - Value: 4.5, - }, + wantWorkload: &appsv1.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "StatefulSet", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sts1", + Namespace: "default", + Annotations: map[string]string{ + apiconsts.WorkloadAnnotationSPDEnableKey: apiconsts.WorkloadAnnotationSPDEnabled, }, }, - { - Name: "system-1", - Indicators: []apiworkload.Indicator{ - { - IndicatorLevel: apiworkload.IndicatorLevelLowerBound, - Value: 10.5, - }, - { - IndicatorLevel: apiworkload.IndicatorLevelUpperBound, - Value: 10.5, + Spec: appsv1.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "workload": "sts1", }, }, - }, - { - Name: "system-2", - Indicators: []apiworkload.Indicator{ - { - IndicatorLevel: apiworkload.IndicatorLevelUpperBound, - Value: 10.5, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "katalyst.kubewharf.io/qos_level": "dedicated_cores", + }, }, + Spec: v1.PodSpec{}, }, }, }, - }, + wantSPD: &apiworkload.ServiceProfileDescriptor{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "sts1", + Annotations: map[string]string{ + consts.ServiceProfileDescriptorAnnotationKeyConfigHash: "51131be1b092", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: "sts1", + }, + }, + }, + Spec: apiworkload.ServiceProfileDescriptorSpec{ + TargetRef: apis.CrossVersionObjectReference{ + Kind: stsGVK.Kind, + Name: "sts1", + APIVersion: stsGVK.GroupVersion().String(), + }, + BaselinePercent: pointer.Int32(100), + }, + Status: apiworkload.ServiceProfileDescriptorStatus{ + AggMetrics: []apiworkload.AggPodMetrics{}, + }, + }, + }, + { + name: "auto create spd(dedicated_cores)", + fields: fields{ + workload: &appsv1.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "StatefulSet", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sts1", + Namespace: "default", + Annotations: map[string]string{ + apiconsts.WorkloadAnnotationSPDEnableKey: apiconsts.WorkloadAnnotationSPDEnabled, + }, + }, + Spec: appsv1.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "workload": "sts1", + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{"katalyst.kubewharf.io/qos_level": "dedicated_cores"}, + }, + Spec: v1.PodSpec{}, + }, + }, + }, + spd: nil, + }, + wantWorkload: &appsv1.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "StatefulSet", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sts1", + Namespace: "default", + Annotations: map[string]string{ + apiconsts.WorkloadAnnotationSPDEnableKey: apiconsts.WorkloadAnnotationSPDEnabled, + }, + }, + Spec: appsv1.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "workload": "sts1", + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{"katalyst.kubewharf.io/qos_level": "dedicated_cores"}, + }, + Spec: v1.PodSpec{}, + }, + }, + }, + wantSPD: &apiworkload.ServiceProfileDescriptor{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "sts1", + Annotations: map[string]string{ + consts.ServiceProfileDescriptorAnnotationKeyConfigHash: "51131be1b092", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: "sts1", + }, + }, + }, + Spec: apiworkload.ServiceProfileDescriptorSpec{ + TargetRef: apis.CrossVersionObjectReference{ + Kind: stsGVK.Kind, + Name: "sts1", + APIVersion: stsGVK.GroupVersion().String(), + }, + BaselinePercent: pointer.Int32(100), + }, + Status: apiworkload.ServiceProfileDescriptorStatus{ + AggMetrics: []apiworkload.AggPodMetrics{}, + }, + }, + }, + { + name: "auto create spd(shared_cores)", + fields: fields{ + workload: &appsv1.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "StatefulSet", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sts1", + Namespace: "default", + Annotations: map[string]string{ + apiconsts.WorkloadAnnotationSPDEnableKey: apiconsts.WorkloadAnnotationSPDEnabled, + }, + }, + Spec: appsv1.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "workload": "sts1", + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{"katalyst.kubewharf.io/qos_level": "shared_cores"}, + }, + Spec: v1.PodSpec{}, + }, + }, + }, + spd: nil, + }, + wantWorkload: &appsv1.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "StatefulSet", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sts1", + Namespace: "default", + Annotations: map[string]string{ + apiconsts.WorkloadAnnotationSPDEnableKey: apiconsts.WorkloadAnnotationSPDEnabled, + }, + }, + Spec: appsv1.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "workload": "sts1", + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{"katalyst.kubewharf.io/qos_level": "shared_cores"}, + }, + Spec: v1.PodSpec{}, + }, + }, + }, + wantSPD: &apiworkload.ServiceProfileDescriptor{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "sts1", + Annotations: map[string]string{ + consts.ServiceProfileDescriptorAnnotationKeyConfigHash: "a62e4c90e3ed", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: "sts1", + }, + }, + }, + Spec: apiworkload.ServiceProfileDescriptorSpec{ + TargetRef: apis.CrossVersionObjectReference{ + Kind: stsGVK.Kind, + Name: "sts1", + APIVersion: stsGVK.GroupVersion().String(), + }, + BaselinePercent: pointer.Int32(0), + }, + Status: apiworkload.ServiceProfileDescriptorStatus{ + AggMetrics: []apiworkload.AggPodMetrics{}, + }, + }, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + spdConfig := &controller.SPDConfig{ + SPDWorkloadGVResources: []string{"statefulsets.v1.apps"}, + BaselinePercent: map[string]int64{ + apiconsts.PodAnnotationQoSLevelSharedCores: 0, + apiconsts.PodAnnotationQoSLevelDedicatedCores: 100, + }, + } + genericConfig := &generic.GenericConfiguration{} + controllerConf := &controller.GenericControllerConfiguration{ + DynamicGVResources: []string{"statefulsets.v1.apps"}, + } + + ctx := context.TODO() + controlCtx, err := katalystbase.GenerateFakeGenericContext([]runtime.Object{tt.fields.pod}, + []runtime.Object{tt.fields.spd}, []runtime.Object{tt.fields.workload}) + assert.NoError(t, err) + + spdController, err := NewSPDController(ctx, controlCtx, genericConfig, controllerConf, spdConfig, generic.NewQoSConfiguration(), struct{}{}) + assert.NoError(t, err) + + controlCtx.StartInformer(ctx) + go spdController.Run() + synced := cache.WaitForCacheSync(ctx.Done(), spdController.syncedFunc...) + assert.True(t, synced) + time.Sleep(1 * time.Second) + + targetSPD := tt.fields.spd + if targetSPD == nil { + targetSPD = tt.wantSPD + } + newSPD, _ := controlCtx.Client.InternalClient.WorkloadV1alpha1(). + ServiceProfileDescriptors(targetSPD.Namespace).Get(ctx, targetSPD.Name, metav1.GetOptions{}) + assert.Equal(t, tt.wantSPD, newSPD) + + newObject, _ := controlCtx.Client.DynamicClient.Resource(stsGVR). + Namespace(tt.fields.workload.GetNamespace()).Get(ctx, tt.fields.workload.GetName(), metav1.GetOptions{}) + + newWorkload := &appsv1.StatefulSet{} + err = runtime.DefaultUnstructuredConverter.FromUnstructured(newObject.UnstructuredContent(), newWorkload) + assert.NoError(t, err) + assert.Equal(t, tt.wantWorkload, newWorkload) + }) + } +} + +func TestIndicatorUpdater(t *testing.T) { + t.Parallel() + + var current float32 = 8.3 + var value float32 = 23.1 + + workload := &appsv1.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "StatefulSet", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sts1", + Namespace: "default", + Annotations: map[string]string{ + apiconsts.WorkloadAnnotationSPDEnableKey: apiconsts.WorkloadAnnotationSPDEnabled, + }, + }, + Spec: appsv1.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "workload": "sts1", + }, + }, + }, + } + + spd := &apiworkload.ServiceProfileDescriptor{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "spd1", + ResourceVersion: "1", + }, + Spec: apiworkload.ServiceProfileDescriptorSpec{ + TargetRef: apis.CrossVersionObjectReference{ + Kind: stsGVK.Kind, + Name: "sts1", + APIVersion: stsGVK.GroupVersion().String(), + }, + BusinessIndicator: []apiworkload.ServiceBusinessIndicatorSpec{ + { + Name: "none-exist-b", + Indicators: []apiworkload.Indicator{ + { + IndicatorLevel: apiworkload.IndicatorLevelLowerBound, + Value: 10.2, + }, + }, + }, + }, + SystemIndicator: []apiworkload.ServiceSystemIndicatorSpec{ + { + Name: "none-exist-s", + Indicators: []apiworkload.Indicator{ + { + IndicatorLevel: apiworkload.IndicatorLevelUpperBound, + Value: 10.5, + }, + }, + }, + { + Name: "system-3", + Indicators: []apiworkload.Indicator{ + { + IndicatorLevel: apiworkload.IndicatorLevelUpperBound, + Value: 4.5, + }, + }, + }, + }, + }, + Status: apiworkload.ServiceProfileDescriptorStatus{ + BusinessStatus: []apiworkload.ServiceBusinessIndicatorStatus{ + { + Name: "none-exist-status", + Current: ¤t, + }, + { + Name: "system-2", + Current: ¤t, + }, + }, + }, + } + + expectedSpd := &apiworkload.ServiceProfileDescriptor{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "spd1", + }, + Spec: apiworkload.ServiceProfileDescriptorSpec{ + TargetRef: apis.CrossVersionObjectReference{ + Kind: stsGVK.Kind, + Name: "sts1", + APIVersion: stsGVK.GroupVersion().String(), + }, + BaselinePercent: pointer.Int32(20), + ExtendedIndicator: []apiworkload.ServiceExtendedIndicatorSpec{ + { + Name: "TestExtended", + Indicators: runtime.RawExtension{ + Raw: func() []byte { + data, _ := json.Marshal(&apiworkload.TestExtendedIndicators{ + Indicators: &apiworkload.TestIndicators{}, + }) + return data + }(), + }, + }, + }, + BusinessIndicator: []apiworkload.ServiceBusinessIndicatorSpec{ + { + Name: "business-1", + Indicators: []apiworkload.Indicator{ + { + IndicatorLevel: apiworkload.IndicatorLevelLowerBound, + Value: 10.2, + }, + }, + }, + { + Name: "business-2", + Indicators: []apiworkload.Indicator{ + { + IndicatorLevel: apiworkload.IndicatorLevelUpperBound, + Value: 18.3, + }, + }, + }, + { + Name: "business-3", + Indicators: []apiworkload.Indicator{ + { + IndicatorLevel: apiworkload.IndicatorLevelUpperBound, + Value: 16.8, + }, + }, + }, + }, + SystemIndicator: []apiworkload.ServiceSystemIndicatorSpec{ + { + Name: "system-3", + Indicators: []apiworkload.Indicator{ + { + IndicatorLevel: apiworkload.IndicatorLevelUpperBound, + Value: 4.5, + }, + }, + }, + { + Name: "system-1", + Indicators: []apiworkload.Indicator{ + { + IndicatorLevel: apiworkload.IndicatorLevelLowerBound, + Value: 10.5, + }, + { + IndicatorLevel: apiworkload.IndicatorLevelUpperBound, + Value: 10.5, + }, + }, + }, + { + Name: "system-2", + Indicators: []apiworkload.Indicator{ + { + IndicatorLevel: apiworkload.IndicatorLevelUpperBound, + Value: 10.5, + }, + }, + }, + }, + }, Status: apiworkload.ServiceProfileDescriptorStatus{ BusinessStatus: []apiworkload.ServiceBusinessIndicatorStatus{ { Name: "system-2", Current: &value, }, - }, - AggMetrics: []apiworkload.AggPodMetrics{ + }, + AggMetrics: []apiworkload.AggPodMetrics{ + { + Aggregator: apiworkload.Avg, + Items: []metrics.PodMetrics{ + { + Timestamp: metav1.NewTime(time.Date(2022, 1, 1, 1, 0, 0, 0, time.Local)), + Window: metav1.Duration{Duration: time.Hour}, + Containers: []metrics.ContainerMetrics{ + { + Name: "c1", + Usage: map[v1.ResourceName]resource.Quantity{ + apimetricpod.CustomMetricPodCPULoad1Min: resource.MustParse("20"), + }, + }, + }, + }, + }, + }, { - Aggregator: apiworkload.Avg, + Aggregator: apiworkload.Sum, }, }, }, @@ -773,482 +1190,100 @@ func TestIndicatorUpdater(t *testing.T) { Indicators: []apiworkload.Indicator{ { IndicatorLevel: apiworkload.IndicatorLevelUpperBound, - Value: 18.3, - }, - }, - }, - }) - sc.indicatorManager.UpdateBusinessIndicatorSpec(nn, []apiworkload.ServiceBusinessIndicatorSpec{ - { - Name: "business-3", - Indicators: []apiworkload.Indicator{ - { - IndicatorLevel: apiworkload.IndicatorLevelUpperBound, - Value: 13.3, - }, - }, - }, - { - Name: "business-3", - Indicators: []apiworkload.Indicator{ - { - IndicatorLevel: apiworkload.IndicatorLevelUpperBound, - Value: 16.8, - }, - }, - }, - }) - - sc.indicatorManager.UpdateSystemIndicatorSpec(nn, []apiworkload.ServiceSystemIndicatorSpec{ - { - Name: "system-1", - Indicators: []apiworkload.Indicator{ - { - IndicatorLevel: apiworkload.IndicatorLevelLowerBound, - Value: 10.5, - }, - { - IndicatorLevel: apiworkload.IndicatorLevelUpperBound, - Value: 10.5, - }, - }, - }, - }) - sc.indicatorManager.UpdateSystemIndicatorSpec(nn, []apiworkload.ServiceSystemIndicatorSpec{ - { - Name: "system-2", - Indicators: []apiworkload.Indicator{ - { - IndicatorLevel: apiworkload.IndicatorLevelUpperBound, - Value: 10.5, - }, - }, - }, - }) - - sc.indicatorManager.UpdateBusinessIndicatorStatus(nn, []apiworkload.ServiceBusinessIndicatorStatus{ - { - Name: "system-1", - Current: &value, - }, - { - Name: "system-2", - Current: &value, - }, - }) - - sc.indicatorManager.UpdateAggMetrics(nn, []apiworkload.AggPodMetrics{ - { - Aggregator: apiworkload.Avg, - }, - { - Aggregator: apiworkload.Sum, - }, - }) - - time.Sleep(time.Second) - newSPD, err := controlCtx.Client.InternalClient.WorkloadV1alpha1(). - ServiceProfileDescriptors("default").Get(ctx, "spd1", metav1.GetOptions{}) - assert.NoError(t, err) - assert.Equal(t, expectedSpd.Spec.ExtendedIndicator, newSPD.Spec.ExtendedIndicator) - assert.Equal(t, expectedSpd.Spec.BusinessIndicator, newSPD.Spec.BusinessIndicator) - assert.Equal(t, expectedSpd.Spec.SystemIndicator, newSPD.Spec.SystemIndicator) - assert.Equal(t, expectedSpd.Status.BusinessStatus, newSPD.Status.BusinessStatus) -} - -func TestSPDController_Run_(t *testing.T) { - t.Parallel() - - type fields struct { - pod *v1.Pod - workload *appsv1.StatefulSet - spd *apiworkload.ServiceProfileDescriptor - } - tests := []struct { - name string - fields fields - wantWorkload *appsv1.StatefulSet - wantSPD *apiworkload.ServiceProfileDescriptor - }{ - { - name: "delete unwanted spd", - fields: fields{ - pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: "default", - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Kind: "StatefulSet", - Name: "sts1", - }, - }, - Annotations: map[string]string{ - apiconsts.PodAnnotationSPDNameKey: "spd1", - }, - Labels: map[string]string{ - "workload": "sts1", - }, - }, - }, - workload: &appsv1.StatefulSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "StatefulSet", - APIVersion: "apps/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "sts1", - Namespace: "default", - Annotations: map[string]string{}, - }, - Spec: appsv1.StatefulSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "workload": "sts1", - }, - }, - }, - }, - spd: &apiworkload.ServiceProfileDescriptor{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "sts1", - }, - Spec: apiworkload.ServiceProfileDescriptorSpec{ - TargetRef: apis.CrossVersionObjectReference{ - Kind: stsGVK.Kind, - Name: "sts1", - APIVersion: stsGVK.GroupVersion().String(), - }, - }, - Status: apiworkload.ServiceProfileDescriptorStatus{}, - }, - }, - wantWorkload: &appsv1.StatefulSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "StatefulSet", - APIVersion: "apps/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "sts1", - Namespace: "default", - }, - Spec: appsv1.StatefulSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "workload": "sts1", - }, - }, - }, - }, - wantSPD: nil, - }, - { - name: "auto create spd", - fields: fields{ - workload: &appsv1.StatefulSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "StatefulSet", - APIVersion: "apps/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "sts1", - Namespace: "default", - Annotations: map[string]string{ - apiconsts.WorkloadAnnotationSPDEnableKey: apiconsts.WorkloadAnnotationSPDEnabled, - }, - }, - Spec: appsv1.StatefulSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "workload": "sts1", - }, - }, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - "katalyst.kubewharf.io/qos_level": "dedicated_cores", - }, - }, - Spec: v1.PodSpec{}, - }, - }, - }, - spd: nil, - }, - wantWorkload: &appsv1.StatefulSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "StatefulSet", - APIVersion: "apps/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "sts1", - Namespace: "default", - Annotations: map[string]string{ - apiconsts.WorkloadAnnotationSPDEnableKey: apiconsts.WorkloadAnnotationSPDEnabled, - }, - }, - Spec: appsv1.StatefulSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "workload": "sts1", - }, - }, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - "katalyst.kubewharf.io/qos_level": "dedicated_cores", - }, - }, - Spec: v1.PodSpec{}, - }, - }, - }, - wantSPD: &apiworkload.ServiceProfileDescriptor{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "sts1", - Annotations: map[string]string{ - consts.ServiceProfileDescriptorAnnotationKeyConfigHash: "51131be1b092", - }, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Kind: "StatefulSet", - Name: "sts1", - }, - }, - }, - Spec: apiworkload.ServiceProfileDescriptorSpec{ - TargetRef: apis.CrossVersionObjectReference{ - Kind: stsGVK.Kind, - Name: "sts1", - APIVersion: stsGVK.GroupVersion().String(), - }, - BaselinePercent: pointer.Int32(100), - }, - Status: apiworkload.ServiceProfileDescriptorStatus{ - AggMetrics: []apiworkload.AggPodMetrics{}, + Value: 18.3, }, }, }, + }) + sc.indicatorManager.UpdateBusinessIndicatorSpec(nn, []apiworkload.ServiceBusinessIndicatorSpec{ { - name: "auto create spd(dedicated_cores)", - fields: fields{ - workload: &appsv1.StatefulSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "StatefulSet", - APIVersion: "apps/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "sts1", - Namespace: "default", - Annotations: map[string]string{ - apiconsts.WorkloadAnnotationSPDEnableKey: apiconsts.WorkloadAnnotationSPDEnabled, - }, - }, - Spec: appsv1.StatefulSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "workload": "sts1", - }, - }, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{"katalyst.kubewharf.io/qos_level": "dedicated_cores"}, - }, - Spec: v1.PodSpec{}, - }, - }, + Name: "business-3", + Indicators: []apiworkload.Indicator{ + { + IndicatorLevel: apiworkload.IndicatorLevelUpperBound, + Value: 13.3, }, - spd: nil, }, - wantWorkload: &appsv1.StatefulSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "StatefulSet", - APIVersion: "apps/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "sts1", - Namespace: "default", - Annotations: map[string]string{ - apiconsts.WorkloadAnnotationSPDEnableKey: apiconsts.WorkloadAnnotationSPDEnabled, - }, - }, - Spec: appsv1.StatefulSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "workload": "sts1", - }, - }, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{"katalyst.kubewharf.io/qos_level": "dedicated_cores"}, - }, - Spec: v1.PodSpec{}, - }, + }, + { + Name: "business-3", + Indicators: []apiworkload.Indicator{ + { + IndicatorLevel: apiworkload.IndicatorLevelUpperBound, + Value: 16.8, }, }, - wantSPD: &apiworkload.ServiceProfileDescriptor{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "sts1", - Annotations: map[string]string{ - consts.ServiceProfileDescriptorAnnotationKeyConfigHash: "51131be1b092", - }, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Kind: "StatefulSet", - Name: "sts1", - }, - }, - }, - Spec: apiworkload.ServiceProfileDescriptorSpec{ - TargetRef: apis.CrossVersionObjectReference{ - Kind: stsGVK.Kind, - Name: "sts1", - APIVersion: stsGVK.GroupVersion().String(), - }, - BaselinePercent: pointer.Int32(100), + }, + }) + + sc.indicatorManager.UpdateSystemIndicatorSpec(nn, []apiworkload.ServiceSystemIndicatorSpec{ + { + Name: "system-1", + Indicators: []apiworkload.Indicator{ + { + IndicatorLevel: apiworkload.IndicatorLevelLowerBound, + Value: 10.5, }, - Status: apiworkload.ServiceProfileDescriptorStatus{ - AggMetrics: []apiworkload.AggPodMetrics{}, + { + IndicatorLevel: apiworkload.IndicatorLevelUpperBound, + Value: 10.5, }, }, }, + }) + sc.indicatorManager.UpdateSystemIndicatorSpec(nn, []apiworkload.ServiceSystemIndicatorSpec{ { - name: "auto create spd(shared_cores)", - fields: fields{ - workload: &appsv1.StatefulSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "StatefulSet", - APIVersion: "apps/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "sts1", - Namespace: "default", - Annotations: map[string]string{ - apiconsts.WorkloadAnnotationSPDEnableKey: apiconsts.WorkloadAnnotationSPDEnabled, - }, - }, - Spec: appsv1.StatefulSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "workload": "sts1", - }, - }, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{"katalyst.kubewharf.io/qos_level": "shared_cores"}, - }, - Spec: v1.PodSpec{}, - }, - }, + Name: "system-2", + Indicators: []apiworkload.Indicator{ + { + IndicatorLevel: apiworkload.IndicatorLevelUpperBound, + Value: 10.5, }, - spd: nil, }, - wantWorkload: &appsv1.StatefulSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "StatefulSet", - APIVersion: "apps/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "sts1", - Namespace: "default", - Annotations: map[string]string{ - apiconsts.WorkloadAnnotationSPDEnableKey: apiconsts.WorkloadAnnotationSPDEnabled, - }, - }, - Spec: appsv1.StatefulSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "workload": "sts1", - }, - }, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{"katalyst.kubewharf.io/qos_level": "shared_cores"}, - }, - Spec: v1.PodSpec{}, - }, - }, + }, + }) + + sc.indicatorManager.UpdateBusinessIndicatorStatus( + nn, []apiworkload.ServiceBusinessIndicatorStatus{ + { + Name: "system-1", + Current: &value, }, - wantSPD: &apiworkload.ServiceProfileDescriptor{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "sts1", - Annotations: map[string]string{ - consts.ServiceProfileDescriptorAnnotationKeyConfigHash: "a62e4c90e3ed", - }, - OwnerReferences: []metav1.OwnerReference{ + { + Name: "system-2", + Current: &value, + }, + }, + ) + sc.indicatorManager.UpdateAggMetrics(nn, []apiworkload.AggPodMetrics{ + { + Aggregator: apiworkload.Avg, + Items: []metrics.PodMetrics{ + { + Timestamp: metav1.NewTime(time.Date(2022, 1, 1, 1, 0, 0, 0, time.Local)), + Window: metav1.Duration{Duration: time.Hour}, + Containers: []metrics.ContainerMetrics{ { - APIVersion: "apps/v1", - Kind: "StatefulSet", - Name: "sts1", + Name: "c1", + Usage: map[v1.ResourceName]resource.Quantity{ + apimetricpod.CustomMetricPodCPULoad1Min: resource.MustParse("20"), + }, }, }, }, - Spec: apiworkload.ServiceProfileDescriptorSpec{ - TargetRef: apis.CrossVersionObjectReference{ - Kind: stsGVK.Kind, - Name: "sts1", - APIVersion: stsGVK.GroupVersion().String(), - }, - BaselinePercent: pointer.Int32(0), - }, - Status: apiworkload.ServiceProfileDescriptorStatus{ - AggMetrics: []apiworkload.AggPodMetrics{}, - }, }, }, - } - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - spdConfig := &controller.SPDConfig{ - SPDWorkloadGVResources: []string{"statefulsets.v1.apps"}, - BaselinePercent: map[string]int64{ - apiconsts.PodAnnotationQoSLevelSharedCores: 0, - apiconsts.PodAnnotationQoSLevelDedicatedCores: 100, - }, - } - genericConfig := &generic.GenericConfiguration{} - controllerConf := &controller.GenericControllerConfiguration{ - DynamicGVResources: []string{"statefulsets.v1.apps"}, - } - - ctx := context.TODO() - controlCtx, err := katalystbase.GenerateFakeGenericContext([]runtime.Object{tt.fields.pod}, - []runtime.Object{tt.fields.spd}, []runtime.Object{tt.fields.workload}) - assert.NoError(t, err) - - spdController, err := NewSPDController(ctx, controlCtx, genericConfig, controllerConf, spdConfig, generic.NewQoSConfiguration(), struct{}{}) - assert.NoError(t, err) - - controlCtx.StartInformer(ctx) - go spdController.Run() - synced := cache.WaitForCacheSync(ctx.Done(), spdController.syncedFunc...) - assert.True(t, synced) - time.Sleep(1 * time.Second) - - targetSPD := tt.fields.spd - if targetSPD == nil { - targetSPD = tt.wantSPD - } - newSPD, _ := controlCtx.Client.InternalClient.WorkloadV1alpha1(). - ServiceProfileDescriptors(targetSPD.Namespace).Get(ctx, targetSPD.Name, metav1.GetOptions{}) - assert.Equal(t, tt.wantSPD, newSPD) - - newObject, _ := controlCtx.Client.DynamicClient.Resource(stsGVR). - Namespace(tt.fields.workload.GetNamespace()).Get(ctx, tt.fields.workload.GetName(), metav1.GetOptions{}) - - newWorkload := &appsv1.StatefulSet{} - err = runtime.DefaultUnstructuredConverter.FromUnstructured(newObject.UnstructuredContent(), newWorkload) - assert.NoError(t, err) - assert.Equal(t, tt.wantWorkload, newWorkload) - }) - } + { + Aggregator: apiworkload.Sum, + }, + }) + time.Sleep(time.Second) + newSPD, err := controlCtx.Client.InternalClient.WorkloadV1alpha1(). + ServiceProfileDescriptors("default").Get(ctx, "spd1", metav1.GetOptions{}) + assert.NoError(t, err) + assert.Equal(t, expectedSpd.Spec.ExtendedIndicator, newSPD.Spec.ExtendedIndicator) + assert.Equal(t, expectedSpd.Spec.BusinessIndicator, newSPD.Spec.BusinessIndicator) + assert.Equal(t, expectedSpd.Spec.SystemIndicator, newSPD.Spec.SystemIndicator) + assert.Equal(t, expectedSpd.Status.BusinessStatus, newSPD.Status.BusinessStatus) + assert.Equal(t, expectedSpd.Status.AggMetrics, newSPD.Status.AggMetrics) } diff --git a/pkg/metaserver/spd/fetcher.go b/pkg/metaserver/spd/fetcher.go index 4dffe383fe..19b3b4228e 100644 --- a/pkg/metaserver/spd/fetcher.go +++ b/pkg/metaserver/spd/fetcher.go @@ -78,8 +78,9 @@ func (d DummySPDFetcher) Run(_ context.Context) { } type spdFetcher struct { - started *atomic.Bool - mux sync.Mutex + started *atomic.Bool + spdGetFromRemote bool + mux sync.Mutex client *client.GenericClientSet emitter metrics.MetricEmitter @@ -106,6 +107,7 @@ func NewSPDFetcher(clientSet *client.GenericClientSet, emitter metrics.MetricEmi emitter: emitter, checkpointManager: checkpointManager, cncFetcher: cncFetcher, + spdGetFromRemote: conf.SPDGetFromRemote, } m.getPodSPDNameFunc = util.GetPodSPDName @@ -148,7 +150,7 @@ func (s *spdFetcher) Run(ctx context.Context) { <-ctx.Done() } -func (s *spdFetcher) getSPDByNamespaceName(_ context.Context, namespace, name string) (*workloadapis.ServiceProfileDescriptor, error) { +func (s *spdFetcher) getSPDByNamespaceName(ctx context.Context, namespace, name string) (*workloadapis.ServiceProfileDescriptor, error) { key := native.GenerateNamespaceNameKey(namespace, name) baseTag := []metrics.MetricTag{ {Key: "spdNamespace", Val: namespace}, @@ -159,6 +161,17 @@ func (s *spdFetcher) getSPDByNamespaceName(_ context.Context, namespace, name st currentSPD := s.spdCache.GetSPD(key, true) if currentSPD != nil { return currentSPD, nil + } else if s.spdGetFromRemote { + klog.Infof("[spd-manager] need to get spd %s from remote", key) + err := s.tryUpdateSPDCache(ctx, namespace, name, currentSPD, baseTag, s.spdGetFromRemote) + if err != nil { + return currentSPD, err + } + + currentSPD = s.spdCache.GetSPD(key, true) + if currentSPD != nil { + return currentSPD, nil + } } _ = s.emitter.StoreInt64(metricsNameCacheNotFound, 1, metrics.MetricTypeNameCount, baseTag...) @@ -198,43 +211,49 @@ func (s *spdFetcher) sync(ctx context.Context) { // first get spd origin spd from local cache originSPD := s.spdCache.GetSPD(key, false) - // get spd current target config from cnc to limit rate of get remote spd by comparing local spd - // hash with cnc target config hash, if cnc target config not found it will get remote spd directly - targetConfig, err := s.getSPDTargetConfig(ctx, namespace, name) - if err != nil { - klog.Warningf("[spd-manager] get spd targetConfig config failed: %v, use local cache instead", err) - targetConfig = &configapis.TargetConfig{ - ConfigNamespace: namespace, - ConfigName: name, - } - _ = s.emitter.StoreInt64(metricsNameGetCNCTargetConfigFailed, 1, metrics.MetricTypeNameCount, baseTag...) - } + _ = s.tryUpdateSPDCache(ctx, namespace, name, originSPD, baseTag, false) + } +} - // try to update spd cache from remote if cache spd hash is not equal to target config hash, - // the rate of getting remote spd will be limited by spd ServiceProfileCacheTTL - err = s.updateSPDCacheIfNeed(ctx, originSPD, targetConfig) - if err != nil { - klog.Errorf("[spd-manager] failed update spd cache from remote: %v, use local cache instead", err) - _ = s.emitter.StoreInt64(metricsNameUpdateCacheFailed, 1, metrics.MetricTypeNameCount, baseTag...) +func (s *spdFetcher) tryUpdateSPDCache(ctx context.Context, namespace, name string, + currentSPD *workloadapis.ServiceProfileDescriptor, baseTag []metrics.MetricTag, spdGetFromRemote bool, +) error { + // get spd current target config from cnc to limit rate of get remote spd by comparing local spd + // hash with cnc target config hash, if cnc target config not found it will get remote spd directly + targetConfig, err := s.getSPDTargetConfig(ctx, namespace, name) + if err != nil { + klog.Warningf("[spd-manager] get spd targetConfig config failed: %v, use local cache instead", err) + targetConfig = &configapis.TargetConfig{ + ConfigNamespace: namespace, + ConfigName: name, } + _ = s.emitter.StoreInt64(metricsNameGetCNCTargetConfigFailed, 1, metrics.MetricTypeNameCount, baseTag...) } + + err = s.updateSPDCacheIfNeed(ctx, currentSPD, targetConfig, spdGetFromRemote) + if err != nil { + klog.Errorf("[spd-manager] failed update spd cache from remote: %v, use local cache instead", err) + _ = s.emitter.StoreInt64(metricsNameUpdateCacheFailed, 1, metrics.MetricTypeNameCount, baseTag...) + return err + } + return nil } // updateSPDCacheIfNeed checks if the previous spd has changed, and // re-get from APIServer if the previous is out-of date. func (s *spdFetcher) updateSPDCacheIfNeed(ctx context.Context, originSPD *workloadapis.ServiceProfileDescriptor, - targetConfig *configapis.TargetConfig, + targetConfig *configapis.TargetConfig, spdGetFromRemote bool, ) error { if originSPD == nil && targetConfig == nil { return nil } now := time.Now() - if originSPD == nil || util.GetSPDHash(originSPD) != targetConfig.Hash { + if originSPD == nil || util.GetSPDHash(originSPD) != targetConfig.Hash || spdGetFromRemote { key := native.GenerateNamespaceNameKey(targetConfig.ConfigNamespace, targetConfig.ConfigName) // Skip the backoff delay if the configuration hash of the CNC target changes, ensuring the // local SPD cache is always updated with the latest configuration. - if nextFetchRemoteTime := s.spdCache.GetNextFetchRemoteTime(key, now, targetConfig.Hash != ""); nextFetchRemoteTime.After(time.Now()) { + if nextFetchRemoteTime := s.spdCache.GetNextFetchRemoteTime(key, now, targetConfig.Hash != ""); nextFetchRemoteTime.After(time.Now()) && !spdGetFromRemote { return nil } else { // first update the timestamp of the last attempt to fetch the remote spd to diff --git a/pkg/metaserver/spd/fetcher_test.go b/pkg/metaserver/spd/fetcher_test.go index 6aafeb722b..95ff2afa28 100644 --- a/pkg/metaserver/spd/fetcher_test.go +++ b/pkg/metaserver/spd/fetcher_test.go @@ -46,6 +46,7 @@ func generateTestConfiguration(t *testing.T, nodeName string, checkpoint string) testConfiguration.NodeName = nodeName testConfiguration.CheckpointManagerDir = checkpoint + testConfiguration.SPDGetFromRemote = true return testConfiguration } diff --git a/pkg/util/spd.go b/pkg/util/spd.go index 90b7fa773c..889238236d 100644 --- a/pkg/util/spd.go +++ b/pkg/util/spd.go @@ -313,6 +313,7 @@ func InsertSPDBusinessIndicatorStatus(status *apiworkload.ServiceProfileDescript status.BusinessStatus = append(status.BusinessStatus, *serviceBusinessIndicatorStatus) } +// InsertSPDAggMetrics inserts aggMetrics into spd status. func InsertSPDAggMetricsStatus(status *apiworkload.ServiceProfileDescriptorStatus, serviceAggPodMetrics *apiworkload.AggPodMetrics, ) {