Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sync.Map to replace map and avoid concurrent map read and map write #17

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 42 additions & 17 deletions pkg/hostpath/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Errorf(codes.OutOfRange, "Requested capacity %d exceeds maximum allowed %d", capacity, maxStorageCapacity)
}
volumeID := uuid.NewUUID().String()
hostPathVolumes.Store(volumeID, nil)
defer func() {
if volumeInf, ok := hostPathVolumes.Load(volumeID); ok && volumeInf == nil {
hostPathVolumes.Delete(volumeID)
}
}()
path := provisionRoot + volumeID
err := os.MkdirAll(path, 0777)
if err != nil {
Expand All @@ -105,10 +111,11 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
contentSource := req.GetVolumeContentSource()
if contentSource.GetSnapshot() != nil {
snapshotId := contentSource.GetSnapshot().GetSnapshotId()
snapshot, ok := hostPathVolumeSnapshots[snapshotId]
snapshotInf, ok := hostPathVolumeSnapshots.Load(snapshotId)
if !ok {
return nil, status.Errorf(codes.NotFound, "cannot find snapshot %v", snapshotId)
}
snapshot := snapshotInf.(hostPathSnapshot)
if snapshot.ReadyToUse != true {
return nil, status.Errorf(codes.Internal, "Snapshot %v is not yet ready to use.", snapshotId)
}
Expand All @@ -127,7 +134,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
hostPathVol.VolID = volumeID
hostPathVol.VolSize = capacity
hostPathVol.VolPath = path
hostPathVolumes[volumeID] = hostPathVol
hostPathVolumes.Store(volumeID, hostPathVol)
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volumeID,
Expand All @@ -152,7 +159,7 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
glog.V(4).Infof("deleting volume %s", volumeID)
path := provisionRoot + volumeID
os.RemoveAll(path)
delete(hostPathVolumes, volumeID)
hostPathVolumes.Delete(volumeID)
return &csi.DeleteVolumeResponse{}, nil
}

Expand Down Expand Up @@ -247,12 +254,20 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
}

volumeID := req.GetSourceVolumeId()
hostPathVolume, ok := hostPathVolumes[volumeID]
hostPathVolumeInf, ok := hostPathVolumes.Load(volumeID)
if !ok {
return nil, status.Error(codes.Internal, "volumeID is not exist")
}

hostPathVolume := hostPathVolumeInf.(hostPathVolume)
snapshotID := uuid.NewUUID().String()

hostPathVolumeSnapshots.Store(snapshotID, nil)
defer func() {
if snapShotInf, ok := hostPathVolumeSnapshots.Load(volumeID); ok && snapShotInf == nil {
hostPathVolumeSnapshots.Delete(volumeID)
}
}()

creationTime := ptypes.TimestampNow()
volPath := hostPathVolume.VolPath
file := snapshotRoot + snapshotID + ".tgz"
Expand All @@ -273,7 +288,7 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
snapshot.SizeBytes = hostPathVolume.VolSize
snapshot.ReadyToUse = true

hostPathVolumeSnapshots[snapshotID] = snapshot
hostPathVolumeSnapshots.Store(snapshotID, snapshot)

return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
Expand All @@ -300,7 +315,7 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
glog.V(4).Infof("deleting volume %s", snapshotID)
path := snapshotRoot + snapshotID + ".tgz"
os.RemoveAll(path)
delete(hostPathVolumeSnapshots, snapshotID)
hostPathVolumeSnapshots.Delete(snapshotID)
return &csi.DeleteSnapshotResponse{}, nil
}

