Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: unable to properly clean mount points #153

Merged
merged 1 commit into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 42 additions & 26 deletions pkg/driver/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ type NodeServer struct {

var _ = csi.NodeServer(&NodeServer{})

func (ns *NodeServer) getVolume(volumeID string) *Volume {
if volume, ok := ns.volumes.Load(volumeID); ok {
return volume.(*Volume)
}
return nil
}

func (ns *NodeServer) setVolume(volumeID string, volume *Volume) {
ns.volumes.Store(volumeID, volume)
}

func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
// mount the fs here
Expand All @@ -51,24 +62,28 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
volumeMutex.Lock()
defer volumeMutex.Unlock()

volume := ns.getVolume(volumeID)
if volume == nil {
volume = NewVolume(volumeID)
}
// The volume has been publish.
if _, ok := ns.volumes.Load(volumeID); ok {
glog.Infof("volume %s has been already published", volumeID)
return &csi.NodePublishVolumeResponse{}, nil
for _, volumePath := range volume.volumePaths {
if volumePath.path == targetPath {
glog.Infof("volume %s has been already published", volumeID)
return &csi.NodePublishVolumeResponse{}, nil
}
}

volContext := req.GetVolumeContext()
readOnly := isVolumeReadOnly(req)

mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext)
if err != nil {
// node publish is unsuccessfull
volumePath := &VolumePath{path: targetPath, volumeId: volumeID}
if mounter, err := newMounter(volumeID, isVolumeReadOnly(req), ns.Driver, req.GetVolumeContext()); err != nil {
ns.removeVolumeMutex(volumeID)
return nil, err
} else {
volumePath.mounter = mounter
volume.volumePaths = append(volume.volumePaths, volumePath)
}

volume := NewVolume(volumeID, mounter)
if err := volume.Publish(targetPath); err != nil {
if err := volume.Publish(volumePath); err != nil {
// node publish is unsuccessfull
ns.removeVolumeMutex(volumeID)

Expand All @@ -89,7 +104,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, err
}

ns.volumes.Store(volumeID, volume)
ns.setVolume(volumeID, volume)
glog.Infof("volume %s successfully publish to %s", volumeID, targetPath)

return &csi.NodePublishVolumeResponse{}, nil
Expand All @@ -113,18 +128,19 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
volumeMutex.Lock()
defer volumeMutex.Unlock()

volume, ok := ns.volumes.Load(volumeID)
if !ok {
if volume := ns.getVolume(volumeID); volume != nil {
if err := volume.Unpublish(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else {
if len(volume.volumePaths) == 0 {
ns.volumes.Delete(volumeID)
}
}
} else {
glog.Warningf("volume %s hasn't been published", volumeID)

// make sure there is no any garbage
_ = mount.CleanupMountPoint(targetPath, mountutil, true)
} else {
if err := volume.(*Volume).Unpublish(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else {
ns.volumes.Delete(volumeID)
}
}

// remove mutex on successfull unpublish
Expand Down Expand Up @@ -180,8 +196,8 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
volumeMutex.Lock()
defer volumeMutex.Unlock()

if volume, ok := ns.volumes.Load(volumeID); ok {
if err := volume.(*Volume).Quota(requiredBytes); err != nil {
if volume := ns.getVolume(volumeID); volume != nil {
if err := volume.Quota(requiredBytes); err != nil {
return nil, err
}
}
Expand All @@ -192,11 +208,11 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
func (ns *NodeServer) NodeCleanup() {
ns.volumes.Range(func(_, vol any) bool {
v := vol.(*Volume)
if len(v.TargetPath) > 0 {
glog.Infof("cleaning up volume %s at %s", v.VolumeId, v.TargetPath)
err := v.Unpublish(v.TargetPath)
for _, volumePath := range v.volumePaths {
glog.Infof("cleaning up volume %s at %s", v.VolumeId, volumePath.path)
err := v.Unpublish(volumePath.path)
if err != nil {
glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, v.TargetPath, err)
glog.Warningf("error cleaning up volume %s at %s, err: %v", v.VolumeId, volumePath.path, err)
}
}
return true
Expand Down
73 changes: 33 additions & 40 deletions pkg/driver/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,48 @@ package driver
import (
"context"
"fmt"
"os"

"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/mount-utils"
)

type Volume struct {
VolumeId string
TargetPath string

type VolumePath struct {
path string
mounter Mounter
unmounter Unmounter
volumeId string
}

type Volume struct {
VolumeId string

// unix socket used to manage volume
localSocket string
volumePaths []*VolumePath
}

func NewVolume(volumeID string, mounter Mounter) *Volume {
func NewVolume(volumeID string) *Volume {
return &Volume{
VolumeId: volumeID,
mounter: mounter,
localSocket: GetLocalSocket(volumeID),
}
}

func (vol *Volume) Publish(targetPath string) error {
func (vol *Volume) Publish(volumePath *VolumePath) error {
// check whether it can be mounted
if isMnt, err := checkMount(targetPath); err != nil {
if isMnt, err := checkMount(volumePath.path); err != nil {
return err
} else if isMnt {
// try to unmount before mounting again
_ = mountutil.Unmount(targetPath)
_ = mountutil.Unmount(volumePath.path)
}

if u, err := vol.mounter.Mount(targetPath); err == nil {
if vol.TargetPath != "" {
if vol.TargetPath == targetPath {
glog.Warningf("target path is already set to %s for volume %s", vol.TargetPath, vol.VolumeId)
} else {
glog.Warningf("target path is already set to %s and differs from %s for volume %s", vol.TargetPath, targetPath, vol.VolumeId)
}
}
vol.TargetPath = targetPath
vol.unmounter = u
if unmounter, err := volumePath.mounter.Mount(volumePath.path); err == nil {
volumePath.unmounter = unmounter
vol.volumePaths = append(vol.volumePaths, volumePath)
return nil
} else {
return err
Expand All @@ -74,25 +70,22 @@ func (vol *Volume) Quota(sizeByte int64) error {

func (vol *Volume) Unpublish(targetPath string) error {
glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, targetPath)

if vol.unmounter == nil {
glog.Errorf("volume is not mounted: %s, path: %s", vol.VolumeId, targetPath)
return nil
}

if targetPath != vol.TargetPath {
glog.Warningf("staging path %s differs for volume %s at %s", targetPath, vol.VolumeId, vol.TargetPath)
}

if err := vol.unmounter.Unmount(); err != nil {
glog.Errorf("error unmounting volume during unstage: %s, err: %v", targetPath, err)
return err
}

if err := os.Remove(targetPath); err != nil && !os.IsNotExist(err) {
glog.Errorf("error removing staging path for volume %s at %s, err: %v", vol.VolumeId, targetPath, err)
return err
for index, volumePath := range vol.volumePaths {
if volumePath.path == targetPath {
vol.volumePaths = append(vol.volumePaths[:index], vol.volumePaths[index+1:]...)
if volumePath.unmounter != nil {
err := volumePath.unmounter.Unmount()
if err != nil {
glog.Errorf("error unmounting volume during unstage: %s, err: %v", targetPath, err)
} else { // unmount success
return nil
}
} else {
glog.Errorf("volume %s is no mounter, path: %s", vol.VolumeId, targetPath)
}
break
}
}

return nil
glog.Warningf("volume %s cannot use unmounter, use default cleanup mount point %s", targetPath, vol.VolumeId)
return mount.CleanupMountPoint(targetPath, mountutil, true)
}
Loading