Skip to content

Commit

Permalink
refactor: testing idea to wrap coscheduling
Browse files Browse the repository at this point in the history
This is the "skeleton" of a new idea to wrap coscheduling, adding
in the logic for fluence only where it is needed, likely in
the PodGroup (in the new fluence/core/core that wraps the same
in coscheduling). This is just a skeleton because we are deploying
the sidecar with the wrapped scheduling and absolutely no logic
ported over to AskFlux. I think I have a sense of where to put
this, but wanted to save this vanilla/skeleton state in case
we need to go back to it. Note that it did not work to have
fluence inherit the functions from coscheduler, so I opted for
a strategy of adding it as a helper field, and then just
using it when necessary.

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Mar 7, 2024
1 parent 71156c5 commit af71f88
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 596 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ Fluence enables HPC-grade pod scheduling in Kubernetes via the [Kubernetes Sched

**Important** Fluence does not currently support use in conjunction with the kube-scheduler. Pods must all be scheduled by Fluence, and *you should not use both schedulers in the same cluster*.

## TODO

- Need to list pods, get state, and if is completed, cancel the job id.
- Keep track of state of all pods in group, when all of pods are completed, then issue cancel.
- Calculate on the fly - on the update event we want to loop through pods, if ALL completed, then delete the podid for fluence.

## Getting started

For instructions on how to start Fluence on a K8s cluster, see [examples](examples/). Documentation and instructions for reproducing our CANOPIE-2022 paper (citation below) can be found in the [canopie22-artifacts branch](https://github.com/flux-framework/flux-k8s/tree/canopie22-artifacts).
Expand Down
4 changes: 2 additions & 2 deletions examples/pod-group/lammps/lammps2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ spec:
command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite
resources:
limits:
cpu: 2
cpu: 10
requests:
cpu: 2
cpu: 10
4 changes: 2 additions & 2 deletions examples/pod-group/lammps/lammps4-2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ spec:
command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite
resources:
limits:
cpu: 2
cpu: 10
requests:
cpu: 2
cpu: 10
4 changes: 2 additions & 2 deletions examples/pod-group/lammps/lammps4-3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ spec:
command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite
resources:
limits:
cpu: 2
cpu: 10
requests:
cpu: 2
cpu: 10
4 changes: 2 additions & 2 deletions examples/pod-group/lammps/lammps4.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ spec:
command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite
resources:
limits:
cpu: 2
cpu: 10
requests:
cpu: 2
cpu: 10
4 changes: 2 additions & 2 deletions examples/pod-group/lammps/lammps5.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ spec:
command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite
resources:
limits:
cpu: 2
cpu: 10
requests:
cpu: 2
cpu: 10
4 changes: 2 additions & 2 deletions examples/pod-group/lammps/lammps6.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ spec:
command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite
resources:
limits:
cpu: 2
cpu: 10
requests:
cpu: 2
cpu: 10
2 changes: 1 addition & 1 deletion examples/test_example/fluence-sized-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ spec:
containers:
- name: fluence-job
image: busybox
command: [echo, potato]
command: [sleep, "20"]
restartPolicy: Never
backoffLimit: 4
5 changes: 2 additions & 3 deletions sig-scheduler-plugins/cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"sigs.k8s.io/scheduler-plugins/pkg/capacityscheduling"
"sigs.k8s.io/scheduler-plugins/pkg/coscheduling"
"sigs.k8s.io/scheduler-plugins/pkg/fluence"
"sigs.k8s.io/scheduler-plugins/pkg/networkaware/networkoverhead"
"sigs.k8s.io/scheduler-plugins/pkg/networkaware/topologicalsort"
"sigs.k8s.io/scheduler-plugins/pkg/noderesources"
Expand All @@ -36,7 +37,7 @@ import (
"sigs.k8s.io/scheduler-plugins/pkg/trimaran/loadvariationriskbalancing"
"sigs.k8s.io/scheduler-plugins/pkg/trimaran/lowriskovercommitment"
"sigs.k8s.io/scheduler-plugins/pkg/trimaran/targetloadpacking"
"sigs.k8s.io/scheduler-plugins/pkg/fluence"

// Ensure scheme package is initialized.
_ "sigs.k8s.io/scheduler-plugins/apis/config/scheme"
)
Expand All @@ -56,8 +57,6 @@ func main() {
app.WithPlugin(preemptiontoleration.Name, preemptiontoleration.New),
app.WithPlugin(targetloadpacking.Name, targetloadpacking.New),
app.WithPlugin(lowriskovercommitment.Name, lowriskovercommitment.New),
// Sample plugins below.
// app.WithPlugin(crossnodepreemption.Name, crossnodepreemption.New),
app.WithPlugin(podstate.Name, podstate.New),
app.WithPlugin(qos.Name, qos.New),
app.WithPlugin(fluence.Name, fluence.New),
Expand Down
36 changes: 21 additions & 15 deletions sig-scheduler-plugins/pkg/controllers/podgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (r *PodGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
log.Error(err, fmt.Sprintf("Unable to retrieve pod group %s", req.NamespacedName))
return ctrl.Result{}, err
}
log.Info("REFERENCES", "Reconciler", pg.ObjectMeta.OwnerReferences)

// Grab all statuses (and groups of them) we are interested in
schedulingOrPending := (pg.Status.Phase == schedv1alpha1.PodGroupScheduling || pg.Status.Phase == schedv1alpha1.PodGroupPending)
Expand Down Expand Up @@ -175,6 +176,7 @@ func (r *PodGroupReconciler) updateStatus(
pods []v1.Pod,
) (ctrl.Result, error) {

log := log.FromContext(ctx)
patch := client.MergeFrom(pg.DeepCopy())

switch pg.Status.Phase {
Expand All @@ -186,24 +188,24 @@ func (r *PodGroupReconciler) updateStatus(
}

case schedv1alpha1.PodGroupPending:
result, err := r.updateOwnerReferences(ctx, pg, pods)
if result.Requeue || err != nil {
return result, err
}
if len(pods) >= int(pg.Spec.MinMember) {
pg.Status.Phase = schedv1alpha1.PodGroupScheduling
result, err := r.updateOwnerReferences(ctx, pg, pods)
if result.Requeue || err != nil {
return result, err
}
}
default:

// Get updated counts of running, succeeded, and failed pods
running, succeeded, failed := getCurrentPodStats(pods)

// If for some reason we weren't pending and now have fewer than min required, flip back to pending
if len(pods) < int(pg.Spec.MinMember) {
pg.Status.Phase = schedv1alpha1.PodGroupPending
break
}

// Get updated counts of running, succeeded, and failed pods
running, succeeded, failed := getCurrentPodStats(pods)

// A pod with succeeded + running STILL less than the minimum required is scheduling
if succeeded+running < pg.Spec.MinMember {
pg.Status.Phase = schedv1alpha1.PodGroupScheduling
Expand Down Expand Up @@ -232,16 +234,16 @@ func (r *PodGroupReconciler) updateStatus(
}

// Apply the patch to update, or delete if finished
// TODO would be better if owner references took here, so delete on owner deletion
// TODO deletion is not currently handled for Deployment, ReplicaSet, StatefulSet
// as they are expected to persist. You can delete / lose and bring up again
var err error
if pg.Status.Phase == schedv1alpha1.PodGroupFinished || pg.Status.Phase == schedv1alpha1.PodGroupFailed {
err = r.Delete(ctx, pg)
} else {
r.Status().Update(ctx, pg)
err = r.Patch(ctx, pg, patch)
log.Info("PodGroup", "Status", "Finished", "Owners", pg.OwnerReferences)

// Update but don't requeue
_, err := r.updateOwnerReferences(ctx, pg, pods)
return ctrl.Result{}, err
}
r.Status().Update(ctx, pg)
err = r.Patch(ctx, pg, patch)
return ctrl.Result{Requeue: true}, err
}

Expand Down Expand Up @@ -366,21 +368,25 @@ func (r *PodGroupReconciler) updateOwnerReferences(
return result, nil
}

// Collect owner references for pod group
// Collect current owner references for pod group,
// We want to ensure we add unique ones across the pod
owners := []metav1.OwnerReference{}
var refs []string
for _, ownerRef := range pod.OwnerReferences {
refs = append(refs, fmt.Sprintf("%s/%s", pod.Namespace, ownerRef.Name))
owners = append(owners, ownerRef)
}

patch := client.MergeFrom(pg.DeepCopy())
if len(refs) != 0 {
sort.Strings(refs)
pg.Status.OccupiedBy = strings.Join(refs, ",")
}
// If we have owners, collapose into list
if len(owners) > 0 {
pg.ObjectMeta.OwnerReferences = owners
}

// Apply the patch to update the size
r.Status().Update(ctx, pg)
err := r.Patch(ctx, pg, patch)
Expand Down
29 changes: 0 additions & 29 deletions sig-scheduler-plugins/pkg/fluence/README.md

This file was deleted.

Loading

0 comments on commit af71f88

Please sign in to comment.