From 103892dd628301340607582a9eaad7d8272c44ae Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre Date: Tue, 7 Nov 2023 21:52:38 +0000 Subject: [PATCH] parallelize job creation --- pkg/controllers/jobset_controller.go | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index d22942bf..27f004f5 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -41,8 +41,8 @@ import ( ) const ( - RestartsKey string = "jobset.sigs.k8s.io/restart-attempt" - parallelDeletions int = 50 + RestartsKey string = "jobset.sigs.k8s.io/restart-attempt" + maxParallelism int = 50 ) var ( @@ -372,26 +372,37 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow } } + var lock sync.Mutex + var finalErrs []error + for _, rjob := range js.Spec.ReplicatedJobs { jobs, err := constructJobsFromTemplate(js, &rjob, ownedJobs) if err != nil { return err } - for _, job := range jobs { + workqueue.ParallelizeUntil(ctx, maxParallelism, len(jobs), func(i int) { + job := jobs[i] + // Set jobset controller as owner of the job for garbage collection and reconcilation. if err := ctrl.SetControllerReference(js, job, r.Scheme); err != nil { - return err + lock.Lock() + defer lock.Unlock() + finalErrs = append(finalErrs, err) + return } // Create the job. // TODO(#18): Deal with the case where the job exists but is not owned by the jobset. if err := r.Create(ctx, job); err != nil { - return err + lock.Lock() + defer lock.Unlock() + finalErrs = append(finalErrs, err) + return } log.V(2).Info("successfully created job", "job", klog.KObj(job)) - } + }) } - return nil + return errors.Join(finalErrs...) } // TODO: look into adopting service and updating the selector @@ -493,7 +504,7 @@ func (r *JobSetReconciler) deleteJobs(ctx context.Context, jobsForDeletion []*ba log := ctrl.LoggerFrom(ctx) lock := &sync.Mutex{} var finalErrs []error - workqueue.ParallelizeUntil(ctx, parallelDeletions, len(jobsForDeletion), func(i int) { + workqueue.ParallelizeUntil(ctx, maxParallelism, len(jobsForDeletion), func(i int) { targetJob := jobsForDeletion[i] // Delete job. This deletion event will trigger another reconciliation, // where the jobs are recreated.