Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup useless comments, and fix receiver names are different #2060

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions pkg/controller/admissionchecks/multikueue/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,19 +152,19 @@ func (g *wlGroup) RemoveRemoteObjects(ctx context.Context, cluster string) error
return nil
}

func (a *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
func (w *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := ctrl.LoggerFrom(ctx)
log.V(2).Info("Reconcile Workload")
wl := &kueue.Workload{}
if err := a.client.Get(ctx, req.NamespacedName, wl); err != nil {
if err := w.client.Get(ctx, req.NamespacedName, wl); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
// NOTE: the not found needs to be treated and should result in the deletion of all the remote workloads.
// since the list of remotes can only be taken from its list of admission check stats we need to either
// 1. use a finalizer
// 2. try to trigger the remote deletion from an event filter.

mkAc, err := a.multikueueAC(ctx, wl)
mkAc, err := w.multikueueAC(ctx, wl)
if err != nil {
return reconcile.Result{}, err
}
Expand All @@ -174,7 +174,7 @@ func (a *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
return reconcile.Result{}, nil
}

adapter, owner := a.adapter(wl)
adapter, owner := w.adapter(wl)
if adapter == nil {
// Reject the workload since there is no chance for it to run.
var rejectionMessage string
Expand All @@ -183,24 +183,24 @@ func (a *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
} else {
rejectionMessage = "No multikueue adapter found"
}
return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, rejectionMessage)
return reconcile.Result{}, w.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, rejectionMessage)
}

managed, unmanagedReason, err := adapter.IsJobManagedByKueue(ctx, a.client, types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace})
managed, unmanagedReason, err := adapter.IsJobManagedByKueue(ctx, w.client, types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace})
if err != nil {
return reconcile.Result{}, err
}

if !managed {
return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, fmt.Sprintf("The owner is not managed by Kueue: %s", unmanagedReason))
return reconcile.Result{}, w.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, fmt.Sprintf("The owner is not managed by Kueue: %s", unmanagedReason))
}

grp, err := a.readGroup(ctx, wl, mkAc.Name, adapter, owner.Name)
grp, err := w.readGroup(ctx, wl, mkAc.Name, adapter, owner.Name)
if err != nil {
return reconcile.Result{}, err
}

return a.reconcileGroup(ctx, grp)
return w.reconcileGroup(ctx, grp)
}

func (w *wlReconciler) updateACS(ctx context.Context, wl *kueue.Workload, acs *kueue.AdmissionCheckState, status kueue.CheckState, message string) error {
Expand Down Expand Up @@ -252,8 +252,8 @@ func (w *wlReconciler) adapter(local *kueue.Workload) (jobAdapter, *metav1.Owner
return nil, nil
}

func (a *wlReconciler) readGroup(ctx context.Context, local *kueue.Workload, acName string, adapter jobAdapter, controllerName string) (*wlGroup, error) {
rClients, err := a.remoteClientsForAC(ctx, acName)
func (w *wlReconciler) readGroup(ctx context.Context, local *kueue.Workload, acName string, adapter jobAdapter, controllerName string) (*wlGroup, error) {
rClients, err := w.remoteClientsForAC(ctx, acName)
if err != nil {
return nil, fmt.Errorf("admission check %q: %w", acName, err)
}
Expand Down Expand Up @@ -281,7 +281,7 @@ func (a *wlReconciler) readGroup(ctx context.Context, local *kueue.Workload, acN
return &grp, nil
}

func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reconcile.Result, error) {
func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reconcile.Result, error) {
log := ctrl.LoggerFrom(ctx).WithValues("op", "reconcileGroup")
log.V(3).Info("Reconcile Workload Group")

Expand All @@ -298,7 +298,7 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
}

if !workload.HasQuotaReservation(group.local) && acs.State == kueue.CheckStateRetry {
errs = append(errs, a.updateACS(ctx, group.local, acs, kueue.CheckStatePending, "Requeued"))
errs = append(errs, w.updateACS(ctx, group.local, acs, kueue.CheckStatePending, "Requeued"))
}

return reconcile.Result{}, errors.Join(errs...)
Expand All @@ -309,7 +309,7 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
// it should not be problematic but the "From remote xxxx:" could be lost ....

if group.jobAdapter != nil {
if err := group.jobAdapter.SyncJob(ctx, a.client, group.remoteClients[remote].client, group.controllerKey, group.local.Name, a.origin); err != nil {
if err := group.jobAdapter.SyncJob(ctx, w.client, group.remoteClients[remote].client, group.controllerKey, group.local.Name, w.origin); err != nil {
log.V(2).Error(err, "copying remote controller status", "workerCluster", remote)
// we should retry this
return reconcile.Result{}, err
Expand All @@ -326,7 +326,7 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
Reason: remoteFinishedCond.Reason,
Message: remoteFinishedCond.Message,
})
return reconcile.Result{}, a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName+"-finish"), client.ForceOwnership)
return reconcile.Result{}, w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName+"-finish"), client.ForceOwnership)
}

// 2. delete all workloads that are out of sync or are not in the chosen worker
Expand Down Expand Up @@ -355,7 +355,7 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
}

