From feae2cf3f0ce3cb340dffd67f973917a7966f39b Mon Sep 17 00:00:00 2001 From: luomingmeng Date: Mon, 12 Jun 2023 19:14:22 +0800 Subject: [PATCH] spd indicator plugin init function support spd workload informer --- pkg/controller/spd/indicator-plugin/plugin.go | 4 +-- pkg/controller/spd/spd.go | 30 +++++++++++-------- pkg/controller/spd/spd_test.go | 5 ++-- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/pkg/controller/spd/indicator-plugin/plugin.go b/pkg/controller/spd/indicator-plugin/plugin.go index e673b3191..ac76f19b5 100644 --- a/pkg/controller/spd/indicator-plugin/plugin.go +++ b/pkg/controller/spd/indicator-plugin/plugin.go @@ -21,11 +21,11 @@ import ( "sync" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/tools/cache" apiworkload "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1" katalystbase "github.com/kubewharf/katalyst-core/cmd/base" "github.com/kubewharf/katalyst-core/pkg/config/controller" + "github.com/kubewharf/katalyst-core/pkg/util/native" ) // IndicatorPlugin represent an implementation for indicator sources; @@ -68,7 +68,7 @@ func (d DummyIndicatorPlugin) GetSupportedBusinessIndicatorStatus() []apiworkloa var pluginInitializers sync.Map type InitFunc func(ctx context.Context, conf *controller.SPDConfig, extraConf interface{}, - workloadLister map[schema.GroupVersionResource]cache.GenericLister, + spdWorkloadInformer map[schema.GroupVersionResource]native.DynamicInformer, controlCtx *katalystbase.GenericContext, updater IndicatorUpdater) (IndicatorPlugin, error) // RegisterPluginInitializer is used to register user-defined indicator plugins diff --git a/pkg/controller/spd/spd.go b/pkg/controller/spd/spd.go index 9e3f2719a..1475b17cc 100644 --- a/pkg/controller/spd/spd.go +++ b/pkg/controller/spd/spd.go @@ -78,9 +78,10 @@ type SPDController struct { spdIndexer cache.Indexer podIndexer cache.Indexer - podLister corelisters.PodLister - spdLister apiListers.ServiceProfileDescriptorLister - workloadLister map[schema.GroupVersionResource]cache.GenericLister + podLister corelisters.PodLister + spdLister apiListers.ServiceProfileDescriptorLister + workloadLister map[schema.GroupVersionResource]cache.GenericLister + spdWorkloadInformer map[schema.GroupVersionResource]native.DynamicInformer syncedFunc []cache.InformerSynced spdQueue workqueue.RateLimitingInterface @@ -106,15 +107,16 @@ func NewSPDController(ctx context.Context, controlCtx *katalystbase.GenericConte spdInformer := controlCtx.InternalInformerFactory.Workload().V1alpha1().ServiceProfileDescriptors() spdController := &SPDController{ - ctx: ctx, - conf: conf, - podUpdater: &control.DummyPodUpdater{}, - spdControl: &control.DummySPDControl{}, - workloadControl: &control.DummyUnstructuredControl{}, - spdQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "spd"), - workloadSyncQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "workload"), - metricsEmitter: controlCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(spdControllerName), - workloadLister: make(map[schema.GroupVersionResource]cache.GenericLister), + ctx: ctx, + conf: conf, + podUpdater: &control.DummyPodUpdater{}, + spdControl: &control.DummySPDControl{}, + workloadControl: &control.DummyUnstructuredControl{}, + spdQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "spd"), + workloadSyncQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "workload"), + metricsEmitter: controlCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(spdControllerName), + workloadLister: make(map[schema.GroupVersionResource]cache.GenericLister), + spdWorkloadInformer: make(map[schema.GroupVersionResource]native.DynamicInformer), } spdController.podLister = podInformer.Lister() @@ -136,6 +138,7 @@ func NewSPDController(ctx context.Context, controlCtx *katalystbase.GenericConte continue } + spdController.spdWorkloadInformer[wf.GVR] = wf wf.Informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: spdController.addWorkload(workload), UpdateFunc: spdController.updateWorkload(workload), @@ -236,7 +239,8 @@ func (sc *SPDController) initializeIndicatorPlugins(controlCtx *katalystbase.Gen sc.indicatorsStatusBusiness = make(map[apiworkload.ServiceBusinessIndicatorName]interface{}) for pluginName, initFunc := range indicator_plugin.GetPluginInitializers() { - plugin, err := initFunc(sc.ctx, sc.conf, extraConf, sc.workloadLister, controlCtx, sc.indicatorManager) + plugin, err := initFunc(sc.ctx, sc.conf, extraConf, sc.spdWorkloadInformer, + controlCtx, sc.indicatorManager) if err != nil { return err } diff --git a/pkg/controller/spd/spd_test.go b/pkg/controller/spd/spd_test.go index 3361ddf7a..e0082871a 100644 --- a/pkg/controller/spd/spd_test.go +++ b/pkg/controller/spd/spd_test.go @@ -38,6 +38,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/config/controller" "github.com/kubewharf/katalyst-core/pkg/config/generic" indicator_plugin "github.com/kubewharf/katalyst-core/pkg/controller/spd/indicator-plugin" + "github.com/kubewharf/katalyst-core/pkg/util/native" ) var ( @@ -468,12 +469,12 @@ func TestIndicatorUpdater(t *testing.T) { } indicator_plugin.RegisterPluginInitializer("d1", func(_ context.Context, _ *controller.SPDConfig, - _ interface{}, _ map[schema.GroupVersionResource]cache.GenericLister, _ *katalystbase.GenericContext, + _ interface{}, _ map[schema.GroupVersionResource]native.DynamicInformer, _ *katalystbase.GenericContext, _ indicator_plugin.IndicatorUpdater) (indicator_plugin.IndicatorPlugin, error) { return d1, nil }) indicator_plugin.RegisterPluginInitializer("d2", func(_ context.Context, _ *controller.SPDConfig, - _ interface{}, _ map[schema.GroupVersionResource]cache.GenericLister, _ *katalystbase.GenericContext, + _ interface{}, _ map[schema.GroupVersionResource]native.DynamicInformer, _ *katalystbase.GenericContext, _ indicator_plugin.IndicatorUpdater) (indicator_plugin.IndicatorPlugin, error) { return d2, nil })