From 0bb711678134c8746a86bc82de8584acf6dab586 Mon Sep 17 00:00:00 2001 From: Michelle Au Date: Fri, 12 Jul 2019 10:25:16 -0700 Subject: [PATCH 1/2] Refactor: Move block, snapshot and cloning logic into separate functions --- pkg/hostpath/controllerserver.go | 145 ++++++------------------------- pkg/hostpath/hostpath.go | 98 ++++++++++++++++++++- 2 files changed, 122 insertions(+), 121 deletions(-) diff --git a/pkg/hostpath/controllerserver.go b/pkg/hostpath/controllerserver.go index e05e31dee..b18c9e593 100644 --- a/pkg/hostpath/controllerserver.go +++ b/pkg/hostpath/controllerserver.go @@ -33,7 +33,6 @@ import ( "google.golang.org/grpc/status" "github.com/container-storage-interface/spec/lib/go/csi" - "k8s.io/kubernetes/pkg/volume/util/volumepathhandler" utilexec "k8s.io/utils/exec" ) @@ -138,114 +137,46 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol }, }, nil } - return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with different size already exist", req.GetName())) + return nil, status.Errorf(codes.AlreadyExists, "Volume with the same name: %s but with different size already exist", req.GetName()) } volumeID := uuid.NewUUID().String() path := getVolumePath(volumeID) - if requestedAccessType == blockAccess { - executor := utilexec.New() - size := fmt.Sprintf("%dM", capacity/mib) - // Create a block file. - out, err := executor.Command("fallocate", "-l", size, path).CombinedOutput() - if err != nil { - glog.V(3).Infof("failed to create block device: %v", string(out)) - return nil, err - } - - // Associate block file with the loop device. - volPathHandler := volumepathhandler.VolumePathHandler{} - _, err = volPathHandler.AttachFileDevice(path) - if err != nil { - glog.Errorf("failed to attach device: %v", err) - // Remove the block file because it'll no longer be used again. - if err2 := os.Remove(path); err != nil { - glog.Errorf("failed to cleanup block file %s: %v", path, err2) - } - return nil, status.Error(codes.Internal, fmt.Sprintf("failed to attach device: %v", err)) - } - } - vol, err := createHostpathVolume(volumeID, req.GetName(), capacity, requestedAccessType, false /* ephemeral */) if err != nil { - return nil, status.Error(codes.Internal, fmt.Sprintf("failed to create volume: %s", err)) + return nil, status.Errorf(codes.Internal, "failed to create volume %v: %v", volumeID, err) } glog.V(4).Infof("created volume %s at path %s", vol.VolID, vol.VolPath) if req.GetVolumeContentSource() != nil { contentSource := req.GetVolumeContentSource() - if contentSource.GetSnapshot() != nil { - snapshotId := contentSource.GetSnapshot().GetSnapshotId() - snapshot, ok := hostPathVolumeSnapshots[snapshotId] - if !ok { - deleteHostpathVolume(volumeID) - return nil, status.Errorf(codes.NotFound, "cannot find snapshot %v", snapshotId) - } - if snapshot.ReadyToUse != true { - deleteHostpathVolume(volumeID) - return nil, status.Errorf(codes.Internal, "Snapshot %v is not yet ready to use.", snapshotId) - } - snapshotPath := snapshot.Path - args := []string{"zxvf", snapshotPath, "-C", path} - executor := utilexec.New() - out, err := executor.Command("tar", args...).CombinedOutput() - if err != nil { - deleteHostpathVolume(volumeID) - return nil, status.Error(codes.Internal, fmt.Sprintf("failed pre-populate data for volume: %v: %s", err, out)) - } + if snapshot := contentSource.GetSnapshot(); snapshot != nil { + err = loadFromSnapshot(snapshot.GetSnapshotId(), path) } if srcVolume := contentSource.GetVolume(); srcVolume != nil { - srcVolumeID := srcVolume.GetVolumeId() - hostPathVolume, ok := hostPathVolumes[srcVolumeID] - if !ok { - deleteHostpathVolume(volumeID) - return nil, status.Error(codes.NotFound, "source volumeID does not exist, are source/destination in the same storage class?") - } - srcPath := hostPathVolume.VolPath - isEmpty, err := hostPathIsEmpty(srcPath) - if err != nil { - deleteHostpathVolume(volumeID) - return nil, status.Error(codes.Internal, fmt.Sprintf("failed verification check of source hostpath volume: %s: %v", srcVolumeID, err)) - } - - // If the source hostpath volume is empty it's a noop and we just move along, otherwise the cp call will fail with a a file stat error DNE - if !isEmpty { - args := []string{"-a", srcPath + "/*", path + "/"} - executor := utilexec.New() - out, err := executor.Command("cp", args...).CombinedOutput() - if err != nil { - deleteHostpathVolume(volumeID) - return nil, status.Error(codes.Internal, fmt.Sprintf("failed pre-populate data (clone) for volume: %s: %s", volumeID, out)) - } + err = loadFromVolume(srcVolume.GetVolumeId(), path) + } + if err != nil { + if delErr := deleteHostpathVolume(volumeID); delErr != nil { + glog.V(2).Infof("deleting hostpath volume %v failed: %v", volumeID, delErr) } + return nil, err } + glog.V(4).Infof("successfully populated volume %s", vol.VolID) } - createVolumeResponse := &csi.CreateVolumeResponse{} - if req.GetVolumeContentSource() != nil { - createVolumeResponse = &csi.CreateVolumeResponse{ - Volume: &csi.Volume{ - VolumeId: volumeID, - CapacityBytes: req.GetCapacityRange().GetRequiredBytes(), - VolumeContext: req.GetParameters(), - ContentSource: req.GetVolumeContentSource(), - }, - } - } else { - createVolumeResponse = &csi.CreateVolumeResponse{ - Volume: &csi.Volume{ - VolumeId: volumeID, - CapacityBytes: req.GetCapacityRange().GetRequiredBytes(), - VolumeContext: req.GetParameters(), - }, - } - } - return createVolumeResponse, nil + return &csi.CreateVolumeResponse{ + Volume: &csi.Volume{ + VolumeId: volumeID, + CapacityBytes: req.GetCapacityRange().GetRequiredBytes(), + VolumeContext: req.GetParameters(), + ContentSource: req.GetVolumeContentSource(), + }, + }, nil } func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { - // Check arguments if len(req.GetVolumeId()) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") @@ -256,36 +187,12 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, err } - vol, err := getVolumeByID(req.GetVolumeId()) - if err != nil { - // Return OK if the volume is not found. - return &csi.DeleteVolumeResponse{}, nil - } - glog.V(4).Infof("deleting volume %s", vol.VolID) - - if vol.VolAccessType == blockAccess { - - volPathHandler := volumepathhandler.VolumePathHandler{} - // Get the associated loop device. - device, err := volPathHandler.GetLoopDevice(getVolumePath(vol.VolID)) - if err != nil { - return nil, status.Error(codes.Internal, fmt.Sprintf("failed to get the loop device: %v", err)) - } - - if device != "" { - // Remove any associated loop device. - glog.V(4).Infof("deleting loop device %s", device) - if err := volPathHandler.RemoveLoopDevice(device); err != nil { - return nil, status.Error(codes.Internal, fmt.Sprintf("failed to remove loop device: %v", err)) - } - } - } - - if err := deleteHostpathVolume(vol.VolID); err != nil && !os.IsNotExist(err) { - return nil, status.Error(codes.Internal, fmt.Sprintf("failed to delete volume: %s", err)) + volId := req.GetVolumeId() + if err := deleteHostpathVolume(volId); err != nil { + return nil, status.Errorf(codes.Internal, "failed to delete volume %v: %v", volId, err) } - glog.V(4).Infof("volume deleted ok: %s", vol.VolID) + glog.V(4).Infof("volume %v successfully deleted", volId) return &csi.DeleteVolumeResponse{}, nil } @@ -377,7 +284,7 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS }, }, nil } - return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("snapshot with the same name: %s but with different SourceVolumeId already exist", req.GetName())) + return nil, status.Errorf(codes.AlreadyExists, "snapshot with the same name: %s but with different SourceVolumeId already exist", req.GetName()) } volumeID := req.GetSourceVolumeId() @@ -402,7 +309,7 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS executor := utilexec.New() out, err := executor.Command("tar", args...).CombinedOutput() if err != nil { - return nil, status.Error(codes.Internal, fmt.Sprintf("failed create snapshot: %v: %s", err, out)) + return nil, status.Errorf(codes.Internal, "failed create snapshot: %v: %s", err, out) } glog.V(4).Infof("create volume snapshot %s", file) @@ -579,7 +486,7 @@ func (cs *controllerServer) validateControllerServiceRequest(c csi.ControllerSer return nil } } - return status.Error(codes.InvalidArgument, fmt.Sprintf("%s", c)) + return status.Errorf(codes.InvalidArgument, "unsupported capability %s", c) } func getControllerServiceCapabilities(cl []csi.ControllerServiceCapability_RPC_Type) []*csi.ControllerServiceCapability { diff --git a/pkg/hostpath/hostpath.go b/pkg/hostpath/hostpath.go index ea54a333f..5f95b2a01 100644 --- a/pkg/hostpath/hostpath.go +++ b/pkg/hostpath/hostpath.go @@ -22,6 +22,10 @@ import ( "os" "github.com/golang/glog" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/kubernetes/pkg/volume/util/volumepathhandler" + utilexec "k8s.io/utils/exec" timestamp "github.com/golang/protobuf/ptypes/timestamp" ) @@ -151,11 +155,34 @@ func getVolumePath(volID string) string { // It returns the volume path or err if one occurs. func createHostpathVolume(volID, name string, cap int64, volAccessType accessType, ephemeral bool) (*hostPathVolume, error) { path := getVolumePath(volID) - if volAccessType == mountAccess { + + switch volAccessType { + case mountAccess: err := os.MkdirAll(path, 0777) if err != nil { return nil, err } + case blockAccess: + executor := utilexec.New() + size := fmt.Sprintf("%dM", cap/mib) + // Create a block file. + out, err := executor.Command("fallocate", "-l", size, path).CombinedOutput() + if err != nil { + return nil, fmt.Errorf("failed to create block device: %v, %v", err, string(out)) + } + + // Associate block file with the loop device. + volPathHandler := volumepathhandler.VolumePathHandler{} + _, err = volPathHandler.AttachFileDevice(path) + if err != nil { + // Remove the block file because it'll no longer be used again. + if err2 := os.Remove(path); err2 != nil { + glog.Errorf("failed to cleanup block file %s: %v", path, err2) + } + return nil, fmt.Errorf("failed to attach device %v: %v", path, err) + } + default: + return nil, fmt.Errorf("unsupported access type %v", volAccessType) } hostpathVol := hostPathVolume{ @@ -173,8 +200,32 @@ func createHostpathVolume(volID, name string, cap int64, volAccessType accessTyp // deleteVolume deletes the directory for the hostpath volume. func deleteHostpathVolume(volID string) error { glog.V(4).Infof("deleting hostpath volume: %s", volID) + + vol, err := getVolumeByID(volID) + if err != nil { + // Return OK if the volume is not found. + return nil + } + + if vol.VolAccessType == blockAccess { + volPathHandler := volumepathhandler.VolumePathHandler{} + // Get the associated loop device. + device, err := volPathHandler.GetLoopDevice(getVolumePath(volID)) + if err != nil { + return fmt.Errorf("failed to get the loop device: %v", err) + } + + if device != "" { + // Remove any associated loop device. + glog.V(4).Infof("deleting loop device %s", device) + if err := volPathHandler.RemoveLoopDevice(device); err != nil { + return fmt.Errorf("failed to remove loop device %v: %v", device, err) + } + } + } + path := getVolumePath(volID) - if err := os.RemoveAll(path); err != nil { + if err := os.RemoveAll(path); err != nil && !os.IsNotExist(err) { return err } delete(hostPathVolumes, volID) @@ -196,3 +247,46 @@ func hostPathIsEmpty(p string) (bool, error) { } return false, err } + +// loadFromSnapshot populates the given destPath with data from the snapshotID +func loadFromSnapshot(snapshotId, destPath string) error { + snapshot, ok := hostPathVolumeSnapshots[snapshotId] + if !ok { + return status.Errorf(codes.NotFound, "cannot find snapshot %v", snapshotId) + } + if snapshot.ReadyToUse != true { + return status.Errorf(codes.Internal, "snapshot %v is not yet ready to use.", snapshotId) + } + snapshotPath := snapshot.Path + args := []string{"zxvf", snapshotPath, "-C", destPath} + executor := utilexec.New() + out, err := executor.Command("tar", args...).CombinedOutput() + if err != nil { + return status.Errorf(codes.Internal, "failed pre-populate data from snapshot %v: %v: %s", snapshotId, err, out) + } + return nil +} + +// loadfromVolume populates the given destPath with data from the srcVolumeID +func loadFromVolume(srcVolumeId, destPath string) error { + hostPathVolume, ok := hostPathVolumes[srcVolumeId] + if !ok { + return status.Error(codes.NotFound, "source volumeId does not exist, are source/destination in the same storage class?") + } + srcPath := hostPathVolume.VolPath + isEmpty, err := hostPathIsEmpty(srcPath) + if err != nil { + return status.Errorf(codes.Internal, "failed verification check of source hostpath volume: %s: %v", srcVolumeId, err) + } + + // If the source hostpath volume is empty it's a noop and we just move along, otherwise the cp call will fail with a a file stat error DNE + if !isEmpty { + args := []string{"-a", srcPath + "/*", destPath + "/"} + executor := utilexec.New() + out, err := executor.Command("cp", args...).CombinedOutput() + if err != nil { + return status.Errorf(codes.Internal, "failed pre-populate data from volume %v: %v: %s", srcVolumeId, err, out) + } + } + return nil +} From 08dfcb7062a2549a621b4c5322ba1b74da181bda Mon Sep 17 00:00:00 2001 From: Michelle Au Date: Fri, 12 Jul 2019 10:52:51 -0700 Subject: [PATCH 2/2] Create dataRoot if it doesn't exist, consolidate provisionRoot and snapshotRoot --- pkg/hostpath/controllerserver.go | 17 +++++++++-------- pkg/hostpath/hostpath.go | 20 ++++++++++++++++---- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/pkg/hostpath/controllerserver.go b/pkg/hostpath/controllerserver.go index b18c9e593..0744363d8 100644 --- a/pkg/hostpath/controllerserver.go +++ b/pkg/hostpath/controllerserver.go @@ -20,9 +20,9 @@ import ( "fmt" "math" "os" + "path/filepath" "sort" "strconv" - "strings" "github.com/golang/protobuf/ptypes" @@ -38,8 +38,6 @@ import ( const ( deviceID = "deviceID" - provisionRoot = "/csi-data-dir" - snapshotRoot = "/csi-data-dir" maxStorageCapacity = tib ) @@ -251,6 +249,11 @@ func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolume return nil, status.Error(codes.Unimplemented, "") } +// getSnapshotPath returns the full path to where the snapshot is stored +func getSnapshotPath(snapshotId string) string { + return filepath.Join(dataRoot, fmt.Sprintf("%s.tgz", snapshotId)) +} + // CreateSnapshot uses tar command to create snapshot for hostpath volume. The tar command can quickly create // archives of entire directories. The host image must have "tar" binaries in /bin, /usr/sbin, or /usr/bin. func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { @@ -296,8 +299,7 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS snapshotID := uuid.NewUUID().String() creationTime := ptypes.TimestampNow() volPath := hostPathVolume.VolPath - filePath := []string{snapshotRoot, "/", snapshotID, ".tgz"} - file := strings.Join(filePath, "") + file := getSnapshotPath(snapshotID) args := []string{} if hostPathVolume.VolAccessType == blockAccess { glog.V(4).Infof("Creating snapshot of Raw Block Mode Volume") @@ -346,9 +348,8 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS return nil, err } snapshotID := req.GetSnapshotId() - glog.V(4).Infof("deleting volume %s", snapshotID) - pathSlice := []string{snapshotRoot, "/", snapshotID, ".tgz"} - path := strings.Join(pathSlice, "") + glog.V(4).Infof("deleting snapshot %s", snapshotID) + path := getSnapshotPath(snapshotID) os.RemoveAll(path) delete(hostPathVolumeSnapshots, snapshotID) return &csi.DeleteSnapshotResponse{}, nil diff --git a/pkg/hostpath/hostpath.go b/pkg/hostpath/hostpath.go index 5f95b2a01..6742e8c80 100644 --- a/pkg/hostpath/hostpath.go +++ b/pkg/hostpath/hostpath.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "os" + "path/filepath" "github.com/golang/glog" "google.golang.org/grpc/codes" @@ -70,11 +71,18 @@ type hostPathSnapshot struct { ReadyToUse bool `json:"readyToUse"` } -var hostPathVolumes map[string]hostPathVolume -var hostPathVolumeSnapshots map[string]hostPathSnapshot - var ( vendorVersion = "dev" + + hostPathVolumes map[string]hostPathVolume + hostPathVolumeSnapshots map[string]hostPathSnapshot +) + +const ( + // Directory where data for volumes and snapshots are persisted. + // This can be ephemeral within the container or persisted if + // backed by a Pod volume. + dataRoot = "/csi-data-dir" ) func init() { @@ -98,6 +106,10 @@ func NewHostPathDriver(driverName, nodeID, endpoint, version string, ephemeral b vendorVersion = version } + if err := os.MkdirAll(dataRoot, 0750); err != nil { + return nil, fmt.Errorf("failed to create dataRoot: %v", err) + } + glog.Infof("Driver: %v ", driverName) glog.Infof("Version: %s", vendorVersion) @@ -148,7 +160,7 @@ func getSnapshotByName(name string) (hostPathSnapshot, error) { // getVolumePath returs the canonical path for hostpath volume func getVolumePath(volID string) string { - return fmt.Sprintf("%s/%s", provisionRoot, volID) + return filepath.Join(dataRoot, volID) } // createVolume create the directory for the hostpath volume.