diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index a461e49..9eaa5ff 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -16,7 +16,8 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: 1.22 + go-version-file: go.mod + cache-dependency-path: go.sum - name: Test run: make test @@ -29,7 +30,8 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: 1.22 + go-version-file: go.mod + cache-dependency-path: go.sum - name: Build run: make build @@ -46,6 +48,32 @@ jobs: - name: Validate Cluster End State run: ./testing/validate.sh + integration-test-presync: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: go.mod + cache-dependency-path: go.sum + + - name: Build + run: make build + + - name: Create k8s Kind Cluster + uses: helm/kind-action@v1.10.0 + + - name: Create Testdata in Cluster + run: ./testing/init.sh + + - name: Run PVMigrate + run: ./bin/pvmigrate --source-sc int-source --dest-sc int-dest --pre-sync-mode + + - name: Validate Cluster End State + run: ./testing/validate.sh + integration-test-incluster: runs-on: ubuntu-latest steps: @@ -54,7 +82,8 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: 1.22 + go-version-file: go.mod + cache-dependency-path: go.sum - name: Build and push uses: docker/build-push-action@v6 @@ -74,3 +103,31 @@ jobs: - name: Validate Cluster End State run: ./testing/validate.sh + + # this job will validate that all the tests passed + # it is used for the github branch protection rule + validate-success: + name: Validate success + runs-on: ubuntu-24.04 + needs: + - unit-test + - integration-test + - integration-test-presync + - integration-test-incluster + if: always() + steps: + # https://docs.github.com/en/actions/learn-github-actions/contexts#needs-context + - name: fail if unit-test job was not successful + if: needs.unit-test.result != 'success' + run: exit 1 + - name: fail if integration-test job was not successful + if: needs.integration-test.result != 'success' + run: exit 1 + - name: fail if integration-test-presync job was not successful + if: needs.integration-test-presync.result != 'success' + run: exit 1 + - name: fail if integration-test-incluster job was not successful + if: needs.integration-test-incluster.result != 'success' + run: exit 1 + - name: succeed if everything else passed + run: echo "Validation succeeded" diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index e6a51cf..039abae 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -15,7 +15,8 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: 1.22 + go-version-file: go.mod + cache-dependency-path: go.sum - name: Test run: make test @@ -28,7 +29,8 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: 1.22 + go-version-file: go.mod + cache-dependency-path: go.sum - name: Build run: make build @@ -45,6 +47,32 @@ jobs: - name: Validate Cluster End State run: ./testing/validate.sh + integration-test-presync: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: go.mod + cache-dependency-path: go.sum + + - name: Build + run: make build + + - name: Create k8s Kind Cluster + uses: helm/kind-action@v1.10.0 + + - name: Create Testdata in Cluster + run: ./testing/init.sh + + - name: Run PVMigrate + run: ./bin/pvmigrate --source-sc int-source --dest-sc int-dest --pre-sync-mode + + - name: Validate Cluster End State + run: ./testing/validate.sh + integration-test-incluster: runs-on: ubuntu-latest steps: @@ -53,7 +81,8 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: 1.22 + go-version-file: go.mod + cache-dependency-path: go.sum - name: Build and push uses: docker/build-push-action@v6 @@ -80,6 +109,7 @@ jobs: - unit-test - integration-test - integration-test-incluster + - integration-test-presync if: startsWith(github.ref, 'refs/tags/v') steps: - name: Checkout @@ -90,7 +120,8 @@ jobs: - uses: actions/setup-go@v5 with: - go-version: 1.22 + go-version-file: go.mod + cache-dependency-path: go.sum - name: Run GoReleaser uses: goreleaser/goreleaser-action@v6 diff --git a/cmd/main.go b/cmd/main.go index d305e2e..f4b7d54 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -39,7 +39,9 @@ func main() { flag.StringVar(&options.Namespace, "namespace", "", "only migrate PVCs within this namespace") flag.BoolVar(&options.SetDefaults, "set-defaults", false, "change default storage class from source to dest") flag.BoolVar(&options.VerboseCopy, "verbose-copy", false, "show output from the rsync command used to copy data between PVCs") + flag.BoolVar(&options.PreSyncMode, "pre-sync-mode", false, "create the new PVC and copy the data, then scale down, run another copy and finally swap the PVCs") flag.BoolVar(&options.SkipSourceValidation, "skip-source-validation", false, "migrate from PVCs using a particular StorageClass name, even if that StorageClass does not exist") + flag.IntVar(&options.MaxPVs, "max-pvs", 0, "maximum number of PVs to process. default to 0 (unlimited)") flag.IntVar(&podReadyTimeout, "pod-ready-timeout", 60, "length of time to wait (in seconds) for validation pod(s) to go into Ready phase") flag.IntVar(&deletePVTimeout, "delete-pv-timeout", 300, "length of time to wait (in seconds) for backing PV to be removed when temporary PVC is deleted") flag.BoolVar(&skipPreflightValidation, "skip-preflight-validation", false, "skip preflight migration validation on the destination storage provider") diff --git a/pkg/k8sutil/truncate.go b/pkg/k8sutil/truncate.go index 9f4cfca..567520f 100644 --- a/pkg/k8sutil/truncate.go +++ b/pkg/k8sutil/truncate.go @@ -2,14 +2,14 @@ package k8sutil import "fmt" -const nameSuffix = "-pvcmigrate" +const PVCNameSuffix = "-pvcmigrate" // if the length after adding the suffix is more than 253 characters, we need to reduce that to fit within k8s limits // pruning from the end runs the risk of dropping the '0'/'1'/etc of a statefulset's PVC name // pruning from the front runs the risk of making a-replica-... and b-replica-... collide // so this removes characters from the middle of the string func NewPvcName(originalName string) string { - candidate := originalName + nameSuffix + candidate := originalName + PVCNameSuffix if len(candidate) <= 253 { return candidate } diff --git a/pkg/migrate/migrate.go b/pkg/migrate/migrate.go index 2576db3..106fc99 100644 --- a/pkg/migrate/migrate.go +++ b/pkg/migrate/migrate.go @@ -8,6 +8,7 @@ import ( "io" "log" "strconv" + "strings" "text/tabwriter" "time" @@ -50,6 +51,8 @@ type Options struct { Namespace string SetDefaults bool VerboseCopy bool + PreSyncMode bool + MaxPVs int SkipSourceValidation bool PodReadyTimeout time.Duration DeletePVTimeout time.Duration @@ -62,17 +65,28 @@ func Migrate(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, return err } - matchingPVCs, namespaces, err := getPVCs(ctx, w, clientset, options.SourceSCName, options.DestSCName, options.Namespace) + if options.PreSyncMode { + w.Println("\nRunning in pre-sync-mode: we first copy the PVC live, without scaling down pods. Once that pre-sync is completed, we scale down, do another copy/sync and finally swap the PVCs.") + } + + matchingPVCs, namespaces, err := getPVCs(ctx, w, clientset, &options) if err != nil { return err } - updatedMatchingPVCs, err := scaleDownPods(ctx, w, clientset, matchingPVCs, time.Second*5) + if options.PreSyncMode { + err = copyAllPVCs(ctx, w, clientset, &options, matchingPVCs, 1*time.Second) + if err != nil { + return err + } + } + + err = scaleDownPods(ctx, w, clientset, matchingPVCs, time.Second*5) if err != nil { return fmt.Errorf("failed to scale down pods: %w", err) } - err = copyAllPVCs(ctx, w, clientset, options.SourceSCName, options.DestSCName, options.RsyncImage, updatedMatchingPVCs, options.VerboseCopy, time.Second, options.RsyncFlags) + err = copyAllPVCs(ctx, w, clientset, &options, matchingPVCs, 1*time.Second) if err != nil { return err } @@ -169,15 +183,15 @@ func swapDefaultStorageClasses(ctx context.Context, w *log.Logger, clientset k8s return nil } -func copyAllPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, sourceSCName string, destSCName string, rsyncImage string, matchingPVCs map[string][]*corev1.PersistentVolumeClaim, verboseCopy bool, waitTime time.Duration, rsyncFlags []string) error { +func copyAllPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, options *Options, matchingPVCs map[string][]*corev1.PersistentVolumeClaim, waitTime time.Duration) error { // create a pod for each PVC migration, and wait for it to finish - w.Printf("\nCopying data from %s PVCs to %s PVCs\n", sourceSCName, destSCName) + w.Printf("\nCopying data from %s PVCs to %s PVCs\n", options.SourceSCName, options.DestSCName) for ns, nsPvcs := range matchingPVCs { for _, nsPvc := range nsPvcs { sourcePvcName, destPvcName := nsPvc.Name, k8sutil.NewPvcName(nsPvc.Name) w.Printf("Copying data from %s (%s) to %s in %s\n", sourcePvcName, nsPvc.Spec.VolumeName, destPvcName, ns) - err := copyOnePVC(ctx, w, clientset, ns, sourcePvcName, destPvcName, rsyncImage, verboseCopy, waitTime, rsyncFlags) + err := copyOnePVC(ctx, w, clientset, ns, sourcePvcName, destPvcName, options, waitTime) if err != nil { return fmt.Errorf("failed to copy PVC %s in %s: %w", nsPvc.Name, ns, err) } @@ -186,7 +200,7 @@ func copyAllPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interfa return nil } -func copyOnePVC(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, ns string, sourcePvcName string, destPvcName string, rsyncImage string, verboseCopy bool, waitTime time.Duration, rsyncFlags []string) error { +func copyOnePVC(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, ns string, sourcePvcName string, destPvcName string, options *Options, waitTime time.Duration) error { w.Printf("Determining the node to migrate PVC %s on\n", sourcePvcName) nodeName, err := getDesiredNode(ctx, clientset, ns, sourcePvcName) if err != nil { @@ -194,7 +208,7 @@ func copyOnePVC(ctx context.Context, w *log.Logger, clientset k8sclient.Interfac } w.Printf("Creating pvc migrator pod on node %s\n", nodeName) - createdPod, err := createMigrationPod(ctx, clientset, ns, sourcePvcName, destPvcName, rsyncImage, nodeName, rsyncFlags) + createdPod, err := createMigrationPod(ctx, clientset, ns, sourcePvcName, destPvcName, options.RsyncImage, nodeName, options.RsyncFlags) if err != nil { return err } @@ -278,13 +292,13 @@ func copyOnePVC(ctx context.Context, w *log.Logger, clientset k8sclient.Interfac w.Printf("failed to read pod logs: %v\n", err) break } - if verboseCopy { + if options.VerboseCopy { w.Printf(" %s\n", line) } else { _, _ = fmt.Fprintf(w.Writer(), ".") // one dot per line of output } } - if !verboseCopy { + if !options.VerboseCopy { _, _ = fmt.Fprintf(w.Writer(), "done!\n") // add a newline at the end of the dots if not showing pod logs } @@ -416,7 +430,7 @@ func createMigrationPod(ctx context.Context, clientset k8sclient.Interface, ns s // a map of namespaces to arrays of original PVCs // an array of namespaces that the PVCs were found within // an error, if one was encountered -func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, sourceSCName, destSCName string, Namespace string) (map[string][]*corev1.PersistentVolumeClaim, []string, error) { +func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, opts *Options) (map[string][]*corev1.PersistentVolumeClaim, []string, error) { // get PVs using the specified storage provider pvs, err := clientset.CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{}) if err != nil { @@ -425,26 +439,38 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, matchingPVs := []corev1.PersistentVolume{} pvsByName := map[string]corev1.PersistentVolume{} for _, pv := range pvs.Items { - if pv.Spec.StorageClassName == sourceSCName { + if pv.Spec.StorageClassName == opts.SourceSCName { matchingPVs = append(matchingPVs, pv) pvsByName[pv.Name] = pv } else { - w.Printf("PV %s does not match source SC %s, not migrating\n", pv.Name, sourceSCName) + w.Printf("PV %s does not match source SC %s, not migrating\n", pv.Name, opts.SourceSCName) } } // get PVCs using specified PVs + matchingPVCsCount := 0 matchingPVCs := map[string][]*corev1.PersistentVolumeClaim{} for _, pv := range matchingPVs { + if opts.MaxPVs > 0 && matchingPVCsCount >= opts.MaxPVs { + break + } if pv.Spec.ClaimRef != nil { + if len(opts.Namespace) > 0 && pv.Spec.ClaimRef.Namespace != opts.Namespace { + continue // early continue, to prevent logging info regarding PV/PVCs in other namespaces + } + + if strings.HasSuffix(pv.Spec.ClaimRef.Name, k8sutil.PVCNameSuffix) { + w.Printf("Skipping PV %s as the claiming PVC has the %s suffix", pv.Name, k8sutil.PVCNameSuffix) + continue + } + pvc, err := clientset.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(ctx, pv.Spec.ClaimRef.Name, metav1.GetOptions{}) if err != nil { return nil, nil, fmt.Errorf("failed to get PVC for PV %s in %s: %w", pv.Spec.ClaimRef.Name, pv.Spec.ClaimRef.Namespace, err) } - if pv.Spec.ClaimRef.Namespace == Namespace || Namespace == "" { - matchingPVCs[pv.Spec.ClaimRef.Namespace] = append(matchingPVCs[pv.Spec.ClaimRef.Namespace], pvc) - } + matchingPVCsCount++ + matchingPVCs[pv.Spec.ClaimRef.Namespace] = append(matchingPVCs[pv.Spec.ClaimRef.Namespace], pvc) } else { return nil, nil, fmt.Errorf("PV %s does not have an associated PVC - resolve this before rerunning", pv.Name) @@ -456,7 +482,7 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, pvcNamespaces = append(pvcNamespaces, idx) } - w.Printf("\nFound %d matching PVCs to migrate across %d namespaces:\n", len(matchingPVCs), len(pvcNamespaces)) + w.Printf("\nFound %d matching PVCs to migrate across %d namespaces:\n", matchingPVCsCount, len(pvcNamespaces)) tw := tabwriter.NewWriter(w.Writer(), 2, 2, 1, ' ', 0) _, _ = fmt.Fprintf(tw, "namespace:\tpvc:\tpv:\tsize:\t\n") for ns, nsPvcs := range matchingPVCs { @@ -471,7 +497,7 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, } // create new PVCs for each matching PVC - w.Printf("\nCreating new PVCs to migrate data to using the %s StorageClass\n", destSCName) + w.Printf("\nCreating new PVCs to migrate data to using the %s StorageClass\n", opts.DestSCName) for ns, nsPvcs := range matchingPVCs { for _, nsPvc := range nsPvcs { newName := k8sutil.NewPvcName(nsPvc.Name) @@ -493,7 +519,7 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, return nil, nil, fmt.Errorf("failed to find existing PVC: %w", err) } } else if existingPVC != nil { - if existingPVC.Spec.StorageClassName != nil && *existingPVC.Spec.StorageClassName == destSCName { + if existingPVC.Spec.StorageClassName != nil && *existingPVC.Spec.StorageClassName == opts.DestSCName { existingSize := existingPVC.Spec.Resources.Requests.Storage().String() if existingSize == desiredPvStorage.String() { @@ -525,7 +551,7 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, }, }, Spec: corev1.PersistentVolumeClaimSpec{ - StorageClassName: &destSCName, + StorageClassName: &opts.DestSCName, Resources: corev1.VolumeResourceRequirements{ Requests: map[corev1.ResourceName]resource.Quantity{ corev1.ResourceStorage: desiredPvStorage, @@ -675,9 +701,7 @@ func mutateSC(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, // if a pod is not created by pvmigrate, and is not controlled by a statefulset/deployment, this function will return an error. // if waitForCleanup is true, after scaling down deployments/statefulsets it will wait for all pods to be deleted. // It returns a map of namespace to PVCs and any errors encountered. -func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, matchingPVCs map[string][]*corev1.PersistentVolumeClaim, checkInterval time.Duration) (map[string][]*corev1.PersistentVolumeClaim, error) { - // build new map with complete pvcCtx - updatedPVCs := matchingPVCs +func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, matchingPVCs map[string][]*corev1.PersistentVolumeClaim, checkInterval time.Duration) error { // get pods using specified PVCs matchingPods := map[string][]corev1.Pod{} @@ -685,7 +709,7 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter for ns, nsPvcs := range matchingPVCs { nsPods, err := clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{}) if err != nil { - return nil, fmt.Errorf("failed to get pods in %s: %w", ns, err) + return fmt.Errorf("failed to get pods in %s: %w", ns, err) } for _, nsPod := range nsPods.Items { perPodLoop: @@ -706,7 +730,7 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter for ns, nsPvcs := range matchingPVCs { nsPods, err := clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{}) if err != nil { - return nil, fmt.Errorf("failed to get pods in %s: %w", ns, err) + return fmt.Errorf("failed to get pods in %s: %w", ns, err) } for _, nsPod := range nsPods.Items { for _, podVol := range nsPod.Spec.Volumes { @@ -728,7 +752,7 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter return volume.Annotations[sourceNodeAnnotation] == nsPod.Spec.NodeName }) if err != nil { - return nil, fmt.Errorf("failed to annotate pv %s (backs pvc %s) with node name %s: %w", nsPvClaim.Spec.VolumeName, nsPvClaim.ObjectMeta.Name, nsPod.Spec.NodeName, err) + return fmt.Errorf("failed to annotate pv %s (backs pvc %s) with node name %s: %w", nsPvClaim.Spec.VolumeName, nsPvClaim.ObjectMeta.Name, nsPod.Spec.NodeName, err) } } } @@ -747,7 +771,7 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter } err := tw.Flush() if err != nil { - return nil, fmt.Errorf("failed to print Pods: %w", err) + return fmt.Errorf("failed to print Pods: %w", err) } // get owners controlling specified pods @@ -771,11 +795,11 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter // this pod was created by pvmigrate, so it can be deleted by pvmigrate err := clientset.CoreV1().Pods(ns).Delete(ctx, nsPod.Name, metav1.DeleteOptions{}) if err != nil { - return nil, fmt.Errorf("migration pod %s in %s leftover from a previous run failed to delete, please delete it before retrying: %w", nsPod.Name, ns, err) + return fmt.Errorf("migration pod %s in %s leftover from a previous run failed to delete, please delete it before retrying: %w", nsPod.Name, ns, err) } } else { // TODO: handle properly - return nil, fmt.Errorf("pod %s in %s did not have any owners!\nPlease delete it before retrying", nsPod.Name, ns) + return fmt.Errorf("pod %s in %s did not have any owners!\nPlease delete it before retrying", nsPod.Name, ns) } } } @@ -791,7 +815,7 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter case "StatefulSet": ss, err := clientset.AppsV1().StatefulSets(ns).Get(ctx, ownerName, metav1.GetOptions{}) if err != nil { - return nil, fmt.Errorf("failed to get statefulset %s scale in %s: %w", ownerName, ns, err) + return fmt.Errorf("failed to get statefulset %s scale in %s: %w", ownerName, ns, err) } formerScale := int32(1) @@ -812,24 +836,24 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter w.Printf("scaling StatefulSet %s from %d to 0 in %s\n", ownerName, formerScale, ns) _, err = clientset.AppsV1().StatefulSets(ns).Update(ctx, ss, metav1.UpdateOptions{}) if err != nil { - return nil, fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err) + return fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err) } case "ReplicaSet": rs, err := clientset.AppsV1().ReplicaSets(ns).Get(ctx, ownerName, metav1.GetOptions{}) if err != nil { - return nil, fmt.Errorf("failed to get replicaset %s in %s: %w", ownerName, ns, err) + return fmt.Errorf("failed to get replicaset %s in %s: %w", ownerName, ns, err) } if len(rs.OwnerReferences) != 1 { - return nil, fmt.Errorf("expected 1 owner for replicaset %s in %s, found %d instead", ownerName, ns, len(rs.OwnerReferences)) + return fmt.Errorf("expected 1 owner for replicaset %s in %s, found %d instead", ownerName, ns, len(rs.OwnerReferences)) } if rs.OwnerReferences[0].Kind != "Deployment" { - return nil, fmt.Errorf("expected owner for replicaset %s in %s to be a deployment, found %s of kind %s instead", ownerName, ns, rs.OwnerReferences[0].Name, rs.OwnerReferences[0].Kind) + return fmt.Errorf("expected owner for replicaset %s in %s to be a deployment, found %s of kind %s instead", ownerName, ns, rs.OwnerReferences[0].Name, rs.OwnerReferences[0].Kind) } dep, err := clientset.AppsV1().Deployments(ns).Get(ctx, rs.OwnerReferences[0].Name, metav1.GetOptions{}) if err != nil { - return nil, fmt.Errorf("failed to get deployment %s scale in %s: %w", ownerName, ns, err) + return fmt.Errorf("failed to get deployment %s scale in %s: %w", ownerName, ns, err) } formerScale := int32(1) @@ -850,10 +874,10 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter w.Printf("scaling Deployment %s from %d to 0 in %s\n", ownerName, formerScale, ns) _, err = clientset.AppsV1().Deployments(ns).Update(ctx, dep, metav1.UpdateOptions{}) if err != nil { - return nil, fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err) + return fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err) } default: - return nil, fmt.Errorf("scaling pods controlled by a %s is not supported, please delete the pods controlled by %s in %s before retrying", ownerKind, ownerKind, ns) + return fmt.Errorf("scaling pods controlled by a %s is not supported, please delete the pods controlled by %s in %s before retrying", ownerKind, ownerKind, ns) } } } @@ -867,7 +891,7 @@ checkPvcPodLoop: for ns, nsPvcs := range matchingPVCs { nsPods, err := clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{}) if err != nil { - return nil, fmt.Errorf("failed to get pods in %s: %w", ns, err) + return fmt.Errorf("failed to get pods in %s: %w", ns, err) } for _, nsPod := range nsPods.Items { for _, podVol := range nsPod.Spec.Volumes { @@ -875,7 +899,7 @@ checkPvcPodLoop: for _, nsClaim := range nsPvcs { if podVol.PersistentVolumeClaim.ClaimName == nsClaim.Name { if nsPod.CreationTimestamp.After(migrationStartTime) { - return nil, fmt.Errorf("pod %s in %s mounting %s was created at %s, after scale-down started at %s. It is likely that there is some other operator scaling this back up", nsPod.Name, ns, nsClaim.Name, nsPod.CreationTimestamp.Format(time.RFC3339), migrationStartTime.Format(time.RFC3339)) + return fmt.Errorf("pod %s in %s mounting %s was created at %s, after scale-down started at %s. It is likely that there is some other operator scaling this back up", nsPod.Name, ns, nsClaim.Name, nsPod.CreationTimestamp.Format(time.RFC3339), migrationStartTime.Format(time.RFC3339)) } w.Printf("Found pod %s in %s mounting to-be-migrated PVC %s, waiting\n", nsPod.Name, ns, nsClaim.Name) @@ -892,7 +916,7 @@ checkPvcPodLoop: } w.Printf("All pods removed successfully\n") - return updatedPVCs, nil + return nil } func scaleUpPods(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, namespaces []string) error { diff --git a/pkg/migrate/migrate_test.go b/pkg/migrate/migrate_test.go index 3563972..11e3fce 100644 --- a/pkg/migrate/migrate_test.go +++ b/pkg/migrate/migrate_test.go @@ -895,7 +895,8 @@ func TestGetPVCs(t *testing.T) { req := require.New(t) clientset := fake.NewSimpleClientset(test.resources...) testlog := log.New(testWriter{t: t}, "", 0) - originalPVCs, nses, err := getPVCs(context.Background(), testlog, clientset, test.sourceScName, test.destScName, test.namespace) + opts := Options{SourceSCName: test.sourceScName, DestSCName: test.destScName, Namespace: test.namespace} + originalPVCs, nses, err := getPVCs(context.Background(), testlog, clientset, &opts) if !test.wantErr { req.NoError(err) } else { @@ -2769,7 +2770,7 @@ func Test_scaleDownPods(t *testing.T) { if tt.backgroundFunc != nil { go tt.backgroundFunc(testCtx, testlog, clientset) } - actualMatchingPVCs, err := scaleDownPods(testCtx, testlog, clientset, tt.matchingPVCs, time.Second/20) + err := scaleDownPods(testCtx, testlog, clientset, tt.matchingPVCs, time.Second/20) if tt.wantErr { req.Error(err) testlog.Printf("got expected error %q", err.Error()) @@ -2796,7 +2797,6 @@ func Test_scaleDownPods(t *testing.T) { req.Equal(tt.wantPods, actualPods) req.Equal(tt.wantDeployments, actualDeployments) req.Equal(tt.wantSS, actualSS) - req.Equal(tt.wantMatchingPVCs, actualMatchingPVCs) actualPVs, err := clientset.CoreV1().PersistentVolumes().List(testCtx, metav1.ListOptions{}) req.NoError(err) @@ -3487,7 +3487,15 @@ func Test_copyAllPVCs(t *testing.T) { } }(testCtx, testlog, clientset, tt.events) - err := copyAllPVCs(testCtx, testlog, clientset, "sourcesc", "destsc", "testrsyncimage", tt.matchingPVCs, false, time.Millisecond*10, nil) + options := Options{ + SourceSCName: "sourcesc", + DestSCName: "destsc", + RsyncImage: "testrsyncimage", + RsyncFlags: nil, + VerboseCopy: false, + } + + err := copyAllPVCs(testCtx, testlog, clientset, &options, tt.matchingPVCs, time.Millisecond*10) if tt.wantErr { req.Error(err) testlog.Printf("got expected error %q", err.Error())