@@ -54,14 +54,8 @@ type ResourceDetector struct {
54
54
// ResourceInterpreter knows the details of resource structure.
55
55
ResourceInterpreter resourceinterpreter.ResourceInterpreter
56
56
EventRecorder record.EventRecorder
57
- // policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and
58
- // a reconcile function to consume the items in queue.
59
- policyReconcileWorker util.AsyncWorker
60
- propagationPolicyLister cache.GenericLister
61
57
62
- // clusterPolicyReconcileWorker maintains a rate limited queue which used to store ClusterPropagationPolicy's key and
63
- // a reconcile function to consume the items in queue.
64
- clusterPolicyReconcileWorker util.AsyncWorker
58
+ propagationPolicyLister cache.GenericLister
65
59
clusterPropagationPolicyLister cache.GenericLister
66
60
67
61
// bindingReconcileWorker maintains a rate limited queue which used to store ResourceBinding's key and
@@ -85,30 +79,17 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
85
79
d .waitingObjects = make (map [keys.ClusterWideKey ]struct {})
86
80
d .stopCh = ctx .Done ()
87
81
88
- // setup policy reconcile worker
89
- d .policyReconcileWorker = util .NewAsyncWorker ("propagationPolicy reconciler" , ClusterWideKeyFunc , d .ReconcilePropagationPolicy )
90
- d .policyReconcileWorker .Run (1 , d .stopCh )
91
- d .clusterPolicyReconcileWorker = util .NewAsyncWorker ("clusterPropagationPolicy reconciler" , ClusterWideKeyFunc , d .ReconcileClusterPropagationPolicy )
92
- d .clusterPolicyReconcileWorker .Run (1 , d .stopCh )
93
-
94
- // watch and enqueue PropagationPolicy changes.
95
82
propagationPolicyGVR := schema.GroupVersionResource {
96
83
Group : policyv1alpha1 .GroupVersion .Group ,
97
84
Version : policyv1alpha1 .GroupVersion .Version ,
98
85
Resource : "propagationpolicies" ,
99
86
}
100
- policyHandler := informermanager .NewHandlerOnEvents (d .OnPropagationPolicyAdd , d .OnPropagationPolicyUpdate , d .OnPropagationPolicyDelete )
101
- d .InformerManager .ForResource (propagationPolicyGVR , policyHandler )
102
- d .propagationPolicyLister = d .InformerManager .Lister (propagationPolicyGVR )
103
-
104
- // watch and enqueue ClusterPropagationPolicy changes.
105
87
clusterPropagationPolicyGVR := schema.GroupVersionResource {
106
88
Group : policyv1alpha1 .GroupVersion .Group ,
107
89
Version : policyv1alpha1 .GroupVersion .Version ,
108
90
Resource : "clusterpropagationpolicies" ,
109
91
}
110
- clusterPolicyHandler := informermanager .NewHandlerOnEvents (d .OnClusterPropagationPolicyAdd , d .OnClusterPropagationPolicyUpdate , d .OnClusterPropagationPolicyDelete )
111
- d .InformerManager .ForResource (clusterPropagationPolicyGVR , clusterPolicyHandler )
92
+ d .propagationPolicyLister = d .InformerManager .Lister (propagationPolicyGVR )
112
93
d .clusterPropagationPolicyLister = d .InformerManager .Lister (clusterPropagationPolicyGVR )
113
94
114
95
// setup binding reconcile worker
@@ -135,6 +116,8 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
135
116
d .InformerManager .ForResource (clusterResourceBindingGVR , clusterBindingHandler )
136
117
137
118
d .EventHandler = informermanager .NewFilteringHandlerOnAllEvents (d .EventFilter , d .OnAdd , d .OnUpdate , d .OnDelete )
119
+ d .InformerManager .ForResource (propagationPolicyGVR , d .EventHandler )
120
+ d .InformerManager .ForResource (clusterPropagationPolicyGVR , d .EventHandler )
138
121
d .Processor = util .NewAsyncWorker ("resource detector" , ClusterWideKeyFunc , d .Reconcile )
139
122
d .Processor .Run (1 , d .stopCh )
140
123
go d .discoverResources (30 * time .Second )
@@ -145,8 +128,10 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
145
128
}
146
129
147
130
// Check if our ResourceDetector implements necessary interfaces
148
- var _ manager.Runnable = & ResourceDetector {}
149
- var _ manager.LeaderElectionRunnable = & ResourceDetector {}
131
+ var (
132
+ _ manager.Runnable = & ResourceDetector {}
133
+ _ manager.LeaderElectionRunnable = & ResourceDetector {}
134
+ )
150
135
151
136
func (d * ResourceDetector ) discoverResources (period time.Duration ) {
152
137
wait .Until (func () {
@@ -206,6 +191,15 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
206
191
}
207
192
klog .Infof ("Reconciling object: %s" , clusterWideKey )
208
193
194
+ if clusterWideKey .Group == policyv1alpha1 .GroupName {
195
+ switch clusterWideKey .Kind {
196
+ case "PropagationPolicy" :
197
+ return d .ReconcilePropagationPolicy (key )
198
+ case "ClusterPropagationPolicy" :
199
+ return d .ReconcileClusterPropagationPolicy (key )
200
+ }
201
+ }
202
+
209
203
object , err := d .GetUnstructuredObject (clusterWideKey )
210
204
if err != nil {
211
205
if apierrors .IsNotFound (err ) {
@@ -265,7 +259,8 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
265
259
// - karmada-es-*
266
260
// All objects which API group defined by Karmada should be ignored:
267
261
// - cluster.karmada.io
268
- // - policy.karmada.io
262
+ // - work.karmada.io
263
+ // - config.karmada.io
269
264
//
270
265
// The api objects listed above will be ignored by default, as we don't want users to manually input the things
271
266
// they don't care when trying to skip something else.
@@ -286,6 +281,12 @@ func (d *ResourceDetector) EventFilter(obj interface{}) bool {
286
281
return false
287
282
}
288
283
284
+ if clusterWideKey .Group == policyv1alpha1 .GroupName {
285
+ if clusterWideKey .Kind == "PropagationPolicy" || clusterWideKey .Kind == "ClusterPropagationPolicy" {
286
+ return true
287
+ }
288
+ }
289
+
289
290
if names .IsReservedNamespace (clusterWideKey .Namespace ) {
290
291
return false
291
292
}
@@ -323,7 +324,15 @@ func (d *ResourceDetector) OnAdd(obj interface{}) {
323
324
324
325
// OnUpdate handles object update event and push the object to queue.
325
326
func (d * ResourceDetector ) OnUpdate (oldObj , newObj interface {}) {
326
- d .OnAdd (newObj )
327
+ switch newObj .(type ) {
328
+ case * policyv1alpha1.PropagationPolicy :
329
+ // currently do nothing, since a policy's resource selector can not be updated.
330
+ return
331
+ case * policyv1alpha1.ClusterPropagationPolicy :
332
+ return
333
+ default :
334
+ d .OnAdd (newObj )
335
+ }
327
336
}
328
337
329
338
// OnDelete handles object delete event and push the object to queue.
@@ -504,7 +513,6 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
504
513
bindingCopy .Spec .Replicas = binding .Spec .Replicas
505
514
return nil
506
515
})
507
-
508
516
if err != nil {
509
517
klog .Errorf ("Failed to apply cluster policy(%s) for object: %s. error: %v" , policy .Name , objectKey , err )
510
518
return err
@@ -534,7 +542,6 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
534
542
bindingCopy .Spec .Replicas = binding .Spec .Replicas
535
543
return nil
536
544
})
537
-
538
545
if err != nil {
539
546
klog .Errorf ("Failed to apply cluster policy(%s) for object: %s. error: %v" , policy .Name , objectKey , err )
540
547
return err
@@ -742,33 +749,6 @@ func (d *ResourceDetector) GetMatching(resourceSelectors []policyv1alpha1.Resour
742
749
return matchedResult
743
750
}
744
751
745
- // OnPropagationPolicyAdd handles object add event and push the object to queue.
746
- func (d * ResourceDetector ) OnPropagationPolicyAdd (obj interface {}) {
747
- key , err := ClusterWideKeyFunc (obj )
748
- if err != nil {
749
- return
750
- }
751
-
752
- klog .V (2 ).Infof ("Create PropagationPolicy(%s)" , key )
753
- d .policyReconcileWorker .Add (key )
754
- }
755
-
756
- // OnPropagationPolicyUpdate handles object update event and push the object to queue.
757
- func (d * ResourceDetector ) OnPropagationPolicyUpdate (oldObj , newObj interface {}) {
758
- // currently do nothing, since a policy's resource selector can not be updated.
759
- }
760
-
761
- // OnPropagationPolicyDelete handles object delete event and push the object to queue.
762
- func (d * ResourceDetector ) OnPropagationPolicyDelete (obj interface {}) {
763
- key , err := ClusterWideKeyFunc (obj )
764
- if err != nil {
765
- return
766
- }
767
-
768
- klog .V (2 ).Infof ("Delete PropagationPolicy(%s)" , key )
769
- d .policyReconcileWorker .Add (key )
770
- }
771
-
772
752
// ReconcilePropagationPolicy handles PropagationPolicy resource changes.
773
753
// When adding a PropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and
774
754
// put the object to queue.
@@ -800,33 +780,6 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error {
800
780
return d .HandlePropagationPolicyCreation (propagationObject )
801
781
}
802
782
803
- // OnClusterPropagationPolicyAdd handles object add event and push the object to queue.
804
- func (d * ResourceDetector ) OnClusterPropagationPolicyAdd (obj interface {}) {
805
- key , err := ClusterWideKeyFunc (obj )
806
- if err != nil {
807
- return
808
- }
809
-
810
- klog .V (2 ).Infof ("Create ClusterPropagationPolicy(%s)" , key )
811
- d .clusterPolicyReconcileWorker .Add (key )
812
- }
813
-
814
- // OnClusterPropagationPolicyUpdate handles object update event and push the object to queue.
815
- func (d * ResourceDetector ) OnClusterPropagationPolicyUpdate (oldObj , newObj interface {}) {
816
- // currently do nothing, since a policy's resource selector can not be updated.
817
- }
818
-
819
- // OnClusterPropagationPolicyDelete handles object delete event and push the object to queue.
820
- func (d * ResourceDetector ) OnClusterPropagationPolicyDelete (obj interface {}) {
821
- key , err := ClusterWideKeyFunc (obj )
822
- if err != nil {
823
- return
824
- }
825
-
826
- klog .V (2 ).Infof ("Delete ClusterPropagationPolicy(%s)" , key )
827
- d .clusterPolicyReconcileWorker .Add (key )
828
- }
829
-
830
783
// ReconcileClusterPropagationPolicy handles ClusterPropagationPolicy resource changes.
831
784
// When adding a ClusterPropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and
832
785
// put the object to queue.
@@ -1063,7 +1016,6 @@ func (d *ResourceDetector) ReconcileResourceBinding(key util.QueueKey) error {
1063
1016
1064
1017
// OnClusterResourceBindingAdd handles object add event.
1065
1018
func (d * ResourceDetector ) OnClusterResourceBindingAdd (obj interface {}) {
1066
-
1067
1019
}
1068
1020
1069
1021
// OnClusterResourceBindingUpdate handles object update event and push the object to queue.
0 commit comments