From b6fa79eba448643a4bc54212358dcb33fa35aac3 Mon Sep 17 00:00:00 2001 From: "chang.qiangqiang" Date: Wed, 13 Nov 2024 20:10:17 +0800 Subject: [PATCH] feat(binding): create or update work parallel Signed-off-by: chang.qiangqiang --- pkg/controllers/binding/common.go | 47 ++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/pkg/controllers/binding/common.go b/pkg/controllers/binding/common.go index 3556989ed75e..e529f73e1d8e 100644 --- a/pkg/controllers/binding/common.go +++ b/pkg/controllers/binding/common.go @@ -81,6 +81,7 @@ func ensureWork( } } + var createOrUpdateWorkArgs []*CreateOrUpdateWorkArg for i := range targetClusters { targetCluster := targetClusters[i] clonedWorkload := workload.DeepCopy() @@ -135,16 +136,24 @@ func ensureWork( Labels: workLabel, Annotations: annotations, } - - if err = helper.CreateOrUpdateWork( - ctx, - c, - workMeta, - clonedWorkload, - helper.WithSuspendDispatching(shouldSuspendDispatching(suspension, targetCluster)), - helper.WithPreserveResourcesOnDeletion(ptr.Deref(preserveResourcesOnDeletion, false)), - ); err != nil { - return err + createOrUpdateWorkArgs = append(createOrUpdateWorkArgs, &CreateOrUpdateWorkArg{ + WorkMeta: workMeta, + ClonedWorkload: clonedWorkload, + Options: []helper.WorkOption{helper.WithSuspendDispatching(shouldSuspendDispatching(suspension, targetCluster)), helper.WithPreserveResourcesOnDeletion(ptr.Deref(preserveResourcesOnDeletion, false))}, + }) + } + resChan := make(chan error) + for _, item := range createOrUpdateWorkArgs { + go CreateOrUpdateWorkParallel(ctx, resChan, c, item) + } + for i := 0; i < len(targetClusters); i++ { + select { + case res := <-resChan: + if res != nil { + return err + } + case <-ctx.Done(): + return nil } } return nil @@ -293,3 +302,21 @@ func shouldSuspendDispatching(suspension *policyv1alpha1.Suspension, targetClust } return suspendDispatching } + +type CreateOrUpdateWorkArg struct { + WorkMeta metav1.ObjectMeta + ClonedWorkload *unstructured.Unstructured + Options []helper.WorkOption +} + +// CreateOrUpdateWorkParallel creates or update work object parallel. +func CreateOrUpdateWorkParallel(ctx context.Context, resChan chan error, c client.Client, createOrUpdateWorkArg *CreateOrUpdateWorkArg) { + err := helper.CreateOrUpdateWork( + ctx, + c, + createOrUpdateWorkArg.WorkMeta, + createOrUpdateWorkArg.ClonedWorkload, + createOrUpdateWorkArg.Options..., + ) + resChan <- err +}