diff --git a/pkg/device_plugin/gfd.go b/pkg/device_plugin/gfd.go index 9d4b2111..74ff4472 100644 --- a/pkg/device_plugin/gfd.go +++ b/pkg/device_plugin/gfd.go @@ -105,14 +105,20 @@ func runGFD() { return } - // 3. Create the gfd pod + // 3. Create the gfd pod and delete when its done gfdPod := createGFDPod(clientset, nodeName, namespace, gfdImage) err = LaunchPodWithRetries(clientset, gfdPod, namespace) if err != nil { log.Printf("Error creating GFD pod: %v", err.Error()) return } + err = CheckAndDeleteCompletedPod(clientset, gfdPod.Name, namespace) + if err != nil { + log.Printf("Error reaping GFD pod: %v", err.Error()) + return + } + log.Println("GFD pod launched and cleaned up successfully.") return } @@ -227,6 +233,18 @@ func getNodeLabel(clientset *kubernetes.Clientset, nodeName, labelKey string) (b // LaunchPodWithRetries creates the pod object with exponential backoff based retries func LaunchPodWithRetries(clientset *kubernetes.Clientset, pod *corev1.Pod, namespace string) error { + + // attempt a delete if the pod already exists + existingPod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), pod.Name, metav1.GetOptions{}) + if err == nil { + err = clientset.CoreV1().Pods(namespace).Delete(context.Background(), existingPod.Name, metav1.DeleteOptions{}) + if err != nil { + fmt.Printf("Error deleting existing GFD pod: %v", err.Error()) + return err + } + } + + // launch the new GFD pod backoff := wait.Backoff{ Duration: 1 * time.Second, // Initial delay Factor: 1.5, // Multiply delay by this factor each step @@ -236,7 +254,7 @@ func LaunchPodWithRetries(clientset *kubernetes.Clientset, pod *corev1.Pod, name } // 2. Execute the retry logic - err := wait.ExponentialBackoff(backoff, func() (bool, error) { + err = wait.ExponentialBackoff(backoff, func() (bool, error) { ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) defer cancel() @@ -296,6 +314,52 @@ func WaitForKataRuntime(clientset *kubernetes.Clientset, nodeName string) error return err } +// CheckAndDeleteCompletedPod checks if a pod is 'Succeeded' (Completed) and deletes it. +func CheckAndDeleteCompletedPod(clientset *kubernetes.Clientset, name, namespace string) error { + // 1. Define the backoff parameters + backoff := wait.Backoff{ + Duration: 2 * time.Second, // Initial delay + Factor: 1.5, // Multiplier + Jitter: 0.1, // Adds randomness to prevent "thundering herd" + Steps: 30, // Total number of attempts + Cap: 1 * time.Minute, // Maximum delay between attempts + } + + // 2. Execute the retry loop + err := wait.ExponentialBackoffWithContext(context.Background(), backoff, func(innerCtx context.Context) (bool, error) { + pod, err := clientset.CoreV1().Pods(namespace).Get(innerCtx, name, metav1.GetOptions{}) + if err != nil { + // If the pod is gone, we stop retrying and return an error + return false, fmt.Errorf("error fetching GFD pod: %w", err) + } + + log.Printf("Pod %s phase: %s", name, pod.Status.Phase) + + // Check if the pod is 'Succeeded' + if pod.Status.Phase == "Succeeded" { + log.Printf("GFD Pod %s completed. Deleting...", name) + + err := clientset.CoreV1().Pods(namespace).Delete(innerCtx, name, metav1.DeleteOptions{}) + if err != nil { + // If deletion fails, return false to retry deletion on the next step + return false, nil + } + + // Return true to signal the loop is finished successfully + return true, nil + } + + // Return false to signal we should wait and try again + return false, nil + }) + + if err != nil { + return fmt.Errorf("[GFD cleanup] failed after backoff: %v", err) + } + + return nil +} + func getGPUDeviceName() string { for deviceID, _ := range deviceMap { // Determine device name - skip nvswitch