Skip to content

Commit

Permalink
Merge pull request #22 from xyloid/yilin/flux-job-cancellation
Browse files Browse the repository at this point in the history
Adds job cancellation of flux jobs
  • Loading branch information
cmisale authored Nov 4, 2021
2 parents cb5a897 + 406d0b9 commit 0a111e6
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 23 deletions.
156 changes: 138 additions & 18 deletions flux-plugin/kubeflux/kubeflux.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,29 @@ package kubeflux
import (
"context"
"errors"
"fluxcli"
"fmt"
"io/ioutil"
"k8s.io/klog/v2"
"time"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"fluxcli"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"sigs.k8s.io/scheduler-plugins/pkg/kubeflux/utils"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"sigs.k8s.io/scheduler-plugins/pkg/kubeflux/jobspec"
"sigs.k8s.io/scheduler-plugins/pkg/kubeflux/utils"
"sync"
"time"
)

type KubeFlux struct {
handle framework.Handle
fluxctx *fluxcli.ReapiCtx
mutex sync.Mutex
handle framework.Handle
fluxctx *fluxcli.ReapiCtx
podNameToJobId map[string]uint64
}

var _ framework.PreFilterPlugin = &KubeFlux{}
Expand Down Expand Up @@ -64,17 +71,18 @@ func (kf *KubeFlux) PreFilter(ctx context.Context, state *framework.CycleState,
klog.Infof("Examining the pod")

fluxjbs := jobspec.InspectPodInfo(pod)
filename := "/home/data/jobspecs/jobspec.yaml"
currenttime := time.Now()
filename := fmt.Sprintf("/home/data/jobspecs/jobspec-%s-%s.yaml", currenttime.Format(time.RFC3339Nano), pod.Name)
jobspec.CreateJobSpecYaml(fluxjbs, filename)

nodename, err := kf.askFlux(ctx, pod)
nodename, err := kf.askFlux(ctx, pod, filename)
if err != nil {
return framework.NewStatus(framework.Unschedulable, err.Error())
}

if nodename == "NONE" {
fmt.Println("Pod cannot be scheduled by KubeFlux, nodename ", nodename)
return framework.NewStatus(framework.Unschedulable, "Pod cannot be scheduled by KubeFlux, nodename " + nodename)
return framework.NewStatus(framework.Unschedulable, "Pod cannot be scheduled by KubeFlux, nodename "+nodename)
}

fmt.Println("Node Selected: ", nodename)
Expand All @@ -101,9 +109,20 @@ func (kf *KubeFlux) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}

func (kf *KubeFlux) askFlux(ctx context.Context, pod *v1.Pod) (string, error) {
func (kf *KubeFlux) askFlux(ctx context.Context, pod *v1.Pod, filename string) (string, error) {

// clean up previous match if a pod has already allocated previously
kf.mutex.Lock()
_, isPodAllocated := kf.podNameToJobId[pod.Name]
kf.mutex.Unlock()

if isPodAllocated {
fmt.Println("Clean up previous allocation")
kf.mutex.Lock()
kf.cancelFluxJobForPod(pod.Name)
kf.mutex.Unlock()
}

filename := "/home/data/jobspecs/jobspec.yaml"
spec, err := ioutil.ReadFile(filename)
if err != nil {
// err := fmt.Errorf("Error reading jobspec file")
Expand All @@ -112,12 +131,12 @@ func (kf *KubeFlux) askFlux(ctx context.Context, pod *v1.Pod) (string, error) {
start := time.Now()
reserved, allocated, at, pre, post, overhead, jobid, fluxerr := fluxcli.ReapiCliMatchAllocate(kf.fluxctx, false, string(spec))
elapsed := metrics.SinceInSeconds(start)
fmt.Println("Time elapsed: ", elapsed)
fmt.Println("Time elapsed (Match Allocate) :", elapsed)
if fluxerr != 0 {
// err := fmt.Errorf("Error in ReapiCliMatchAllocate")
return "", errors.New("Error in ReapiCliMatchAllocate")
}

if allocated == "" {
return "NONE", nil
}
Expand All @@ -126,16 +145,60 @@ func (kf *KubeFlux) askFlux(ctx context.Context, pod *v1.Pod) (string, error) {
nodename := fluxcli.ReapiCliGetNode(kf.fluxctx)
fmt.Println("nodename ", nodename)

kf.mutex.Lock()
kf.podNameToJobId[pod.Name] = jobid
fmt.Println("Check job set:")
fmt.Println(kf.podNameToJobId)
kf.mutex.Unlock()

return nodename, nil
}

func (kf *KubeFlux) cancelFluxJobForPod(podName string) {
jobid := kf.podNameToJobId[podName]

fmt.Printf("Cancel flux job: %v for pod %s\n", jobid, podName)

start := time.Now()
err := fluxcli.ReapiCliCancel(kf.fluxctx, int64(jobid), false)

if err == 0 {
delete(kf.podNameToJobId, podName)
} else {
fmt.Printf("Failed to delete pod %s from the podname-jobid map.\n", podName)
}

elapsed := metrics.SinceInSeconds(start)
fmt.Println("Time elapsed (Cancel Job) :", elapsed)

fmt.Printf("Job cancellation for pod %s result: %d\n", podName, err)
fmt.Println("Check job set: after delete")
fmt.Println(kf.podNameToJobId)
}

// initialize and return a new Flux Plugin
func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
fmt.Println("Error getting InClusterConfig")
return nil, err
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
fmt.Println("Error getting ClientSet")
return nil, err
}

factory := informers.NewSharedInformerFactory(clientset, 0)
podInformer := factory.Core().V1().Pods().Informer()

fctx := fluxcli.NewReapiCli()
fmt.Println("Created cli context ", fctx)
fmt.Printf("%+v\n", fctx)
filename := "/home/data/jgf/kubecluster.json"
err := utils.CreateJGF(handle, filename)
err = utils.CreateJGF(handle, filename)
if err != nil {
return nil, err
}
Expand All @@ -146,16 +209,73 @@ func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
return nil, err
}


start := time.Now()
fluxcli.ReapiCliInit(fctx, string(jgf))
elapsed := metrics.SinceInSeconds(start)
fmt.Println("Time elapsed (Cli Init with Graph) :", elapsed)

// if ret != 0 {
// fmt.Println("Error while initializing ReapiCli")
// return nil, errors.New("Error while initializing ReapiCli")
// }
klog.Infof("KubeFlux starts")

return &KubeFlux{handle: handle, fluxctx: fctx}, nil
kf := &KubeFlux{handle: handle, fluxctx: fctx, podNameToJobId: make(map[string]uint64)}

podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: kf.updatePod,
})

