Skip to content

Commit

Permalink
Merge pull request #73 from msau42/cleanup
Browse files Browse the repository at this point in the history
Make dataRoot optional, cleanup
  • Loading branch information
k8s-ci-robot authored Jul 17, 2019
2 parents 82936aa + 08dfcb7 commit 4b2c735
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 133 deletions.
162 changes: 35 additions & 127 deletions pkg/hostpath/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"fmt"
"math"
"os"
"path/filepath"
"sort"
"strconv"
"strings"

"github.com/golang/protobuf/ptypes"

Expand All @@ -33,14 +33,11 @@ import (
"google.golang.org/grpc/status"

"github.com/container-storage-interface/spec/lib/go/csi"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
utilexec "k8s.io/utils/exec"
)

const (
deviceID = "deviceID"
provisionRoot = "/csi-data-dir"
snapshotRoot = "/csi-data-dir"
maxStorageCapacity = tib
)

Expand Down Expand Up @@ -138,114 +135,46 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
},
}, nil
}
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with different size already exist", req.GetName()))
return nil, status.Errorf(codes.AlreadyExists, "Volume with the same name: %s but with different size already exist", req.GetName())
}

volumeID := uuid.NewUUID().String()
path := getVolumePath(volumeID)

if requestedAccessType == blockAccess {
executor := utilexec.New()
size := fmt.Sprintf("%dM", capacity/mib)
// Create a block file.
out, err := executor.Command("fallocate", "-l", size, path).CombinedOutput()
if err != nil {
glog.V(3).Infof("failed to create block device: %v", string(out))
return nil, err
}

// Associate block file with the loop device.
volPathHandler := volumepathhandler.VolumePathHandler{}
_, err = volPathHandler.AttachFileDevice(path)
if err != nil {
glog.Errorf("failed to attach device: %v", err)
// Remove the block file because it'll no longer be used again.
if err2 := os.Remove(path); err != nil {
glog.Errorf("failed to cleanup block file %s: %v", path, err2)
}
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to attach device: %v", err))
}
}

vol, err := createHostpathVolume(volumeID, req.GetName(), capacity, requestedAccessType, false /* ephemeral */)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to create volume: %s", err))
return nil, status.Errorf(codes.Internal, "failed to create volume %v: %v", volumeID, err)
}
glog.V(4).Infof("created volume %s at path %s", vol.VolID, vol.VolPath)

if req.GetVolumeContentSource() != nil {
contentSource := req.GetVolumeContentSource()
if contentSource.GetSnapshot() != nil {
snapshotId := contentSource.GetSnapshot().GetSnapshotId()
snapshot, ok := hostPathVolumeSnapshots[snapshotId]
if !ok {
deleteHostpathVolume(volumeID)
return nil, status.Errorf(codes.NotFound, "cannot find snapshot %v", snapshotId)
}
if snapshot.ReadyToUse != true {
deleteHostpathVolume(volumeID)
return nil, status.Errorf(codes.Internal, "Snapshot %v is not yet ready to use.", snapshotId)
}
snapshotPath := snapshot.Path
args := []string{"zxvf", snapshotPath, "-C", path}
executor := utilexec.New()
out, err := executor.Command("tar", args...).CombinedOutput()
if err != nil {
deleteHostpathVolume(volumeID)
return nil, status.Error(codes.Internal, fmt.Sprintf("failed pre-populate data for volume: %v: %s", err, out))
}
if snapshot := contentSource.GetSnapshot(); snapshot != nil {
err = loadFromSnapshot(snapshot.GetSnapshotId(), path)
}
if srcVolume := contentSource.GetVolume(); srcVolume != nil {
srcVolumeID := srcVolume.GetVolumeId()
hostPathVolume, ok := hostPathVolumes[srcVolumeID]
if !ok {
deleteHostpathVolume(volumeID)
return nil, status.Error(codes.NotFound, "source volumeID does not exist, are source/destination in the same storage class?")
}
srcPath := hostPathVolume.VolPath
isEmpty, err := hostPathIsEmpty(srcPath)
if err != nil {
deleteHostpathVolume(volumeID)
return nil, status.Error(codes.Internal, fmt.Sprintf("failed verification check of source hostpath volume: %s: %v", srcVolumeID, err))
}

// If the source hostpath volume is empty it's a noop and we just move along, otherwise the cp call will fail with a a file stat error DNE
if !isEmpty {
args := []string{"-a", srcPath + "/*", path + "/"}
executor := utilexec.New()
out, err := executor.Command("cp", args...).CombinedOutput()
if err != nil {
deleteHostpathVolume(volumeID)
return nil, status.Error(codes.Internal, fmt.Sprintf("failed pre-populate data (clone) for volume: %s: %s", volumeID, out))
}
err = loadFromVolume(srcVolume.GetVolumeId(), path)
}
if err != nil {
if delErr := deleteHostpathVolume(volumeID); delErr != nil {
glog.V(2).Infof("deleting hostpath volume %v failed: %v", volumeID, delErr)
}
return nil, err
}
glog.V(4).Infof("successfully populated volume %s", vol.VolID)
}

createVolumeResponse := &csi.CreateVolumeResponse{}
if req.GetVolumeContentSource() != nil {
createVolumeResponse = &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volumeID,
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
VolumeContext: req.GetParameters(),
ContentSource: req.GetVolumeContentSource(),
},
}
} else {
createVolumeResponse = &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volumeID,
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
VolumeContext: req.GetParameters(),
},
}
}
return createVolumeResponse, nil
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volumeID,
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
VolumeContext: req.GetParameters(),
ContentSource: req.GetVolumeContentSource(),
},
}, nil
}

