From af71f886b6d9a8c15aa8b6427be930d72b9a3501 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 7 Mar 2024 01:33:45 -0700 Subject: [PATCH] refactor: testing idea to wrap coscheduling This is the "skeleton" of a new idea to wrap coscheduling, adding in the logic for fluence only where it is needed, likely in the PodGroup (in the new fluence/core/core that wraps the same in coscheduling). This is just a skeleton because we are deploying the sidecar with the wrapped scheduling and absolutely no logic ported over to AskFlux. I think I have a sense of where to put this, but wanted to save this vanilla/skeleton state in case we need to go back to it. Note that it did not work to have fluence inherit the functions from coscheduler, so I opted for a strategy of adding it as a helper field, and then just using it when necessary. Signed-off-by: vsoch --- README.md | 6 + examples/pod-group/lammps/lammps2.yaml | 4 +- examples/pod-group/lammps/lammps4-2.yaml | 4 +- examples/pod-group/lammps/lammps4-3.yaml | 4 +- examples/pod-group/lammps/lammps4.yaml | 4 +- examples/pod-group/lammps/lammps5.yaml | 4 +- examples/pod-group/lammps/lammps6.yaml | 4 +- examples/test_example/fluence-sized-job.yaml | 2 +- sig-scheduler-plugins/cmd/scheduler/main.go | 5 +- .../pkg/controllers/podgroup_controller.go | 36 +- sig-scheduler-plugins/pkg/fluence/README.md | 29 -- .../pkg/fluence/core/core.go | 177 +++------ sig-scheduler-plugins/pkg/fluence/events.go | 166 --------- sig-scheduler-plugins/pkg/fluence/fluence.go | 336 ++++++------------ 14 files changed, 185 insertions(+), 596 deletions(-) delete mode 100644 sig-scheduler-plugins/pkg/fluence/README.md delete mode 100644 sig-scheduler-plugins/pkg/fluence/events.go diff --git a/README.md b/README.md index ae420fd..89f2a18 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,12 @@ Fluence enables HPC-grade pod scheduling in Kubernetes via the [Kubernetes Sched **Important** Fluence does not currently support use in conjunction with the kube-scheduler. Pods must all be scheduled by Fluence, and *you should not use both schedulers in the same cluster*. +## TODO + +- Need to list pods, get state, and if is completed, cancel the job id. +- Keep track of state of all pods in group, when all of pods are completed, then issue cancel. +- Calculate on the fly - on the update event we want to loop through pods, if ALL completed, then delete the podid for fluence. + ## Getting started For instructions on how to start Fluence on a K8s cluster, see [examples](examples/). Documentation and instructions for reproducing our CANOPIE-2022 paper (citation below) can be found in the [canopie22-artifacts branch](https://github.com/flux-framework/flux-k8s/tree/canopie22-artifacts). diff --git a/examples/pod-group/lammps/lammps2.yaml b/examples/pod-group/lammps/lammps2.yaml index 5cc7535..5a83c97 100644 --- a/examples/pod-group/lammps/lammps2.yaml +++ b/examples/pod-group/lammps/lammps2.yaml @@ -14,6 +14,6 @@ spec: command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite resources: limits: - cpu: 2 + cpu: 10 requests: - cpu: 2 \ No newline at end of file + cpu: 10 diff --git a/examples/pod-group/lammps/lammps4-2.yaml b/examples/pod-group/lammps/lammps4-2.yaml index 777e73c..6b647bc 100644 --- a/examples/pod-group/lammps/lammps4-2.yaml +++ b/examples/pod-group/lammps/lammps4-2.yaml @@ -17,6 +17,6 @@ spec: command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite resources: limits: - cpu: 2 + cpu: 10 requests: - cpu: 2 \ No newline at end of file + cpu: 10 diff --git a/examples/pod-group/lammps/lammps4-3.yaml b/examples/pod-group/lammps/lammps4-3.yaml index 76c5ed0..b182751 100644 --- a/examples/pod-group/lammps/lammps4-3.yaml +++ b/examples/pod-group/lammps/lammps4-3.yaml @@ -17,6 +17,6 @@ spec: command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite resources: limits: - cpu: 2 + cpu: 10 requests: - cpu: 2 \ No newline at end of file + cpu: 10 diff --git a/examples/pod-group/lammps/lammps4.yaml b/examples/pod-group/lammps/lammps4.yaml index 38ae0a7..9420902 100644 --- a/examples/pod-group/lammps/lammps4.yaml +++ b/examples/pod-group/lammps/lammps4.yaml @@ -18,6 +18,6 @@ spec: command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite resources: limits: - cpu: 2 + cpu: 10 requests: - cpu: 2 \ No newline at end of file + cpu: 10 diff --git a/examples/pod-group/lammps/lammps5.yaml b/examples/pod-group/lammps/lammps5.yaml index 7546b48..e85299f 100644 --- a/examples/pod-group/lammps/lammps5.yaml +++ b/examples/pod-group/lammps/lammps5.yaml @@ -17,6 +17,6 @@ spec: command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite resources: limits: - cpu: 2 + cpu: 10 requests: - cpu: 2 \ No newline at end of file + cpu: 10 diff --git a/examples/pod-group/lammps/lammps6.yaml b/examples/pod-group/lammps/lammps6.yaml index 2030192..14ebae3 100644 --- a/examples/pod-group/lammps/lammps6.yaml +++ b/examples/pod-group/lammps/lammps6.yaml @@ -17,6 +17,6 @@ spec: command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite resources: limits: - cpu: 2 + cpu: 10 requests: - cpu: 2 \ No newline at end of file + cpu: 10 diff --git a/examples/test_example/fluence-sized-job.yaml b/examples/test_example/fluence-sized-job.yaml index a195d87..d1e7556 100644 --- a/examples/test_example/fluence-sized-job.yaml +++ b/examples/test_example/fluence-sized-job.yaml @@ -11,6 +11,6 @@ spec: containers: - name: fluence-job image: busybox - command: [echo, potato] + command: [sleep, "20"] restartPolicy: Never backoffLimit: 4 diff --git a/sig-scheduler-plugins/cmd/scheduler/main.go b/sig-scheduler-plugins/cmd/scheduler/main.go index d9a580a..2b21d28 100644 --- a/sig-scheduler-plugins/cmd/scheduler/main.go +++ b/sig-scheduler-plugins/cmd/scheduler/main.go @@ -26,6 +26,7 @@ import ( "sigs.k8s.io/scheduler-plugins/pkg/capacityscheduling" "sigs.k8s.io/scheduler-plugins/pkg/coscheduling" + "sigs.k8s.io/scheduler-plugins/pkg/fluence" "sigs.k8s.io/scheduler-plugins/pkg/networkaware/networkoverhead" "sigs.k8s.io/scheduler-plugins/pkg/networkaware/topologicalsort" "sigs.k8s.io/scheduler-plugins/pkg/noderesources" @@ -36,7 +37,7 @@ import ( "sigs.k8s.io/scheduler-plugins/pkg/trimaran/loadvariationriskbalancing" "sigs.k8s.io/scheduler-plugins/pkg/trimaran/lowriskovercommitment" "sigs.k8s.io/scheduler-plugins/pkg/trimaran/targetloadpacking" - "sigs.k8s.io/scheduler-plugins/pkg/fluence" + // Ensure scheme package is initialized. _ "sigs.k8s.io/scheduler-plugins/apis/config/scheme" ) @@ -56,8 +57,6 @@ func main() { app.WithPlugin(preemptiontoleration.Name, preemptiontoleration.New), app.WithPlugin(targetloadpacking.Name, targetloadpacking.New), app.WithPlugin(lowriskovercommitment.Name, lowriskovercommitment.New), - // Sample plugins below. - // app.WithPlugin(crossnodepreemption.Name, crossnodepreemption.New), app.WithPlugin(podstate.Name, podstate.New), app.WithPlugin(qos.Name, qos.New), app.WithPlugin(fluence.Name, fluence.New), diff --git a/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go b/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go index 73b7d2d..ac417f3 100644 --- a/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go +++ b/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go @@ -82,6 +82,7 @@ func (r *PodGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c log.Error(err, fmt.Sprintf("Unable to retrieve pod group %s", req.NamespacedName)) return ctrl.Result{}, err } + log.Info("REFERENCES", "Reconciler", pg.ObjectMeta.OwnerReferences) // Grab all statuses (and groups of them) we are interested in schedulingOrPending := (pg.Status.Phase == schedv1alpha1.PodGroupScheduling || pg.Status.Phase == schedv1alpha1.PodGroupPending) @@ -175,6 +176,7 @@ func (r *PodGroupReconciler) updateStatus( pods []v1.Pod, ) (ctrl.Result, error) { + log := log.FromContext(ctx) patch := client.MergeFrom(pg.DeepCopy()) switch pg.Status.Phase { @@ -186,24 +188,24 @@ func (r *PodGroupReconciler) updateStatus( } case schedv1alpha1.PodGroupPending: + result, err := r.updateOwnerReferences(ctx, pg, pods) + if result.Requeue || err != nil { + return result, err + } if len(pods) >= int(pg.Spec.MinMember) { pg.Status.Phase = schedv1alpha1.PodGroupScheduling - result, err := r.updateOwnerReferences(ctx, pg, pods) - if result.Requeue || err != nil { - return result, err - } } default: - // Get updated counts of running, succeeded, and failed pods - running, succeeded, failed := getCurrentPodStats(pods) - // If for some reason we weren't pending and now have fewer than min required, flip back to pending if len(pods) < int(pg.Spec.MinMember) { pg.Status.Phase = schedv1alpha1.PodGroupPending break } + // Get updated counts of running, succeeded, and failed pods + running, succeeded, failed := getCurrentPodStats(pods) + // A pod with succeeded + running STILL less than the minimum required is scheduling if succeeded+running < pg.Spec.MinMember { pg.Status.Phase = schedv1alpha1.PodGroupScheduling @@ -232,16 +234,16 @@ func (r *PodGroupReconciler) updateStatus( } // Apply the patch to update, or delete if finished - // TODO would be better if owner references took here, so delete on owner deletion - // TODO deletion is not currently handled for Deployment, ReplicaSet, StatefulSet - // as they are expected to persist. You can delete / lose and bring up again var err error if pg.Status.Phase == schedv1alpha1.PodGroupFinished || pg.Status.Phase == schedv1alpha1.PodGroupFailed { - err = r.Delete(ctx, pg) - } else { - r.Status().Update(ctx, pg) - err = r.Patch(ctx, pg, patch) + log.Info("PodGroup", "Status", "Finished", "Owners", pg.OwnerReferences) + + // Update but don't requeue + _, err := r.updateOwnerReferences(ctx, pg, pods) + return ctrl.Result{}, err } + r.Status().Update(ctx, pg) + err = r.Patch(ctx, pg, patch) return ctrl.Result{Requeue: true}, err } @@ -366,21 +368,25 @@ func (r *PodGroupReconciler) updateOwnerReferences( return result, nil } - // Collect owner references for pod group + // Collect current owner references for pod group, + // We want to ensure we add unique ones across the pod owners := []metav1.OwnerReference{} var refs []string for _, ownerRef := range pod.OwnerReferences { refs = append(refs, fmt.Sprintf("%s/%s", pod.Namespace, ownerRef.Name)) owners = append(owners, ownerRef) } + patch := client.MergeFrom(pg.DeepCopy()) if len(refs) != 0 { sort.Strings(refs) pg.Status.OccupiedBy = strings.Join(refs, ",") } + // If we have owners, collapose into list if len(owners) > 0 { pg.ObjectMeta.OwnerReferences = owners } + // Apply the patch to update the size r.Status().Update(ctx, pg) err := r.Patch(ctx, pg, patch) diff --git a/sig-scheduler-plugins/pkg/fluence/README.md b/sig-scheduler-plugins/pkg/fluence/README.md deleted file mode 100644 index 61f4923..0000000 --- a/sig-scheduler-plugins/pkg/fluence/README.md +++ /dev/null @@ -1,29 +0,0 @@ -# Overview - -Project to manage Flux tasks needed to standardize kubernetes HPC scheduling interfaces - -## Installing the chart - -More detail will be added here about installing the chart. You will -be using the [install-as-a-second-scheduler](https://github.com/kubernetes-sigs/scheduler-plugins/tree/master/manifests/install/charts/as-a-second-scheduler) -charts. Fluence-specific values are detailed below. - -### Fluence specific values - -In `values.yaml` it is possible to customize the container image, already defaulted to the latest release, and the allocation policy -used by the scheduler. -Most common options are: - -- `lonode`: choose the nodes with lower ID first. Can be compared to packing -- `low`: choose cores with lowest IDs from multiple nodes. Can be compared to spread process-to-resource placement - -## Maturity Level - - - -- [x] Sample (for demonstrating and inspiring purpose) -- [ ] Alpha (used in companies for pilot projects) -- [ ] Beta (used in companies and developed actively) -- [ ] Stable (used in companies for production workloads) - - diff --git a/sig-scheduler-plugins/pkg/fluence/core/core.go b/sig-scheduler-plugins/pkg/fluence/core/core.go index a3f4531..b117e56 100644 --- a/sig-scheduler-plugins/pkg/fluence/core/core.go +++ b/sig-scheduler-plugins/pkg/fluence/core/core.go @@ -1,161 +1,70 @@ package core import ( - "fmt" - - klog "k8s.io/klog/v2" + "context" + "time" + corev1 "k8s.io/api/core/v1" + informerv1 "k8s.io/client-go/informers/core/v1" "k8s.io/kubernetes/pkg/scheduler/framework" - pb "sigs.k8s.io/scheduler-plugins/pkg/fluence/fluxcli-grpc" -) + "sigs.k8s.io/controller-runtime/pkg/client" -// FluxStateData is a CycleState -// It holds the PodCache for a pod, which has node assignment, group, and group size -// We also save the group name and size, and time created, in case we want to (somehow) resume scheduling -// In practice I'm not sure how CycleState objects are dumped and loaded. Kueue has a dumper :P -// https://github.com/kubernetes/enhancements/blob/master/keps/sig-scheduling/624-scheduling-framework/README.md#cyclestate -type FluxStateData struct { - NodeCache NodeCache -} + "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" -// Clone is required for CycleState plugins -func (s *FluxStateData) Clone() framework.StateData { - return &FluxStateData{NodeCache: s.NodeCache} -} + ccore "sigs.k8s.io/scheduler-plugins/pkg/coscheduling/core" +) -// NewFluxState creates an entry for the CycleState with the node and group name -func NewFluxState(nodeName string, groupName string) *FluxStateData { - cache := NodeCache{NodeName: nodeName} - return &FluxStateData{NodeCache: cache} +// PodGroupManager defines the scheduling operation called +type PodGroupManager struct { + core *ccore.PodGroupManager } -// NodeCache holds the node name and tasks for the node -// For the PodGroupCache, these are organized by group name, -// and there is a list of them -type NodeCache struct { - NodeName string - - // Tie assignment back to PodGroup, which can be used to get size and time created - GroupName string - - // Assigned tasks (often pods) to nodes - // https://github.com/flux-framework/flux-k8s/blob/9f24f36752e3cced1b1112d93bfa366fb58b3c84/src/fluence/fluxion/fluxion.go#L94-L97 - AssignedTasks int +// NewPodGroupManager creates a new operation object. +func NewPodGroupManager(client client.Client, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager { + pgMgr := ccore.NewPodGroupManager(client, snapshotSharedLister, scheduleTimeout, podInformer) + mgr := PodGroupManager{core: pgMgr} + return &mgr } -// A pod group cache holds a list of nodes for an allocation, where each has some number of tasks -// along with the expected group size. This is intended to replace PodGroup -// given the group name, size (derived from annotations) and timestamp -type PodGroupCache struct { - GroupName string - - // This is a cache of nodes for pods - Nodes []NodeCache +func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Duration) { + pgMgr.core.BackoffPodGroup(pgName, backoff) } -// PodGroups seen by fluence -var groupsSeen map[string]*PodGroupCache - -// Init populates the groupsSeen cache -func Init() { - groupsSeen = map[string]*PodGroupCache{} +// ActivateSiblings stashes the pods belonging to the same PodGroup of the given pod +// in the given state, with a reserved key "kubernetes.io/pods-to-activate". +func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) { + pgMgr.core.ActivateSiblings(pod, state) } -// GetFluenceCache determines if a group has been seen. -// Yes -> we return the PodGroupCache entry -// No -> the entry is nil / does not exist -func GetFluenceCache(groupName string) *PodGroupCache { - entry, _ := groupsSeen[groupName] - return entry +// PreFilter filters out a pod if +// 1. it belongs to a podgroup that was recently denied or +// 2. the total number of pods in the podgroup is less than the minimum number of pods +// that is required to be scheduled. +func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error { + return pgMgr.core.PreFilter(ctx, pod) } -// DeletePodGroup deletes a pod from the group cache -func DeletePodGroup(groupName string) { - delete(groupsSeen, groupName) +// Permit permits a pod to run, if the minMember match, it would send a signal to chan. +func (pgMgr *PodGroupManager) Permit(ctx context.Context, pod *corev1.Pod) ccore.Status { + return pgMgr.core.Permit(ctx, pod) } -// CreateNodePodsList creates a list of node pod caches -func CreateNodeList(nodelist []*pb.NodeAlloc, groupName string) (nodepods []NodeCache) { - - // Create a pod cache for each node - nodepods = make([]NodeCache, len(nodelist)) - - // TODO: should we be integrating topology information here? Could it be the - // case that some nodes (pods) in the group should be closer? - for i, v := range nodelist { - nodepods[i] = NodeCache{ - NodeName: v.GetNodeID(), - AssignedTasks: int(v.GetTasks()), - GroupName: groupName, - } - } - - // Update the pods in the PodGroupCache (groupsSeen) - updatePodGroupCache(groupName, nodepods) - return nodepods +// GetCreationTimestamp returns the creation time of a podGroup or a pod. +func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) time.Time { + return pgMgr.core.GetCreationTimestamp(pod, ts) } -// updatePodGroupList updates the PodGroupCache with a listing of nodes -func updatePodGroupCache(groupName string, nodes []NodeCache) { - cache := PodGroupCache{ - Nodes: nodes, - GroupName: groupName, - } - groupsSeen[groupName] = &cache +// DeletePermittedPodGroup deletes a podGroup that passes Pre-Filter but reaches PostFilter. +func (pgMgr *PodGroupManager) DeletePermittedPodGroup(pgFullName string) { + pgMgr.core.DeletePermittedPodGroup(pgFullName) } -// GetNextNode gets the next node in the PodGroupCache -func (p *PodGroupCache) GetNextNode() (string, error) { - - nextnode := "" - - // Quick failure state - we ran out of nodes - if len(p.Nodes) == 0 { - return nextnode, fmt.Errorf("[Fluence] PodGroup %s ran out of nodes.", p.GroupName) - } - - // The next is the 0th in the list - nextnode = p.Nodes[0].NodeName - klog.Infof("[Fluence] Next node for group %s is %s", p.GroupName, nextnode) - - // If there is only one task left, we are going to use it (and remove the node) - if p.Nodes[0].AssignedTasks == 1 { - klog.Infof("[Fluence] First node has one remaining task slot") - slice := p.Nodes[1:] - - // If after we remove the node there are no nodes left... - // Note that I'm not deleting the node from the cache because that is the - // only way fluence knows it has already assigned work (presence of the key) - if len(slice) == 0 { - klog.Infof("[Fluence] Assigning node %s. There are NO reamining nodes for group %s\n", nextnode, p.GroupName) - // delete(podGroupCache, groupName) - return nextnode, nil - } - - klog.Infof("[Fluence] Assigning node %s. There are nodes left for group", nextnode, p.GroupName) - updatePodGroupCache(p.GroupName, slice) - return nextnode, nil - } - - // If we get here the first node had >1 assigned tasks - klog.Infof("[Fluence] Assigning node %s for group %s. There are still task assignments available for this node.", nextnode, p.GroupName) - p.Nodes[0].AssignedTasks = p.Nodes[0].AssignedTasks - 1 - return nextnode, nil +// GetPodGroup returns the PodGroup that a Pod belongs to in cache. +func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod) (string, *v1alpha1.PodGroup) { + return pgMgr.core.GetPodGroup(ctx, pod) } -// GetNextNode gets the next available node we can allocate for a group -// TODO this should be able to take and pass forward a number of tasks. -// It is implicity 1 now, but doesn't have to be. -func GetNextNode(groupName string) (string, error) { - - // Get our entry from the groupsSeen cache - klog.Infof("[Fluence] groups seen %s", groupsSeen) - entry, ok := groupsSeen[groupName] - - // This case should not happen - if !ok { - return "", fmt.Errorf("[Fluence] Map is empty") - } - // Get the next node from the PodGroupCache - return entry.GetNextNode() +// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound. +func (pgMgr *PodGroupManager) CalculateAssignedPods(podGroupName, namespace string) int { + return pgMgr.core.CalculateAssignedPods(podGroupName, namespace) } diff --git a/sig-scheduler-plugins/pkg/fluence/events.go b/sig-scheduler-plugins/pkg/fluence/events.go deleted file mode 100644 index b891713..0000000 --- a/sig-scheduler-plugins/pkg/fluence/events.go +++ /dev/null @@ -1,166 +0,0 @@ -package fluence - -import ( - "context" - "time" - - "google.golang.org/grpc" - v1 "k8s.io/api/core/v1" - klog "k8s.io/klog/v2" - - pb "sigs.k8s.io/scheduler-plugins/pkg/fluence/fluxcli-grpc" - fgroup "sigs.k8s.io/scheduler-plugins/pkg/fluence/group" -) - -// Events are associated with inforers, typically on pods, e.g., -// delete: deletion of a pod -// update: update of a pod! -// For both of the above, there are cases to cancel the flux job -// associated with the group id - -// cancelFluxJobForPod cancels the flux job for a pod. -// We assume that the cancelled job also means deleting the pod group -func (f *Fluence) cancelFluxJob(groupName string) error { - - // TODO: it's a bit risky to store state here, because if the scheduler - // restarts we cannot look up the jobid, and then cannot cancel it. - // There is no way to request cancelling the job for a specific group - jobid, ok := f.groupToJobId[groupName] - - // The job was already cancelled by another pod - if !ok { - klog.Infof("[Fluence] Request for cancel of group %s is already complete.", groupName) - return nil - } - klog.Infof("[Fluence] Cancel flux job: %v for group %s", jobid, groupName) - - // This first error is about connecting to the server - conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) - if err != nil { - klog.Errorf("[Fluence] Error connecting to server: %v", err) - return err - } - defer conn.Close() - - grpcclient := pb.NewFluxcliServiceClient(conn) - _, cancel := context.WithTimeout(context.Background(), 200*time.Second) - defer cancel() - - // This error reflects the success or failure of the cancel request - request := &pb.CancelRequest{JobID: int64(jobid)} - res, err := grpcclient.Cancel(context.Background(), request) - if err != nil { - klog.Errorf("[Fluence] did not receive any cancel response: %v", err) - return err - } - klog.Infof("[Fluence] Job cancellation for group %s result: %d", groupName, res.Error) - - // And this error is if the cancel was successful or not - if res.Error == 0 { - klog.Infof("[Fluence] Successful cancel of flux job: %d for group %s", jobid, groupName) - delete(f.groupToJobId, groupName) - } else { - klog.Warningf("[Fluence] Failed to cancel flux job %d for group %s", jobid, groupName) - } - return nil -} - -// updatePod is called on an update, and the old and new object are presented -func (f *Fluence) updatePod(oldObj, newObj interface{}) { - - oldPod := oldObj.(*v1.Pod) - newPod := newObj.(*v1.Pod) - - // a pod is updated, get the group - // TODO should we be checking group / size for old vs new? - groupName, pg := f.pgMgr.GetPodGroup(context.TODO(), oldPod) - - // If PodGroup is nil, still try to look up a faux name - if pg == nil { - pg = fgroup.CreateFakeGroup(oldPod) - groupName = pg.Name - } - - klog.Infof("[Fluence] Processing event for pod %s in group %s from %s to %s", newPod.Name, groupName, newPod.Status.Phase, oldPod.Status.Phase) - - switch newPod.Status.Phase { - case v1.PodPending: - // in this state we don't know if a pod is going to be running, thus we don't need to update job map - case v1.PodRunning: - // if a pod is start running, we can add it state to the delta graph if it is scheduled by other scheduler - case v1.PodSucceeded: - klog.Infof("[Fluence] Pod %s succeeded, Fluence needs to free the resources", newPod.Name) - - f.mutex.Lock() - defer f.mutex.Unlock() - - // Do we have the group id in our cache? If yes, we haven't deleted the jobid yet - // I am worried here that if some pods are succeeded and others pending, this could - // be a mistake - fluence would schedule it again - _, ok := f.groupToJobId[groupName] - if ok { - f.cancelFluxJob(groupName) - } else { - klog.Infof("[Fluence] Succeeded pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName) - } - - case v1.PodFailed: - - // a corner case need to be tested, the pod exit code is not 0, can be created with segmentation fault pi test - klog.Warningf("[Fluence] Pod %s in group %s failed, Fluence needs to free the resources", newPod.Name, groupName) - - f.mutex.Lock() - defer f.mutex.Unlock() - - _, ok := f.groupToJobId[groupName] - if ok { - f.cancelFluxJob(groupName) - } else { - klog.Errorf("[Fluence] Failed pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName) - } - case v1.PodUnknown: - // don't know how to deal with it as it's unknown phase - default: - // shouldn't enter this branch - } -} - -// deletePod handles the delete event handler -func (f *Fluence) deletePod(podObj interface{}) { - klog.Info("[Fluence] Delete Pod event handler") - pod := podObj.(*v1.Pod) - groupName, pg := f.pgMgr.GetPodGroup(context.TODO(), pod) - - // If PodGroup is nil, still try to look up a faux name - if pg == nil { - pg = fgroup.CreateFakeGroup(pod) - groupName = pg.Name - } - - klog.Infof("[Fluence] Delete pod %s in group %s has status %s", pod.Status.Phase, pod.Name, groupName) - switch pod.Status.Phase { - case v1.PodSucceeded: - case v1.PodPending: - klog.Infof("[Fluence] Pod %s completed and is Pending termination, Fluence needs to free the resources", pod.Name) - - f.mutex.Lock() - defer f.mutex.Unlock() - - _, ok := f.groupToJobId[groupName] - if ok { - f.cancelFluxJob(groupName) - } else { - klog.Infof("[Fluence] Terminating pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName) - } - case v1.PodRunning: - f.mutex.Lock() - defer f.mutex.Unlock() - - _, ok := f.groupToJobId[groupName] - if ok { - f.cancelFluxJob(groupName) - } else { - klog.Infof("[Fluence] Deleted pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName) - } - } -} diff --git a/sig-scheduler-plugins/pkg/fluence/fluence.go b/sig-scheduler-plugins/pkg/fluence/fluence.go index 33976ae..7ec2e0e 100644 --- a/sig-scheduler-plugins/pkg/fluence/fluence.go +++ b/sig-scheduler-plugins/pkg/fluence/fluence.go @@ -3,122 +3,122 @@ package fluence import ( "context" "fmt" - "os" "sync" "time" - "google.golang.org/grpc" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/informers" clientscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" + cosched "sigs.k8s.io/scheduler-plugins/pkg/coscheduling" + ccore "sigs.k8s.io/scheduler-plugins/pkg/coscheduling/core" + + fgroup "sigs.k8s.io/scheduler-plugins/pkg/fluence/group" + corev1helpers "k8s.io/component-helpers/scheduling/corev1" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" - "sigs.k8s.io/controller-runtime/pkg/client" - sched "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" - coschedulingcore "sigs.k8s.io/scheduler-plugins/pkg/coscheduling/core" + + "sigs.k8s.io/scheduler-plugins/apis/config" + "sigs.k8s.io/scheduler-plugins/apis/scheduling" + "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" fcore "sigs.k8s.io/scheduler-plugins/pkg/fluence/core" - pb "sigs.k8s.io/scheduler-plugins/pkg/fluence/fluxcli-grpc" - fgroup "sigs.k8s.io/scheduler-plugins/pkg/fluence/group" - "sigs.k8s.io/scheduler-plugins/pkg/fluence/utils" ) +// Fluence schedules pods in a group using Fluxion as a backend +// We inherit cosched.Coscheduling to use some of the primary functions type Fluence struct { - mutex sync.Mutex - handle framework.Handle - client client.Client + cosched cosched.Coscheduling + mutex sync.Mutex + client client.Client // Store jobid on the level of a group (which can be a single pod) groupToJobId map[string]uint64 - pgMgr coschedulingcore.Manager -} -// Name is the name of the plugin used in the Registry and configurations. -// Note that this would do better as an annotation (fluence.flux-framework.org/pod-group) -// But we cannot use them as selectors then! -const ( - Name = "Fluence" -) + frameworkHandler framework.Handle + pgMgr ccore.Manager + scheduleTimeout *time.Duration + pgBackoff *time.Duration +} var ( - _ framework.QueueSortPlugin = &Fluence{} - _ framework.PreFilterPlugin = &Fluence{} - _ framework.FilterPlugin = &Fluence{} + _ framework.QueueSortPlugin = &Fluence{} + _ framework.PreFilterPlugin = &Fluence{} + _ framework.PostFilterPlugin = &Fluence{} // Here down are from coscheduling + _ framework.PermitPlugin = &Fluence{} + _ framework.ReservePlugin = &Fluence{} + _ framework.EnqueueExtensions = &Fluence{} ) -func (f *Fluence) Name() string { - return Name -} +const ( + // Name is the name of the plugin used in Registry and configurations. + Name = "Fluence" +) // Initialize and return a new Fluence Custom Scheduler Plugin -// This class and functions are analogous to: -// https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/coscheduling/coscheduling.go#L63 -func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { - f := &Fluence{handle: handle, groupToJobId: make(map[string]uint64)} - - ctx := context.TODO() - fcore.Init() - - fluxPodsInformer := handle.SharedInformerFactory().Core().V1().Pods().Informer() - fluxPodsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - UpdateFunc: f.updatePod, - DeleteFunc: f.deletePod, - }) - - go fluxPodsInformer.Run(ctx.Done()) + // Keep these empty for now, use defaults + args := config.CoschedulingArgs{} scheme := runtime.NewScheme() - clientscheme.AddToScheme(scheme) - v1.AddToScheme(scheme) - sched.AddToScheme(scheme) - k8scli, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme}) - if err != nil { - return nil, err - } - - // Save the kubernetes client for fluence to interact with cluster objects - f.client = k8scli + _ = clientscheme.AddToScheme(scheme) + _ = v1.AddToScheme(scheme) + _ = v1alpha1.AddToScheme(scheme) - fieldSelector, err := fields.ParseSelector(",status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed)) + client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme}) if err != nil { - klog.Errorf("ParseSelector failed %s", err) - os.Exit(1) + return nil, err } - informerFactory := informers.NewSharedInformerFactoryWithOptions(handle.ClientSet(), 0, informers.WithTweakListOptions(func(opt *metav1.ListOptions) { - opt.FieldSelector = fieldSelector.String() - })) - podInformer := informerFactory.Core().V1().Pods() - scheduleTimeDuration := time.Duration(500) * time.Second + // Performance improvement when retrieving list of objects by namespace or we'll log 'index not exist' warning. + handle.SharedInformerFactory().Core().V1().Pods().Informer().AddIndexers(cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - // https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/coscheduling/core/core.go#L84 - pgMgr := coschedulingcore.NewPodGroupManager( - k8scli, + // PermitWaitingTimeSeconds is the waiting timeout in seconds. + scheduleTimeDuration := time.Duration(args.PermitWaitingTimeSeconds) * time.Second + pgMgr := fcore.NewPodGroupManager( + client, handle.SnapshotSharedLister(), &scheduleTimeDuration, - podInformer, + // Keep the podInformer (from frameworkHandle) as the single source of Pods. + handle.SharedInformerFactory().Core().V1().Pods(), ) - f.pgMgr = pgMgr - // stopCh := make(chan struct{}) - // defer close(stopCh) - // informerFactory.Start(stopCh) - informerFactory.Start(ctx.Done()) + // The main difference here is adding the groupToJobId lookup + plugin := &Fluence{ + frameworkHandler: handle, + pgMgr: pgMgr, + scheduleTimeout: &scheduleTimeDuration, + groupToJobId: make(map[string]uint64), + } - if !cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) { - err := fmt.Errorf("WaitForCacheSync failed") - klog.ErrorS(err, "Cannot sync caches") + // PodGroupBackoffSeconds: backoff time in seconds before a pod group can be scheduled again. + if args.PodGroupBackoffSeconds < 0 { + err := fmt.Errorf("parse arguments failed") + klog.ErrorS(err, "PodGroupBackoffSeconds cannot be negative") return nil, err + } else if args.PodGroupBackoffSeconds > 0 { + pgBackoff := time.Duration(args.PodGroupBackoffSeconds) * time.Second + plugin.pgBackoff = &pgBackoff } + return plugin, nil +} + +func (f *Fluence) Name() string { + return Name +} - klog.Info("Fluence scheduler plugin started") - return f, nil +// Fluence has added delete, although I wonder if update includes that signal +// and it's redundant? +func (f *Fluence) EventsToRegister() []framework.ClusterEventWithHint { + // To register a custom event, follow the naming convention at: + // https://git.k8s.io/kubernetes/pkg/scheduler/eventhandlers.go#L403-L410 + pgGVK := fmt.Sprintf("podgroups.v1alpha1.%v", scheduling.GroupName) + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add | framework.Delete}}, + {Event: framework.ClusterEvent{Resource: framework.GVK(pgGVK), ActionType: framework.Add | framework.Update | framework.Delete}}, + } } // Less is used to sort pods in the scheduling queue in the following order. @@ -147,177 +147,41 @@ func (f *Fluence) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { // If they are the same, fall back to sorting by name. if creationTime1.Equal(&creationTime2) { - return coschedulingcore.GetNamespacedName(podInfo1.Pod) < coschedulingcore.GetNamespacedName(podInfo2.Pod) + return ccore.GetNamespacedName(podInfo1.Pod) < ccore.GetNamespacedName(podInfo2.Pod) } return creationTime1.Before(&creationTime2) -} - -// PreFilter checks info about the Pod / checks conditions that the cluster or the Pod must meet. -// This comes after sort -func (f *Fluence) PreFilter( - ctx context.Context, - state *framework.CycleState, - pod *v1.Pod, -) (*framework.PreFilterResult, *framework.Status) { - klog.Infof("[Fluence] Examining pod %s", pod.Name) - - // groupName will be named according to the single pod namespace / pod if there wasn't - // a user defined group. This is a size 1 group we handle equivalently. - groupName, pg := f.pgMgr.GetPodGroup(ctx, pod) - - // If we don't have a pod group and it's here, it was asked to be scheduled by fluence - // but the group isn't ready. Unshedulable for now. - if pg == nil { - klog.Infof("[Fluence] Group %s/%s does not have a pod group, not schedulable yet.", pod.Namespace, pod.Name) - return nil, framework.NewStatus(framework.Unschedulable, "Missing podgroup") - } - klog.Infof("[Fluence] Pod %s is in group %s with minimum members %d", pod.Name, groupName, pg.Spec.MinMember) - - // Has this podgroup been seen by fluence yet? If yes, we will have it in the cache - cache := fcore.GetFluenceCache(groupName) - klog.Infof("[Fluence] cache %s", cache) - - // Fluence has never seen this before, we need to schedule an allocation - // It also could have been seen, but was not able to get one. - if cache == nil { - klog.Infof("[Fluence] Does not have nodes for %s yet, asking Fluxion", groupName) - - // groupName is the namespaced name / - err := f.AskFlux(ctx, pod, pg, groupName) - if err != nil { - klog.Infof("[Fluence] Fluxion returned an error %s, not schedulable", err.Error()) - return nil, framework.NewStatus(framework.Unschedulable, err.Error()) - } - } - - // This is the next node in the list - nodename, err := fcore.GetNextNode(groupName) - if err != nil { - return nil, framework.NewStatus(framework.Unschedulable, err.Error()) - } - klog.Infof("Node Selected %s (pod %s:group %s)", nodename, pod.Name, groupName) - - // Create a fluxState (CycleState) with things that might be useful - // This isn't a PodGroupCache, but a single node cache, which also - // has group information, but just is for one node. Note that assigned - // tasks is hard coded to 1 but this isn't necessarily the case - we should - // eventually be able to GetNextNode for a number of tasks, for example - // (unless task == pod in which case it is always 1) - nodeCache := fcore.NodeCache{NodeName: nodename, GroupName: groupName, AssignedTasks: 1} - state.Write(framework.StateKey(pod.Name), &fcore.FluxStateData{NodeCache: nodeCache}) - return nil, framework.NewStatus(framework.Success, "") } -// TODO we need to account for affinity here -func (f *Fluence) Filter( - ctx context.Context, - cycleState *framework.CycleState, - pod *v1.Pod, - nodeInfo *framework.NodeInfo, -) *framework.Status { - - klog.Info("Filtering input node ", nodeInfo.Node().Name) - state, err := cycleState.Read(framework.StateKey(pod.Name)) - - // No error means we retrieved the state - if err == nil { - - // Try to convert the state to FluxStateDate - value, ok := state.(*fcore.FluxStateData) +// PreFilter performs the following validations. +// 1. Whether the PodGroup that the Pod belongs to is on the deny list. +// 2. Whether the total number of pods in a PodGroup is less than its `minMember`. +func (f *Fluence) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { + return f.cosched.PreFilter(ctx, state, pod) +} - // If we have state data that isn't equal to the current assignment, no go - if ok && value.NodeCache.NodeName != nodeInfo.Node().Name { - return framework.NewStatus(framework.Unschedulable, "pod is not permitted") - } else { - klog.Infof("Filter: node %s selected for %s\n", value.NodeCache.NodeName, pod.Name) - } - } - return framework.NewStatus(framework.Success) +// PostFilter is used to reject a group of pods if a pod does not pass PreFilter or Filter. +func (f *Fluence) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, + filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { + return f.cosched.PostFilter(ctx, state, pod, filteredNodeStatusMap) } -// PreFilterExtensions allow for callbacks on filtered states -// https://github.com/kubernetes/kubernetes/blob/master/pkg/scheduler/framework/interface.go#L383 +// PreFilterExtensions returns a PreFilterExtensions interface if the plugin implements one. func (f *Fluence) PreFilterExtensions() framework.PreFilterExtensions { - return nil + return f.cosched.PreFilterExtensions() } -// AskFlux will ask flux for an allocation for nodes for the pod group. -func (f *Fluence) AskFlux( - ctx context.Context, - pod *v1.Pod, - pg *sched.PodGroup, - groupName string, -) error { - - // clean up previous match if a pod has already allocated previously - f.mutex.Lock() - _, isAllocated := f.groupToJobId[groupName] - f.mutex.Unlock() - - // This case happens when there is some reason that an initial job pods partially allocated, - // but then the job restarted, and new pods are present but fluence had assigned nodes to - // the old ones (and there aren't enough). The job would have had to complete in some way, - // and the PodGroup would have to then recreate, and have the same job id (the group name). - // This happened when I cancalled a bunch of jobs and they didn't have the chance to - // cancel in fluence. What we can do here is assume the previous pods are no longer running - // and cancel the flux job to create again. - if isAllocated { - klog.Info("Warning - group %s was previously allocated and is requesting again, so must have completed.", groupName) - f.mutex.Lock() - f.cancelFluxJob(groupName) - f.mutex.Unlock() - } - - // IMPORTANT: this is a JobSpec for *one* pod, assuming they are all the same. - // This obviously may not be true if we have a hetereogenous PodGroup. - // We name it based on the group, since it will represent the group - jobspec := utils.PreparePodJobSpec(pod, groupName) - klog.Infof("[Fluence] Inspect pod info, jobspec: %s\n", jobspec) - conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) - - // TODO change this to just return fmt.Errorf - if err != nil { - klog.Errorf("[Fluence] Error connecting to server: %v\n", err) - return err - } - defer conn.Close() - - grpcclient := pb.NewFluxcliServiceClient(conn) - _, cancel := context.WithTimeout(context.Background(), 200*time.Second) - defer cancel() - - request := &pb.MatchRequest{ - Ps: jobspec, - Request: "allocate", - Count: pg.Spec.MinMember, - } - - // An error here is an error with making the request - r, err := grpcclient.Match(context.Background(), request) - if err != nil { - klog.Errorf("[Fluence] did not receive any match response: %v\n", err) - return err - } - - // TODO GetPodID should be renamed, because it will reflect the group - klog.Infof("[Fluence] Match response ID %s\n", r.GetPodID()) - - // Get the nodelist and inspect - nodes := r.GetNodelist() - klog.Infof("[Fluence] Nodelist returned from Fluxion: %s\n", nodes) - - // Assign the nodelist - this sets the group name in the groupSeen cache - // at this point, we can retrieve the cache and get nodes - nodelist := fcore.CreateNodeList(nodes, groupName) +// Permit is the functions invoked by the framework at "Permit" extension point. +func (f *Fluence) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) { + return f.cosched.Permit(ctx, state, pod, nodeName) +} - jobid := uint64(r.GetJobID()) - klog.Infof("[Fluence] parsed node pods list %s for job id %d\n", nodelist, jobid) +// Reserve is the functions invoked by the framework at "reserve" extension point. +func (f *Fluence) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { + return f.cosched.Reserve(ctx, state, pod, nodeName) +} - // TODO would be nice to actually be able to ask flux jobs -a to fluence - // That way we can verify assignments, etc. - f.mutex.Lock() - f.groupToJobId[groupName] = jobid - f.mutex.Unlock() - return nil +// Unreserve rejects all other Pods in the PodGroup when one of the pods in the group times out. +func (f *Fluence) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { + f.cosched.Unreserve(ctx, state, pod, nodeName) }