Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sync mcs work when cluster joined or unjoin #4360

Merged
merged 1 commit into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 60 additions & 5 deletions pkg/controllers/multiclusterservice/mcs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (c *MCSController) Reconcile(ctx context.Context, req controllerruntime.Req
return
}
_ = c.updateMCSStatus(mcs, metav1.ConditionTrue, "ServiceAppliedSucceed", "Service is propagated to target clusters.")
c.EventRecorder.Eventf(mcs, corev1.EventTypeNormal, events.EventReasonSyncWorkSucceed, "Service is propagated to target clusters.")
}()

if err = c.handleMCSCreateOrUpdate(ctx, mcs.DeepCopy()); err != nil {
Expand Down Expand Up @@ -191,13 +192,13 @@ func (c *MCSController) deleteMultiClusterServiceWork(mcs *networkingv1alpha1.Mu
klog.Errorf("Failed to get member cluster name for work %s/%s:%v", work.Namespace, work.Name, work)
continue
}
if !deleteAll && provisionClusters.Has(clusterName) {

if !deleteAll && provisionClusters.Has(clusterName) && c.IsClusterReady(clusterName) {
continue
}

if err = c.Client.Delete(context.TODO(), work.DeepCopy()); err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Error while updating work(%s/%s) deletion timestamp: %s",
work.Namespace, work.Name, err)
klog.Errorf("Error while deleting work(%s/%s): %v", work.Namespace, work.Name, err)
return err
}
}
Expand Down Expand Up @@ -275,13 +276,16 @@ func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networ
}

func (c *MCSController) ensureMultiClusterServiceWork(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
provisionCluster, err := helper.GetProvisionClusters(c.Client, mcs)
provisionClusters, err := helper.GetProvisionClusters(c.Client, mcs)
if err != nil {
klog.Errorf("Failed to get provision clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
return err
}

for clusterName := range provisionCluster {
for clusterName := range provisionClusters {
if !c.IsClusterReady(clusterName) {
continue
}
workMeta := metav1.ObjectMeta{
Name: names.GenerateMCSWorkName(mcs.Kind, mcs.Name, mcs.Namespace, clusterName),
Namespace: names.GenerateExecutionSpaceName(clusterName),
Expand Down Expand Up @@ -392,6 +396,16 @@ func (c *MCSController) updateMCSStatus(mcs *networkingv1alpha1.MultiClusterServ
})
}

func (c *MCSController) IsClusterReady(clusterName string) bool {
cluster := &clusterv1alpha1.Cluster{}
if err := c.Client.Get(context.TODO(), types.NamespacedName{Name: clusterName}, cluster); err != nil {
klog.ErrorS(err, "failed to get cluster object", "Name", clusterName)
return false
}

return util.IsClusterReady(&cluster.Status)
}

// SetupWithManager creates a controller and register to controller manager.
func (c *MCSController) SetupWithManager(mgr controllerruntime.Manager) error {
mcsPredicateFunc := predicate.Funcs{
Expand Down Expand Up @@ -460,6 +474,47 @@ func (c *MCSController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).
For(&networkingv1alpha1.MultiClusterService{}, builder.WithPredicates(mcsPredicateFunc)).
Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(svcMapFunc), builder.WithPredicates(svcPredicateFunc)).
Watches(&clusterv1alpha1.Cluster{}, handler.EnqueueRequestsFromMapFunc(c.clusterMapFunc())).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jwcesign
Do we need to add an event filter?
Note that each cluster change would lead to list of all MultiClusterService in clusterMapFunc().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can only accept changes to the cluster status condition.

WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions)}).
Complete(c)
}

func (c *MCSController) clusterMapFunc() handler.MapFunc {
return func(ctx context.Context, a client.Object) []reconcile.Request {
var clusterName string
switch t := a.(type) {
case *clusterv1alpha1.Cluster:
clusterName = t.Name
default:
return nil
}

klog.V(4).Infof("Begin to sync mcs with cluster %s.", clusterName)
mcsList := &networkingv1alpha1.MultiClusterServiceList{}
if err := c.Client.List(context.TODO(), mcsList, &client.ListOptions{}); err != nil {
klog.Errorf("Failed to list MultiClusterService, error: %v", err)
return nil
}

var requests []reconcile.Request
for index := range mcsList.Items {
if !needSyncMultiClusterService(&mcsList.Items[index], clusterName) {
continue
}

requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: mcsList.Items[index].Namespace,
Name: mcsList.Items[index].Name}})
}

return requests
}
}

func needSyncMultiClusterService(mcs *networkingv1alpha1.MultiClusterService, clusterName string) bool {
if len(mcs.Spec.ServiceProvisionClusters) == 0 || len(mcs.Spec.ServiceConsumptionClusters) == 0 {
return true
}
clusters := sets.New[string](mcs.Spec.ServiceProvisionClusters...)
clusters.Insert(mcs.Spec.ServiceConsumptionClusters...)
return clusters.Has(clusterName)
}
28 changes: 14 additions & 14 deletions pkg/util/helper/mcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,26 +108,26 @@ func MultiClusterServiceCrossClusterEnabled(mcs *networkingv1alpha1.MultiCluster

func GetProvisionClusters(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) {
provisionClusters := sets.New[string](mcs.Spec.ServiceProvisionClusters...)
existClusters, err := util.GetClusterSet(client)
if err != nil {
klog.Errorf("Failed to get cluster set, Error: %v", err)
return nil, err
}
if len(provisionClusters) == 0 {
var err error
provisionClusters, err = util.GetClusterSet(client)
if err != nil {
klog.Errorf("Failed to get cluster set, Error: %v", err)
return nil, err
}
return existClusters, nil
}
return provisionClusters, nil
return provisionClusters.Intersection(existClusters), nil
}

func GetConsumptionClustres(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) {
consumptionClusters := sets.New[string](mcs.Spec.ServiceConsumptionClusters...)
existClusters, err := util.GetClusterSet(client)
if err != nil {
klog.Errorf("Failed to get cluster set, Error: %v", err)
return nil, err
}
if len(consumptionClusters) == 0 {
var err error
consumptionClusters, err = util.GetClusterSet(client)
if err != nil {
klog.Errorf("Failed to get cluster set, Error: %v", err)
return nil, err
}
return existClusters, nil
}
return consumptionClusters, nil
return consumptionClusters.Intersection(existClusters), nil
}