diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index b6c400ac..e1ef8fdf 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -63,6 +63,8 @@ type controller struct { k8sNodeInformer cache.SharedIndexInformer zfsNodeInformer cache.SharedIndexInformer + + volumeLock *volumeLock } // NewController returns a new instance @@ -71,6 +73,7 @@ func NewController(d *CSIDriver) csi.ControllerServer { ctrl := &controller{ driver: d, capabilities: newControllerCapabilities(), + volumeLock: newVolumeLock(), } if err := ctrl.init(); err != nil { klog.Fatalf("init controller: %v", err) @@ -450,6 +453,9 @@ func (cs *controller) CreateVolume( contentSource := req.GetVolumeContentSource() pvcName := helpers.GetInsensitiveParameter(¶meters, "csi.storage.k8s.io/pvc/name") + unlock := cs.volumeLock.LockVolume(volName) + defer unlock() + if contentSource != nil && contentSource.GetSnapshot() != nil { snapshotID := contentSource.GetSnapshot().GetSnapshotId() @@ -493,6 +499,8 @@ func (cs *controller) DeleteVolume( } volumeID := strings.ToLower(req.GetVolumeId()) + unlock := cs.volumeLock.LockVolume(volumeID) + defer unlock() // verify if the volume has already been deleted vol, err := zfs.GetVolume(volumeID) @@ -611,6 +619,8 @@ func (cs *controller) ControllerExpandVolume( "ControllerExpandVolume: no volumeID provided", ) } + unlock := cs.volumeLock.LockVolume(volumeID) + defer unlock() /* round off the new size */ updatedSize := getRoundedCapacity(req.GetCapacityRange().GetRequiredBytes()) @@ -707,6 +717,8 @@ func (cs *controller) CreateSnapshot( if err != nil { return nil, err } + unlock := cs.volumeLock.LockVolumeWithSnapshot(volumeID, snapName) + defer unlock() snapTimeStamp := time.Now().Unix() var state string @@ -803,6 +815,8 @@ func (cs *controller) DeleteSnapshot( // should succeed when an invalid snapshot id is used return &csi.DeleteSnapshotResponse{}, nil } + unlock := cs.volumeLock.LockVolumeWithSnapshot(snapshotID[0], snapshotID[1]) + defer unlock() if err := zfs.DeleteSnapshot(snapshotID[1]); err != nil { return nil, status.Errorf( codes.Internal, diff --git a/pkg/driver/volume_lock.go b/pkg/driver/volume_lock.go new file mode 100644 index 00000000..c3b140a8 --- /dev/null +++ b/pkg/driver/volume_lock.go @@ -0,0 +1,49 @@ +package driver + +import ( + "sync" +) + +type volumeLock struct { + cond sync.Cond + locked map[string]struct{} +} + +func newVolumeLock() *volumeLock { + return &volumeLock{ + cond: *sync.NewCond(&sync.Mutex{}), + locked: map[string]struct{}{}, + } +} + +func (l *volumeLock) LockVolume(volume string) func() { + l.cond.L.Lock() + defer l.cond.L.Unlock() + + for { + if _, locked := l.locked[volume]; !locked { + break + } + + l.cond.Wait() + } + + l.locked[volume] = struct{}{} + + return func() { + l.cond.L.Lock() + defer l.cond.L.Unlock() + + delete(l.locked, volume) + l.cond.Broadcast() + } +} + +func (l *volumeLock) LockVolumeWithSnapshot(volume string, snapshot string) func() { + unlockVol := l.LockVolume(volume) + unlockSnap := l.LockVolume(snapshot) + return func() { + unlockVol() + unlockSnap() + } +}