func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {

// Check arguments
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
Expand All @@ -256,36 +185,12 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, err
}

vol, err := getVolumeByID(req.GetVolumeId())
if err != nil {
// Return OK if the volume is not found.
return &csi.DeleteVolumeResponse{}, nil
}
glog.V(4).Infof("deleting volume %s", vol.VolID)

if vol.VolAccessType == blockAccess {

volPathHandler := volumepathhandler.VolumePathHandler{}
// Get the associated loop device.
device, err := volPathHandler.GetLoopDevice(getVolumePath(vol.VolID))
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to get the loop device: %v", err))
}

if device != "" {
// Remove any associated loop device.
glog.V(4).Infof("deleting loop device %s", device)
if err := volPathHandler.RemoveLoopDevice(device); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to remove loop device: %v", err))
}
}
}

if err := deleteHostpathVolume(vol.VolID); err != nil && !os.IsNotExist(err) {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to delete volume: %s", err))
volId := req.GetVolumeId()
if err := deleteHostpathVolume(volId); err != nil {
return nil, status.Errorf(codes.Internal, "failed to delete volume %v: %v", volId, err)
}

glog.V(4).Infof("volume deleted ok: %s", vol.VolID)
glog.V(4).Infof("volume %v successfully deleted", volId)

return &csi.DeleteVolumeResponse{}, nil
}
Expand Down Expand Up @@ -344,6 +249,11 @@ func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolume
return nil, status.Error(codes.Unimplemented, "")
}

// getSnapshotPath returns the full path to where the snapshot is stored
func getSnapshotPath(snapshotId string) string {
return filepath.Join(dataRoot, fmt.Sprintf("%s.tgz", snapshotId))
}

// CreateSnapshot uses tar command to create snapshot for hostpath volume. The tar command can quickly create
// archives of entire directories. The host image must have "tar" binaries in /bin, /usr/sbin, or /usr/bin.
func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
Expand Down Expand Up @@ -377,7 +287,7 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
},
}, nil
}
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("snapshot with the same name: %s but with different SourceVolumeId already exist", req.GetName()))
return nil, status.Errorf(codes.AlreadyExists, "snapshot with the same name: %s but with different SourceVolumeId already exist", req.GetName())
}

volumeID := req.GetSourceVolumeId()
Expand All @@ -389,8 +299,7 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
snapshotID := uuid.NewUUID().String()
creationTime := ptypes.TimestampNow()
volPath := hostPathVolume.VolPath
filePath := []string{snapshotRoot, "/", snapshotID, ".tgz"}
file := strings.Join(filePath, "")
file := getSnapshotPath(snapshotID)
args := []string{}
if hostPathVolume.VolAccessType == blockAccess {
glog.V(4).Infof("Creating snapshot of Raw Block Mode Volume")
Expand All @@ -402,7 +311,7 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
executor := utilexec.New()
out, err := executor.Command("tar", args...).CombinedOutput()
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed create snapshot: %v: %s", err, out))
return nil, status.Errorf(codes.Internal, "failed create snapshot: %v: %s", err, out)
}

glog.V(4).Infof("create volume snapshot %s", file)
Expand Down Expand Up @@ -439,9 +348,8 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
return nil, err
}
snapshotID := req.GetSnapshotId()
glog.V(4).Infof("deleting volume %s", snapshotID)
pathSlice := []string{snapshotRoot, "/", snapshotID, ".tgz"}
path := strings.Join(pathSlice, "")
glog.V(4).Infof("deleting snapshot %s", snapshotID)
path := getSnapshotPath(snapshotID)
os.RemoveAll(path)
delete(hostPathVolumeSnapshots, snapshotID)
return &csi.DeleteSnapshotResponse{}, nil
Expand Down Expand Up @@ -579,7 +487,7 @@ func (cs *controllerServer) validateControllerServiceRequest(c csi.ControllerSer
return nil
}
}
return status.Error(codes.InvalidArgument, fmt.Sprintf("%s", c))
return status.Errorf(codes.InvalidArgument, "unsupported capability %s", c)
}

func getControllerServiceCapabilities(cl []csi.ControllerServiceCapability_RPC_Type) []*csi.ControllerServiceCapability {
Expand Down
Loading

0 comments on commit 4b2c735

Please sign in to comment.