stopPodInformer := make(chan struct{})
go podInformer.Run(stopPodInformer)

return kf, nil
}

// EventHandlers
func (kf *KubeFlux) updatePod(oldObj, newObj interface{}) {
fmt.Println("updatePod event handler")
oldPod := oldObj.(*v1.Pod)
newPod := newObj.(*v1.Pod)
fmt.Println(oldPod)
fmt.Println(newPod)
fmt.Println(oldPod.Name, oldPod.Status)
fmt.Println(newPod.Name, newPod.Status)

switch newPod.Status.Phase {
case v1.PodPending:
// in this state we don't know if a pod is going to be running, thus we don't need to update job map
case v1.PodRunning:
// if a pod is start running, we can add it state to the delta graph if it is scheduled by other scheduler
case v1.PodSucceeded:
fmt.Printf("Pod %s succeeded, kubeflux needs to free the resources\n", newPod.Name)

kf.mutex.Lock()
defer kf.mutex.Unlock()

if _, ok := kf.podNameToJobId[newPod.Name]; ok {
kf.cancelFluxJobForPod(newPod.Name)
} else {
fmt.Printf("Succeeded pod %s/%s doesn't have flux jobid\n", newPod.Namespace, newPod.Name)
}
case v1.PodFailed:
// a corner case need to be tested, the pod exit code is not 0, can be created with segmentation fault pi test
fmt.Printf("Pod %s failed, kubeflux needs to free the resources\n", newPod.Name)

kf.mutex.Lock()
defer kf.mutex.Unlock()

if _, ok := kf.podNameToJobId[newPod.Name]; ok {
kf.cancelFluxJobForPod(newPod.Name)
} else {
fmt.Printf("Failed pod %s/%s doesn't have flux jobid\n", newPod.Namespace, newPod.Name)
}
case v1.PodUnknown:
// don't know how to deal with it as it's unknown phase
default:
// shouldn't enter this branch
}

}

////// Utility functions
Expand Down
16 changes: 11 additions & 5 deletions flux-plugin/kubeflux/utils/utils.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package utils

import (
"k8s.io/kubernetes/pkg/scheduler/framework"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"sigs.k8s.io/scheduler-plugins/pkg/kubeflux/jgf"
"time"
)

func CreateJGF(handle framework.Handle, filename string) error {
start := time.Now()
ctx := context.Background()
clientset := handle.ClientSet()
nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
Expand All @@ -20,7 +23,7 @@ func CreateJGF(handle framework.Handle, filename string) error {

fluxgraph.MakeEdge(cluster, rack, "contains")
fluxgraph.MakeEdge(rack, cluster, "in")

vcores := 0
fmt.Println("Number nodes ", len(nodes.Items))
// sdnCount := 0
Expand All @@ -30,7 +33,7 @@ func CreateJGF(handle framework.Handle, filename string) error {
if !master && !cp {

// Check if subnet already exists
// Here we build subnets according to IP addresses of nodes.
// Here we build subnets according to IP addresses of nodes.
// This was for GROMACS, therefore I comment that out and go back
// to build racks. One day, this will be customized by users.

Expand All @@ -49,7 +52,6 @@ func CreateJGF(handle framework.Handle, filename string) error {
// }
// subnet := subnets[subnetName]


totalcpu, _ := node.Status.Capacity.Cpu().AsInt64()
totalmem, _ := node.Status.Capacity.Memory().AsInt64()
workernode := fluxgraph.MakeNode(node_index, false, node.Name)
Expand Down Expand Up @@ -82,10 +84,14 @@ func CreateJGF(handle framework.Handle, filename string) error {
}
}

elapsed := metrics.SinceInSeconds(start)
fmt.Println("Time elapsed (CreateJGF) :", elapsed)
err = fluxgraph.WriteJGF(filename)
if err != nil {
return err
}
elapsed_write := metrics.SinceInSeconds(start)
fmt.Println("Time elapsed (CreateJGF write) :", elapsed_write-elapsed)
return nil

}

0 comments on commit 0a111e6

Please sign in to comment.