From 080c2ce146f8e4a95306e380fcaefc0ae956dfb3 Mon Sep 17 00:00:00 2001 From: Anton Myagkov Date: Wed, 16 Oct 2024 14:31:52 +0200 Subject: [PATCH] Implement (un)stage/(un)publish volume according csi spec for mount mode (#2195) --- cloud/blockstore/tests/csi_driver/test.py | 57 +++- .../tools/csi_driver/internal/driver/node.go | 319 ++++++++++++++++-- .../csi_driver/internal/driver/node_test.go | 63 ++-- .../stage-publish-unpublish-unstage-flow.md | 61 ++++ 4 files changed, 444 insertions(+), 56 deletions(-) create mode 100644 cloud/blockstore/tools/csi_driver/stage-publish-unpublish-unstage-flow.md diff --git a/cloud/blockstore/tests/csi_driver/test.py b/cloud/blockstore/tests/csi_driver/test.py index d22d92e3fc..9581d80f0c 100644 --- a/cloud/blockstore/tests/csi_driver/test.py +++ b/cloud/blockstore/tests/csi_driver/test.py @@ -465,8 +465,9 @@ def test_node_volume_expand(fs_type): cleanup_after_test(env) -def test_publish_volume_twice_on_the_same_node(): - env, run = init(vm_mode=True) +@pytest.mark.parametrize('vm_mode', [True, False]) +def test_publish_volume_twice_on_the_same_node(vm_mode): + env, run = init(vm_mode=vm_mode) try: volume_name = "example-disk" volume_size = 1024 ** 3 @@ -491,3 +492,55 @@ def test_publish_volume_twice_on_the_same_node(): with called_process_error_logged(): env.csi.delete_volume(volume_name) cleanup_after_test(env) + + +def test_restart_kubelet_with_old_format_endpoint(): + env, run = init() + try: + volume_name = "example-disk" + volume_size = 1024 ** 3 + pod_name1 = "example-pod-1" + pod_id1 = "deadbeef1" + env.csi.create_volume(name=volume_name, size=volume_size) + # skip stage to create endpoint with old format + env.csi.publish_volume(pod_id1, volume_name, pod_name1) + # run stage/publish again to simulate kubelet restart + env.csi.stage_volume(volume_name) + env.csi.publish_volume(pod_id1, volume_name, pod_name1) + except subprocess.CalledProcessError as e: + log_called_process_error(e) + raise + finally: + with called_process_error_logged(): + env.csi.unpublish_volume(pod_id1, volume_name) + with called_process_error_logged(): + env.csi.unstage_volume(volume_name) + with called_process_error_logged(): + env.csi.delete_volume(volume_name) + cleanup_after_test(env) + + +def test_restart_kubelet_with_new_format_endpoint(): + env, run = init() + try: + volume_name = "example-disk" + volume_size = 1024 ** 3 + pod_name1 = "example-pod-1" + pod_id1 = "deadbeef1" + env.csi.create_volume(name=volume_name, size=volume_size) + env.csi.stage_volume(volume_name) + env.csi.publish_volume(pod_id1, volume_name, pod_name1) + # run stage/publish again to simulate kubelet restart + env.csi.stage_volume(volume_name) + env.csi.publish_volume(pod_id1, volume_name, pod_name1) + except subprocess.CalledProcessError as e: + log_called_process_error(e) + raise + finally: + with called_process_error_logged(): + env.csi.unpublish_volume(pod_id1, volume_name) + with called_process_error_logged(): + env.csi.unstage_volume(volume_name) + with called_process_error_logged(): + env.csi.delete_volume(volume_name) + cleanup_after_test(env) diff --git a/cloud/blockstore/tools/csi_driver/internal/driver/node.go b/cloud/blockstore/tools/csi_driver/internal/driver/node.go index 5920fbbdc4..94acf1f953 100644 --- a/cloud/blockstore/tools/csi_driver/internal/driver/node.go +++ b/cloud/blockstore/tools/csi_driver/internal/driver/node.go @@ -154,37 +154,71 @@ func (s *nodeService) NodeStageVolume( "VolumeCapability is missing in NodeStageVolumeRequest") } - if s.vmMode { - nfsBackend := (req.VolumeContext[backendVolumeContextKey] == "nfs") + accessMode := req.VolumeCapability.AccessMode + if accessMode == nil { + return nil, s.statusError( + codes.InvalidArgument, + "AccessMode is missing in NodePublishVolumeRequest") + } - var err error - if instanceID := req.VolumeContext[instanceIDKey]; instanceID != "" { - stageRecordPath := filepath.Join(req.StagingTargetPath, req.VolumeId+".json") - // Backend can be empty for old disks, in this case we use NBS - backend := "nbs" - if nfsBackend { - backend = "nfs" - } - if err = s.writeStageData(stageRecordPath, &StageData{ - Backend: backend, - InstanceId: instanceID, - RealStagePath: s.getEndpointDir(instanceID, req.VolumeId), - }); err != nil { - return nil, s.statusErrorf(codes.Internal, - "Failed to write stage record: %v", err) - } + nfsBackend := (req.VolumeContext[backendVolumeContextKey] == "nfs") + if !nfsBackend && accessMode.GetMode() == + csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER { + return nil, s.statusError( + codes.InvalidArgument, + "ReadWriteMany access mode is supported only with nfs backend") + } + + var err error + switch req.VolumeCapability.GetAccessType().(type) { + case *csi.VolumeCapability_Mount: + if s.vmMode { + nfsBackend := (req.VolumeContext[backendVolumeContextKey] == "nfs") + + var err error + if instanceID := req.VolumeContext[instanceIDKey]; instanceID != "" { + stageRecordPath := filepath.Join(req.StagingTargetPath, req.VolumeId+".json") + // Backend can be empty for old disks, in this case we use NBS + backend := "nbs" + if nfsBackend { + backend = "nfs" + } + if err = s.writeStageData(stageRecordPath, &StageData{ + Backend: backend, + InstanceId: instanceID, + RealStagePath: s.getEndpointDir(instanceID, req.VolumeId), + }); err != nil { + return nil, s.statusErrorf(codes.Internal, + "Failed to write stage record: %v", err) + } + if nfsBackend { + err = s.nodeStageFileStoreAsVhostSocket(ctx, instanceID, req.VolumeId) + } else { + err = s.nodeStageDiskAsVhostSocket(ctx, instanceID, req.VolumeId, req.VolumeContext) + } + + if err != nil { + return nil, s.statusErrorf(codes.Internal, + "Failed to stage volume: %v", err) + } + } + } else { if nfsBackend { - err = s.nodeStageFileStoreAsVhostSocket(ctx, instanceID, req.VolumeId) + return nil, s.statusError(codes.InvalidArgument, + "NFS mounts are only supported in VM mode") } else { - err = s.nodeStageDiskAsVhostSocket(ctx, instanceID, req.VolumeId, req.VolumeContext) - } - - if err != nil { - return nil, s.statusErrorf(codes.Internal, - "Failed to stage volume: %v", err) + err = s.nodeStageDiskAsFilesystem(ctx, req) } } + case *csi.VolumeCapability_Block: + default: + return nil, s.statusError(codes.InvalidArgument, "Unknown access type") + } + + if err != nil { + return nil, s.statusErrorf(codes.Internal, + "Failed to stage volume: %v", err) } return &csi.NodeStageVolumeResponse{}, nil @@ -217,6 +251,12 @@ func (s *nodeService) NodeUnstageVolume( } ignoreError(os.Remove(stageRecordPath)) } + } else { + if err := s.nodeUnstageVolume(ctx, req); err != nil { + return nil, s.statusErrorf( + codes.InvalidArgument, + "Failed to unstage volume: %v", err) + } } return &csi.NodeUnstageVolumeResponse{}, nil @@ -505,7 +545,7 @@ func (s *nodeService) nodeStageDiskAsVhostSocket( return s.createDummyImgFile(endpointDir) } -func (s *nodeService) nodePublishDiskAsFilesystem( +func (s *nodeService) nodePublishDiskAsFilesystemDeprecated( ctx context.Context, req *csi.NodePublishVolumeRequest) error { @@ -572,6 +612,165 @@ func (s *nodeService) nodePublishDiskAsFilesystem( return nil } +func (s *nodeService) nodePublishDiskAsFilesystem( + ctx context.Context, + req *csi.NodePublishVolumeRequest) error { + + // Fallback to previous implementation for already mounted volumes + // Must be removed after migration of all endpoints to the new format + mounted, _ := s.mounter.IsMountPoint(req.StagingTargetPath) + if !mounted { + return s.nodePublishDiskAsFilesystemDeprecated(ctx, req) + } + + targetPerm := os.FileMode(0775) + if err := os.MkdirAll(req.TargetPath, targetPerm); err != nil { + return fmt.Errorf("failed to create target directory: %w", err) + } + + mountOptions := []string{"bind"} + mnt := req.VolumeCapability.GetMount() + if mnt != nil { + for _, flag := range mnt.MountFlags { + mountOptions = append(mountOptions, flag) + } + } + + err := s.mountIfNeeded( + req.VolumeId, + req.StagingTargetPath, + req.TargetPath, + "", + mountOptions) + if err != nil { + return err + } + + if mnt != nil && mnt.VolumeMountGroup != "" { + cmd := exec.Command("chown", "-R", ":"+mnt.VolumeMountGroup, req.TargetPath) + if out, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("failed to chown %s to %q: %w, output %q", + mnt.VolumeMountGroup, req.TargetPath, err, out) + } + } + + return nil +} + +func (s *nodeService) IsMountConflictError(err error) bool { + if err != nil { + var clientErr *nbsclient.ClientError + if errors.As(err, &clientErr) { + if clientErr.Code == nbsclient.E_MOUNT_CONFLICT { + return true + } + } + } + + return false +} + +func (s *nodeService) hasLocalEndpoint( + ctx context.Context, + volumeId string) (bool, error) { + + listEndpointsResp, err := s.nbsClient.ListEndpoints( + ctx, &nbsapi.TListEndpointsRequest{}, + ) + if err != nil { + log.Printf("List endpoints failed %v", err) + return false, err + } + + if len(listEndpointsResp.Endpoints) == 0 { + return false, nil + } + + for _, endpoint := range listEndpointsResp.Endpoints { + if endpoint.DiskId == volumeId { + return true, nil + } + } + + return false, nil +} + +func (s *nodeService) nodeStageDiskAsFilesystem( + ctx context.Context, + req *csi.NodeStageVolumeRequest) error { + + resp, err := s.startNbsEndpointForNBD(ctx, "", req.VolumeId, req.VolumeContext) + if err != nil { + if s.IsMountConflictError(err) { + localEndpoint, err := s.hasLocalEndpoint(ctx, req.VolumeId) + if err != nil { + return err + } + if localEndpoint { + return nil + } + } + return fmt.Errorf("failed to start NBS endpoint: %w", err) + } + + if resp.NbdDeviceFile == "" { + return fmt.Errorf("NbdDeviceFile shouldn't be empty") + } + + logVolume(req.VolumeId, "endpoint started with device: %q", resp.NbdDeviceFile) + + mnt := req.VolumeCapability.GetMount() + + fsType := req.VolumeContext["fsType"] + if mnt != nil && mnt.FsType != "" { + fsType = mnt.FsType + } + if fsType == "" { + fsType = "ext4" + } + + err = s.makeFilesystemIfNeeded(req.VolumeId, resp.NbdDeviceFile, fsType) + if err != nil { + return err + } + + targetPerm := os.FileMode(0775) + if err := os.MkdirAll(req.StagingTargetPath, targetPerm); err != nil { + return fmt.Errorf("failed to create staging directory: %w", err) + } + + mountOptions := []string{} + if mnt != nil { + for _, flag := range mnt.MountFlags { + mountOptions = append(mountOptions, flag) + } + } + + err = s.mountIfNeeded( + req.VolumeId, + resp.NbdDeviceFile, + req.StagingTargetPath, + fsType, + mountOptions) + if err != nil { + return err + } + + if mnt != nil && mnt.VolumeMountGroup != "" { + cmd := exec.Command("chown", "-R", ":"+mnt.VolumeMountGroup, req.StagingTargetPath) + if out, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("failed to chown %s to %q: %w, output %q", + mnt.VolumeMountGroup, req.StagingTargetPath, err, out) + } + } + + if err := os.Chmod(req.StagingTargetPath, targetPerm); err != nil { + return fmt.Errorf("failed to chmod target path: %w", err) + } + + return nil +} + func (s *nodeService) nodePublishDiskAsBlockDevice( ctx context.Context, req *csi.NodePublishVolumeRequest) error { @@ -591,11 +790,11 @@ func (s *nodeService) nodePublishDiskAsBlockDevice( func (s *nodeService) startNbsEndpointForNBD( ctx context.Context, - podId string, + instanceId string, volumeId string, volumeContext map[string]string) (*nbsapi.TStartEndpointResponse, error) { - endpointDir := s.getEndpointDir(podId, volumeId) + endpointDir := s.getEndpointDir(instanceId, volumeId) if err := os.MkdirAll(endpointDir, os.FileMode(0755)); err != nil { return nil, err } @@ -605,12 +804,17 @@ func (s *nodeService) startNbsEndpointForNBD( deviceName = volumeId } + nbsInstanceId := instanceId + if nbsInstanceId == "" { + nbsInstanceId = s.nodeID + } + hostType := nbsapi.EHostType_HOST_TYPE_DEFAULT return s.nbsClient.StartEndpoint(ctx, &nbsapi.TStartEndpointRequest{ UnixSocketPath: filepath.Join(endpointDir, nbsSocketName), DiskId: volumeId, - InstanceId: podId, - ClientId: fmt.Sprintf("%s-%s", s.clientID, podId), + InstanceId: nbsInstanceId, + ClientId: fmt.Sprintf("%s-%s", s.clientID, nbsInstanceId), DeviceName: deviceName, IpcType: nbdIpc, VhostQueuesCount: 8, @@ -697,6 +901,35 @@ func (s *nodeService) nodePublishStagedVhostSocket(req *csi.NodePublishVolumeReq return s.mountSocketDir(endpointDir, req) } +func (s *nodeService) nodeUnstageVolume( + ctx context.Context, + req *csi.NodeUnstageVolumeRequest) error { + + mounted, _ := s.mounter.IsMountPoint(req.StagingTargetPath) + if !mounted { + // Fallback to previous implementation for already mounted volumes to + // stop endpoint in nodeUnpublishVolume + // Must be removed after migration of all endpoints to the new format + return nil + } + + if err := s.mounter.CleanupMountPoint(req.StagingTargetPath); err != nil { + return err + } + + endpointDir := s.getEndpointDir("", req.VolumeId) + if s.nbsClient != nil { + _, err := s.nbsClient.StopEndpoint(ctx, &nbsapi.TStopEndpointRequest{ + UnixSocketPath: filepath.Join(endpointDir, nbsSocketName), + }) + if err != nil { + return fmt.Errorf("failed to stop nbs endpoint: %w", err) + } + } + + return nil +} + func (s *nodeService) nodeUnpublishVolume( ctx context.Context, req *csi.NodeUnpublishVolumeRequest) error { @@ -728,6 +961,9 @@ func (s *nodeService) nodeUnpublishVolume( // because the endpoint's backend service is unknown here. // When we miss we get S_FALSE/S_ALREADY code (err == nil). + // Fallback to previous implementation for already mounted volumes to + // stop endpoint in nodeUnpublishVolume. + // Must be removed after migration of all endpoints to the new format if s.nbsClient != nil { _, err := s.nbsClient.StopEndpoint(ctx, &nbsapi.TStopEndpointRequest{ UnixSocketPath: filepath.Join(endpointDir, nbsSocketName), @@ -755,8 +991,8 @@ func (s *nodeService) nodeUnpublishVolume( return nil } -func (s *nodeService) getEndpointDir(instanceOrPodId string, volumeId string) string { - return filepath.Join(s.socketsDir, instanceOrPodId, volumeId) +func (s *nodeService) getEndpointDir(instanceId string, volumeId string) string { + return filepath.Join(s.socketsDir, instanceId, volumeId) } func (s *nodeService) mountSocketDir(sourcePath string, req *csi.NodePublishVolumeRequest) error { @@ -1126,8 +1362,11 @@ func (s *nodeService) NodeExpandVolume( return nil, err } - endpointDir := s.getEndpointDir(podId, req.VolumeId) - unixSocketPath := filepath.Join(endpointDir, nbsSocketName) + endpointDirOld := s.getEndpointDir(podId, req.VolumeId) + unixSocketPathOld := filepath.Join(endpointDirOld, nbsSocketName) + + endpointDirNew := s.getEndpointDir("", req.VolumeId) + unixSocketPathNew := filepath.Join(endpointDirNew, nbsSocketName) listEndpointsResp, err := s.nbsClient.ListEndpoints( ctx, &nbsapi.TListEndpointsRequest{}, @@ -1138,9 +1377,19 @@ func (s *nodeService) NodeExpandVolume( } nbdDevicePath := "" + unixSocketPath := "" for _, endpoint := range listEndpointsResp.Endpoints { - if endpoint.UnixSocketPath == unixSocketPath { + // Fallback to previous implementation for already mounted volumes + // Must be removed after migration of all endpoints to the new format + if endpoint.UnixSocketPath == unixSocketPathOld { + nbdDevicePath = endpoint.GetNbdDeviceFile() + unixSocketPath = unixSocketPathOld + break + } + + if endpoint.UnixSocketPath == unixSocketPathNew { nbdDevicePath = endpoint.GetNbdDeviceFile() + unixSocketPath = unixSocketPathNew break } } diff --git a/cloud/blockstore/tools/csi_driver/internal/driver/node_test.go b/cloud/blockstore/tools/csi_driver/internal/driver/node_test.go index 0618a4f2b0..fe0a5be6e2 100644 --- a/cloud/blockstore/tools/csi_driver/internal/driver/node_test.go +++ b/cloud/blockstore/tools/csi_driver/internal/driver/node_test.go @@ -398,13 +398,14 @@ func TestPublishUnpublishDiskForInfrakuber(t *testing.T) { clientID := "testClientId" podID := "test-pod-id-13" diskID := "test-disk-id-42" - actualClientId := "testClientId-test-pod-id-13" + actualClientId := "testClientId-testNodeId" targetPath := filepath.Join(tempDir, "pods", podID, "volumes", diskID, "mount") targetFsPathPattern := filepath.Join(tempDir, "pods/([a-z0-9-]+)/volumes/([a-z0-9-]+)/mount") stagingTargetPath := "testStagingTargetPath" socketsDir := filepath.Join(tempDir, "sockets") - sourcePath := filepath.Join(socketsDir, podID, diskID) - socketPath := filepath.Join(socketsDir, podID, diskID, "nbs.sock") + sourcePath := filepath.Join(socketsDir, diskID) + socketPath := filepath.Join(socketsDir, diskID, "nbs.sock") + deprecatedSocketPath := filepath.Join(socketsDir, podID, diskID, "nbs.sock") nodeService := newNodeService( nodeID, @@ -431,19 +432,11 @@ func TestPublishUnpublishDiskForInfrakuber(t *testing.T) { volumeContext := map[string]string{} - _, err = nodeService.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{ - VolumeId: diskID, - StagingTargetPath: stagingTargetPath, - VolumeCapability: &volumeCapability, - VolumeContext: volumeContext, - }) - require.NoError(t, err) - hostType := nbs.EHostType_HOST_TYPE_DEFAULT nbsClient.On("StartEndpoint", ctx, &nbs.TStartEndpointRequest{ UnixSocketPath: socketPath, DiskId: diskID, - InstanceId: podID, + InstanceId: nodeID, ClientId: actualClientId, DeviceName: diskID, IpcType: ipcType, @@ -465,9 +458,24 @@ func TestPublishUnpublishDiskForInfrakuber(t *testing.T) { mounter.On("MakeFilesystem", nbdDeviceFile, "ext4").Return([]byte{}, nil) + mockCallIsMountPoint := mounter.On("IsMountPoint", stagingTargetPath).Return(false, nil) + + mounter.On("Mount", nbdDeviceFile, stagingTargetPath, "ext4", []string{}).Return(nil) + + _, err = nodeService.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{ + VolumeId: diskID, + StagingTargetPath: stagingTargetPath, + VolumeCapability: &volumeCapability, + VolumeContext: volumeContext, + }) + require.NoError(t, err) + + mockCallIsMountPoint.Unset() + + mounter.On("IsMountPoint", stagingTargetPath).Return(true, nil) mounter.On("IsMountPoint", targetPath).Return(false, nil) - mounter.On("Mount", nbdDeviceFile, targetPath, "ext4", []string{}).Return(nil) + mounter.On("Mount", stagingTargetPath, targetPath, "", []string{"bind"}).Return(nil) _, err = nodeService.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{ VolumeId: diskID, @@ -494,10 +502,10 @@ func TestPublishUnpublishDiskForInfrakuber(t *testing.T) { fields := strings.Fields(string(output)) assert.Equal(t, groupId, fields[3]) - mounter.On("CleanupMountPoint", targetPath).Return(nil) + mockCallCleanupMountPoint := mounter.On("CleanupMountPoint", targetPath).Return(nil) - nbsClient.On("StopEndpoint", ctx, &nbs.TStopEndpointRequest{ - UnixSocketPath: socketPath, + mockCallStopEndpoint := nbsClient.On("StopEndpoint", ctx, &nbs.TStopEndpointRequest{ + UnixSocketPath: deprecatedSocketPath, }).Return(&nbs.TStopEndpointResponse{}, nil) _, err = nodeService.NodeUnpublishVolume(ctx, &csi.NodeUnpublishVolumeRequest{ @@ -506,9 +514,18 @@ func TestPublishUnpublishDiskForInfrakuber(t *testing.T) { }) require.NoError(t, err) + mockCallStopEndpoint.Unset() + mockCallCleanupMountPoint.Unset() + _, err = os.Stat(filepath.Join(socketsDir, podID)) assert.True(t, os.IsNotExist(err)) + mounter.On("CleanupMountPoint", stagingTargetPath).Return(nil) + + nbsClient.On("StopEndpoint", ctx, &nbs.TStopEndpointRequest{ + UnixSocketPath: socketPath, + }).Return(&nbs.TStopEndpointResponse{}, nil) + _, err = nodeService.NodeUnstageVolume(ctx, &csi.NodeUnstageVolumeRequest{ VolumeId: diskID, StagingTargetPath: stagingTargetPath, @@ -593,7 +610,7 @@ func TestPublishUnpublishDeviceForInfrakuber(t *testing.T) { NbdDeviceFile: nbdDeviceFile, }, nil) - mounter.On("IsMountPoint", targetPath).Return(false, nil) + mockCallIsMountPoint := mounter.On("IsMountPoint", targetPath).Return(false, nil) mounter.On("Mount", nbdDeviceFile, targetPath, "", []string{"bind"}).Return(nil) @@ -633,9 +650,13 @@ func TestPublishUnpublishDeviceForInfrakuber(t *testing.T) { }) require.NoError(t, err) + mockCallIsMountPoint.Unset() + _, err = os.Stat(filepath.Join(socketsDir, podID)) assert.True(t, os.IsNotExist(err)) + mounter.On("IsMountPoint", stagingTargetPath).Return(false, nil) + _, err = nodeService.NodeUnstageVolume(ctx, &csi.NodeUnstageVolumeRequest{ VolumeId: diskID, StagingTargetPath: stagingTargetPath, @@ -792,9 +813,13 @@ func TestPublishDeviceWithReadWriteManyModeIsNotSupportedWithNBS(t *testing.T) { _, err := nodeService.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{ VolumeId: diskID, StagingTargetPath: "testStagingTargetPath", - VolumeCapability: &csi.VolumeCapability{}, + VolumeCapability: &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Block{ + Block: &csi.VolumeCapability_BlockVolume{}, + }, + }, }) - require.NoError(t, err) + require.Error(t, err) // NodePublishVolume without access mode should fail _, err = nodeService.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{ diff --git a/cloud/blockstore/tools/csi_driver/stage-publish-unpublish-unstage-flow.md b/cloud/blockstore/tools/csi_driver/stage-publish-unpublish-unstage-flow.md new file mode 100644 index 0000000000..5655ed5aea --- /dev/null +++ b/cloud/blockstore/tools/csi_driver/stage-publish-unpublish-unstage-flow.md @@ -0,0 +1,61 @@ +Current CSI Driver implementation violates CSI specification in terms of stage/publish/unstage/unpublish volumes. +At this moment StageVolume step is completely ignored and start endpoint/mounting volumes happens at PublishVolume step. +As a result CSI Driver doesn't support ReadWriteOnce access mode in the correct way and only one pod on the same node can mount the volume, +however it should be allowed to mount the same volume into multiple pods on the same node. + +According to CSI Driver specification: + +NodeStageVolume should mount the volume to the staging path +NodePublishVolume should mount the volume to the target path +NodeUnpublishVolume should unmount the volume from the target path +NodeUnstageVolume should unmount the volume from the staging path +As we already have current implementation of CSI Driver in production clusters we need to handle migration +from existing implementation of mounting volumes(only NodePublishVolune/NodeUnpublishVolume is implemented) +to the new implementation. + +The tricky part here is using different UnixSocketPath/InstanceId/ClientId +for already bounded volumes and "new" volumes. + +Current format of UnixSocketPath: socketsDir/podId/volumeId +New format of UnixSocketPath: socketsDir/volumeId + +Current format of InstanceId: podId +New format of InstanceId: nodeId + +Current format of ClientId: clientID-podId +New format of ClientId: clientID-nodeId + +Possible scenarios: + +-------- +1. Volume was staged and published +2. CSI Driver was updated +3. Volume was unpublished and unstaged <- here we should handle unpublish with old unix socket path +-------- +1. Volume was staged and published +2. CSI Driver was updated +3. Kubelet restart happened +4. CSI Driver received stage and publish for the same volume again <- here we should handle stage/publish with old unix socket path +-------- +1. CSI Driver was updated +2. Volume was staged and published +3. endpoint should start with new unix socket path +4. Volume was unpublished and unstaged +5. UnstageVolume should stop endpoint with new unix socket path +-------- +1. CSI Driver was updated +2. Volume was staged and published on the node #1 with RWO access mode +3. Staging volume on the node #2 +4. StageVolume on the node #2 should return error + + +Migration is splitted for differnt modes +VM mode: https://github.com/ydb-platform/nbs/pull/1982 +Mount mode: https://github.com/ydb-platform/nbs/pull/2195 +Block mode: https://github.com/ydb-platform/nbs/pull/2269 + +After migration of all volumes to the new endpoints we can remove backward compatibility +with old format of endpoints. + +External links/Documentation: +https://github.com/container-storage-interface/spec/blob/master/spec.md#node-service-rpc \ No newline at end of file