acs := workload.FindAdmissionCheck(group.local.Status.AdmissionChecks, group.acName)
if err := group.jobAdapter.SyncJob(ctx, a.client, group.remoteClients[reservingRemote].client, group.controllerKey, group.local.Name, a.origin); err != nil {
if err := group.jobAdapter.SyncJob(ctx, w.client, group.remoteClients[reservingRemote].client, group.controllerKey, group.local.Name, w.origin); err != nil {
log.V(2).Error(err, "creating remote controller object", "remote", reservingRemote)
// We'll retry this in the next reconcile.
return reconcile.Result{}, err
Expand All @@ -374,16 +374,16 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco

wlPatch := workload.BaseSSAWorkload(group.local)
workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs)
err := a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership)
err := w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership)
if err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{RequeueAfter: a.workerLostTimeout}, nil
return reconcile.Result{RequeueAfter: w.workerLostTimeout}, nil
} else if acs.State == kueue.CheckStateReady {
// If there is no reserving and the AC is ready, the connection with the reserving remote might
// be lost, keep the workload admitted for keepReadyTimeout and put it back in the queue after that.
remainingWaitTime := a.workerLostTimeout - time.Since(acs.LastTransitionTime.Time)
remainingWaitTime := w.workerLostTimeout - time.Since(acs.LastTransitionTime.Time)
if remainingWaitTime > 0 {
log.V(3).Info("Reserving remote lost, retry", "retryAfter", remainingWaitTime)
return reconcile.Result{RequeueAfter: remainingWaitTime}, nil
Expand All @@ -393,7 +393,7 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
acs.LastTransitionTime = metav1.NewTime(time.Now())
wlPatch := workload.BaseSSAWorkload(group.local)
workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs)
return reconcile.Result{}, a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership)
return reconcile.Result{}, w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func WithManagerName(n string) Option {
}
}

// WithLabelKeysToCopy
// WithLabelKeysToCopy adds the label keys
func WithLabelKeysToCopy(n []string) Option {
return func(o *Options) {
o.LabelKeysToCopy = n
Expand Down
10 changes: 5 additions & 5 deletions pkg/queue/cluster_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (c *ClusterQueue) totalElements() []*workload.Info {
return elements
}

// Returns true if the queue is active
// Active returns true if the queue is active
func (c *ClusterQueue) Active() bool {
c.rwm.RLock()
defer c.rwm.RUnlock()
Expand All @@ -388,11 +388,11 @@ func (c *ClusterQueue) Active() bool {
// compete with other workloads, until cluster events free up quota.
// The workload should not be reinserted if it's already in the ClusterQueue.
// Returns true if the workload was inserted.
func (cq *ClusterQueue) RequeueIfNotPresent(wInfo *workload.Info, reason RequeueReason) bool {
if cq.queueingStrategy == kueue.StrictFIFO {
return cq.requeueIfNotPresent(wInfo, reason != RequeueReasonNamespaceMismatch)
func (c *ClusterQueue) RequeueIfNotPresent(wInfo *workload.Info, reason RequeueReason) bool {
if c.queueingStrategy == kueue.StrictFIFO {
return c.requeueIfNotPresent(wInfo, reason != RequeueReasonNamespaceMismatch)
}
return cq.requeueIfNotPresent(wInfo, reason == RequeueReasonFailedAfterNomination || reason == RequeueReasonPendingPreemption)
return c.requeueIfNotPresent(wInfo, reason == RequeueReasonFailedAfterNomination || reason == RequeueReasonPendingPreemption)
}

// queueOrderingFunc returns a function used by the clusterQueue heap algorithm
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/testingjobs/jobset/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (j *JobSetWrapper) Label(k, v string) *JobSetWrapper {
return j
}

// Annotation sets annotations to the JobSet.
// Annotations sets annotations to the JobSet.
func (j *JobSetWrapper) Annotations(annotations map[string]string) *JobSetWrapper {
j.ObjectMeta.Annotations = annotations
return j
Expand Down Expand Up @@ -162,6 +162,7 @@ func (j *JobSetWrapper) Condition(c metav1.Condition) *JobSetWrapper {
return j
}

// ManagedBy adds a managedby.
func (j *JobSetWrapper) ManagedBy(c string) *JobSetWrapper {
j.Spec.ManagedBy = &c
return j
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/testingjobs/mxjob/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (j *MXJobWrapper) NodeSelector(k, v string) *MXJobWrapper {
RoleNodeSelector(kftraining.MXJobReplicaTypeWorker, k, v)
}

// NodeSelector updates the nodeSelector of job.
// RoleNodeSelector updates the nodeSelector of job.
func (j *MXJobWrapper) RoleNodeSelector(role kftraining.ReplicaType, k, v string) *MXJobWrapper {
if j.Spec.MXReplicaSpecs[role].Template.Spec.NodeSelector == nil {
j.Spec.MXReplicaSpecs[role].Template.Spec.NodeSelector = make(map[string]string)
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/testingjobs/pod/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (p *PodWrapper) Queue(q string) *PodWrapper {
return p.Label(constants.QueueLabel, q)
}

// Queue updates the queue name of the Pod
// PriorityClass updates the priority class name of the Pod
func (p *PodWrapper) PriorityClass(pc string) *PodWrapper {
p.Spec.PriorityClassName = pc
return p
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/testingjobs/raycluster/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func MakeCluster(name, ns string) *ClusterWrapper {
}}
}

// NodeSelector adds a node selector to the job's head.
// NodeSelectorHeadGroup adds a node selector to the job's head.
func (j *ClusterWrapper) NodeSelectorHeadGroup(k, v string) *ClusterWrapper {
j.Spec.HeadGroupSpec.Template.Spec.NodeSelector[k] = v
return j
Expand Down