Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 66 additions & 2 deletions pkg/device_plugin/gfd.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,20 @@ func runGFD() {
return
}

// 3. Create the gfd pod
// 3. Create the gfd pod and delete when its done
gfdPod := createGFDPod(clientset, nodeName, namespace, gfdImage)
err = LaunchPodWithRetries(clientset, gfdPod, namespace)
if err != nil {
log.Printf("Error creating GFD pod: %v", err.Error())
return
}
err = CheckAndDeleteCompletedPod(clientset, gfdPod.Name, namespace)
if err != nil {
log.Printf("Error reaping GFD pod: %v", err.Error())
return
}

log.Println("GFD pod launched and cleaned up successfully.")
return
}

Expand Down Expand Up @@ -227,6 +233,18 @@ func getNodeLabel(clientset *kubernetes.Clientset, nodeName, labelKey string) (b

// LaunchPodWithRetries creates the pod object with exponential backoff based retries
func LaunchPodWithRetries(clientset *kubernetes.Clientset, pod *corev1.Pod, namespace string) error {

// attempt a delete if the pod already exists
existingPod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), pod.Name, metav1.GetOptions{})
if err == nil {
err = clientset.CoreV1().Pods(namespace).Delete(context.Background(), existingPod.Name, metav1.DeleteOptions{})
if err != nil {
fmt.Printf("Error deleting existing GFD pod: %v", err.Error())
return err
}
}

// launch the new GFD pod
backoff := wait.Backoff{
Duration: 1 * time.Second, // Initial delay
Factor: 1.5, // Multiply delay by this factor each step
Expand All @@ -236,7 +254,7 @@ func LaunchPodWithRetries(clientset *kubernetes.Clientset, pod *corev1.Pod, name
}

// 2. Execute the retry logic
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
err = wait.ExponentialBackoff(backoff, func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

Expand Down Expand Up @@ -296,6 +314,52 @@ func WaitForKataRuntime(clientset *kubernetes.Clientset, nodeName string) error
return err
}

// CheckAndDeleteCompletedPod checks if a pod is 'Succeeded' (Completed) and deletes it.
func CheckAndDeleteCompletedPod(clientset *kubernetes.Clientset, name, namespace string) error {
// 1. Define the backoff parameters
backoff := wait.Backoff{
Duration: 2 * time.Second, // Initial delay
Factor: 1.5, // Multiplier
Jitter: 0.1, // Adds randomness to prevent "thundering herd"
Steps: 30, // Total number of attempts
Cap: 1 * time.Minute, // Maximum delay between attempts
}

// 2. Execute the retry loop
err := wait.ExponentialBackoffWithContext(context.Background(), backoff, func(innerCtx context.Context) (bool, error) {
pod, err := clientset.CoreV1().Pods(namespace).Get(innerCtx, name, metav1.GetOptions{})
if err != nil {
// If the pod is gone, we stop retrying and return an error
return false, fmt.Errorf("error fetching GFD pod: %w", err)
}

log.Printf("Pod %s phase: %s", name, pod.Status.Phase)

// Check if the pod is 'Succeeded'
if pod.Status.Phase == "Succeeded" {
log.Printf("GFD Pod %s completed. Deleting...", name)

err := clientset.CoreV1().Pods(namespace).Delete(innerCtx, name, metav1.DeleteOptions{})
if err != nil {
// If deletion fails, return false to retry deletion on the next step
return false, nil
}

// Return true to signal the loop is finished successfully
return true, nil
}

// Return false to signal we should wait and try again
return false, nil
})

if err != nil {
return fmt.Errorf("[GFD cleanup] failed after backoff: %v", err)
}

return nil
}

func getGPUDeviceName() string {
for deviceID, _ := range deviceMap {
// Determine device name - skip nvswitch
Expand Down