From dbb28fab8754dffe31e7ddacb01c7c69d4964ef1 Mon Sep 17 00:00:00 2001 From: Praveenrajmani Date: Wed, 18 Oct 2023 13:29:43 +0530 Subject: [PATCH] Support moving data from one drive to another within the same node --- cmd/directpv/controller.go | 46 ++++ cmd/directpv/copy.go | 234 +++++++++++++++++++ cmd/directpv/main.go | 1 + cmd/kubectl-directpv/clean.go | 4 + cmd/kubectl-directpv/flags.go | 65 +++++- cmd/kubectl-directpv/list.go | 7 +- cmd/kubectl-directpv/list_drives.go | 5 + cmd/kubectl-directpv/list_jobs.go | 168 ++++++++++++++ cmd/kubectl-directpv/list_volumes.go | 5 + cmd/kubectl-directpv/main.go | 1 + cmd/kubectl-directpv/move.go | 149 +++++++++--- cmd/kubectl-directpv/purge.go | 49 ++++ cmd/kubectl-directpv/purge_jobs.go | 131 +++++++++++ cmd/kubectl-directpv/remove.go | 4 + cmd/kubectl-directpv/utils.go | 26 +++ docs/tools/replace.sh | 4 +- go.mod | 7 +- go.sum | 8 + pkg/apis/directpv.min.io/types/label.go | 12 + pkg/apis/directpv.min.io/types/types.go | 1 + pkg/apis/directpv.min.io/v1beta1/drive.go | 46 +++- pkg/consts/consts.go | 25 ++ pkg/consts/consts.go.in | 25 ++ pkg/controller/controller.go | 10 +- pkg/csi/controller/server.go | 4 + pkg/csi/node/publish_unpublish.go | 4 + pkg/csi/node/stage_unstage.go | 5 + pkg/drive/event.go | 6 +- pkg/installer/consts.go | 6 +- pkg/installer/daemonset.go | 68 +++--- pkg/installer/deployment.go | 12 +- pkg/installer/namespace.go | 12 +- pkg/installer/psp.go | 4 +- pkg/installer/rbac.go | 1 + pkg/installer/utils.go | 25 -- pkg/installer/vars.go | 2 +- pkg/jobs/copy.go | 147 ++++++++++++ pkg/jobs/event.go | 205 +++++++++++++++++ pkg/jobs/lister.go | 264 ++++++++++++++++++++++ pkg/k8s/k8s.go | 27 +++ pkg/volume/event.go | 3 + 41 files changed, 1690 insertions(+), 138 deletions(-) create mode 100644 cmd/directpv/copy.go create mode 100644 cmd/kubectl-directpv/list_jobs.go create mode 100644 cmd/kubectl-directpv/purge.go create mode 100644 cmd/kubectl-directpv/purge_jobs.go create mode 100644 pkg/jobs/copy.go create mode 100644 pkg/jobs/event.go create mode 100644 pkg/jobs/lister.go diff --git a/cmd/directpv/controller.go b/cmd/directpv/controller.go index e0ae6daf..98c81da5 100644 --- a/cmd/directpv/controller.go +++ b/cmd/directpv/controller.go @@ -18,12 +18,19 @@ package main import ( "context" + "os" + "time" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/minio/directpv/pkg/consts" "github.com/minio/directpv/pkg/csi/controller" pkgidentity "github.com/minio/directpv/pkg/csi/identity" + "github.com/minio/directpv/pkg/jobs" + "github.com/minio/directpv/pkg/k8s" "github.com/spf13/cobra" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog/v2" ) @@ -75,5 +82,44 @@ func startController(ctx context.Context) error { } }() + go func() { + runJobsController(ctx) + }() + return <-errCh } + +func runJobsController(ctx context.Context) { + podName := os.Getenv("HOSTNAME") + if podName == "" { + klog.V(5).Info("unable to get the pod name from env; defaulting to pod name: directpv-controller") + podName = "directpv-controller" + } + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: consts.AppName + "-jobs-controller", + Namespace: consts.AppNamespace, + }, + Client: k8s.KubeClient().CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: podName, + }, + } + // start the leader election code loop + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + ReleaseOnCancel: true, + LeaseDuration: 60 * time.Second, + RenewDeadline: 15 * time.Second, + RetryPeriod: 5 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + klog.Info("started leading") + jobs.StartController(ctx) + }, + OnStoppedLeading: func() { + klog.Infof("leader lost") + }, + }, + }) +} diff --git a/cmd/directpv/copy.go b/cmd/directpv/copy.go new file mode 100644 index 00000000..cc22b771 --- /dev/null +++ b/cmd/directpv/copy.go @@ -0,0 +1,234 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2023 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "syscall" + "time" + + "github.com/dustin/go-humanize" + "github.com/minio/directpv/pkg/client" + "github.com/minio/directpv/pkg/sys" + "github.com/minio/directpv/pkg/types" + "github.com/minio/directpv/pkg/xfs" + xfilepath "github.com/minio/filepath" + "github.com/spf13/cobra" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" +) + +var ( + volumeID string + dryRunFlag bool +) + +var copyCmd = &cobra.Command{ + Use: "copy SRC-DRIVE DEST-DRIVE --volume-id VOLUME-ID", + Short: "copy the volume data from source drive to destination drive", + Aliases: []string{"cp"}, + SilenceUsage: true, + SilenceErrors: true, + RunE: func(c *cobra.Command, args []string) error { + switch len(args) { + case 0: + return errors.New("Source and destination DRIVE-IDs should be provided") + case 1: + return errors.New("Both the source and destination DRIVE-IDs should be provided") + case 2: + default: + return errors.New("invalid syntax") + } + if volumeID == "" { + return errors.New("--volume-id should be provided") + } + if args[0] == args[1] { + return errors.New("Both the source and destination DRIVE-IDs are same") + } + + ctx := c.Context() + srcDrive, err := client.DriveClient().Get(ctx, args[0], metav1.GetOptions{ + TypeMeta: types.NewDriveTypeMeta(), + }) + if err != nil { + return err + } + destDrive, err := client.DriveClient().Get(ctx, args[1], metav1.GetOptions{ + TypeMeta: types.NewDriveTypeMeta(), + }) + if err != nil { + return err + } + volume, err := client.VolumeClient().Get(ctx, volumeID, metav1.GetOptions{ + TypeMeta: types.NewVolumeTypeMeta(), + }) + if err != nil { + return err + } + if !destDrive.VolumeExist(volumeID) { + return errors.New("volume finalizer not found on the destination drive") + } + if volume.GetNodeID() != nodeID { + return errors.New("the nodeID in the volume doesn't match") + } + if err := checkDrive(srcDrive); err != nil { + klog.ErrorS(err, "unable to check the source drive", "driveID", srcDrive.Name) + return err + } + if err := checkDrive(destDrive); err != nil { + klog.ErrorS(err, "unable to check the destination drive", "driveID", destDrive.Name) + return err + } + err = startCopy(ctx, srcDrive, destDrive, volume) + if err != nil { + klog.ErrorS(err, "unable to copy", "source", srcDrive.Name, "destination", destDrive.Name) + } + return err + }, +} + +func init() { + copyCmd.PersistentFlags().StringVar(&volumeID, "volume-id", volumeID, "volumeID of the volume to be copied") + copyCmd.PersistentFlags().BoolVar(&dryRunFlag, "dry-run", dryRunFlag, "Enable dry-run mode") +} + +func checkDrive(drive *types.Drive) error { + if drive.GetNodeID() != nodeID { + return errors.New("the nodeID in the drive doesn't match") + } + if _, err := os.Lstat(types.GetVolumeRootDir(drive.Status.FSUUID)); err != nil { + return fmt.Errorf("unable to stat the volume root directory; %v", err) + } + if _, err := sys.GetDeviceByFSUUID(drive.Status.FSUUID); err != nil { + return fmt.Errorf("unable to find device by its FSUUID; %v", err) + } + return nil +} + +func startCopy(ctx context.Context, srcDrive, destDrive *types.Drive, volume *types.Volume) error { + if dryRunFlag { + return nil + } + + sourcePath := types.GetVolumeDir(srcDrive.Status.FSUUID, volume.Name) + destPath := types.GetVolumeDir(destDrive.Status.FSUUID, volume.Name) + + if _, err := os.Lstat(sourcePath); err != nil { + return fmt.Errorf("unable to stat the sourcePath %v; %v", sourcePath, err) + } + if err := sys.Mkdir(destPath, 0o755); err != nil && !errors.Is(err, os.ErrExist) { + return fmt.Errorf("unable to create the targetPath %v; %v", destPath, err) + } + + quota := xfs.Quota{ + HardLimit: uint64(volume.Status.TotalCapacity), + SoftLimit: uint64(volume.Status.TotalCapacity), + } + if err := xfs.SetQuota(ctx, "/dev/"+string(destDrive.GetDriveName()), destPath, volume.Name, quota, false); err != nil { + return fmt.Errorf("unable to set quota on volume data path; %w", err) + } + + ctxWitCancel, cancel := context.WithCancel(ctx) + defer func() { + cancel() + printProgress(ctx, srcDrive, destDrive, volume) + }() + go func() { + logProgress(ctxWitCancel, srcDrive, destDrive, volume) + }() + + return copyData(sourcePath, destPath) +} + +func printProgress(ctx context.Context, srcDrive, destDrive *types.Drive, volume *types.Volume) { + sourceQ, err := xfs.GetQuota(ctx, "/dev/"+string(srcDrive.GetDriveName()), volume.Name) + if err != nil { + klog.ErrorS(err, "unable to get quota of the source drive", "source drive", srcDrive.GetDriveName(), "volume", volume.Name) + return + } + destQ, err := xfs.GetQuota(ctx, "/dev/"+string(destDrive.GetDriveName()), volume.Name) + if err != nil { + klog.ErrorS(err, "unable to get quota of the destination drive", "destination drive", destDrive.GetDriveName(), "volume", volume.Name) + return + } + fmt.Printf("\nCopied %v/%v", humanize.IBytes(destQ.CurrentSpace), humanize.IBytes(sourceQ.CurrentSpace)) +} + +func logProgress(ctx context.Context, srcDrive, destDrive *types.Drive, volume *types.Volume) { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + printProgress(ctx, srcDrive, destDrive, volume) + } + } +} + +func copyData(source, destination string) error { + visitFn := func(f string, fi os.FileInfo, _ error) error { + targetPath := filepath.Join(destination, strings.TrimPrefix(f, source)) + switch { + case fi.Mode()&os.ModeDir != 0: + return os.MkdirAll(targetPath, fi.Mode().Perm()) + case fi.Mode()&os.ModeType == 0: + if targetFi, err := os.Lstat(targetPath); err == nil { + if targetFi.ModTime().Equal(fi.ModTime()) && targetFi.Size() == fi.Size() { + return nil + } + } + reader, err := os.Open(f) + if err != nil { + return err + } + writer, err := os.OpenFile(targetPath, os.O_RDWR|os.O_CREATE, 0o755) + if err != nil { + return err + } + if _, err := io.CopyN(writer, reader, fi.Size()); err != nil { + return err + } + stat, ok := fi.Sys().(*syscall.Stat_t) + if !ok { + return fmt.Errorf("unable to get the stat information for %v", f) + } + if err := os.Chown(targetPath, int(stat.Uid), int(stat.Gid)); err != nil { + return fmt.Errorf("unable to set UID and GID to path %v; %v", targetPath, err) + } + if err := os.Chmod(targetPath, fi.Mode().Perm()); err != nil { + return fmt.Errorf("unable to chmod on path %v; %v", targetPath, err) + } + return os.Chtimes(targetPath, fi.ModTime(), fi.ModTime()) + case fi.Mode()&os.ModeSymlink != 0: + // ToDo: Handle symlink + return nil + default: + // unsupported modes + return nil + } + } + return xfilepath.Walk(source, visitFn) +} diff --git a/cmd/directpv/main.go b/cmd/directpv/main.go index 60902ad2..022f62df 100644 --- a/cmd/directpv/main.go +++ b/cmd/directpv/main.go @@ -128,6 +128,7 @@ func init() { mainCmd.AddCommand(legacyControllerCmd) mainCmd.AddCommand(legacyNodeServerCmd) mainCmd.AddCommand(nodeControllerCmd) + mainCmd.AddCommand(copyCmd) } func main() { diff --git a/cmd/kubectl-directpv/clean.go b/cmd/kubectl-directpv/clean.go index 4afd69d3..87ca64b8 100644 --- a/cmd/kubectl-directpv/clean.go +++ b/cmd/kubectl-directpv/clean.go @@ -23,6 +23,7 @@ import ( "os" "strings" + directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" "github.com/minio/directpv/pkg/client" "github.com/minio/directpv/pkg/consts" "github.com/minio/directpv/pkg/k8s" @@ -152,6 +153,9 @@ func cleanMain(ctx context.Context) { List(ctx) matchFunc := func(volume *types.Volume) bool { + if volume.Status.Status == directpvtypes.VolumeStatusCopying { + return false + } pv, err := k8s.KubeClient().CoreV1().PersistentVolumes().Get(ctx, volume.Name, metav1.GetOptions{}) if err != nil { if apierrors.IsNotFound(err) { diff --git a/cmd/kubectl-directpv/flags.go b/cmd/kubectl-directpv/flags.go index ffede0a2..bae1425a 100644 --- a/cmd/kubectl-directpv/flags.go +++ b/cmd/kubectl-directpv/flags.go @@ -24,6 +24,7 @@ import ( "github.com/minio/directpv/pkg/apis/directpv.min.io/types" directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" "github.com/minio/directpv/pkg/ellipsis" + "github.com/minio/directpv/pkg/jobs" "github.com/minio/directpv/pkg/utils" "github.com/spf13/cobra" ) @@ -43,6 +44,16 @@ var volumeStatusValues = []string{ strings.ToLower(string(directpvtypes.VolumeStatusReady)), } +var jobStatusValues = []string{ + string(jobs.JobStatusActive), + string(jobs.JobStatusFailed), + string(jobs.JobStatusSucceeded), +} + +var jobTypeValues = []string{ + string(jobs.JobTypeCopy), +} + var ( kubeconfig string // --kubeconfig flag quietFlag bool // --quiet flag @@ -55,7 +66,9 @@ var ( driveIDArgs []string // --drive-id flag podNameArgs []string // --pod-name flag podNSArgs []string // --pod-namespace flag - volumeStatusArgs []string // --status flag of volumes + volumeStatusArgs []string // --status flag for volumes + jobStatusArgs []string // --status flag for jobs + jobTypeArgs []string // --status flag for jobs pvcFlag bool // --pvc flag dryRunFlag bool // --dry-run flag idArgs []string // --id flag @@ -108,6 +121,14 @@ func addVolumeStatusFlag(cmd *cobra.Command, usage string) { cmd.PersistentFlags().StringSliceVar(&volumeStatusArgs, "status", volumeStatusArgs, fmt.Sprintf("%v; one of: %v", usage, strings.Join(volumeStatusValues, "|"))) } +func addJobsStatusFlag(cmd *cobra.Command, usage string) { + cmd.PersistentFlags().StringSliceVar(&jobStatusArgs, "status", jobStatusArgs, fmt.Sprintf("%v; one of: %v", usage, strings.Join(jobStatusValues, "|"))) +} + +func addJobsTypeFlag(cmd *cobra.Command, usage string) { + cmd.PersistentFlags().StringSliceVar(&jobTypeArgs, "type", jobTypeArgs, fmt.Sprintf("%v; one of: %v", usage, strings.Join(jobTypeValues, "|"))) +} + func addIDFlag(cmd *cobra.Command, usage string) { cmd.PersistentFlags().StringSliceVar(&idArgs, "ids", idArgs, usage) } @@ -136,6 +157,8 @@ var ( driveIDSelectors []directpvtypes.DriveID volumeStatusSelectors []directpvtypes.VolumeStatus labelSelectors map[directpvtypes.LabelKey]directpvtypes.LabelValue + jobStatusSelectors []jobs.JobStatus + jobTypeSelectors []jobs.JobType dryRunPrinter func(interface{}) ) @@ -243,15 +266,23 @@ func validatePodNSArgs() error { } func validateVolumeNameArgs() error { - for i := range volumeNameArgs { - volumeNameArgs[i] = strings.TrimSpace(volumeNameArgs[i]) - if volumeNameArgs[i] == "" { - return fmt.Errorf("empty volume name") + return validateNameArgs(volumeNameArgs) +} + +func validateNameArgs(args []string) error { + for i := range args { + args[i] = strings.TrimSpace(args[i]) + if args[i] == "" { + return fmt.Errorf("empty name") } } return nil } +func validateJobNameArgs() error { + return validateNameArgs(jobNameArgs) +} + func validateVolumeStatusArgs() error { for i := range volumeStatusArgs { volumeStatusArgs[i] = strings.TrimSpace(volumeStatusArgs[i]) @@ -264,6 +295,30 @@ func validateVolumeStatusArgs() error { return nil } +func validateJobStatusArgs() error { + for i := range jobStatusArgs { + jobStatusArgs[i] = strings.TrimSpace(jobStatusArgs[i]) + status, err := jobs.ToStatus(jobStatusArgs[i]) + if err != nil { + return err + } + jobStatusSelectors = append(jobStatusSelectors, status) + } + return nil +} + +func validateJobTypeArgs() error { + for i := range jobTypeArgs { + jobTypeArgs[i] = strings.ToLower(strings.TrimSpace(jobTypeArgs[i])) + jobType, err := jobs.ToType(jobTypeArgs[i]) + if err != nil { + return err + } + jobTypeSelectors = append(jobTypeSelectors, jobType) + } + return nil +} + func validateLabelArgs() error { if labelSelectors == nil { labelSelectors = make(map[directpvtypes.LabelKey]directpvtypes.LabelValue) diff --git a/cmd/kubectl-directpv/list.go b/cmd/kubectl-directpv/list.go index fe0756e2..74dacb87 100644 --- a/cmd/kubectl-directpv/list.go +++ b/cmd/kubectl-directpv/list.go @@ -27,7 +27,7 @@ import ( var listCmd = &cobra.Command{ Use: "list", - Short: "List drives and volumes", + Short: fmt.Sprintf("List %s resources", consts.AppPrettyName), PersistentPreRunE: func(cmd *cobra.Command, args []string) error { if parent := cmd.Parent(); parent != nil { parent.PersistentPreRunE(parent, args) @@ -40,12 +40,12 @@ func init() { setFlagOpts(listCmd) addNodesFlag(listCmd, "Filter output by nodes") - addDrivesFlag(listCmd, "Filter output by drive names") addOutputFormatFlag(listCmd, "Output format. One of: json|yaml|wide") listCmd.PersistentFlags().BoolVar(&noHeaders, "no-headers", noHeaders, "When using the default or custom-column output format, don't print headers (default print headers)") listCmd.AddCommand(listDrivesCmd) listCmd.AddCommand(listVolumesCmd) + listCmd.AddCommand(listJobsCmd) } func validateListCmd() error { @@ -55,9 +55,6 @@ func validateListCmd() error { if err := validateNodeArgs(); err != nil { return err } - if err := validateDriveNameArgs(); err != nil { - return err - } return validateLabelArgs() } diff --git a/cmd/kubectl-directpv/list_drives.go b/cmd/kubectl-directpv/list_drives.go index 5f6dcd9a..6cea755d 100644 --- a/cmd/kubectl-directpv/list_drives.go +++ b/cmd/kubectl-directpv/list_drives.go @@ -79,6 +79,7 @@ var listDrivesCmd = &cobra.Command{ func init() { setFlagOpts(listDrivesCmd) + addDrivesFlag(listDrivesCmd, "Filter output by drive names") addDriveStatusFlag(listDrivesCmd, "Filter output by drive status") addShowLabelsFlag(listDrivesCmd) addLabelsFlag(listDrivesCmd, "Filter output by drive labels") @@ -86,6 +87,10 @@ func init() { } func validateListDrivesArgs() error { + if err := validateDriveNameArgs(); err != nil { + return err + } + if err := validateDriveStatusArgs(); err != nil { return err } diff --git a/cmd/kubectl-directpv/list_jobs.go b/cmd/kubectl-directpv/list_jobs.go new file mode 100644 index 00000000..41afe800 --- /dev/null +++ b/cmd/kubectl-directpv/list_jobs.go @@ -0,0 +1,168 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2023 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "context" + "os" + "strings" + + "github.com/jedib0t/go-pretty/v6/table" + "github.com/minio/directpv/pkg/consts" + "github.com/minio/directpv/pkg/jobs" + "github.com/minio/directpv/pkg/utils" + "github.com/spf13/cobra" + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var jobNameArgs []string + +var listJobsCmd = &cobra.Command{ + Use: "jobs [JOB ...]", + Short: "List jobs", + SilenceUsage: true, + SilenceErrors: true, + Example: strings.ReplaceAll( + `1. List all jobs + $ kubectl {PLUGIN_NAME} list jobs + +2. List jobs by a node + $ kubectl {PLUGIN_NAME} list jobs --nodes=node1 + +3. List jobs by type + $ kubectl {PLUGIN_NAME} list jobs --type=copy + +3. List jobs filtered by labels + $ kubectl {PLUGIN_NAME} list jobs --labels type=copy`, + `{PLUGIN_NAME}`, + consts.AppName, + ), + Run: func(c *cobra.Command, args []string) { + jobNameArgs = args + if err := validateListJobsArgs(); err != nil { + utils.Eprintf(quietFlag, true, "%v\n", err) + os.Exit(-1) + } + + listJobsMain(c.Context()) + }, +} + +func init() { + setFlagOpts(listJobsCmd) + + addJobsTypeFlag(listJobsCmd, "Filter output by job type") + addJobsStatusFlag(listJobsCmd, "Filter output by job status") + addShowLabelsFlag(listJobsCmd) + addLabelsFlag(listJobsCmd, "Filter output by job labels") +} + +func validateListJobsArgs() error { + if err := validateJobNameArgs(); err != nil { + return err + } + + if err := validateJobStatusArgs(); err != nil { + return err + } + + return validateJobTypeArgs() +} + +func listJobsMain(ctx context.Context) { + jobObjects, err := jobs.NewLister(). + JobNameSelector(jobNameArgs). + NodeSelector(toLabelValues(nodesArgs)). + StatusSelector(jobStatusSelectors). + TypeSelector(jobTypeSelectors). + LabelSelector(labelSelectors). + Get(ctx) + if err != nil { + utils.Eprintf(quietFlag, true, "%v\n", err) + os.Exit(1) + } + + if dryRunPrinter != nil { + jobList := batchv1.JobList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + Items: jobObjects, + } + dryRunPrinter(jobList) + return + } + + headers := table.Row{ + "JOB", + "TYPE", + "NODE", + "STATUS", + } + if showLabels { + headers = append(headers, "LABELS") + } + writer := newTableWriter( + headers, + []table.SortBy{ + { + Name: "JOB", + Mode: table.Asc, + }, + { + Name: "NODE", + Mode: table.Asc, + }, + { + Name: "STATUS", + Mode: table.Asc, + }, + { + Name: "TYPE", + Mode: table.Asc, + }, + }, + noHeaders) + + for _, job := range jobObjects { + row := []interface{}{ + job.Name, + jobs.GetType(job), + printableString(jobs.GetNode(job)), + jobs.GetStatus(job), + } + if showLabels { + row = append(row, labelsToString(job.GetLabels())) + } + writer.AppendRow(row) + } + + if writer.Length() > 0 { + writer.Render() + return + } + + if allFlag { + utils.Eprintf(quietFlag, false, "No resources found\n") + } else { + utils.Eprintf(quietFlag, false, "No matching resources found\n") + } + + os.Exit(1) +} diff --git a/cmd/kubectl-directpv/list_volumes.go b/cmd/kubectl-directpv/list_volumes.go index b86bd694..b9478a09 100644 --- a/cmd/kubectl-directpv/list_volumes.go +++ b/cmd/kubectl-directpv/list_volumes.go @@ -87,6 +87,7 @@ var listVolumesCmd = &cobra.Command{ func init() { setFlagOpts(listVolumesCmd) + addDrivesFlag(listVolumesCmd, "Filter output by drive names") addDriveIDFlag(listVolumesCmd, "Filter output by drive IDs") addPodNameFlag(listVolumesCmd, "Filter output by pod names") addPodNSFlag(listVolumesCmd, "Filter output by pod namespaces") @@ -98,6 +99,10 @@ func init() { } func validateListVolumesArgs() error { + if err := validateDriveNameArgs(); err != nil { + return err + } + if err := validateDriveIDArgs(); err != nil { return err } diff --git a/cmd/kubectl-directpv/main.go b/cmd/kubectl-directpv/main.go index 26a47025..af3c57fe 100644 --- a/cmd/kubectl-directpv/main.go +++ b/cmd/kubectl-directpv/main.go @@ -154,6 +154,7 @@ Use "{{.CommandPath}} [command] --help" for more information about this command. mainCmd.AddCommand(suspendCmd) mainCmd.AddCommand(resumeCmd) mainCmd.AddCommand(removeCmd) + mainCmd.AddCommand(purgeCmd) mainCmd.AddCommand(uninstallCmd) mainCmd.SetHelpCommand(&cobra.Command{ Hidden: true, diff --git a/cmd/kubectl-directpv/move.go b/cmd/kubectl-directpv/move.go index a1f7eea7..f1bbb3ff 100644 --- a/cmd/kubectl-directpv/move.go +++ b/cmd/kubectl-directpv/move.go @@ -22,48 +22,67 @@ import ( "os" "strings" + "github.com/fatih/color" directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" "github.com/minio/directpv/pkg/client" "github.com/minio/directpv/pkg/consts" + "github.com/minio/directpv/pkg/jobs" "github.com/minio/directpv/pkg/types" "github.com/minio/directpv/pkg/utils" "github.com/minio/directpv/pkg/volume" "github.com/spf13/cobra" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" ) -var moveCmd = &cobra.Command{ - Use: "move SRC-DRIVE DEST-DRIVE", - Aliases: []string{"mv"}, - SilenceUsage: true, - SilenceErrors: true, - Short: "Move volumes excluding data from source drive to destination drive on a same node", - Example: strings.ReplaceAll( - `1. Move volumes from drive af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 to drive 834e8f4c-14f4-49b9-9b77-e8ac854108d5 - $ kubectl {PLUGIN_NAME} drives move af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 834e8f4c-14f4-49b9-9b77-e8ac854108d5`, - `{PLUGIN_NAME}`, - consts.AppName, - ), - Run: func(c *cobra.Command, args []string) { - if len(args) != 2 { - utils.Eprintf(quietFlag, true, "only one source and one destination drive must be provided\n") - os.Exit(-1) - } +var ( + withData, overwrite bool + moveCmd = &cobra.Command{ + Use: "move SRC-DRIVE DEST-DRIVE", + Aliases: []string{"mv"}, + SilenceUsage: true, + SilenceErrors: true, + Short: "Move volumes excluding data from source drive to destination drive on a same node", + Example: strings.ReplaceAll( + `1. Move volumes from drive af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 to drive 834e8f4c-14f4-49b9-9b77-e8ac854108d5 + $ kubectl {PLUGIN_NAME} drives move af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 834e8f4c-14f4-49b9-9b77-e8ac854108d5 - src := strings.TrimSpace(args[0]) - if src == "" { - utils.Eprintf(quietFlag, true, "empty source drive\n") - os.Exit(-1) - } +2. Move volumes from drive af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 to drive 834e8f4c-14f4-49b9-9b77-e8ac854108d5 with data +$ kubectl {PLUGIN_NAME} drives move af3b8b4c-73b4-4a74-84b7-1ec30492a6f0 834e8f4c-14f4-49b9-9b77-e8ac854108d5 --with-data`, + `{PLUGIN_NAME}`, + consts.AppName, + ), + Run: func(c *cobra.Command, args []string) { + if len(args) != 2 { + utils.Eprintf(quietFlag, true, "only one source and one destination drive must be provided\n") + os.Exit(-1) + } - dest := strings.TrimSpace(args[1]) - if dest == "" { - utils.Eprintf(quietFlag, true, "empty destination drive\n") - os.Exit(-1) - } + src := strings.TrimSpace(args[0]) + if src == "" { + utils.Eprintf(quietFlag, true, "empty source drive\n") + os.Exit(-1) + } + + dest := strings.TrimSpace(args[1]) + if dest == "" { + utils.Eprintf(quietFlag, true, "empty destination drive\n") + os.Exit(-1) + } + + if overwrite && !withData { + utils.Eprintf(quietFlag, true, "'--overwrite' flag must be set only when '--with-data' flag is set") + } + + moveMain(c.Context(), src, dest) + }, + } +) - moveMain(c.Context(), src, dest) - }, +func init() { + moveCmd.PersistentFlags().BoolVar(&withData, "with-data", withData, "move the volume with data") + moveCmd.PersistentFlags().BoolVar(&overwrite, "overwrite", overwrite, "overwrite any duplicate volume copy job if present") } func moveMain(ctx context.Context, src, dest string) { @@ -133,7 +152,10 @@ func moveMain(ctx context.Context, src, dest string) { os.Exit(1) } - if destDrive.Status.Status != directpvtypes.DriveStatusReady { + switch { + case destDrive.Status.Status == directpvtypes.DriveStatusReady: + case destDrive.Status.Status == directpvtypes.DriveStatusMoving && withData: + default: utils.Eprintf(quietFlag, true, "destination drive is not in ready state\n") os.Exit(1) } @@ -160,12 +182,51 @@ func moveMain(ctx context.Context, src, dest string) { os.Exit(1) } + var jobParams jobs.ContainerParams + if withData { + jobParams.Image, jobParams.ImagePullSecrets, jobParams.Tolerations, err = getContainerParams(ctx) + if err != nil { + utils.Eprintf(quietFlag, true, "unable to get container params; %v", err) + os.Exit(1) + } + srcDrive.AddCopyProtectionFinalizer() + } + + var processed bool for _, volume := range volumes { + if withData { + if err := jobs.CreateCopyJob(ctx, jobs.CopyOpts{ + SourceDriveID: srcDrive.GetDriveID(), + DestinationDriveID: destDrive.GetDriveID(), + VolumeID: volume.Name, + NodeID: srcDrive.GetNodeID(), + }, jobParams, overwrite); err != nil { + if apierrors.IsAlreadyExists(err) && !overwrite { + utils.Eprintf(quietFlag, false, "duplicate job found for %v; Please use `--overwrite` for this volume to be moved", volume.Name) + } + utils.Eprintf(quietFlag, true, "unable to create copy job for volume %v; %v", volume.Name, err) + continue + } + if err := setVolumeStatusCopying(ctx, volume.Name); err != nil { + utils.Eprintf(quietFlag, true, "unable to set volume status moving; %v", err) + os.Exit(1) + } + } + processed = true + if !quietFlag { + fmt.Println("Moving volume ", volume.Name) + } if destDrive.AddVolumeFinalizer(volume.Name) { destDrive.Status.FreeCapacity -= volume.Status.TotalCapacity destDrive.Status.AllocatedCapacity += volume.Status.TotalCapacity } + srcDrive.RemoveVolumeFinalizer(volume.Name) } + + if !processed { + os.Exit(1) + } + destDrive.Status.Status = directpvtypes.DriveStatusMoving _, err = client.DriveClient().Update( ctx, destDrive, metav1.UpdateOptions{TypeMeta: types.NewDriveTypeMeta()}, @@ -175,13 +236,6 @@ func moveMain(ctx context.Context, src, dest string) { os.Exit(1) } - for _, volume := range volumes { - if !quietFlag { - fmt.Println("Moving volume", volume.Name) - } - } - - srcDrive.ResetFinalizers() _, err = client.DriveClient().Update( ctx, srcDrive, metav1.UpdateOptions{TypeMeta: types.NewDriveTypeMeta()}, ) @@ -189,4 +243,25 @@ func moveMain(ctx context.Context, src, dest string) { utils.Eprintf(quietFlag, true, "unable to remove volume references in source drive; %v\n", err) os.Exit(1) } + + if withData && !quietFlag { + color.HiGreen("Jobs created successfully to copy the volume data. Please uncordon the node to get the jobs scheduled") + } +} + +func setVolumeStatusCopying(ctx context.Context, volumeName string) error { + updateFunc := func() error { + volume, err := client.VolumeClient().Get(ctx, volumeName, metav1.GetOptions{ + TypeMeta: types.NewVolumeTypeMeta(), + }) + if err != nil { + return err + } + volume.Status.Status = directpvtypes.VolumeStatusCopying + _, err = client.VolumeClient().Update(ctx, volume, metav1.UpdateOptions{ + TypeMeta: types.NewVolumeTypeMeta(), + }) + return err + } + return retry.RetryOnConflict(retry.DefaultRetry, updateFunc) } diff --git a/cmd/kubectl-directpv/purge.go b/cmd/kubectl-directpv/purge.go new file mode 100644 index 00000000..ad369378 --- /dev/null +++ b/cmd/kubectl-directpv/purge.go @@ -0,0 +1,49 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2023 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "fmt" + + "github.com/minio/directpv/pkg/consts" + "github.com/spf13/cobra" +) + +var purgeCmd = &cobra.Command{ + Use: "purge", + Short: fmt.Sprintf("purge %v resources", consts.AppPrettyName), + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + if parent := cmd.Parent(); parent != nil { + parent.PersistentPreRunE(parent, args) + } + return validatePurgeCmd() + }, +} + +func init() { + setFlagOpts(purgeCmd) + + addNodesFlag(purgeCmd, "If present, filter objects from given nodes") + addAllFlag(purgeCmd, "If present, select all objects") + addDryRunFlag(purgeCmd, "Run in dry run mode") + + purgeCmd.AddCommand(purgeJobsCmd) +} + +func validatePurgeCmd() error { + return validateNodeArgs() +} diff --git a/cmd/kubectl-directpv/purge_jobs.go b/cmd/kubectl-directpv/purge_jobs.go new file mode 100644 index 00000000..470990ae --- /dev/null +++ b/cmd/kubectl-directpv/purge_jobs.go @@ -0,0 +1,131 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2023 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "context" + "errors" + "fmt" + "os" + "strings" + + "github.com/minio/directpv/pkg/consts" + "github.com/minio/directpv/pkg/jobs" + "github.com/minio/directpv/pkg/k8s" + "github.com/minio/directpv/pkg/utils" + "github.com/spf13/cobra" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var purgeJobsCmd = &cobra.Command{ + Use: "jobs [JOB ...]", + Short: "Purge jobs", + SilenceUsage: true, + SilenceErrors: true, + Example: strings.ReplaceAll( + `1. Purge all jobs + $ kubectl {PLUGIN_NAME} purge jobs --all + +2. Purge jobs from a node + $ kubectl {PLUGIN_NAME} purge jobs --nodes=node1 + +3. Purge jobs by type + $ kubectl {PLUGIN_NAME} purge jobs --type=copy + +3. Purge jobs filtered by labels + $ kubectl {PLUGIN_NAME} purge jobs --labels type=copy`, + `{PLUGIN_NAME}`, + consts.AppName, + ), + Run: func(c *cobra.Command, args []string) { + jobNameArgs = args + if err := validatePurgeJobsArgs(); err != nil { + utils.Eprintf(quietFlag, true, "%v\n", err) + os.Exit(-1) + } + + purgeJobsMain(c.Context()) + }, +} + +func init() { + setFlagOpts(purgeJobsCmd) + + addJobsTypeFlag(purgeJobsCmd, "Filter output by job type") + addJobsStatusFlag(purgeJobsCmd, "Filter output by job status") + addLabelsFlag(purgeJobsCmd, "Filter output by job labels") + addDangerousFlag(purgeJobsCmd, "Set dangerous flag to forcefully purge active jobs") +} + +func validatePurgeJobsArgs() error { + if err := validateListJobsArgs(); err != nil { + return err + } + if err := validateLabelArgs(); err != nil { + return err + } + switch { + case allFlag: + case len(nodesArgs) != 0: + case len(jobNameArgs) != 0: + case len(jobStatusArgs) != 0: + case len(jobTypeArgs) != 0: + case len(labelArgs) != 0: + default: + return errors.New("no jobs selected to purge") + } + if allFlag { + nodesArgs = nil + jobNameArgs = nil + jobStatusSelectors = nil + jobTypeSelectors = nil + labelSelectors = nil + } + return nil +} + +func purgeJobsMain(ctx context.Context) { + resultCh := jobs.NewLister(). + JobNameSelector(jobNameArgs). + NodeSelector(toLabelValues(nodesArgs)). + StatusSelector(jobStatusSelectors). + TypeSelector(jobTypeSelectors). + LabelSelector(labelSelectors). + List(ctx) + for result := range resultCh { + if result.Err != nil { + utils.Eprintf(quietFlag, true, "%v\n", result.Err) + os.Exit(1) + } + switch jobs.GetStatus(result.Job) { + case jobs.JobStatusActive: + if !dangerousFlag { + utils.Eprintf(quietFlag, true, "Purging the active job may lead to partial data; Please use `--dangerous` to purge the job %v", result.Job.Name) + continue + } + case jobs.JobStatusSucceeded, jobs.JobStatusFailed: + } + if !dryRunFlag { + if err := k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).Delete(ctx, result.Job.Name, metav1.DeleteOptions{}); err != nil { + utils.Eprintf(quietFlag, true, "unable to delete job %v: %v\n", result.Job.Name, err) + } + } + if !quietFlag { + fmt.Printf("Job '%s' purged successfully \n", result.Job.Name) + } + } +} diff --git a/cmd/kubectl-directpv/remove.go b/cmd/kubectl-directpv/remove.go index 39c9d69e..0c54139d 100644 --- a/cmd/kubectl-directpv/remove.go +++ b/cmd/kubectl-directpv/remove.go @@ -134,6 +134,10 @@ func removeMain(ctx context.Context) { os.Exit(1) } + if result.Drive.IsCopyProtected() { + continue + } + processed = true switch result.Drive.Status.Status { case directpvtypes.DriveStatusRemoved: diff --git a/cmd/kubectl-directpv/utils.go b/cmd/kubectl-directpv/utils.go index d5363dbd..5cfa5769 100644 --- a/cmd/kubectl-directpv/utils.go +++ b/cmd/kubectl-directpv/utils.go @@ -31,8 +31,10 @@ import ( "github.com/minio/directpv/pkg/k8s" "github.com/minio/directpv/pkg/utils" "github.com/mitchellh/go-homedir" + corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" storagev1beta1 "k8s.io/api/storage/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" @@ -186,3 +188,27 @@ func getCSINodes(ctx context.Context) (nodes []string, err error) { return nodes, err } + +func getContainerParams(ctx context.Context) (string, []corev1.LocalObjectReference, []corev1.Toleration, error) { + daemonSet, err := k8s.KubeClient().AppsV1().DaemonSets(consts.AppName).Get( + ctx, consts.NodeServerName, metav1.GetOptions{}, + ) + + if err != nil && !apierrors.IsNotFound(err) { + return "", nil, nil, err + } + + if daemonSet == nil || daemonSet.UID == "" { + return "", nil, nil, fmt.Errorf("invalid daemonset found") + } + + var containerImage string + for _, container := range daemonSet.Spec.Template.Spec.Containers { + if container.Name == consts.NodeServerName { + containerImage = container.Image + break + } + } + + return containerImage, daemonSet.Spec.Template.Spec.ImagePullSecrets, daemonSet.Spec.Template.Spec.Tolerations, nil +} diff --git a/docs/tools/replace.sh b/docs/tools/replace.sh index 8fe117b4..2527e0e7 100755 --- a/docs/tools/replace.sh +++ b/docs/tools/replace.sh @@ -126,6 +126,8 @@ function main() { dest_drive="${2#/dev/}" node="${3}" + shift 3 + if [ "${src_drive}" == "${dest_drive}" ]; then echo "the source and destination drives are same" exit 255 @@ -201,7 +203,7 @@ function main() { fi # Run move command - kubectl directpv move "${src_drive_id}" "${dest_drive_id}" + kubectl directpv move "${src_drive_id}" "${dest_drive_id}" "$@" # Uncordon destination drive kubectl directpv uncordon "${dest_drive_id}" diff --git a/go.mod b/go.mod index 133accff..0de12a41 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/viper v1.16.0 golang.org/x/time v0.3.0 - google.golang.org/grpc v1.58.2 + google.golang.org/grpc v1.59.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.28.2 k8s.io/apiextensions-apiserver v0.28.2 @@ -66,6 +66,7 @@ require ( github.com/mattn/go-localereader v0.0.1 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/minio/filepath v1.0.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect @@ -85,13 +86,13 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.6.0 // indirect golang.org/x/net v0.17.0 // indirect - golang.org/x/oauth2 v0.10.0 // indirect + golang.org/x/oauth2 v0.11.0 // indirect golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.13.0 // indirect golang.org/x/term v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect google.golang.org/appengine v1.6.8 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index a7b1cfbf..483eb762 100644 --- a/go.sum +++ b/go.sum @@ -231,6 +231,8 @@ github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZ github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/minio/filepath v1.0.0 h1:fvkJu1+6X+ECRA6G3+JJETj4QeAYO9sV43I79H8ubDY= +github.com/minio/filepath v1.0.0/go.mod h1:/nRZA2ldl5z6jT9/KQuvZcQlxZIMQoFFQPvEXx9T/Bw= github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= @@ -409,6 +411,8 @@ golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.10.0 h1:zHCpF2Khkwy4mMB4bv0U37YtJdTGW8jI0glAApi0Kh8= golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI= +golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU= +golang.org/x/oauth2 v0.11.0/go.mod h1:LdF7O/8bLR/qWK9DrpXmbHLTouvRHK0SgJl0GmDBchk= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -608,6 +612,8 @@ google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U= google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -626,6 +632,8 @@ google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA5 google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.58.2 h1:SXUpjxeVF3FKrTYQI4f4KvbGD5u2xccdYdurwowix5I= google.golang.org/grpc v1.58.2/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/pkg/apis/directpv.min.io/types/label.go b/pkg/apis/directpv.min.io/types/label.go index c4388b5f..14538523 100644 --- a/pkg/apis/directpv.min.io/types/label.go +++ b/pkg/apis/directpv.min.io/types/label.go @@ -88,6 +88,18 @@ const ( // ClaimIDLabelKey label key to denote the claim id of the volumes ClaimIDLabelKey LabelKey = consts.GroupName + "/claim-id" + + // JobTypeLabelKey denotes the type of the job + JobTypeLabelKey LabelKey = consts.GroupName + "/job-type" + + // SourceDriveLabelKey denotes the source drive id + SourceDriveLabelKey LabelKey = consts.GroupName + "/source-drive" + + // DestinationDriveLabelKey denotes the destination drive id + DestinationDriveLabelKey LabelKey = consts.GroupName + "/destination-drive" + + // VolumeLabelKey denotes the volume name + VolumeLabelKey LabelKey = consts.GroupName + "/volume" ) // LabelValue is a type definition for label value diff --git a/pkg/apis/directpv.min.io/types/types.go b/pkg/apis/directpv.min.io/types/types.go index 5c1d06f1..751df127 100644 --- a/pkg/apis/directpv.min.io/types/types.go +++ b/pkg/apis/directpv.min.io/types/types.go @@ -69,6 +69,7 @@ type VolumeStatus string const ( VolumeStatusPending VolumeStatus = "Pending" VolumeStatusReady VolumeStatus = "Ready" + VolumeStatusCopying VolumeStatus = "Copying" ) // ToVolumeStatus converts string value to VolumeStatus. diff --git a/pkg/apis/directpv.min.io/v1beta1/drive.go b/pkg/apis/directpv.min.io/v1beta1/drive.go index ab0cc753..bd32dc9d 100644 --- a/pkg/apis/directpv.min.io/v1beta1/drive.go +++ b/pkg/apis/directpv.min.io/v1beta1/drive.go @@ -122,8 +122,13 @@ func (drive DirectPVDrive) GetDriveID() types.DriveID { } // GetVolumeCount returns number of volumes on this drive. -func (drive DirectPVDrive) GetVolumeCount() int { - return len(drive.Finalizers) - 1 +func (drive DirectPVDrive) GetVolumeCount() (count int) { + for _, finalizer := range drive.Finalizers { + if strings.HasPrefix(finalizer, driveFinalizerVolumePrefix) { + count++ + } + } + return } // VolumeExist returns whether given volume is on this drive or not. @@ -157,20 +162,49 @@ func (drive *DirectPVDrive) RemoveFinalizers() bool { // AddVolumeFinalizer adds volume to this drive's finalizer. func (drive *DirectPVDrive) AddVolumeFinalizer(volume string) (added bool) { - value := driveFinalizerVolumePrefix + volume + return drive.addFinalizer(driveFinalizerVolumePrefix + volume) +} + +func (drive *DirectPVDrive) addFinalizer(value string) (added bool) { for _, finalizer := range drive.Finalizers { if finalizer == value { return false } } - drive.Finalizers = append(drive.Finalizers, value) return true } +// AddCopyProtectionFinalizer adds a finalizer to protect drive deletions +// while the drive is being copied. +func (drive *DirectPVDrive) AddCopyProtectionFinalizer() (added bool) { + return drive.addFinalizer(consts.CopyProtectionFinalizer) +} + +// IsCopyProtected checks if copy protection finalizer is +// present on the drive. +func (drive *DirectPVDrive) IsCopyProtected() (added bool) { + value := consts.CopyProtectionFinalizer + for _, finalizer := range drive.Finalizers { + if finalizer == value { + return true + } + } + return false +} + +// RemoveCopyProtectionFinalizer removes the copy protection finalizer +// present on the drive +func (drive *DirectPVDrive) RemoveCopyProtectionFinalizer() (found bool) { + return drive.removeFinalizer(consts.CopyProtectionFinalizer) +} + // RemoveVolumeFinalizer remove volume from this drive's finalizer. func (drive *DirectPVDrive) RemoveVolumeFinalizer(volume string) (found bool) { - value := driveFinalizerVolumePrefix + volume + return drive.removeFinalizer(driveFinalizerVolumePrefix + volume) +} + +func (drive *DirectPVDrive) removeFinalizer(value string) (found bool) { finalizers := []string{} for _, finalizer := range drive.Finalizers { if finalizer == value { @@ -179,11 +213,9 @@ func (drive *DirectPVDrive) RemoveVolumeFinalizer(volume string) (found bool) { finalizers = append(finalizers, finalizer) } } - if found { drive.Finalizers = finalizers } - return } diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 9bee57e0..6f2f5013 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -22,6 +22,9 @@ const ( // AppName denotes application/library/plugin/tool name AppName = "directpv" + // AppNamespace denotes the namespace + AppNamespace = "directpv" + // AppPrettyName denotes application/library/plugin/tool pretty name AppPrettyName = "DirectPV" @@ -97,4 +100,26 @@ const ( // TmpFS mount TmpMountDir = AppRootDir + "/tmp" + + // LegacyAppRootDir is legacy application root directory. + LegacyAppRootDir = "/var/lib/direct-csi" + + AppRootDirVolumeName = AppName + "-common-root" + AppRootDirVolumePath = AppRootDir + "/" + + LegacyAppRootDirVolumeName = "direct-csi-common-root" + LegacyAppRootDirVolumePath = LegacyAppRootDir + "/" + + SysDirVolumeName = "sysfs" + SysDirVolumePath = "/sys" + + DevDirVolumeName = "devfs" + DevDirVolumePath = "/dev" + + RunUdevDataVolumeName = "run-udev-data-dir" + RunUdevDataVolumePath = UdevDataDir + + KubeNodeNameEnvVarName = "KUBE_NODE_NAME" + + CopyProtectionFinalizer = GroupName + "/copy-protection" ) diff --git a/pkg/consts/consts.go.in b/pkg/consts/consts.go.in index bf50b159..f7095263 100644 --- a/pkg/consts/consts.go.in +++ b/pkg/consts/consts.go.in @@ -20,6 +20,9 @@ const ( // AppName denotes application/library/plugin/tool name AppName = "directpv" + // AppNamespace denotes the namespace + AppNamespace = "directpv" + // AppPrettyName denotes application/library/plugin/tool pretty name AppPrettyName = "DirectPV" @@ -95,4 +98,26 @@ const ( // TmpFS mount TmpMountDir = AppRootDir + "/tmp" + + // LegacyAppRootDir is legacy application root directory. + LegacyAppRootDir = "/var/lib/direct-csi" + + AppRootDirVolumeName = AppName + "-common-root" + AppRootDirVolumePath = AppRootDir + "/" + + LegacyAppRootDirVolumeName = "direct-csi-common-root" + LegacyAppRootDirVolumePath = LegacyAppRootDir + "/" + + SysDirVolumeName = "sysfs" + SysDirVolumePath = "/sys" + + DevDirVolumeName = "devfs" + DevDirVolumePath = "/dev" + + RunUdevDataVolumeName = "run-udev-data-dir" + RunUdevDataVolumePath = UdevDataDir + + KubeNodeNameEnvVarName = "KUBE_NODE_NAME" + + CopyProtectionFinalizer = GroupName + "/copy-protection" ) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index f15722fb..55298b25 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -79,7 +79,15 @@ func New(name string, handler EventHandler, workers int, resyncPeriod time.Durat handler.ListerWatcher(), handler.ObjectType(), resyncPeriod, - cache.Indexers{}, + cache.Indexers{ + "objectname": func(obj interface{}) ([]string, error) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + return nil, err + } + return []string{key}, nil + }, + }, ) queue := workqueue.NewRateLimitingQueue( diff --git a/pkg/csi/controller/server.go b/pkg/csi/controller/server.go index 12bfd7bb..f710e013 100644 --- a/pkg/csi/controller/server.go +++ b/pkg/csi/controller/server.go @@ -270,6 +270,10 @@ func (c *Server) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) return nil, status.Errorf(codes.Internal, "unable to get volume %v; %v", volumeID, err) } + if volume.Status.Status == directpvtypes.VolumeStatusCopying { + return nil, status.Errorf(codes.FailedPrecondition, "volume %s is busy copying the data", volumeID) + } + if volume.IsStaged() || volume.IsPublished() { return nil, status.Errorf(codes.FailedPrecondition, "volume %v is not yet unstaged for deletion", volumeID) } diff --git a/pkg/csi/node/publish_unpublish.go b/pkg/csi/node/publish_unpublish.go index 46b55dea..d89a687f 100644 --- a/pkg/csi/node/publish_unpublish.go +++ b/pkg/csi/node/publish_unpublish.go @@ -120,6 +120,10 @@ func (server *Server) NodePublishVolume(ctx context.Context, req *csi.NodePublis return nil, status.Errorf(codes.FailedPrecondition, "volume %v is not yet staged, but requested with %v", volume.Name, req.GetStagingTargetPath()) } + if volume.Status.Status == directpvtypes.VolumeStatusCopying { + return nil, status.Error(codes.FailedPrecondition, "volume is busy; copying the data from the source") + } + if err := server.publishVolume(req, isSuspended); err != nil { klog.Errorf("unable to publish volume %s; %v", volume.Name, err) return nil, status.Errorf(codes.Internal, "unable to publish volume; %v", err) diff --git a/pkg/csi/node/stage_unstage.go b/pkg/csi/node/stage_unstage.go index 059945ac..1da57686 100644 --- a/pkg/csi/node/stage_unstage.go +++ b/pkg/csi/node/stage_unstage.go @@ -20,6 +20,7 @@ import ( "context" "github.com/container-storage-interface/spec/lib/go/csi" + directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" "github.com/minio/directpv/pkg/client" "github.com/minio/directpv/pkg/drive" "github.com/minio/directpv/pkg/types" @@ -59,6 +60,10 @@ func (server *Server) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return &csi.NodeStageVolumeResponse{}, nil } + if volume.Status.Status == directpvtypes.VolumeStatusCopying { + return nil, status.Error(codes.FailedPrecondition, "volume is busy; copying the data from the source") + } + code, err := drive.StageVolume( ctx, volume, diff --git a/pkg/drive/event.go b/pkg/drive/event.go index 6c106346..d903a622 100644 --- a/pkg/drive/event.go +++ b/pkg/drive/event.go @@ -140,7 +140,9 @@ func StageVolume( volume.Status.DataPath = volumeDir volume.Status.StagingTargetPath = stagingTargetPath - volume.Status.Status = directpvtypes.VolumeStatusReady + if volume.Status.Status != directpvtypes.VolumeStatusCopying { + volume.Status.Status = directpvtypes.VolumeStatusReady + } if _, err := client.VolumeClient().Update(ctx, volume, metav1.UpdateOptions{ TypeMeta: types.NewVolumeTypeMeta(), }); err != nil { @@ -316,7 +318,7 @@ func (handler *driveEventHandler) move(ctx context.Context, drive *types.Drive) "stagingTargetPath", volume.Status.StagingTargetPath, ) } - } else { + } else if volume.Status.Status != directpvtypes.VolumeStatusCopying { volume.Status.Status = directpvtypes.VolumeStatusPending } diff --git a/pkg/installer/consts.go b/pkg/installer/consts.go index 31a18369..12a285af 100644 --- a/pkg/installer/consts.go +++ b/pkg/installer/consts.go @@ -25,11 +25,9 @@ const ( namespace = consts.AppName healthZContainerPortName = "healthz" healthZContainerPort = 9898 - volumePathSysDir = "/sys" - volumeNameSocketDir = "socket-dir" - socketDir = "/csi" + csiDirVolumeName = "socket-dir" + csiDirVolumePath = "/csi" selectorKey = "selector." + consts.GroupName - kubeNodeNameEnvVarName = "KUBE_NODE_NAME" csiEndpointEnvVarName = "CSI_ENDPOINT" pluginName = "kubectl-" + consts.AppName selectorValueEnabled = "enabled" diff --git a/pkg/installer/daemonset.go b/pkg/installer/daemonset.go index 8e067711..1deba0d3 100644 --- a/pkg/installer/daemonset.go +++ b/pkg/installer/daemonset.go @@ -31,20 +31,11 @@ import ( ) const ( - volumeNameMountpointDir = "mountpoint-dir" - volumeNameRegistrationDir = "registration-dir" - volumeNamePluginDir = "plugins-dir" - volumeNameAppRootDir = consts.AppName + "-common-root" - volumeNameLegacyAppRootDir = "direct-csi-common-root" - appRootDir = consts.AppRootDir + "/" - legacyAppRootDir = "/var/lib/direct-csi/" - volumeNameSysDir = "sysfs" - volumeNameDevDir = "devfs" - volumePathDevDir = "/dev" - volumeNameRunUdevData = "run-udev-data-dir" - volumePathRunUdevData = consts.UdevDataDir - socketFile = "/csi.sock" - totalDaemonsetSteps = 2 + kubeletPodsDirVolumeName = "mountpoint-dir" + registrationDirVolumeName = "registration-dir" + kubeletPluginsDirVolumeName = "plugins-dir" + socketFile = "/csi.sock" + totalDaemonsetSteps = 2 ) type daemonsetTask struct{} @@ -94,25 +85,25 @@ func newSecurityContext(seccompProfile string) *corev1.SecurityContext { func getVolumesAndMounts(pluginSocketDir string) (volumes []corev1.Volume, volumeMounts []corev1.VolumeMount) { volumes = []corev1.Volume{ - newHostPathVolume(volumeNameSocketDir, pluginSocketDir), - newHostPathVolume(volumeNameMountpointDir, kubeletDirPath+"/pods"), - newHostPathVolume(volumeNameRegistrationDir, kubeletDirPath+"/plugins_registry"), - newHostPathVolume(volumeNamePluginDir, kubeletDirPath+"/plugins"), - newHostPathVolume(volumeNameAppRootDir, appRootDir), - newHostPathVolume(volumeNameSysDir, volumePathSysDir), - newHostPathVolume(volumeNameDevDir, volumePathDevDir), - newHostPathVolume(volumeNameRunUdevData, volumePathRunUdevData), - newHostPathVolume(volumeNameLegacyAppRootDir, legacyAppRootDir), + k8s.NewHostPathVolume(csiDirVolumeName, pluginSocketDir), + k8s.NewHostPathVolume(kubeletPodsDirVolumeName, kubeletDirPath+"/pods"), + k8s.NewHostPathVolume(registrationDirVolumeName, kubeletDirPath+"/plugins_registry"), + k8s.NewHostPathVolume(kubeletPluginsDirVolumeName, kubeletDirPath+"/plugins"), + k8s.NewHostPathVolume(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath), + k8s.NewHostPathVolume(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath), + k8s.NewHostPathVolume(consts.SysDirVolumeName, consts.SysDirVolumePath), + k8s.NewHostPathVolume(consts.DevDirVolumeName, consts.DevDirVolumePath), + k8s.NewHostPathVolume(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath), } volumeMounts = []corev1.VolumeMount{ - newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), - newVolumeMount(volumeNameMountpointDir, kubeletDirPath+"/pods", corev1.MountPropagationBidirectional, false), - newVolumeMount(volumeNamePluginDir, kubeletDirPath+"/plugins", corev1.MountPropagationBidirectional, false), - newVolumeMount(volumeNameAppRootDir, appRootDir, corev1.MountPropagationBidirectional, false), - newVolumeMount(volumeNameSysDir, volumePathSysDir, corev1.MountPropagationBidirectional, false), - newVolumeMount(volumeNameDevDir, volumePathDevDir, corev1.MountPropagationHostToContainer, true), - newVolumeMount(volumeNameRunUdevData, volumePathRunUdevData, corev1.MountPropagationBidirectional, true), - newVolumeMount(volumeNameLegacyAppRootDir, legacyAppRootDir, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false), + k8s.NewVolumeMount(kubeletPodsDirVolumeName, kubeletDirPath+"/pods", corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(kubeletPluginsDirVolumeName, kubeletDirPath+"/plugins", corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.SysDirVolumeName, consts.SysDirVolumePath, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.DevDirVolumeName, consts.DevDirVolumePath, corev1.MountPropagationHostToContainer, true), + k8s.NewVolumeMount(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath, corev1.MountPropagationBidirectional, true), } return @@ -129,8 +120,8 @@ func nodeDriverRegistrarContainer(image, pluginSocketDir string) corev1.Containe }, Env: []corev1.EnvVar{kubeNodeNameEnvVar}, VolumeMounts: []corev1.VolumeMount{ - newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), - newVolumeMount(volumeNameRegistrationDir, "/registration", corev1.MountPropagationNone, false), + k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false), + k8s.NewVolumeMount(registrationDirVolumeName, "/registration", corev1.MountPropagationNone, false), }, TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, TerminationMessagePath: "/var/log/driver-registrar-termination-log", @@ -192,13 +183,13 @@ func livenessProbeContainer(image string) corev1.Container { Name: "liveness-probe", Image: image, Args: []string{ - fmt.Sprintf("--csi-address=%v%v", socketDir, socketFile), + fmt.Sprintf("--csi-address=%v%v", csiDirVolumePath, socketFile), fmt.Sprintf("--health-port=%v", healthZContainerPort), }, TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, TerminationMessagePath: "/var/log/driver-liveness-termination-log", VolumeMounts: []corev1.VolumeMount{ - newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), + k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false), }, } } @@ -254,14 +245,15 @@ func doCreateDaemonset(ctx context.Context, args *Args) (err error) { fmt.Sprintf("-v=%d", logLevel), fmt.Sprintf("--identity=%s", consts.Identity), fmt.Sprintf("--csi-endpoint=$(%s)", csiEndpointEnvVarName), - fmt.Sprintf("--kube-node-name=$(%s)", kubeNodeNameEnvVarName), + fmt.Sprintf("--kube-node-name=$(%s)", consts.KubeNodeNameEnvVarName), fmt.Sprintf("--readiness-port=%d", consts.ReadinessPort), fmt.Sprintf("--metrics-port=%d", consts.MetricsPort), } + nodeNameEnvVarName := "KUBE_NODE_NAME" nodeControllerArgs := []string{ consts.NodeControllerName, fmt.Sprintf("-v=%d", logLevel), - fmt.Sprintf("--kube-node-name=$(%s)", kubeNodeNameEnvVarName), + fmt.Sprintf("--kube-node-name=$(%s)", nodeNameEnvVarName), } podSpec := corev1.PodSpec{ @@ -324,7 +316,7 @@ func doCreateLegacyDaemonset(ctx context.Context, args *Args) (err error) { consts.LegacyNodeServerName, fmt.Sprintf("-v=%d", logLevel), fmt.Sprintf("--csi-endpoint=$(%s)", csiEndpointEnvVarName), - fmt.Sprintf("--kube-node-name=$(%s)", kubeNodeNameEnvVarName), + fmt.Sprintf("--kube-node-name=$(%s)", consts.KubeNodeNameEnvVarName), fmt.Sprintf("--readiness-port=%d", consts.ReadinessPort), } diff --git a/pkg/installer/deployment.go b/pkg/installer/deployment.go index af1b8aea..a57ede52 100644 --- a/pkg/installer/deployment.go +++ b/pkg/installer/deployment.go @@ -84,7 +84,7 @@ func doCreateDeployment(ctx context.Context, args *Args, legacy bool, step int) []string{ fmt.Sprintf("-v=%d", logLevel), fmt.Sprintf("--csi-endpoint=$(%s)", csiEndpointEnvVarName), - fmt.Sprintf("--kube-node-name=$(%s)", kubeNodeNameEnvVarName), + fmt.Sprintf("--kube-node-name=$(%s)", consts.KubeNodeNameEnvVarName), fmt.Sprintf("--readiness-port=%d", consts.ReadinessPort), }..., ) @@ -93,8 +93,8 @@ func doCreateDeployment(ctx context.Context, args *Args, legacy bool, step int) podSpec := corev1.PodSpec{ ServiceAccountName: consts.Identity, Volumes: []corev1.Volume{ - newHostPathVolume( - volumeNameSocketDir, + k8s.NewHostPathVolume( + csiDirVolumeName, newPluginsSocketDir(kubeletDirPath, fmt.Sprintf("%s-controller", consts.ControllerServerName)), ), }, @@ -113,7 +113,7 @@ func doCreateDeployment(ctx context.Context, args *Args, legacy bool, step int) }, Env: []corev1.EnvVar{csiEndpointEnvVar}, VolumeMounts: []corev1.VolumeMount{ - newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), + k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false), }, TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, TerminationMessagePath: "/var/log/controller-provisioner-termination-log", @@ -145,7 +145,7 @@ func doCreateDeployment(ctx context.Context, args *Args, legacy bool, step int) }, Env: []corev1.EnvVar{csiEndpointEnvVar}, VolumeMounts: []corev1.VolumeMount{ - newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), + k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false), }, TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, TerminationMessagePath: "/var/log/controller-csi-resizer-termination-log", @@ -170,7 +170,7 @@ func doCreateDeployment(ctx context.Context, args *Args, legacy bool, step int) }, Env: []corev1.EnvVar{kubeNodeNameEnvVar, csiEndpointEnvVar}, VolumeMounts: []corev1.VolumeMount{ - newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), + k8s.NewVolumeMount(csiDirVolumeName, csiDirVolumePath, corev1.MountPropagationNone, false), }, }, }, diff --git a/pkg/installer/namespace.go b/pkg/installer/namespace.go index e0e81a05..1ea8bc20 100644 --- a/pkg/installer/namespace.go +++ b/pkg/installer/namespace.go @@ -19,6 +19,7 @@ package installer import ( "context" + "github.com/minio/directpv/pkg/jobs" "github.com/minio/directpv/pkg/k8s" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -106,8 +107,17 @@ func createNamespace(ctx context.Context, args *Args) (err error) { } func deleteNamespace(ctx context.Context) error { + jobObjects, err := jobs.NewLister().IgnoreNotFound(true).Get(ctx) + if err != nil { + return err + } + if len(jobObjects) > 0 { + // Do not delete the namespace if there + // are jobs in directpv namespace + return nil + } propagationPolicy := metav1.DeletePropagationForeground - err := k8s.KubeClient().CoreV1().Namespaces().Delete( + err = k8s.KubeClient().CoreV1().Namespaces().Delete( ctx, namespace, metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}, ) if err != nil { diff --git a/pkg/installer/psp.go b/pkg/installer/psp.go index 43000981..016ec948 100644 --- a/pkg/installer/psp.go +++ b/pkg/installer/psp.go @@ -146,10 +146,10 @@ func createPodSecurityPolicy(ctx context.Context, args *Args) (err error) { Volumes: []policy.FSType{policy.HostPath}, AllowedHostPaths: []policy.AllowedHostPath{ {PathPrefix: "/proc", ReadOnly: true}, - {PathPrefix: volumePathSysDir}, + {PathPrefix: consts.SysDirVolumePath}, {PathPrefix: consts.UdevDataDir, ReadOnly: true}, {PathPrefix: consts.AppRootDir}, - {PathPrefix: socketDir}, + {PathPrefix: csiDirVolumePath}, {PathPrefix: kubeletDirPath}, }, SELinux: policy.SELinuxStrategyOptions{ diff --git a/pkg/installer/rbac.go b/pkg/installer/rbac.go index ea34ff3e..0b98848a 100644 --- a/pkg/installer/rbac.go +++ b/pkg/installer/rbac.go @@ -166,6 +166,7 @@ func createClusterRole(ctx context.Context, args *Args) (err error) { ), newPolicyRule([]string{"pods", "pod"}, nil, getVerb, listVerb, watchVerb), newPolicyRule([]string{"secrets", "secret"}, nil, getVerb, listVerb, watchVerb), + newPolicyRule([]string{"jobs"}, []string{"batch"}, createVerb, deleteVerb, getVerb, listVerb, updateVerb, watchVerb), }, AggregationRule: nil, } diff --git a/pkg/installer/utils.go b/pkg/installer/utils.go index c37ecaab..8bc82d7c 100644 --- a/pkg/installer/utils.go +++ b/pkg/installer/utils.go @@ -24,38 +24,13 @@ import ( "strings" "github.com/minio/directpv/pkg/k8s" - corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" ) -func newHostPathVolume(name, path string) corev1.Volume { - hostPathType := corev1.HostPathDirectoryOrCreate - volumeSource := corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: path, - Type: &hostPathType, - }, - } - - return corev1.Volume{ - Name: name, - VolumeSource: volumeSource, - } -} - func newPluginsSocketDir(kubeletDir, name string) string { return path.Join(kubeletDir, "plugins", k8s.SanitizeResourceName(name)) } -func newVolumeMount(name, path string, mountPropogation corev1.MountPropagationMode, readOnly bool) corev1.VolumeMount { - return corev1.VolumeMount{ - Name: name, - ReadOnly: readOnly, - MountPath: path, - MountPropagation: &mountPropogation, - } -} - func getRandSuffix() string { b := make([]byte, 5) if _, err := rand.Read(b); err != nil { diff --git a/pkg/installer/vars.go b/pkg/installer/vars.go index 4abf5c2b..6fc3b074 100644 --- a/pkg/installer/vars.go +++ b/pkg/installer/vars.go @@ -38,7 +38,7 @@ var ( } kubeNodeNameEnvVar = corev1.EnvVar{ - Name: kubeNodeNameEnvVarName, + Name: consts.KubeNodeNameEnvVarName, ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ APIVersion: "v1", diff --git a/pkg/jobs/copy.go b/pkg/jobs/copy.go new file mode 100644 index 00000000..0c63c84e --- /dev/null +++ b/pkg/jobs/copy.go @@ -0,0 +1,147 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2023 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package jobs + +import ( + "context" + "fmt" + + directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" + "github.com/minio/directpv/pkg/consts" + "github.com/minio/directpv/pkg/k8s" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// CopyOpts defines the options for copying +type CopyOpts struct { + SourceDriveID directpvtypes.DriveID + DestinationDriveID directpvtypes.DriveID + VolumeID string + NodeID directpvtypes.NodeID +} + +// ContainerParams represents the container parameters +type ContainerParams struct { + Image string + ImagePullSecrets []corev1.LocalObjectReference + Tolerations []corev1.Toleration +} + +// CreateCopyJob creates a new job instance for copying the volume. +func CreateCopyJob(ctx context.Context, opts CopyOpts, params ContainerParams, overwrite bool) error { + labels := map[string]string{ + string(directpvtypes.JobTypeLabelKey): string(JobTypeCopy), + string(directpvtypes.SourceDriveLabelKey): string(opts.SourceDriveID), + string(directpvtypes.DestinationDriveLabelKey): string(opts.DestinationDriveID), + string(directpvtypes.NodeLabelKey): string(opts.NodeID), + string(directpvtypes.VolumeLabelKey): opts.VolumeID, + } + for k, v := range defaultLabels { + labels[k] = v + } + objectMeta := metav1.ObjectMeta{ + Name: "copy-" + opts.VolumeID, + Namespace: consts.AppNamespace, + Labels: labels, + Finalizers: []string{ + consts.CopyProtectionFinalizer, + }, + } + privileged := true + var backoffLimit int32 = 3 + job := &batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + }, + ObjectMeta: objectMeta, + Spec: batchv1.JobSpec{ + BackoffLimit: &backoffLimit, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + NodeSelector: map[string]string{string(directpvtypes.NodeLabelKey): string(opts.NodeID)}, + ServiceAccountName: consts.Identity, + Tolerations: params.Tolerations, + ImagePullSecrets: params.ImagePullSecrets, + Volumes: []corev1.Volume{ + k8s.NewHostPathVolume(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath), + k8s.NewHostPathVolume(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath), + k8s.NewHostPathVolume(consts.SysDirVolumeName, consts.SysDirVolumePath), + k8s.NewHostPathVolume(consts.DevDirVolumeName, consts.DevDirVolumePath), + k8s.NewHostPathVolume(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath), + }, + Containers: []corev1.Container{ + { + Name: "copy-job", + Image: params.Image, + Args: []string{ + "copy", + string(opts.SourceDriveID), + string(opts.DestinationDriveID), + "--volume-id=" + opts.VolumeID, + fmt.Sprintf("--kube-node-name=$(%s)", consts.KubeNodeNameEnvVarName), + }, + Env: []corev1.EnvVar{ + { + Name: consts.KubeNodeNameEnvVarName, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "spec.nodeName", + }, + }, + }, + }, + TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, + TerminationMessagePath: "/var/log/copy-termination-log", + VolumeMounts: []corev1.VolumeMount{ + k8s.NewVolumeMount(consts.AppRootDirVolumeName, consts.AppRootDirVolumePath, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.LegacyAppRootDirVolumeName, consts.LegacyAppRootDirVolumePath, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.SysDirVolumeName, consts.SysDirVolumePath, corev1.MountPropagationBidirectional, false), + k8s.NewVolumeMount(consts.DevDirVolumeName, consts.DevDirVolumePath, corev1.MountPropagationHostToContainer, true), + k8s.NewVolumeMount(consts.RunUdevDataVolumeName, consts.RunUdevDataVolumePath, corev1.MountPropagationBidirectional, true), + }, + SecurityContext: &corev1.SecurityContext{ + Privileged: &privileged, + }, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + } + + if _, err := k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).Create(ctx, job, metav1.CreateOptions{}); err != nil { + if apierrors.IsAlreadyExists(err) && overwrite { + return deleteAndCreate(ctx, job) + } + return err + } + return nil +} + +func deleteAndCreate(ctx context.Context, job *batchv1.Job) error { + if err := k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).Delete(ctx, job.Name, metav1.DeleteOptions{}); err != nil { + return err + } + _, err := k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).Create(ctx, job, metav1.CreateOptions{}) + return err +} diff --git a/pkg/jobs/event.go b/pkg/jobs/event.go new file mode 100644 index 00000000..9fb88160 --- /dev/null +++ b/pkg/jobs/event.go @@ -0,0 +1,205 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2023 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package jobs + +import ( + "context" + "fmt" + "time" + + directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" + "github.com/minio/directpv/pkg/client" + "github.com/minio/directpv/pkg/consts" + "github.com/minio/directpv/pkg/controller" + "github.com/minio/directpv/pkg/k8s" + v1 "k8s.io/api/batch/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +// JobStatus represents the status of the Job +type JobStatus string + +// JobType represents the type of the Job +type JobType string + +const ( + // JobTypeCopy represents the mirror job + JobTypeCopy JobType = "copy" + // JobTypeUnknown represents unknown job type + JobTypeUnknown JobType = "unknown" + // JobStatusActive represents the active job status + JobStatusActive JobStatus = "active" + // JobStatusFailed represents the failed job status + JobStatusFailed JobStatus = "failed" + // JobStatusSucceeded represents the succeeded job status + JobStatusSucceeded JobStatus = "succeeded" +) + +var defaultLabels = map[string]string{ + "application-name": consts.GroupName, + "application-type": "CSIDriver", + string(directpvtypes.CreatedByLabelKey): "controller", + string(directpvtypes.VersionLabelKey): consts.LatestAPIVersion, +} + +const ( + workerThreads = 10 + resyncPeriod = 10 * time.Minute +) + +type jobsEventHandler struct{} + +func newJobsEventHandler() *jobsEventHandler { + return &jobsEventHandler{} +} + +func (handler *jobsEventHandler) ListerWatcher() cache.ListerWatcher { + return &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).Watch(context.TODO(), options) + }, + } +} + +func (handler *jobsEventHandler) ObjectType() runtime.Object { + return &v1.Job{} +} + +func (handler *jobsEventHandler) Handle(ctx context.Context, eventType controller.EventType, object runtime.Object) error { + job := object.(*v1.Job) + if !job.GetDeletionTimestamp().IsZero() { + return handleDelete(ctx, job) + } + if eventType == controller.UpdateEvent { + return handleUpdate(ctx, object.(*v1.Job)) + } + return nil +} + +func handleDelete(ctx context.Context, job *v1.Job) error { + jobType, err := getJobType(job) + if err != nil { + return err + } + switch jobType { + case JobTypeCopy: + return handleCopyJobDeletion(ctx, job) + default: + return fmt.Errorf("Invalid jobType: %v", jobType) + } +} + +func handleCopyJobDeletion(ctx context.Context, job *v1.Job) error { + if err := updateOnCopyJobCompletion(ctx, job); err != nil { + return err + } + finalizers := []string{} + for _, finalizer := range job.ObjectMeta.GetFinalizers() { + if finalizer == consts.CopyProtectionFinalizer { + continue + } + finalizers = append(finalizers, finalizer) + } + job.ObjectMeta.SetFinalizers(finalizers) + _, err := k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).Update(ctx, job, metav1.UpdateOptions{}) + return err +} + +func handleUpdate(ctx context.Context, job *v1.Job) error { + if job.Status.CompletionTime == nil || job.Status.Succeeded == 0 { + return nil + } + jobType, err := getJobType(job) + if err != nil { + return err + } + switch jobType { + case JobTypeCopy: + return updateOnCopyJobCompletion(ctx, job) + default: + return fmt.Errorf("Invalid jobType: %v", jobType) + } +} + +func getJobType(job *v1.Job) (JobType, error) { + labels := job.ObjectMeta.GetLabels() + if labels == nil { + return JobTypeUnknown, fmt.Errorf("No labels present in the job: %v", job.Name) + } + value, ok := labels[string(directpvtypes.JobTypeLabelKey)] + if !ok { + return JobTypeUnknown, fmt.Errorf("Unable to identify the job: %v; Missing JobType", job.Name) + } + jobType, err := ToType(value) + if err != nil { + return JobTypeUnknown, err + } + return jobType, nil +} + +func updateOnCopyJobCompletion(ctx context.Context, job *v1.Job) error { + labels := job.ObjectMeta.GetLabels() + + // Update volume + volumeName := labels[string(directpvtypes.VolumeLabelKey)] + if volumeName == "" { + return fmt.Errorf("No volumeID present in the copy job: %v", job.Name) + } + volume, err := client.VolumeClient().Get(ctx, volumeName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + volume.Status.Status = directpvtypes.VolumeStatusReady + if !volume.IsStaged() { + volume.Status.Status = directpvtypes.VolumeStatusPending + } + if _, err = client.VolumeClient().Update(ctx, volume, metav1.UpdateOptions{}); err != nil { + return err + } + + // Update source drive + sourceDriveID := labels[string(directpvtypes.SourceDriveLabelKey)] + if sourceDriveID == "" { + return fmt.Errorf("No source drive ID present in the copy job: %v", job.Name) + } + sourceDrive, err := client.DriveClient().Get(ctx, sourceDriveID, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + sourceDrive.RemoveCopyProtectionFinalizer() + _, err = client.DriveClient().Update(ctx, sourceDrive, metav1.UpdateOptions{}) + return err +} + +// StartController starts volume controller. +func StartController(ctx context.Context) { + ctrl := controller.New("jobs", newJobsEventHandler(), workerThreads, resyncPeriod) + ctrl.Run(ctx) +} diff --git a/pkg/jobs/lister.go b/pkg/jobs/lister.go new file mode 100644 index 00000000..e08b6a2d --- /dev/null +++ b/pkg/jobs/lister.go @@ -0,0 +1,264 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2023 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package jobs + +import ( + "context" + "fmt" + "strings" + + directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" + "github.com/minio/directpv/pkg/consts" + "github.com/minio/directpv/pkg/k8s" + "github.com/minio/directpv/pkg/utils" + batchv1 "k8s.io/api/batch/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// ListJobResult denotes list of job result. +type ListJobResult struct { + Job batchv1.Job + Err error +} + +// Lister is job lister. +type Lister struct { + nodes []directpvtypes.LabelValue + statusList []JobStatus + typeList []JobType + jobNames []string + labels map[directpvtypes.LabelKey]directpvtypes.LabelValue + maxObjects int64 + ignoreNotFound bool +} + +// NewLister creates new job lister. +func NewLister() *Lister { + return &Lister{ + maxObjects: k8s.MaxThreadCount, + } +} + +// ToStatus converts a value to job status. +func ToStatus(value string) (JobStatus, error) { + status := JobStatus(strings.ToLower(value)) + switch status { + case JobStatusActive, JobStatusFailed, JobStatusSucceeded: + default: + return status, fmt.Errorf("invalid job status: %v", value) + } + return status, nil +} + +// ToType converts a value to job type. +func ToType(value string) (JobType, error) { + status := JobType(strings.ToLower(value)) + switch status { + case JobTypeCopy: + default: + return status, fmt.Errorf("invalid job type: %v", value) + } + return status, nil +} + +// NodeSelector adds filter listing by nodes. +func (lister *Lister) NodeSelector(nodes []directpvtypes.LabelValue) *Lister { + lister.nodes = nodes + return lister +} + +// StatusSelector adds filter listing by job status. +func (lister *Lister) StatusSelector(statusList []JobStatus) *Lister { + lister.statusList = statusList + return lister +} + +// TypeSelector adds filter listing by job status. +func (lister *Lister) TypeSelector(typeList []JobType) *Lister { + lister.typeList = typeList + return lister +} + +// JobNameSelector adds filter listing by job names. +func (lister *Lister) JobNameSelector(jobNames []string) *Lister { + lister.jobNames = jobNames + return lister +} + +// LabelSelector adds filter listing by labels. +func (lister *Lister) LabelSelector(labels map[directpvtypes.LabelKey]directpvtypes.LabelValue) *Lister { + lister.labels = labels + return lister +} + +// MaxObjects controls number of items to be fetched in every iteration. +func (lister *Lister) MaxObjects(n int64) *Lister { + lister.maxObjects = n + return lister +} + +// IgnoreNotFound controls listing to ignore job not found error. +func (lister *Lister) IgnoreNotFound(b bool) *Lister { + lister.ignoreNotFound = b + return lister +} + +// GetStatus gets the job status. +func GetStatus(job batchv1.Job) JobStatus { + if job.Status.Active > 0 { + return JobStatusActive + } + if job.Status.CompletionTime != nil && job.Status.Succeeded > 0 { + return JobStatusSucceeded + } + return JobStatusFailed +} + +// GetType gets the job type +func GetType(job batchv1.Job) JobType { + labels := job.GetLabels() + if v, ok := labels[string(directpvtypes.JobTypeLabelKey)]; ok { + jobType, err := ToType(v) + if err == nil { + return jobType + } + } + return JobTypeUnknown +} + +// GetNode returns the node name of the job +func GetNode(job batchv1.Job) string { + labels := job.GetLabels() + if v, ok := labels[string(directpvtypes.NodeLabelKey)]; ok { + return v + } + return "" +} + +// List returns channel to loop through job items. +func (lister *Lister) List(ctx context.Context) <-chan ListJobResult { + getOnly := len(lister.nodes) == 0 && + len(lister.statusList) == 0 && + len(lister.labels) == 0 && + len(lister.typeList) == 0 && + len(lister.jobNames) != 0 + + labelMap := map[directpvtypes.LabelKey][]directpvtypes.LabelValue{ + directpvtypes.NodeLabelKey: lister.nodes, + } + for k, v := range lister.labels { + labelMap[k] = []directpvtypes.LabelValue{v} + } + labelSelector := directpvtypes.ToLabelSelector(labelMap) + + resultCh := make(chan ListJobResult) + go func() { + defer close(resultCh) + + send := func(result ListJobResult) bool { + select { + case <-ctx.Done(): + return false + case resultCh <- result: + return true + } + } + + if !getOnly { + options := metav1.ListOptions{ + Limit: lister.maxObjects, + LabelSelector: labelSelector, + } + + for { + result, err := k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).List(ctx, options) + if err != nil { + if apierrors.IsNotFound(err) && lister.ignoreNotFound { + break + } + + send(ListJobResult{Err: err}) + return + } + + for _, item := range result.Items { + var found bool + var values []string + for i := range lister.jobNames { + if lister.jobNames[i] == item.Name { + found = true + } else { + values = append(values, lister.jobNames[i]) + } + } + lister.jobNames = values + + switch { + case found || (len(lister.statusList) == 0 && len(lister.typeList) == 0): + case len(lister.statusList) > 0 && utils.Contains(lister.statusList, GetStatus(item)): + case len(lister.typeList) > 0 && utils.Contains(lister.typeList, GetType(item)): + default: + continue + } + + if !send(ListJobResult{Job: item}) { + return + } + } + + if result.Continue == "" { + break + } + + options.Continue = result.Continue + } + } + + for _, jobName := range lister.jobNames { + job, err := k8s.KubeClient().BatchV1().Jobs(consts.AppNamespace).Get(ctx, jobName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) && lister.ignoreNotFound { + continue + } + send(ListJobResult{Err: err}) + return + } + if !send(ListJobResult{Job: *job}) { + return + } + } + }() + + return resultCh +} + +// Get returns list of jobs. +func (lister *Lister) Get(ctx context.Context) ([]batchv1.Job, error) { + ctx, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() + + jobList := []batchv1.Job{} + for result := range lister.List(ctx) { + if result.Err != nil { + return jobList, result.Err + } + jobList = append(jobList, result.Job) + } + + return jobList, nil +} diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go index b33d360b..0f2e02d1 100644 --- a/pkg/k8s/k8s.go +++ b/pkg/k8s/k8s.go @@ -23,6 +23,7 @@ import ( "github.com/minio/directpv/pkg/utils" "github.com/spf13/viper" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes/scheme" @@ -190,3 +191,29 @@ func SanitizeResourceName(name string) string { return string(result) } + +// NewHostPathVolume - creates volume for given name and host path. +func NewHostPathVolume(name, path string) corev1.Volume { + hostPathType := corev1.HostPathDirectoryOrCreate + volumeSource := corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: path, + Type: &hostPathType, + }, + } + + return corev1.Volume{ + Name: name, + VolumeSource: volumeSource, + } +} + +// NewVolumeMount - creates volume mount for given name, path, mount propagation and read only flag. +func NewVolumeMount(name, path string, mountPropogation corev1.MountPropagationMode, readOnly bool) corev1.VolumeMount { + return corev1.VolumeMount{ + Name: name, + ReadOnly: readOnly, + MountPath: path, + MountPropagation: &mountPropogation, + } +} diff --git a/pkg/volume/event.go b/pkg/volume/event.go index 7881d749..e7ab406f 100644 --- a/pkg/volume/event.go +++ b/pkg/volume/event.go @@ -113,6 +113,9 @@ func (handler *volumeEventHandler) delete(ctx context.Context, volume *types.Vol if !volume.IsReleased() { return fmt.Errorf("volume %v must be released before cleaning up", volume.Name) } + if volume.Status.Status == directpvtypes.VolumeStatusCopying { + return fmt.Errorf("volume %v is busy copying data", volume.Name) + } if volume.Status.TargetPath != "" { if err := handler.unmount(volume.Status.TargetPath); err != nil {