Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(CSI-256): avoid multiple mounts to same filesystem on same mountpoint #331

Merged
merged 5 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/wekafs/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type AnyMounter interface {

type mountsMapPerFs map[string]AnyMount
type mountsMap map[string]mountsMapPerFs
type nfsMountsMap map[string]int // we only follow the mountPath and number of references

type UnmountFunc func()

Expand All @@ -42,4 +43,5 @@ type AnyMount interface {
getMountPoint() string
getMountOptions() MountOptions
getLastUsed() time.Time
locateMountIP() error // used only for NFS
}
90 changes: 75 additions & 15 deletions pkg/wekafs/nfsmount.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

type nfsMount struct {
mounter *nfsMounter
fsName string
mountPoint string
kMounter mount.Interface
Expand All @@ -26,7 +27,7 @@ type nfsMount struct {
}

func (m *nfsMount) getMountPoint() string {
return m.mountPoint
return fmt.Sprintf("%s-%s", m.mountPoint, m.mountIpAddress)
}

func (m *nfsMount) getRefCount() int {
Expand All @@ -46,27 +47,86 @@ func (m *nfsMount) isInDevMode() bool {
}

func (m *nfsMount) isMounted() bool {
return PathExists(m.mountPoint) && PathIsWekaMount(context.Background(), m.mountPoint)
return PathExists(m.getMountPoint()) && PathIsWekaMount(context.Background(), m.mountPoint)
}

func (m *nfsMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient) error {
if err := m.doMount(ctx, apiClient, m.mountOptions); err != nil {
return err
logger := log.Ctx(ctx)
if m.mounter == nil {
logger.Error().Msg("Mounter is nil")
return errors.New("mounter is nil")
}
m.mounter.lock.Lock()
defer m.mounter.lock.Unlock()
refCount, ok := m.mounter.mountMap[m.getMountPoint()]
if !ok {
refCount = 0
}
if refCount == 0 {
if err := m.doMount(ctx, apiClient, m.getMountOptions()); err != nil {
return err
}
} else if !m.isMounted() {
logger.Warn().Str("mount_point", m.getMountPoint()).Int("refcount", refCount).Msg("Mount not exists although should!")
if err := m.doMount(ctx, apiClient, m.getMountOptions()); err != nil {
return err
}

}
refCount++
m.mounter.mountMap[m.getMountPoint()] = refCount

logger.Trace().Int("refcount", refCount).Strs("mount_options", m.getMountOptions().Strings()).Str("filesystem_name", m.fsName).Msg("RefCount increased")
return nil
}

func (m *nfsMount) decRef(ctx context.Context) error {
if err := m.doUnmount(ctx); err != nil {
return err
logger := log.Ctx(ctx)
if m.mounter == nil {
logger.Error().Msg("Mounter is nil")
return errors.New("mounter is nil")
}
m.mounter.lock.Lock()
defer m.mounter.lock.Unlock()
refCount, ok := m.mounter.mountMap[m.getMountPoint()]
defer func() {
if refCount == 0 {
delete(m.mounter.mountMap, m.getMountPoint())
} else {
m.mounter.mountMap[m.getMountPoint()] = refCount
}
}()
if !ok {
refCount = 0
}
if refCount < 0 {
logger.Error().Int("refcount", refCount).Msg("During decRef negative refcount encountered")
refCount = 0 // to make sure that we don't have negative refcount later
}
if refCount == 1 {
if err := m.doUnmount(ctx); err != nil {
return err
}
refCount--
}
return nil
}

func (m *nfsMount) locateMountIP() error {
if m.mountIpAddress == "" {
ipAddr, err := GetMountIpFromActualMountPoint(m.mountPoint)
if err != nil {
return err
}
m.mountIpAddress = ipAddr
}
return nil
}

func (m *nfsMount) doUnmount(ctx context.Context) error {
logger := log.Ctx(ctx).With().Str("mount_point", m.mountPoint).Str("filesystem", m.fsName).Logger()
logger.Trace().Strs("mount_options", m.mountOptions.Strings()).Msg("Performing umount via k8s native mounter")
err := m.kMounter.Unmount(m.mountPoint)
logger := log.Ctx(ctx).With().Str("mount_point", m.getMountPoint()).Str("filesystem", m.fsName).Logger()
logger.Trace().Strs("mount_options", m.getMountOptions().Strings()).Msg("Performing umount via k8s native mounter")
err := m.kMounter.Unmount(m.getMountPoint())
if err != nil {
logger.Error().Err(err).Msg("Failed to unmount")
} else {
Expand All @@ -87,9 +147,9 @@ func (m *nfsMount) ensureMountIpAddress(ctx context.Context, apiClient *apiclien
}

func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient, mountOptions MountOptions) error {
logger := log.Ctx(ctx).With().Str("mount_point", m.mountPoint).Str("filesystem", m.fsName).Logger()
logger := log.Ctx(ctx).With().Str("mount_point", m.getMountPoint()).Str("filesystem", m.fsName).Logger()
var mountOptionsSensitive []string
if err := os.MkdirAll(m.mountPoint, DefaultVolumePermissions); err != nil {
if err := os.MkdirAll(m.getMountPoint(), DefaultVolumePermissions); err != nil {
return err
}
if !m.isInDevMode() {
Expand Down Expand Up @@ -117,12 +177,12 @@ func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient,

mountTarget := m.mountIpAddress + ":/" + m.fsName
logger.Trace().
Strs("mount_options", m.mountOptions.Strings()).
Strs("mount_options", m.getMountOptions().Strings()).
Str("mount_target", mountTarget).
Str("mount_ip_address", m.mountIpAddress).
Msg("Performing mount")

err = m.kMounter.MountSensitive(mountTarget, m.mountPoint, "nfs", mountOptions.Strings(), mountOptionsSensitive)
err = m.kMounter.MountSensitive(mountTarget, m.getMountPoint(), "nfs", mountOptions.Strings(), mountOptionsSensitive)
if err != nil {
if os.IsNotExist(err) {
logger.Error().Err(err).Msg("Mount target not found")
Expand All @@ -144,8 +204,8 @@ func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient,
if err := os.MkdirAll(fakePath, DefaultVolumePermissions); err != nil {
Die(fmt.Sprintf("Failed to create directory %s, while running in debug mode", fakePath))
}
logger.Trace().Strs("mount_options", m.mountOptions.Strings()).Str("debug_path", m.debugPath).Msg("Performing mount")
logger.Trace().Strs("mount_options", m.getMountOptions().Strings()).Str("debug_path", m.debugPath).Msg("Performing mount")

return m.kMounter.Mount(fakePath, m.mountPoint, "", []string{"bind"})
return m.kMounter.Mount(fakePath, m.getMountPoint(), "", []string{"bind"})
}
}
12 changes: 11 additions & 1 deletion pkg/wekafs/nfsmounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"github.com/rs/zerolog/log"
"github.com/wekafs/csi-wekafs/pkg/wekafs/apiclient"
"k8s.io/mount-utils"
"sync"
"time"
)

type nfsMounter struct {
mountMap nfsMountsMap
lock sync.Mutex
kMounter mount.Interface
debugPath string
selinuxSupport *bool
Expand All @@ -28,7 +31,7 @@ func newNfsMounter(driver *WekaFsDriver) *nfsMounter {
log.Debug().Msg("SELinux support is forced")
selinuxSupport = &[]bool{true}[0]
}
mounter := &nfsMounter{debugPath: driver.debugPath, selinuxSupport: selinuxSupport, exclusiveMountOptions: driver.config.mutuallyExclusiveOptions}
mounter := &nfsMounter{mountMap: make(nfsMountsMap), debugPath: driver.debugPath, selinuxSupport: selinuxSupport, exclusiveMountOptions: driver.config.mutuallyExclusiveOptions}
mounter.gc = initInnerPathVolumeGc(mounter)
mounter.schedulePeriodicMountGc()
mounter.interfaceGroupName = &driver.config.interfaceGroupName
Expand All @@ -43,6 +46,7 @@ func (m *nfsMounter) NewMount(fsName string, options MountOptions) AnyMount {
}
uniqueId := getStringSha1AsB32(fsName + ":" + options.String())
wMount := &nfsMount{
mounter: m,
kMounter: m.kMounter,
fsName: fsName,
debugPath: m.debugPath,
Expand Down Expand Up @@ -91,6 +95,12 @@ func (m *nfsMounter) unmountWithOptions(ctx context.Context, fsName string, opti
options = options.AsNfs()
options.Merge(options, m.exclusiveMountOptions)
mnt := m.NewMount(fsName, options)
// since we are not aware of the IP address of the mount, we need to find the mount point by listing the mounts
err := mnt.locateMountIP()
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("Failed to locate mount IP")
return err
}

log.Ctx(ctx).Trace().Strs("mount_options", opts.Strings()).Str("filesystem", fsName).Msg("Received an unmount request")
return mnt.decRef(ctx)
Expand Down
18 changes: 18 additions & 0 deletions pkg/wekafs/utilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,24 @@ func PathIsWekaMount(ctx context.Context, path string) bool {
return false
}

func GetMountIpFromActualMountPoint(mountPointBase string) (string, error) {
file, err := os.Open("/proc/mounts")
if err != nil {
return "", errors.New("failed to open /proc/mounts")
}
defer func() { _ = file.Close() }()
var actualMountPoint string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
fields := strings.Fields(scanner.Text())
if len(fields) >= 3 && strings.HasPrefix(fields[1], fmt.Sprintf("%s-", mountPointBase)) {
actualMountPoint = fields[1]
return strings.TrimLeft(actualMountPoint, mountPointBase+"-"), nil
}
}
return "", errors.New("mount point not found")
}

func validateVolumeId(volumeId string) error {
// Volume New format:
// VolID format is as following:
Expand Down
36 changes: 20 additions & 16 deletions pkg/wekafs/wekafsmount.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,30 +45,30 @@ func (m *wekafsMount) isInDevMode() bool {
}

func (m *wekafsMount) isMounted() bool {
return PathExists(m.mountPoint) && PathIsWekaMount(context.Background(), m.mountPoint)
return PathExists(m.getMountPoint()) && PathIsWekaMount(context.Background(), m.getMountPoint())
}

func (m *wekafsMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient) error {
logger := log.Ctx(ctx)
m.lock.Lock()
defer m.lock.Unlock()
if m.refCount < 0 {
logger.Error().Str("mount_point", m.mountPoint).Int("refcount", m.refCount).Msg("During incRef negative refcount encountered")
logger.Error().Str("mount_point", m.getMountPoint()).Int("refcount", m.refCount).Msg("During incRef negative refcount encountered")
m.refCount = 0 // to make sure that we don't have negative refcount later
}
if m.refCount == 0 {
if err := m.doMount(ctx, apiClient, m.mountOptions); err != nil {
if err := m.doMount(ctx, apiClient, m.getMountOptions()); err != nil {
return err
}
} else if !m.isMounted() {
logger.Warn().Str("mount_point", m.mountPoint).Int("refcount", m.refCount).Msg("Mount not exists although should!")
if err := m.doMount(ctx, apiClient, m.mountOptions); err != nil {
logger.Warn().Str("mount_point", m.getMountPoint()).Int("refcount", m.refCount).Msg("Mount not exists although should!")
if err := m.doMount(ctx, apiClient, m.getMountOptions()); err != nil {
return err
}

}
m.refCount++
logger.Trace().Int("refcount", m.refCount).Strs("mount_options", m.mountOptions.Strings()).Str("filesystem_name", m.fsName).Msg("RefCount increased")
logger.Trace().Int("refcount", m.refCount).Strs("mount_options", m.getMountOptions().Strings()).Str("filesystem_name", m.fsName).Msg("RefCount increased")
return nil
}

Expand All @@ -78,7 +78,7 @@ func (m *wekafsMount) decRef(ctx context.Context) error {
defer m.lock.Unlock()
m.refCount--
m.lastUsed = time.Now()
logger.Trace().Int("refcount", m.refCount).Strs("mount_options", m.mountOptions.Strings()).Str("filesystem_name", m.fsName).Msg("RefCount decreased")
logger.Trace().Int("refcount", m.refCount).Strs("mount_options", m.getMountOptions().Strings()).Str("filesystem_name", m.fsName).Msg("RefCount decreased")
if m.refCount < 0 {
logger.Error().Int("refcount", m.refCount).Msg("During decRef negative refcount encountered")
m.refCount = 0 // to make sure that we don't have negative refcount later
Expand All @@ -92,9 +92,9 @@ func (m *wekafsMount) decRef(ctx context.Context) error {
}

func (m *wekafsMount) doUnmount(ctx context.Context) error {
logger := log.Ctx(ctx).With().Str("mount_point", m.mountPoint).Str("filesystem", m.fsName).Logger()
logger.Trace().Strs("mount_options", m.mountOptions.Strings()).Msg("Performing umount via k8s native mounter")
err := m.kMounter.Unmount(m.mountPoint)
logger := log.Ctx(ctx).With().Str("mount_point", m.getMountPoint()).Str("filesystem", m.fsName).Logger()
logger.Trace().Strs("mount_options", m.getMountOptions().Strings()).Msg("Performing umount via k8s native mounter")
err := m.kMounter.Unmount(m.getMountPoint())
if err != nil {
logger.Error().Err(err).Msg("Failed to unmount")
} else {
Expand All @@ -104,11 +104,11 @@ func (m *wekafsMount) doUnmount(ctx context.Context) error {
}

func (m *wekafsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient, mountOptions MountOptions) error {
logger := log.Ctx(ctx).With().Str("mount_point", m.mountPoint).Str("filesystem", m.fsName).Logger()
logger := log.Ctx(ctx).With().Str("mount_point", m.getMountPoint()).Str("filesystem", m.fsName).Logger()
mountToken := ""
var mountOptionsSensitive []string
var localContainerName string
if err := os.MkdirAll(m.mountPoint, DefaultVolumePermissions); err != nil {
if err := os.MkdirAll(m.getMountPoint(), DefaultVolumePermissions); err != nil {
return err
}
if !m.isInDevMode() {
Expand Down Expand Up @@ -166,16 +166,20 @@ func (m *wekafsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClien
}
}

logger.Trace().Strs("mount_options", m.mountOptions.Strings()).
logger.Trace().Strs("mount_options", m.getMountOptions().Strings()).
Fields(mountOptions).Msg("Performing mount")
return m.kMounter.MountSensitive(m.fsName, m.mountPoint, "wekafs", mountOptions.Strings(), mountOptionsSensitive)
return m.kMounter.MountSensitive(m.fsName, m.getMountPoint(), "wekafs", mountOptions.Strings(), mountOptionsSensitive)
} else {
fakePath := filepath.Join(m.debugPath, m.fsName)
if err := os.MkdirAll(fakePath, DefaultVolumePermissions); err != nil {
Die(fmt.Sprintf("Failed to create directory %s, while running in debug mode", fakePath))
}
logger.Trace().Strs("mount_options", m.mountOptions.Strings()).Str("debug_path", m.debugPath).Msg("Performing mount")
logger.Trace().Strs("mount_options", m.getMountOptions().Strings()).Str("debug_path", m.debugPath).Msg("Performing mount")

return m.kMounter.Mount(fakePath, m.mountPoint, "", []string{"bind"})
return m.kMounter.Mount(fakePath, m.getMountPoint(), "", []string{"bind"})
}
}

func (m *wekafsMount) locateMountIP() error {
return nil
}