Expand All @@ -313,30 +328,40 @@ func (cs *controllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap
// case 1: SnapshotId is not empty, return snapshots that match the snapshot id.
if len(req.GetSnapshotId()) != 0 {
snapshotID := req.SnapshotId
if snapshot, ok := hostPathVolumeSnapshots[snapshotID]; ok {
return convertSnapshot(snapshot), nil
if snapshotInf, ok := hostPathVolumeSnapshots.Load(snapshotID); ok {
return convertSnapshot(snapshotInf.(hostPathSnapshot)), nil
}
}

// case 2: SourceVolumeId is not empty, return snapshots that match the source volume id.
if len(req.GetSourceVolumeId()) != 0 {
for _, snapshot := range hostPathVolumeSnapshots {
if snapshot.VolID == req.SourceVolumeId {
return convertSnapshot(snapshot), nil

var snapshot hostPathSnapshot
hostPathVolumeSnapshots.Range(func(key, value interface{}) bool {
if value.(hostPathSnapshot).VolID == req.SourceVolumeId {
snapshot = value.(hostPathSnapshot)
return false
} else {
return true
}
}
},
)
return convertSnapshot(snapshot), nil

}

var snapshots []csi.Snapshot
// case 3: no parameter is set, so we return all the snapshots.
sortedKeys := make([]string, 0)
for k := range hostPathVolumeSnapshots {
sortedKeys = append(sortedKeys, k)
}
hostPathVolumeSnapshots.Range(func(key, value interface{}) bool {
sortedKeys = append(sortedKeys, key.(string))
return true
})
sort.Strings(sortedKeys)

for _, key := range sortedKeys {
snap := hostPathVolumeSnapshots[key]
snapInf, _ := hostPathVolumeSnapshots.Load(key)
snap := snapInf.(hostPathSnapshot)
snapshot := csi.Snapshot{
SnapshotId: snap.Id,
SourceVolumeId: snap.VolID,
Expand Down
54 changes: 41 additions & 13 deletions pkg/hostpath/hostpath.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package hostpath

import (
"fmt"
"sync"

"github.com/golang/glog"

timestamp "github.com/golang/protobuf/ptypes/timestamp"
"github.com/golang/protobuf/ptypes/timestamp"
)

const (
Expand Down Expand Up @@ -61,16 +62,16 @@ type hostPathSnapshot struct {
ReadyToUse bool `json:"readyToUse"`
}

var hostPathVolumes map[string]hostPathVolume
var hostPathVolumeSnapshots map[string]hostPathSnapshot
var hostPathVolumes = new(sync.Map)
var hostPathVolumeSnapshots = new(sync.Map)

var (
vendorVersion = "dev"
)

func init() {
hostPathVolumes = map[string]hostPathVolume{}
hostPathVolumeSnapshots = map[string]hostPathSnapshot{}
/*hostPathVolumes = map[string]hostPathVolume{}
hostPathVolumeSnapshots = map[string]hostPathSnapshot{}*/
}

func NewHostPathDriver(driverName, nodeID, endpoint string) (*hostPath, error) {
Expand Down Expand Up @@ -110,26 +111,53 @@ func (hp *hostPath) Run() {
}

func getVolumeByID(volumeID string) (hostPathVolume, error) {
if hostPathVol, ok := hostPathVolumes[volumeID]; ok {
return hostPathVol, nil
if hostPathVol, ok := hostPathVolumes.Load(volumeID); ok {
return hostPathVol.(hostPathVolume), nil
}
return hostPathVolume{}, fmt.Errorf("volume id %s does not exit in the volumes list", volumeID)
}

func getVolumeByName(volName string) (hostPathVolume, error) {
for _, hostPathVol := range hostPathVolumes {
if hostPathVol.VolName == volName {
return hostPathVol, nil

var volume hostPathVolume

hostPathVolumes.Range(func(key, value interface{}) bool {
if value != nil {
if value.(hostPathVolume).VolName == volName {
volume = value.(hostPathVolume)
return false
} else {
return true
}
} else {
return true
}
})
if len(volume.VolName) != 0 {
return volume, nil
}

return hostPathVolume{}, fmt.Errorf("volume name %s does not exit in the volumes list", volName)
}

func getSnapshotByName(name string) (hostPathSnapshot, error) {
for _, snapshot := range hostPathVolumeSnapshots {
if snapshot.Name == name {
return snapshot, nil

var snapshot hostPathSnapshot

hostPathVolumeSnapshots.Range(func(key, value interface{}) bool {
if value != nil {
if value.(hostPathSnapshot).Name == name {
snapshot = value.(hostPathSnapshot)
return false
} else {
return true
}
} else {
return true
}
})
if len(snapshot.Name) != 0 {
return snapshot, nil
}
return hostPathSnapshot{}, fmt.Errorf("snapshot name %s does not exit in the snapshots list", name)
}