diff --git a/README.md b/README.md index 4f33cd9..8556dd1 100644 --- a/README.md +++ b/README.md @@ -9,11 +9,11 @@ Fluence enables HPC-grade pod scheduling in Kubernetes via the [Kubernetes Sched ## Getting started For instructions on how to start Fluence on a K8s cluster, see [examples](examples/). Documentation and instructions for reproducing our CANOPIE-2022 paper (citation below) can be found in the [canopie22-artifacts branch](https://github.com/flux-framework/flux-k8s/tree/canopie22-artifacts). -For background on the Flux framework and the Fluxion scheduler, you can take a look at our award-winning R&D100 submission: https://ipo.llnl.gov/sites/default/files/2022-02/Flux_RD100_Final.pdf. For next steps: +For background on the Flux framework and the Fluxion scheduler, you can take a look at our award-winning [R&D100 submission](https://ipo.llnl.gov/sites/default/files/2022-02/Flux_RD100_Final.pdf). For next steps: - To understand how it works, see [Design](#design) - To deploy our pre-built images, go to [Deploy](#deploy) - - To build your own images, go to [Setup](#setup) + - To build your own images, go to [Build](#build) - To learn about repository organization, see [Developer](#developer) ### Design @@ -21,19 +21,47 @@ For background on the Flux framework and the Fluxion scheduler, you can take a l Fluence is a custom scheduler plugin that you can specify to use with two directive in your pod spec - - Asking for `fluence` as the scheduler name -- On either a job or a single or group of pods: - - Defining a named group of pods with the `fluence.flux-framework.org/pod-group` label. - - Defining the group size with the `fluence.flux-framework.org/group-size` label. + +Note that any abstraction with pods (or a single pod) marked for fluence will automatically have the group name +and nodes derived. However, if you want to customize this metadata (for example, define the size of the pod group explicitly you can use +the following labels): + + - A named group of pods with the `scheduling.x-k8s.io/pod-group` label. + - Defining the group size with the `fluence.group-size` label. + +We expect to define more labels to customize the scheduling logic. The way it works: -1. We have a mutating admission webhook that looks for jobs and pods, and ensures there are fluence labels. -2. A PodGroup reconciler is watching for these same objects. When they are created (this is not done yet): +1. We have a mutating admission webhook that looks for jobs and pods, and ensures there are fluence labels (likely we will add more abstractions). +2. A PodGroup reconciler is watching for these same objects. When they are created: a. We find the labels and create the pod group object. - b. The pod group object has a timestamp for creation. + b. The pod group object has a timestamp for creation in milliseconds. 3. When the pod is then given to fluence for scheduling, it already has the PodGroup created with name/size and can properly sort. -Another strategy I'm considering (if the above runs into trouble) is to watch a [channel](https://book-v1.book.kubebuilder.io/beyond_basics/controller_watches). An example is shown below for an indexed job, which will create multiple pods. +Here is an example of a Job intended for Fluence: + +```yaml +apiVersion: batch/v1 +kind: Job +metadata: + name: fluence-job +spec: + completions: 10 + parallelism: 10 + completionMode: Indexed + template: + spec: + schedulerName: fluence + containers: + - name: fluence-job + image: busybox + command: [echo, potato] + restartPolicy: Never + backoffLimit: 4 +``` + +And you can imagine if you want to group pods from different abstractions together, or declare a different size than what is represented in the Job: ```yaml apiVersion: batch/v1 @@ -41,8 +69,8 @@ kind: Job metadata: name: fluence-job labels: - fluence.flux-framework.org/pod-group: my-pods - fluence.flux-framework.org/group-size: 10 + scheduling.x-k8s.io/pod-group: min-size-group + fluence.group-size: 5 spec: completions: 10 parallelism: 10 @@ -58,8 +86,7 @@ spec: backoffLimit: 4 ``` -The group size might be different than, for example, your higher level abstraction (e.g., the IndexedJob) as there is no reason -pods with different names cannot be part of the same group that needs to be scheduled together. +There is no reason pods with different names or under different abstractions cannot be part of the same group that needs to be scheduled together. ### Deploy @@ -88,7 +115,7 @@ helm install \ And that's it! See the [testing install](#testing-install) section for a basic example to schedule pods using Fluence. -### Setup +### Build To build and test Fluence, you will need: @@ -96,9 +123,7 @@ To build and test Fluence, you will need: - [helm](https://helm.sh/docs/intro/install/) to install charts for scheduler plugins. - A Kubernetes cluster for testing, e.g., you can deploy one with [kind](https://kind.sigs.k8s.io/docs/user/quick-start/) -### Building Fluence - -There are two images we will be building: +There are three images we will be building: - the scheduler sidecar: built from the repository here - the scheduler: built (and modified) from [this branch of scheduler-plugins](https://github.com/openshift-psap/scheduler-plugins/blob/fluence/build/scheduler/Dockerfile) @@ -111,7 +136,7 @@ There are two images we will be building: This will run the full builds for all containers in one step, which includes: 1. Building the fluence sidecar from source code in [src](src) -2. Cloning the upstream kubernetes-sigs/plugin-schedulers respository to ./upstream +2. Cloning the upstream kubernetes-sigs/plugin-schedulers repository to ./upstream 3. Building the scheduler and controller containers From the root here: @@ -128,26 +153,18 @@ make REGISTRY=vanessa SCHEDULER_IMAGE=fluence SIDECAR_IMAGE=fluence-sidecar CONT As an alternative, you can look at the Makefile to do each of the steps separately. - -Whatever build approach you use, you'll want to push to your registry for later discovery! - -```bash -docker push docker.io/vanessa/fluence -docker push docker.io/vanessa/fluence-sidecar -docker push docker.io/vanessa/fluence-controller -``` - -### Prepare Cluster +#### Prepare Cluster > Prepare a cluster and install the Kubernetes scheduling plugins framework -These steps will require a Kubernetes cluster to install to, and having pushed the plugin container to a registry. If you aren't using a cloud provider, you can create a local one with `kind`: +These steps will require a Kubernetes cluster to install to, and having pushed the plugin container to a registry OR loading +them into the local cluster and setting the image pull policy to `Never`. If you aren't using a cloud provider, you can create a local one with `kind`: ```bash kind create cluster --config ./examples/kind-config.yaml ``` -And install the certificate manager: +And again install the certificate manager: ```bash kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.13.1/cert-manager.yaml @@ -155,7 +172,7 @@ kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/ **Important** if you are developing or testing fluence, note that custom scheduler plugins don't seem to work out of the box with MiniKube (but everything works with kind). Likely there are extensions or similar that need to be configured with MiniKube (that we have not looked into). -### Install Fluence +#### Install Fluence For some background, the [Scheduling Framework](https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/) provided by Kubernetes means that our container is going to provide specific endpoints to allow for custom scheduling. At this point you can follow the instructions @@ -184,19 +201,26 @@ helm show values as-a-second-scheduler/ scheduler: name: fluence - image: registry.k8s.io/scheduler-plugins/kube-scheduler:v0.27.8 + image: ghcr.io/flux-framework/fluence:latest replicaCount: 1 leaderElect: false sidecarimage: ghcr.io/flux-framework/fluence-sidecar:latest policy: lonode pullPolicy: Always sidecarPullPolicy: Always + loggingLevel: "9" + + # Port is for GRPC, and enabling the external service will also + # create the service and ingress to it, along with adding + # additional API endpoints for our TBA kubectl plugin + enableExternalService: false + port: 4242 controller: name: scheduler-plugins-controller image: ghcr.io/flux-framework/fluence-controller:latest replicaCount: 1 - pullPolicy: IfNotPresent + pullPolicy: Always # LoadVariationRiskBalancing and TargetLoadPacking are not enabled by default # as they need extra RBAC privileges on metrics.k8s.io. @@ -217,6 +241,15 @@ pluginConfig: # args: # scoringStrategy: # type: MostAllocated # default is LeastAllocated + +enableCertManager: true +kubernetesClusterDomain: cluster.local +webhookService: + ports: + - port: 9443 + protocol: TCP + targetPort: 9443 + type: ClusterIP ``` @@ -239,9 +272,15 @@ If you need to uninstall (e.g., to redo something): helm uninstall fluence ``` +Or see the name you used: + +```bash +helm list +``` + Next you can move down to testing the install. -### Testing Install +#### Testing Install The installation process will run one scheduler and one controller pod for the Scheduler Plugin Framework in the default namespace. You can double check that everything is running as follows: @@ -284,35 +323,40 @@ kubectl logs fluence-6bbcbc6bbf-xjfx6 -c scheduler-plugins-scheduler If you haven't done anything, you'll likely just see health checks. -### Deploy Pods +#### Testing Pods and Jobs -Let's now run a simple example! Change directory into this directory: +You can test deploying pods and jobs. ```bash -# This is from the root of flux-k8s -cd examples/simple_example +kubectl apply -f examples/simple_example/fluence-scheduler-pod.yaml ``` +or a job: -And then we want to deploy two pods, one assigned to the `default-scheduler` and the other -`fluence`. For FYI, we do this via setting `schedulerName` in the spec: +```bash +# size 3 +kubectl apply -f examples/test_example/fluence-sized-job.yaml + +# size 1 +kubectl apply -f examples/test_example/fluence-job.yaml +``` + +Note that all of these have (in their spec) a designation of the fluence scheduler. ```yaml spec: schedulerName: fluence ``` -Here is how to create the pods: +Once it was created, aside from checking that it ran OK, you can verify by looking at the scheduler logs again: ```bash -kubectl apply -f default-scheduler-pod.yaml -kubectl apply -f fluence-scheduler-pod.yaml +kubectl logs fluence-6bbcbc6bbf-xjfx6 ``` -Once it was created, aside from checking that it ran OK, I could verify by looking at the scheduler logs again: +
+ +Scheduler Logs -```bash -kubectl logs fluence-6bbcbc6bbf-xjfx6 -``` ```bash Defaulted container "sidecar" out of: sidecar, scheduler-plugins-scheduler This is the fluxion grpc server @@ -361,6 +405,8 @@ FINAL NODE RESULT: [GRPCServer] Response podID:"fluence-scheduled-pod" nodelist:{nodeID:"kind-control-plane" tasks:1} jobID:1 ``` +
+ I was trying to look for a way to see the assignment, and maybe we can see it here (this is the best I could come up with!) ```bash @@ -385,7 +431,6 @@ pod/fluence-scheduled-pod spec.containers{fluence-scheduled-container} kubelet For the above, I found [this page](https://kubernetes.io/docs/tasks/extend-kubernetes/configure-multiple-schedulers/#enable-leader-election) very helpful. -Finally, note that we also have a more appropriate example with jobs under [examples/test_example](examples/test_example). It's slightly more sane because it uses Job, and jobs are expected to complete (whereas pods are not and will get into crash loop backoffs, etc). For example of how to programmatically interact with the job pods and check states, events, see the [test.sh](.github/test.sh) script. ### Developer @@ -397,9 +442,10 @@ If you are looking to develop: - [src](src): includes source code for fluence. You'll find logs for this code in the `sidecar` container of the fluence pod. - [sig-scheduler-plugins](sig-scheduler-plugins): includes assets (manifests and Go files) that are intended to be added to the kubernetes-sigs/scheduler-plugins upstream repository before build. You'll find logs for this container in the `scheduler-plugins-scheduler` container of the pod. + - [apis](sig-scheduler-plugins/apis): customized PodGroup to define the status scheduled time in micro seconds - [manifests](sig-scheduler-plugins/manifests): manifests for helm and Kubernetes - [pkg](sig-scheduler-plugins/pkg): the main fluence module to add to upstream - - [cmd](sig-scheduler-plugins/cmd): the main.go to replace in upstream + - [cmd](sig-scheduler-plugins/cmd): the main.go to replace in upstream - *upstream*: the default name this upstream is cloned to when you do a make build command. Note that the clone of the repository and copying of files to the correct locations is all automated through the [Makefile](Makefile). Additional commands provided include the following: @@ -447,7 +493,7 @@ I was having trouble developing this easily because it's a lot of steps to build The last step ensures we use the images we loaded! You can basically just do: ```bash -./hack/quick-build.sh +/bin/bash ./hack/quick-build.sh ``` This sped up my development time immensely. If you want to manually do the steps, see that script for instructions. @@ -474,60 +520,18 @@ kind create cluster --config ./kind-config.yaml #### TODO - Try what [kueue does](https://github.com/kubernetes-sigs/kueue/blob/6d57813a52066dab412735deeeb60ebb0cdb8e8e/cmd/kueue/main.go#L146-L155) to not require cert-manager. - -#### Vanessa Thinking - -> Updated February 15, 2024 - -What I think might be happening (and not always, sometimes) - -- New pod group, no node list -- Fluence assigns nodes -- Nodes get assigned to pods 1:1 -- POD group is deleted -- Some pod is sent back to queue (kubelet rejects, etc) -- POD group does not exist and is recreated, no node list -- Fluence asks again, but still has the first job. Not enough resources, asks forever. - -The above would not happen with the persistent pod group (if it wasn't cleaned up until the deletion of the job) and wouldn't happen if there are just enough resources to account for the overlap. - -- Does Fluence allocate resources for itself? -- It would be nice to be able to inspect the state of Fluence. -- At some point we want to be using the TBA fluxion-go instead of the one off branch we currently have (but we don't need to be blocked for that) -- We should (I think) restore pod group (it's in the controller here) and have our own container built. That way we have total control over the custom resource, and we don't risk it going away. - - As a part of that, we can add add a mutating webhook that emulates what we are doing in fluence now to find the label, but instead we will create the CRD to hold state instead of trying to hold in the operator. -- It could then also be investigated that we can more flexibly change the size of the group, within some min/max size (also determined by labels?) to help with scheduling. -- Note that kueue has added a Pod Group object, so probably addresses the static case here. + - Add other abstraction types to be intercepted (and labeled with sizes) #### Components - [FluxStateData](sig-scheduler-plugins/pkg/fluence/core/core.go): is given to the [framework.CycleState](https://github.com/kubernetes/kubernetes/blob/242b41b36a20032f99e8a059ca0a5d764105217b/pkg/scheduler/framework/cycle_state.go#L48) and serves as a vehicle to store a cache of node name assignment. -#### Helm - -The install commands are shown above, but often you want to uninstall! - -> What is the name of the installed plugin again? - -```bash - helm list -NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION -fluence default 1 2024-01-08 12:04:58.558612156 -0700 MST deployed scheduler-plugins-0.27.80.27.8 -``` - -And then uninstall: - -```bash -$ helm uninstall fluence -release "fluence" uninstalled -``` - - ## Papers You can find details of Fluence architecture, implementation, experiments, and improvements to the Kubeflow MPI operator in our collaboration's papers: -``` + +```bibtex @INPROCEEDINGS{10029991, author={Milroy, Daniel J. and Misale, Claudia and Georgakoudis, Giorgis and Elengikal, Tonia and Sarkar, Abhik and Drocco, Maurizio and Patki, Tapasya and Yeom, Jae-Seung and Gutierrez, Carlos Eduardo Arango and Ahn, Dong H. and Park, Yoonho}, booktitle={2022 IEEE/ACM 4th International Workshop on Containers and New Orchestration Paradigms for Isolated Environments in HPC (CANOPIE-HPC)}, @@ -539,7 +543,7 @@ You can find details of Fluence architecture, implementation, experiments, and i doi={10.1109/CANOPIE-HPC56864.2022.00011} } ``` -``` +```bibtex @INPROCEEDINGS{9652595, author={Misale, Claudia and Drocco, Maurizio and Milroy, Daniel J. and Gutierrez, Carlos Eduardo Arango and Herbein, Stephen and Ahn, Dong H. and Park, Yoonho}, booktitle={2021 3rd International Workshop on Containers and New Orchestration Paradigms for Isolated Environments in HPC (CANOPIE-HPC)}, @@ -551,7 +555,7 @@ You can find details of Fluence architecture, implementation, experiments, and i doi={10.1109/CANOPIEHPC54579.2021.00006} } ``` -``` +```bibtex @inproceedings{10.1007/978-3-030-96498-6_18, address = {Cham}, author = {Misale, Claudia and Milroy, Daniel J. and Gutierrez, Carlos Eduardo Arango and Drocco, Maurizio and Herbein, Stephen and Ahn, Dong H. and Kaiser, Zvonko and Park, Yoonho}, diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..155ffc8 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,25 @@ +# Development Notes + +## Thinking + +> Updated February 15, 2024 + +What I think might be happening (and not always, sometimes) + +- New pod group, no node list +- Fluence assigns nodes +- Nodes get assigned to pods 1:1 +- POD group is deleted +- Some pod is sent back to queue (kubelet rejects, etc) +- POD group does not exist and is recreated, no node list +- Fluence asks again, but still has the first job. Not enough resources, asks forever. + +The above would not happen with the persistent pod group (if it wasn't cleaned up until the deletion of the job) and wouldn't happen if there are just enough resources to account for the overlap. + +- Does Fluence allocate resources for itself? +- It would be nice to be able to inspect the state of Fluence. +- At some point we want to be using the TBA fluxion-go instead of the one off branch we currently have (but we don't need to be blocked for that) +- We should (I think) restore pod group (it's in the controller here) and have our own container built. That way we have total control over the custom resource, and we don't risk it going away. + - As a part of that, we can add add a mutating webhook that emulates what we are doing in fluence now to find the label, but instead we will create the CRD to hold state instead of trying to hold in the operator. +- It could then also be investigated that we can more flexibly change the size of the group, within some min/max size (also determined by labels?) to help with scheduling. +- Note that kueue has added a Pod Group object, so probably addresses the static case here. diff --git a/examples/simple_example/fluence-scheduler-pod.yaml b/examples/simple_example/fluence-scheduler-pod.yaml index 71a8463..b09c714 100644 --- a/examples/simple_example/fluence-scheduler-pod.yaml +++ b/examples/simple_example/fluence-scheduler-pod.yaml @@ -1,11 +1,11 @@ apiVersion: v1 kind: Pod metadata: - name: fluence-scheduled-pod-1 + name: fluence-scheduled-pod labels: name: scheduler-example spec: schedulerName: fluence containers: - name: fluence-scheduled-container - image: registry.k8s.io/pause:2.0 \ No newline at end of file + image: registry.k8s.io/pause:2.0 diff --git a/sig-scheduler-plugins/pkg/fluence/core/core.go b/sig-scheduler-plugins/pkg/fluence/core/core.go index 53a627e..a3f4531 100644 --- a/sig-scheduler-plugins/pkg/fluence/core/core.go +++ b/sig-scheduler-plugins/pkg/fluence/core/core.go @@ -3,10 +3,7 @@ package core import ( "fmt" - v1 "k8s.io/api/core/v1" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/klog/v2" + klog "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" pb "sigs.k8s.io/scheduler-plugins/pkg/fluence/fluxcli-grpc" @@ -26,13 +23,9 @@ func (s *FluxStateData) Clone() framework.StateData { return &FluxStateData{NodeCache: s.NodeCache} } -// NewFluxState creates an entry for the CycleState with the minimum that we might need -func NewFluxState(nodeName string, groupName string, size int32) *FluxStateData { - cache := NodeCache{ - NodeName: nodeName, - GroupName: groupName, - MinGroupSize: size, - } +// NewFluxState creates an entry for the CycleState with the node and group name +func NewFluxState(nodeName string, groupName string) *FluxStateData { + cache := NodeCache{NodeName: nodeName} return &FluxStateData{NodeCache: cache} } @@ -42,162 +35,127 @@ func NewFluxState(nodeName string, groupName string, size int32) *FluxStateData type NodeCache struct { NodeName string - // This is derived from tasks, where - // task is an allocation to some node - // High level it is most often akin to the - // number of pods on the node. I'm not sure that I understand this - // https://github.com/flux-framework/flux-k8s/blob/9f24f36752e3cced1b1112d93bfa366fb58b3c84/src/fluence/fluxion/fluxion.go#L94-L97 - // How does that relate to a single pod? It is called "Count" in other places - Tasks int + // Tie assignment back to PodGroup, which can be used to get size and time created + GroupName string - // These fields are primarily for the FluxStateData - // Without a PodGroup CRD we keep min size here - MinGroupSize int32 - GroupName string + // Assigned tasks (often pods) to nodes + // https://github.com/flux-framework/flux-k8s/blob/9f24f36752e3cced1b1112d93bfa366fb58b3c84/src/fluence/fluxion/fluxion.go#L94-L97 + AssignedTasks int } // A pod group cache holds a list of nodes for an allocation, where each has some number of tasks // along with the expected group size. This is intended to replace PodGroup // given the group name, size (derived from annotations) and timestamp type PodGroupCache struct { + GroupName string // This is a cache of nodes for pods Nodes []NodeCache - Size int32 - Name string - - // Keep track of when the group was initially created! - // This is like, the main thing we need. - TimeCreated metav1.MicroTime } -// Memory cache of pod group name to pod group cache, above -var podGroupCache map[string]*PodGroupCache +// PodGroups seen by fluence +var groupsSeen map[string]*PodGroupCache -// Init populates the podGroupCache +// Init populates the groupsSeen cache func Init() { - podGroupCache = map[string]*PodGroupCache{} -} - -// RegisterPodGroup ensures that the PodGroup exists in the cache -// This is an experimental replacement for an actual PodGroup -// We take a timestampo, which if called from Less (during sorting) is tiem.Time -// if called later (an individual pod) we go for its creation timestamp -func RegisterPodGroup(pod *v1.Pod, groupName string, groupSize int32) error { - entry, ok := podGroupCache[groupName] - - if !ok { - - // Assume we create the group with the timestamp - // of the first pod seen. There might be imperfections - // by the second, but as long as we sort them via millisecond - // this should prevent interleaving - nodes := []NodeCache{} - - // Create the new entry for the pod group - entry = &PodGroupCache{ - Name: groupName, - Size: groupSize, - Nodes: nodes, - TimeCreated: metav1.NowMicro(), - } - - // Tell the user when it was created - klog.Infof("[Fluence] Pod group %s was created at %s\n", entry.Name, entry.TimeCreated) - } - - // If the size has changed, we currently do not allow updating it. - // We issue a warning. In the future this could be supported with a grow command. - if entry.Size != groupSize { - klog.Infof("[Fluence] Pod group %s request to change size from %s to %s is not yet supported\n", groupName, entry.Size, groupSize) - // entry.GroupSize = groupSize - } - podGroupCache[groupName] = entry - return nil + groupsSeen = map[string]*PodGroupCache{} } -// GetPodGroup gets a pod group in the cache by name -func GetPodGroup(groupName string) *PodGroupCache { - entry, _ := podGroupCache[groupName] +// GetFluenceCache determines if a group has been seen. +// Yes -> we return the PodGroupCache entry +// No -> the entry is nil / does not exist +func GetFluenceCache(groupName string) *PodGroupCache { + entry, _ := groupsSeen[groupName] return entry } // DeletePodGroup deletes a pod from the group cache func DeletePodGroup(groupName string) { - delete(podGroupCache, groupName) -} - -// ListGroups lists groups, primarily for debugging -func ListGroups() { - for name, pg := range podGroupCache { - fmt.Printf(" %s: size %s, created at %s\n", name, pg.Size, &pg.TimeCreated) - } + delete(groupsSeen, groupName) } // CreateNodePodsList creates a list of node pod caches -func CreateNodePodsList(nodelist []*pb.NodeAlloc, groupName string) (nodepods []NodeCache) { +func CreateNodeList(nodelist []*pb.NodeAlloc, groupName string) (nodepods []NodeCache) { // Create a pod cache for each node nodepods = make([]NodeCache, len(nodelist)) + // TODO: should we be integrating topology information here? Could it be the + // case that some nodes (pods) in the group should be closer? for i, v := range nodelist { nodepods[i] = NodeCache{ - NodeName: v.GetNodeID(), - Tasks: int(v.GetTasks()), + NodeName: v.GetNodeID(), + AssignedTasks: int(v.GetTasks()), + GroupName: groupName, } } - // Update the pods in the PodGraphCache - updatePodGroupNodes(groupName, nodepods) - klog.Infof("[Fluence] Pod group cache updated with nodes\n", podGroupCache) + // Update the pods in the PodGroupCache (groupsSeen) + updatePodGroupCache(groupName, nodepods) return nodepods } // updatePodGroupList updates the PodGroupCache with a listing of nodes -func updatePodGroupNodes(groupName string, nodes []NodeCache) { - group := podGroupCache[groupName] - group.Nodes = nodes - podGroupCache[groupName] = group +func updatePodGroupCache(groupName string, nodes []NodeCache) { + cache := PodGroupCache{ + Nodes: nodes, + GroupName: groupName, + } + groupsSeen[groupName] = &cache } -// HavePodNodes returns true if the listing of pods is not empty -// This should be all pods that are needed - the allocation will not -// be successful otherwise, so we just check > 0 -func (p *PodGroupCache) HavePodNodes() bool { - return len(p.Nodes) > 0 -} +// GetNextNode gets the next node in the PodGroupCache +func (p *PodGroupCache) GetNextNode() (string, error) { -// CancelAllocation resets the node cache and allocation status -func (p *PodGroupCache) CancelAllocation() { - p.Nodes = []NodeCache{} -} + nextnode := "" -// GetNextNode gets the next available node we can allocate for a group -func GetNextNode(groupName string) (string, error) { - entry, ok := podGroupCache[groupName] - if !ok { - return "", fmt.Errorf("[Fluence] Map is empty\n") - } - if len(entry.Nodes) == 0 { - return "", fmt.Errorf("[Fluence] Error while getting a node\n") + // Quick failure state - we ran out of nodes + if len(p.Nodes) == 0 { + return nextnode, fmt.Errorf("[Fluence] PodGroup %s ran out of nodes.", p.GroupName) } - nodename := entry.Nodes[0].NodeName - klog.Infof("[Fluence] Next node for group %s is %s", groupName, nodename) + // The next is the 0th in the list + nextnode = p.Nodes[0].NodeName + klog.Infof("[Fluence] Next node for group %s is %s", p.GroupName, nextnode) - if entry.Nodes[0].Tasks == 1 { - klog.Infof("[Fluence] First node has one task") - slice := entry.Nodes[1:] + // If there is only one task left, we are going to use it (and remove the node) + if p.Nodes[0].AssignedTasks == 1 { + klog.Infof("[Fluence] First node has one remaining task slot") + slice := p.Nodes[1:] + + // If after we remove the node there are no nodes left... + // Note that I'm not deleting the node from the cache because that is the + // only way fluence knows it has already assigned work (presence of the key) if len(slice) == 0 { - klog.Infof("[Fluence] After this node, the slice is empty, deleting group %s from cache\n", groupName) - delete(podGroupCache, groupName) - return nodename, nil + klog.Infof("[Fluence] Assigning node %s. There are NO reamining nodes for group %s\n", nextnode, p.GroupName) + // delete(podGroupCache, groupName) + return nextnode, nil } - klog.Infof("[Fluence] After this node, the slide still has nodes") - updatePodGroupNodes(groupName, slice) - return nodename, nil + + klog.Infof("[Fluence] Assigning node %s. There are nodes left for group", nextnode, p.GroupName) + updatePodGroupCache(p.GroupName, slice) + return nextnode, nil + } + + // If we get here the first node had >1 assigned tasks + klog.Infof("[Fluence] Assigning node %s for group %s. There are still task assignments available for this node.", nextnode, p.GroupName) + p.Nodes[0].AssignedTasks = p.Nodes[0].AssignedTasks - 1 + return nextnode, nil +} + +// GetNextNode gets the next available node we can allocate for a group +// TODO this should be able to take and pass forward a number of tasks. +// It is implicity 1 now, but doesn't have to be. +func GetNextNode(groupName string) (string, error) { + + // Get our entry from the groupsSeen cache + klog.Infof("[Fluence] groups seen %s", groupsSeen) + entry, ok := groupsSeen[groupName] + + // This case should not happen + if !ok { + return "", fmt.Errorf("[Fluence] Map is empty") } - klog.Infof("[Fluence] Subtracting one task from first node") - entry.Nodes[0].Tasks = entry.Nodes[0].Tasks - 1 - return nodename, nil + // Get the next node from the PodGroupCache + return entry.GetNextNode() } diff --git a/sig-scheduler-plugins/pkg/fluence/events.go b/sig-scheduler-plugins/pkg/fluence/events.go new file mode 100644 index 0000000..bc265f7 --- /dev/null +++ b/sig-scheduler-plugins/pkg/fluence/events.go @@ -0,0 +1,150 @@ +package fluence + +import ( + "context" + "time" + + "google.golang.org/grpc" + v1 "k8s.io/api/core/v1" + klog "k8s.io/klog/v2" + + pb "sigs.k8s.io/scheduler-plugins/pkg/fluence/fluxcli-grpc" +) + +// Events are associated with inforers, typically on pods, e.g., +// delete: deletion of a pod +// update: update of a pod! +// For both of the above, there are cases to cancel the flux job +// associated with the group id + +// cancelFluxJobForPod cancels the flux job for a pod. +// We assume that the cancelled job also means deleting the pod group +func (f *Fluence) cancelFluxJob(groupName string) error { + + jobid, ok := f.groupToJobId[groupName] + + // The job was already cancelled by another pod + if !ok { + klog.Infof("[Fluence] Request for cancel of group %s is already complete.", groupName) + return nil + } + klog.Infof("[Fluence] Cancel flux job: %v for group %s", jobid, groupName) + + // This first error is about connecting to the server + conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) + if err != nil { + klog.Errorf("[Fluence] Error connecting to server: %v", err) + return err + } + defer conn.Close() + + grpcclient := pb.NewFluxcliServiceClient(conn) + _, cancel := context.WithTimeout(context.Background(), 200*time.Second) + defer cancel() + + // This error reflects the success or failure of the cancel request + request := &pb.CancelRequest{JobID: int64(jobid)} + res, err := grpcclient.Cancel(context.Background(), request) + if err != nil { + klog.Errorf("[Fluence] did not receive any cancel response: %v", err) + return err + } + klog.Infof("[Fluence] Job cancellation for group %s result: %d", groupName, res.Error) + + // And this error is if the cancel was successful or not + if res.Error == 0 { + klog.Infof("[Fluence] Successful cancel of flux job: %d for group %s", jobid, groupName) + delete(f.groupToJobId, groupName) + } else { + klog.Warningf("[Fluence] Failed to cancel flux job %d for group %s", jobid, groupName) + } + return nil +} + +// updatePod is called on an update, and the old and new object are presented +func (f *Fluence) updatePod(oldObj, newObj interface{}) { + + oldPod := oldObj.(*v1.Pod) + newPod := newObj.(*v1.Pod) + + // a pod is updated, get the group + // TODO should we be checking group / size for old vs new? + groupName, _ := f.pgMgr.GetPodGroup(context.TODO(), oldPod) + + klog.Infof("[Fluence] Processing event for pod %s in group %s from %s to %s", newPod.Name, groupName, newPod.Status.Phase, oldPod.Status.Phase) + + switch newPod.Status.Phase { + case v1.PodPending: + // in this state we don't know if a pod is going to be running, thus we don't need to update job map + case v1.PodRunning: + // if a pod is start running, we can add it state to the delta graph if it is scheduled by other scheduler + case v1.PodSucceeded: + klog.Infof("[Fluence] Pod %s succeeded, Fluence needs to free the resources", newPod.Name) + + f.mutex.Lock() + defer f.mutex.Unlock() + + // Do we have the group id in our cache? If yes, we haven't deleted the jobid yet + // I am worried here that if some pods are succeeded and others pending, this could + // be a mistake - fluence would schedule it again + _, ok := f.groupToJobId[groupName] + if ok { + f.cancelFluxJob(groupName) + } else { + klog.Infof("[Fluence] Succeeded pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName) + } + + case v1.PodFailed: + + // a corner case need to be tested, the pod exit code is not 0, can be created with segmentation fault pi test + klog.Warningf("[Fluence] Pod %s in group %s failed, Fluence needs to free the resources", newPod.Name, groupName) + + f.mutex.Lock() + defer f.mutex.Unlock() + + _, ok := f.groupToJobId[groupName] + if ok { + f.cancelFluxJob(groupName) + } else { + klog.Errorf("[Fluence] Failed pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName) + } + case v1.PodUnknown: + // don't know how to deal with it as it's unknown phase + default: + // shouldn't enter this branch + } +} + +// deletePod handles the delete event handler +func (f *Fluence) deletePod(podObj interface{}) { + klog.Info("[Fluence] Delete Pod event handler") + pod := podObj.(*v1.Pod) + groupName, _ := f.pgMgr.GetPodGroup(context.TODO(), pod) + + klog.Infof("[Fluence] Delete pod %s in group %s has status %s", pod.Status.Phase, pod.Name, groupName) + switch pod.Status.Phase { + case v1.PodSucceeded: + case v1.PodPending: + klog.Infof("[Fluence] Pod %s completed and is Pending termination, Fluence needs to free the resources", pod.Name) + + f.mutex.Lock() + defer f.mutex.Unlock() + + _, ok := f.groupToJobId[groupName] + if ok { + f.cancelFluxJob(groupName) + } else { + klog.Infof("[Fluence] Terminating pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName) + } + case v1.PodRunning: + f.mutex.Lock() + defer f.mutex.Unlock() + + _, ok := f.groupToJobId[groupName] + if ok { + f.cancelFluxJob(groupName) + } else { + klog.Infof("[Fluence] Deleted pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName) + } + } +} diff --git a/sig-scheduler-plugins/pkg/fluence/fluence.go b/sig-scheduler-plugins/pkg/fluence/fluence.go index 26282e5..0e8ec21 100644 --- a/sig-scheduler-plugins/pkg/fluence/fluence.go +++ b/sig-scheduler-plugins/pkg/fluence/fluence.go @@ -1,19 +1,3 @@ -/* -Copyright 2022 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - package fluence import ( @@ -32,7 +16,7 @@ import ( clientscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" corev1helpers "k8s.io/component-helpers/scheduling/corev1" - "k8s.io/klog/v2" + klog "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" "sigs.k8s.io/controller-runtime/pkg/client" @@ -49,12 +33,9 @@ type Fluence struct { handle framework.Handle client client.Client - // Important: I tested moving this into the group, but it's a bad idea because - // we need to delete the group after the last allocation is given, and then we - // no longer have the ID. It might be a better approach to delete it elsewhere - // (but I'm not sure where that elsewhere could be) - podNameToJobId map[string]uint64 - pgMgr coschedulingcore.Manager + // Store jobid on the level of a group (which can be a single pod) + groupToJobId map[string]uint64 + pgMgr coschedulingcore.Manager } // Name is the name of the plugin used in the Registry and configurations. @@ -79,7 +60,7 @@ func (f *Fluence) Name() string { // https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/coscheduling/coscheduling.go#L63 func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { - f := &Fluence{handle: handle, podNameToJobId: make(map[string]uint64)} + f := &Fluence{handle: handle, groupToJobId: make(map[string]uint64)} ctx := context.TODO() fcore.Init() @@ -106,7 +87,7 @@ func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { fieldSelector, err := fields.ParseSelector(",status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed)) if err != nil { - klog.ErrorS(err, "ParseSelector failed") + klog.Errorf("ParseSelector failed %s", err) os.Exit(1) } @@ -116,6 +97,7 @@ func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { podInformer := informerFactory.Core().V1().Pods() scheduleTimeDuration := time.Duration(500) * time.Second + // https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/coscheduling/core/core.go#L84 pgMgr := coschedulingcore.NewPodGroupManager( k8scli, handle.SnapshotSharedLister(), @@ -141,34 +123,27 @@ func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { // Less is used to sort pods in the scheduling queue in the following order. // 1. Compare the priorities of Pods. -// 2. Compare the initialization timestamps of fluence pod groups -// 3. Fall back, sort by namespace/name -// See https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/ -// Less is part of Sort, which is the earliest we can see a pod unless we use gate -// IMPORTANT: Less sometimes is not called for smaller sizes, not sure why. -// To get around this we call it during PreFilter too. +// 2. Compare the initialization timestamps of PodGroups or Pods. +// 3. Compare the keys of PodGroups/Pods: /. func (f *Fluence) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { - klog.Infof("[Fluence] Ordering pods in Less") - - // ensure we have a PodGroup no matter what - klog.Infof("[Fluence] Comparing %s and %s", podInfo1.Pod.Name, podInfo2.Pod.Name) - podGroup1 := fgroup.EnsureFluenceGroup(podInfo1.Pod) - podGroup2 := fgroup.EnsureFluenceGroup(podInfo2.Pod) - - // First preference to priority, but only if they are different + klog.Infof("ordering pods in fluence scheduler plugin") prio1 := corev1helpers.PodPriority(podInfo1.Pod) prio2 := corev1helpers.PodPriority(podInfo2.Pod) - - // ...and only allow this to sort if they aren't the same - // The assumption here is that pods with priority are ignored by fluence if prio1 != prio2 { return prio1 > prio2 } + // Important: this GetPodGroup returns the first name as the Namespaced one, + // which is what fluence needs to distinguish between namespaces. Just the + // name could be replicated between different namespaces + ctx := context.TODO() + name1, podGroup1 := f.pgMgr.GetPodGroup(ctx, podInfo1.Pod) + name2, podGroup2 := f.pgMgr.GetPodGroup(ctx, podInfo2.Pod) + // Fluence can only compare if we have two known groups. // This tries for that first, and falls back to the initial attempt timestamp - creationTime1 := fgroup.GetCreationTimestamp(podGroup1, podInfo1) - creationTime2 := fgroup.GetCreationTimestamp(podGroup2, podInfo2) + creationTime1 := fgroup.GetCreationTimestamp(name1, podGroup1, podInfo1) + creationTime2 := fgroup.GetCreationTimestamp(name2, podGroup2, podInfo2) // If they are the same, fall back to sorting by name. if creationTime1.Equal(&creationTime2) { @@ -178,7 +153,7 @@ func (f *Fluence) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { } // PreFilter checks info about the Pod / checks conditions that the cluster or the Pod must meet. -// This still comes after sort +// This comes after sort func (f *Fluence) PreFilter( ctx context.Context, state *framework.CycleState, @@ -189,31 +164,46 @@ func (f *Fluence) PreFilter( // groupName will be named according to the single pod namespace / pod if there wasn't // a user defined group. This is a size 1 group we handle equivalently. - pg := fgroup.GetPodsGroup(pod) + groupName, pg := f.pgMgr.GetPodGroup(ctx, pod) + klog.Infof("[Fluence] Pod %s is in group %s with minimum members %d", pod.Name, groupName, pg.Spec.MinMember) - klog.Infof("[Fluence] Pod %s group size %d", pod.Name, pg.Size) - klog.Infof("[Fluence] Pod %s group name is %s", pod.Name, pg.Name) + // Has this podgroup been seen by fluence yet? If yes, we will have it in the cache + cache := fcore.GetFluenceCache(groupName) + klog.Infof("[Fluence] cache %s", cache) - // Note that it is always the case we have a group - // We have not yet derived a node list - if !pg.HavePodNodes() { - klog.Infof("[Fluence] Does not have nodes yet, asking Fluxion") - err := f.AskFlux(ctx, pod, int(pg.Size)) + // Fluence has never seen this before, we need to schedule an allocation + // It also could have been seen, but was not able to get one. + if cache == nil { + klog.Infof("[Fluence] Does not have nodes for %s yet, asking Fluxion", groupName) + + // groupName is the namespaced name / + err := f.AskFlux(ctx, pod, pg, groupName) if err != nil { klog.Infof("[Fluence] Fluxion returned an error %s, not schedulable", err.Error()) return nil, framework.NewStatus(framework.Unschedulable, err.Error()) } } - nodename, err := fcore.GetNextNode(pg.Name) - klog.Infof("Node Selected %s (%s:%s)", nodename, pod.Name, pg.Name) + + // We can only get here if an allocation is done (and there is no error above) + // The cache would only originally be nil if we didn't do that yet. It should + // always be defined (not nil) when we get here + cache = fcore.GetFluenceCache(groupName) + + // This is the next node in the list + nodename, err := fcore.GetNextNode(groupName) if err != nil { return nil, framework.NewStatus(framework.Unschedulable, err.Error()) } - - // Create a fluxState (CycleState) with things that might be useful/ - klog.Info("Node Selected: ", nodename) - cache := fcore.NodeCache{NodeName: nodename} - state.Write(framework.StateKey(pod.Name), &fcore.FluxStateData{NodeCache: cache}) + klog.Infof("Node Selected %s (pod %s:group %s)", nodename, pod.Name, groupName) + + // Create a fluxState (CycleState) with things that might be useful + // This isn't a PodGroupCache, but a single node cache, which also + // has group information, but just is for one node. Note that assigned + // tasks is hard coded to 1 but this isn't necessarily the case - we should + // eventually be able to GetNextNode for a number of tasks, for example + // (unless task == pod in which case it is always 1) + nodeCache := fcore.NodeCache{NodeName: nodename, GroupName: groupName, AssignedTasks: 1} + state.Write(framework.StateKey(pod.Name), &fcore.FluxStateData{NodeCache: nodeCache}) return nil, framework.NewStatus(framework.Success, "") } @@ -226,8 +216,16 @@ func (f *Fluence) Filter( ) *framework.Status { klog.Info("Filtering input node ", nodeInfo.Node().Name) - if v, e := cycleState.Read(framework.StateKey(pod.Name)); e == nil { - if value, ok := v.(*fcore.FluxStateData); ok && value.NodeCache.NodeName != nodeInfo.Node().Name { + state, err := cycleState.Read(framework.StateKey(pod.Name)) + + // No error means we retrieved the state + if err == nil { + + // Try to convert the state to FluxStateDate + value, ok := state.(*fcore.FluxStateData) + + // If we have state data that isn't equal to the current assignment, no go + if ok && value.NodeCache.NodeName != nodeInfo.Node().Name { return framework.NewStatus(framework.Unschedulable, "pod is not permitted") } else { klog.Infof("Filter: node %s selected for %s\n", value.NodeCache.NodeName, pod.Name) @@ -243,24 +241,33 @@ func (f *Fluence) PreFilterExtensions() framework.PreFilterExtensions { } // AskFlux will ask flux for an allocation for nodes for the pod group. -func (f *Fluence) AskFlux(ctx context.Context, pod *v1.Pod, count int) error { +func (f *Fluence) AskFlux( + ctx context.Context, + pod *v1.Pod, + pg *sched.PodGroup, + groupName string, +) error { + // clean up previous match if a pod has already allocated previously f.mutex.Lock() - _, isPodAllocated := f.podNameToJobId[pod.Name] + _, isAllocated := f.groupToJobId[groupName] f.mutex.Unlock() - if isPodAllocated { - klog.Infof("[Fluence] Pod %s is allocated, cleaning up previous allocation\n", pod.Name) - f.mutex.Lock() - f.cancelFluxJobForPod(pod) - f.mutex.Unlock() + // Not allowing cancel for now - not sure how or why we could do this, need to better + // understand the case. This function should ONLY be successful on a new match allocate, + // otherwise the calling logic does not make sense. + if isAllocated { + return fmt.Errorf("[Fluence] Pod %s in group %s is allocated and calling AskFlux, should we be here?\n", pod.Name, groupName) } - // Does the task name here matter? We are naming the entire group for the pod - jobspec := utils.InspectPodInfo(pod) + // IMPORTANT: this is a JobSpec for *one* pod, assuming they are all the same. + // This obviously may not be true if we have a hetereogenous PodGroup. + // We name it based on the group, since it will represent the group + jobspec := utils.PreparePodJobSpec(pod, groupName) klog.Infof("[Fluence] Inspect pod info, jobspec: %s\n", jobspec) conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) + // TODO change this to just return fmt.Errorf if err != nil { klog.Errorf("[Fluence] Error connecting to server: %v\n", err) return err @@ -274,154 +281,34 @@ func (f *Fluence) AskFlux(ctx context.Context, pod *v1.Pod, count int) error { request := &pb.MatchRequest{ Ps: jobspec, Request: "allocate", - Count: int32(count)} + Count: pg.Spec.MinMember, + } - // Question from vsoch; Why return err instead of err2 here? - // err would return a nil value, but we need to return non nil, - // otherwise it's going to try to use the allocation (but there is none) + // An error here is an error with making the request r, err := grpcclient.Match(context.Background(), request) if err != nil { klog.Errorf("[Fluence] did not receive any match response: %v\n", err) return err } - klog.Infof("[Fluence] response podID %s\n", r.GetPodID()) - - // Presence of a podGroup is indicated by a groupName - // Flag that the group is allocated (yes we also have the job id, testing for now) - pg := fgroup.GetPodsGroup(pod) + // TODO GetPodID should be renamed, because it will reflect the group + klog.Infof("[Fluence] Match response ID %s\n", r.GetPodID()) // Get the nodelist and inspect nodes := r.GetNodelist() klog.Infof("[Fluence] Nodelist returned from Fluxion: %s\n", nodes) - nodelist := fcore.CreateNodePodsList(nodes, pg.Name) - klog.Infof("[Fluence] parsed node pods list %s\n", nodelist) + // Assign the nodelist - this sets the group name in the groupSeen cache + // at this point, we can retrieve the cache and get nodes + nodelist := fcore.CreateNodeList(nodes, groupName) + jobid := uint64(r.GetJobID()) + klog.Infof("[Fluence] parsed node pods list %s for job id %d\n", nodelist, jobid) + // TODO would be nice to actually be able to ask flux jobs -a to fluence + // That way we can verify assignments, etc. f.mutex.Lock() - f.podNameToJobId[pod.Name] = jobid - klog.Infof("[Fluence] Check job assignment: %s\n", f.podNameToJobId) + f.groupToJobId[groupName] = jobid f.mutex.Unlock() return nil } - -// cancelFluxJobForPod cancels the flux job for a pod. -// We assume that the cancelled job also means deleting the pod group -func (f *Fluence) cancelFluxJobForPod(pod *v1.Pod) error { - jobid := f.podNameToJobId[pod.Name] - - klog.Infof("[Fluence] Cancel flux job: %v for pod %s", jobid, pod.Name) - - conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) - - if err != nil { - klog.Errorf("[Fluence] Error connecting to server: %v", err) - return err - } - defer conn.Close() - - grpcclient := pb.NewFluxcliServiceClient(conn) - _, cancel := context.WithTimeout(context.Background(), 200*time.Second) - defer cancel() - - // I think this error reflects the success or failure of the cancel request - request := &pb.CancelRequest{JobID: int64(jobid)} - res, err := grpcclient.Cancel(context.Background(), request) - if err != nil { - klog.Errorf("[Fluence] did not receive any cancel response: %v", err) - return err - } - klog.Infof("[Fluence] Job cancellation for pod %s result: %d", pod.Name, res.Error) - - // And this error is if the cancel was successful or not - if res.Error == 0 { - klog.Infof("[Fluence] Successful cancel of flux job: %v for pod %s", jobid, pod.Name) - delete(f.podNameToJobId, pod.Name) - - // If we are successful, clear the group allocated nodes - fgroup.DeleteFluenceGroup(pod) - } else { - klog.Warningf("[Fluence] Failed to cancel flux job %v for pod %s", jobid, pod.Name) - } - return nil -} - -// EventHandlers updatePod handles cleaning up resources -func (f *Fluence) updatePod(oldObj, newObj interface{}) { - - oldPod := oldObj.(*v1.Pod) - newPod := newObj.(*v1.Pod) - - klog.Infof("[Fluence] Processing event for pod %s from %s to %s", newPod.Name, newPod.Status.Phase, oldPod.Status.Phase) - - switch newPod.Status.Phase { - case v1.PodPending: - // in this state we don't know if a pod is going to be running, thus we don't need to update job map - case v1.PodRunning: - // if a pod is start running, we can add it state to the delta graph if it is scheduled by other scheduler - case v1.PodSucceeded: - klog.Infof("[Fluence] Pod %s succeeded, Fluence needs to free the resources", newPod.Name) - - f.mutex.Lock() - defer f.mutex.Unlock() - - if _, ok := f.podNameToJobId[newPod.Name]; ok { - f.cancelFluxJobForPod(newPod) - } else { - klog.Infof("[Fluence] Succeeded pod %s/%s doesn't have flux jobid", newPod.Namespace, newPod.Name) - } - case v1.PodFailed: - // a corner case need to be tested, the pod exit code is not 0, can be created with segmentation fault pi test - klog.Warningf("[Fluence] Pod %s failed, Fluence needs to free the resources", newPod.Name) - - f.mutex.Lock() - defer f.mutex.Unlock() - - if _, ok := f.podNameToJobId[newPod.Name]; ok { - f.cancelFluxJobForPod(newPod) - } else { - klog.Errorf("[Fluence] Failed pod %s/%s doesn't have flux jobid", newPod.Namespace, newPod.Name) - } - case v1.PodUnknown: - // don't know how to deal with it as it's unknown phase - default: - // shouldn't enter this branch - } -} - -// deletePod handles the delete event handler -// TODO when should we clear group from the cache? -func (f *Fluence) deletePod(podObj interface{}) { - klog.Info("[Fluence] Delete Pod event handler") - - pod := podObj.(*v1.Pod) - klog.Infof("[Fluence] Delete pod has status %s", pod.Status.Phase) - switch pod.Status.Phase { - case v1.PodSucceeded: - case v1.PodPending: - klog.Infof("[Fluence] Pod %s completed and is Pending termination, Fluence needs to free the resources", pod.Name) - - f.mutex.Lock() - defer f.mutex.Unlock() - - if _, ok := f.podNameToJobId[pod.Name]; ok { - f.cancelFluxJobForPod(pod) - } else { - klog.Infof("[Fluence] Terminating pod %s/%s doesn't have flux jobid", pod.Namespace, pod.Name) - } - case v1.PodRunning: - f.mutex.Lock() - defer f.mutex.Unlock() - - if _, ok := f.podNameToJobId[pod.Name]; ok { - f.cancelFluxJobForPod(pod) - } else { - klog.Infof("[Fluence] Deleted pod %s/%s doesn't have flux jobid", pod.Namespace, pod.Name) - } - } - - // We assume that a request to delete one pod means all of them. - // We have to take an all or nothing approach for now - fgroup.DeleteFluenceGroup(pod) -} diff --git a/sig-scheduler-plugins/pkg/fluence/group/group.go b/sig-scheduler-plugins/pkg/fluence/group/group.go index 4af84e2..455b9e5 100644 --- a/sig-scheduler-plugins/pkg/fluence/group/group.go +++ b/sig-scheduler-plugins/pkg/fluence/group/group.go @@ -1,103 +1,23 @@ package group import ( - "fmt" - "strconv" - - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/klog/v2" + klog "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" - fcore "sigs.k8s.io/scheduler-plugins/pkg/fluence/core" - "sigs.k8s.io/scheduler-plugins/pkg/fluence/labels" + sched "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" ) -// getDefaultGroupName returns a group name based on the pod namespace and name -// We could do this for pods that are not labeled, and treat them as a size 1 group -func getDefaultGroupName(pod *v1.Pod) string { - return fmt.Sprintf("%s-%s", pod.Namespace, pod.Name) -} - -// getPodsGroup gets the pods group, if it exists. -func GetPodsGroup(pod *v1.Pod) *fcore.PodGroupCache { - groupName := EnsureFluenceGroup(pod) - return fcore.GetPodGroup(groupName) -} - -// GetGroup is a courtesy wrapper around fcore.GetPodGroup -func GetGroup(groupName string) *fcore.PodGroupCache { - return fcore.GetPodGroup(groupName) -} - -// ensureFluenceGroup ensure that a podGroup is created for the named fluence group -// Preference goes to the traditional PodGroup (created by the user) -// and falls back to having one created by fluence. If there is no PodGroup -// created and no fluence annotation, we do not create the group. -// Likely for fluence we'd want a cleanup function somehow too, -// for now assume groups are unique by name. -func EnsureFluenceGroup(pod *v1.Pod) string { - - // Get the group name and size from the fluence labels - groupName := getFluenceGroupName(pod) - groupSize := getFluenceGroupSize(pod) - - // If there isn't a group, make a single node sized group - // This is so we can always treat the cases equally - if groupName == "" { - klog.Infof("[Fluence] Group annotation missing for pod %s", pod.Name) - groupName = getDefaultGroupName(pod) - } - klog.Infof("[Fluence] Group name for %s is %s", pod.Name, groupName) - klog.Infof("[Fluence] Group size for %s is %d", pod.Name, groupSize) - - // Register the pod group (with the pod) in our cache - fcore.RegisterPodGroup(pod, groupName, groupSize) - return groupName -} - -// deleteFluenceGroup ensures the pod group is deleted, if it exists -func DeleteFluenceGroup(pod *v1.Pod) { - // Get the group name and size from the fluence labels - pg := GetPodsGroup(pod) - fcore.DeletePodGroup(pg.Name) - klog.Infof("[Fluence] known groups are:\n") - fcore.ListGroups() -} - -// getFluenceGroupName looks for the group to indicate a fluence group, and returns it -func getFluenceGroupName(pod *v1.Pod) string { - groupName, _ := pod.Labels[labels.PodGroupLabel] - return groupName -} - -// getFluenceGroupSize gets the size of the fluence group -func getFluenceGroupSize(pod *v1.Pod) int32 { - size, _ := pod.Labels[labels.PodGroupSizeLabel] - - // Default size of 1 if the label is not set (but name is) - if size == "" { - return 1 - } - - // We don't want the scheduler to fail if someone puts a value for size - // that doesn't convert nicely. They can find this in the logs. - intSize, err := strconv.ParseUint(size, 10, 32) - if err != nil { - klog.Error(" [Fluence] Parsing integer size for pod group") - } - return int32(intSize) -} - // GetCreationTimestamp first tries the fluence group, then falls back to the initial attempt timestamp -func GetCreationTimestamp(groupName string, podInfo *framework.QueuedPodInfo) metav1.MicroTime { - pg := fcore.GetPodGroup(groupName) +// This is the only update we have made to the upstream PodGroupManager, because we are expecting +// a MicroTime and not a time.Time. +func GetCreationTimestamp(groupName string, pg *sched.PodGroup, podInfo *framework.QueuedPodInfo) metav1.MicroTime { // IsZero is an indicator if this was actually set // If the group label was present and we have a group, this will be true - if !pg.TimeCreated.IsZero() { - klog.Infof(" [Fluence] Pod group %s was created at %s\n", groupName, pg.TimeCreated) - return pg.TimeCreated + if !pg.Status.ScheduleStartTime.IsZero() { + klog.Infof(" [Fluence] Pod group %s was created at %s\n", groupName, pg.Status.ScheduleStartTime) + return pg.Status.ScheduleStartTime } // We should actually never get here. klog.Errorf(" [Fluence] Pod group %s time IsZero, we should not have reached here", groupName) diff --git a/sig-scheduler-plugins/pkg/fluence/utils/utils.go b/sig-scheduler-plugins/pkg/fluence/utils/utils.go index e384669..f2969d2 100644 --- a/sig-scheduler-plugins/pkg/fluence/utils/utils.go +++ b/sig-scheduler-plugins/pkg/fluence/utils/utils.go @@ -21,7 +21,7 @@ import ( "strings" v1 "k8s.io/api/core/v1" - "k8s.io/klog/v2" + klog "k8s.io/klog/v2" pb "sigs.k8s.io/scheduler-plugins/pkg/fluence/fluxcli-grpc" ) @@ -39,12 +39,14 @@ func getPodJobspecLabels(pod *v1.Pod) []string { return labels } -// InspectPodInfo takes a pod object and returns the pod.spec -// Note from vsoch - I updated this to calculate containers across the pod -// if that's wrong we can change it back. -func InspectPodInfo(pod *v1.Pod) *pb.PodSpec { +// PreparePodJobSpec takes a pod object and returns the jobspec +// The jobspec is based on the pod, and assumes it will be duplicated +// for a MatchAllocate request (representing all pods). We name the +// jobspec based on the group and not the individual ID. +// This calculates across containers in the od +func PreparePodJobSpec(pod *v1.Pod, groupName string) *pb.PodSpec { ps := new(pb.PodSpec) - ps.Id = pod.Name + ps.Id = groupName // Note from vsoch - there was an if check here to see if we had labels, // I don't think there is risk to adding an empty list but we can add diff --git a/src/fluence/fluxion/fluxion.go b/src/fluence/fluxion/fluxion.go index 5775199..05e94fa 100644 --- a/src/fluence/fluxion/fluxion.go +++ b/src/fluence/fluxion/fluxion.go @@ -8,7 +8,7 @@ import ( "github.com/flux-framework/flux-k8s/flux-plugin/fluence/jobspec" "github.com/flux-framework/flux-k8s/flux-plugin/fluence/utils" "github.com/flux-framework/fluxion-go/pkg/fluxcli" - "k8s.io/klog/v2" + klog "k8s.io/klog/v2" "context" "errors"