Skip to content

Commit

Permalink
feat(CSI-308): implement SINGLE_NODE_* capability checking on NodePub…
Browse files Browse the repository at this point in the history
…lishVolume
  • Loading branch information
sergeyberezansky committed Nov 26, 2024
1 parent 5d9fe7d commit a5f88c2
Showing 1 changed file with 105 additions and 13 deletions.
118 changes: 105 additions & 13 deletions pkg/wekafs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/google/uuid"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/wekafs/csi-wekafs/pkg/wekafs/db"
"go.opentelemetry.io/otel"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -56,6 +57,7 @@ type NodeServer struct {
config *DriverConfig
semaphores map[string]*semaphore.Weighted
bootId string
database db.Database
sync.Mutex
}

Expand Down Expand Up @@ -187,7 +189,11 @@ func getVolumeStats(volumePath string) (volumeStats *VolumeStats, err error) {
}

func NewNodeServer(nodeId string, maxVolumesPerNode int64, api *ApiStore, mounter AnyMounter, config *DriverConfig) *NodeServer {
//goland:noinspection GoBoolExpressions
d, err := db.GetDatabase(context.Background())
if err != nil {
panic(err)
}

return &NodeServer{
caps: getNodeServiceCapabilities(
[]csi.NodeServiceCapability_RPC_Type{
Expand All @@ -202,6 +208,7 @@ func NewNodeServer(nodeId string, maxVolumesPerNode int64, api *ApiStore, mounte
api: api,
config: config,
semaphores: make(map[string]*semaphore.Weighted),
database: d,
}
}

Expand Down Expand Up @@ -318,6 +325,26 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis

fsType := req.GetVolumeCapability().GetMount().GetFsType()

// check if the volume can be published due to another existing publishes and access type
if response, err := ns.ensurePublishingIsAllowed(ctx, req); err != nil {
return response, err
}

// set the lock for the volume
if err := ns.SetPvcLock(ctx, volumeID, targetPath, req.GetPublishContext()[deviceID], req.GetVolumeCapability().GetAccessMode().GetMode().String()); err != nil {
return NodePublishVolumeError(ctx, codes.Internal, err.Error())
}

// if from some reason the volume fails to publish, we need to release the lock in the end
defer func() {
if result != "SUCCESS" {

if err := ns.ReleasePvcLock(ctx, volumeID, targetPath); err != nil {
logger.Error().Err(err).Msg("Failed to release PVC lock")
}
}
}()

deviceId := ""
if req.GetPublishContext() != nil {
deviceId = req.GetPublishContext()[deviceID]
Expand Down Expand Up @@ -403,6 +430,81 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return &csi.NodePublishVolumeResponse{}, nil
}

func (ns *NodeServer) ensurePublishingIsAllowed(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
logger := log.Ctx(ctx)
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()

// register the published volume in DB if it's not already there, and check for publish conflicts if otherwise
accessMode := req.GetVolumeCapability().GetAccessMode().GetMode()
logger.Debug().Str("access_mode", accessMode.String()).Str("volume_id", volumeID).Msg("Checking access mode")
node, ok := req.GetVolumeContext()["node"]
if !ok {
return NodePublishVolumeError(ctx, codes.InvalidArgument, "Node name missing in request")
}

locksForVolume, err := ns.getExistingPvcAttachments(ctx, &volumeID, nil)
if err != nil {
logger.Error().Err(err).Msg("Failed to get existing PVC locks")
return NodePublishVolumeError(ctx, codes.Internal, err.Error())
}

if len(*locksForVolume) == 0 {
logger.Debug().Str("volume_id", volumeID).Msg("No existing locks found for volume")
err := ns.SetPvcLock(ctx, volumeID, targetPath, node, accessMode.String())
if err != nil {
logger.Error().Msg("Failed to set PVC lock")
return NodePublishVolumeError(ctx, codes.Internal, err.Error())
}
}

for _, lock := range *locksForVolume {
switch {

// idempotent request, all params are same
case lock.Matches(volumeID, targetPath, node, accessMode.String(), ns.GetNodeBootId(ctx)):
logger.Debug().Str("volume_id", volumeID).Msg("Existing lock found, assuming repeated request")
// no need to update the lock, as it's the same request
continue

// same volume mapping, but has discrepancies in other fields
case lock.MatchesVolumeId(volumeID) && lock.MatchesTargetPath(targetPath):
if lock.MatchesAccessType(accessMode.String()) {
// same pod and node, same volume id, same access type
logger.Debug().Str("volume_id", volumeID).Msg("Existing lock found, assuming repeated request")
} else {
// same pod and node, but access type differs
// COs SHOULD NOT call NodePublishVolume a second time with a different volume_capability.
// If this happens, the Plugin SHOULD return FAILED_PRECONDITION.
logger.Debug().Str("volume_id", volumeID).Str("access_mode", accessMode.String()).Str("existing_access_mode", lock.AccessType).Msg("Existing lock found with a different access type, rejecting publish request")
return NodePublishVolumeError(ctx, codes.FailedPrecondition, "Volume already published with a different access type")
}

if !lock.MatchesBootId(ns.GetNodeBootId(ctx)) {
// same pod and node, same volume id, same access type, but boot id differs
// this is a stale record, just remove it
logger.Debug().Str("volume_id", volumeID).Msg("Existing lock found with a different boot id, assuming stale lock and deleting")
err := ns.database.DeleteAttachment(ctx, &lock)
if err != nil {
logger.Error().Msg("Failed to remove stale PVC lock")
return NodePublishVolumeError(ctx, codes.Internal, err.Error())
}
}

// different volume mapping, check other fields
case lock.MatchesVolumeId(volumeID) && !lock.MatchesTargetPath(targetPath): //
if accessMode == csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER {
logger.Error().Str("volume_id", volumeID).Str("target_path", lock.TargetPath).Str("published_capability", accessMode.String()).Msg("Volume is already published under different target, cannot publish with single capability")
return NodePublishVolumeError(ctx, codes.FailedPrecondition, "Volume already published with a different target path")
}
if lock.IsSingleWriter() {
logger.Error().Str("volume_id", volumeID).Str("target_path", lock.TargetPath).Str("published_capability", lock.AccessType).Msg("Volume is already published under different target with single capability, cannot publish")
} // same volume id, but different target path, reject if one of the attachments is single
}
}
return nil, nil
}

func NodeUnpublishVolumeError(ctx context.Context, errorCode codes.Code, errorMessage string) (*csi.NodeUnpublishVolumeResponse, error) {
err := status.Error(errorCode, strings.ToLower(errorMessage))
log.Ctx(ctx).Err(err).CallerSkipFrame(1).Msg("Error unpublishing volume")
Expand Down Expand Up @@ -556,11 +658,6 @@ func (ns *NodeServer) getExistingPvcAttachments(ctx context.Context, volumeId, t

func (ns *NodeServer) SetPvcLock(ctx context.Context, volumeId, targetPath, node, accessType string) error {
logger := log.Ctx(ctx)
database, err := db.GetDatabase(ctx)
if err != nil {
return err
}

attachment := &db.PvcAttachment{
VolumeId: volumeId,
Node: node,
Expand All @@ -569,7 +666,7 @@ func (ns *NodeServer) SetPvcLock(ctx context.Context, volumeId, targetPath, node
TargetPath: targetPath,
}

if err := database.CreateOrUpdateAttachment(ctx, attachment); err != nil {
if err := ns.database.CreateOrUpdateAttachment(ctx, attachment); err != nil {
logger.Error().Err(err).Str("volume_id", volumeId).Str("target_path", targetPath).Str("node", node).Str("access_type", accessType).Msg("Failed to create or update attachment")
return err
}
Expand All @@ -578,15 +675,10 @@ func (ns *NodeServer) SetPvcLock(ctx context.Context, volumeId, targetPath, node

func (ns *NodeServer) ReleasePvcLock(ctx context.Context, volumeId, targetPath string) error {
logger := log.Ctx(ctx)
database, err := db.GetDatabase(ctx)
if err != nil {
return err
}

node := ns.getNodeId()
bootId := ns.GetNodeBootId(ctx)

if err := database.DeleteAttachmentDisregardingAccessType(volumeId, targetPath, node, bootId); err != nil {
if err := ns.database.DeleteAttachmentDisregardingAccessType(volumeId, targetPath, node, bootId); err != nil {
logger.Error().Err(err).Str("volume_id", volumeId).Str("target_path", targetPath).Msg("Failed to delete attachment")
return err
}
Expand Down

0 comments on commit a5f88c2

Please sign in to comment.