Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot & VolumeContentSource Support #47

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/contributing/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
- Login to `gcloud` (`gcloud auth login`)
* Google Cloud Project
- Create Test Project (`gcloud projects create [PROJECT_ID] --name=[PROJECT_NAME]`)
* Enable Storage Transfer API
- Visit https://console.developers.google.com/apis/library/storagetransfer.googleapis.com
- Enable API
- Setup Access (https://cloud.google.com/storage-transfer/docs/configure-access)
- `gcloud projects add-iam-policy-binding [PROJECT_ID] --member=serviceAccount:project-[PROJECT_NUMBER]@storage-transfer-service.iam.gserviceaccount.com --role=roles/storage.admin`
* Google Cloud Service Account
- Create (`gcloud iam service-accounts create [ACCOUNT_NAME] --display-name="Test Account" --description="Test Account for GCS CSI" --project=[PROJECT_ID]`)
- Create Key (`gcloud iam service-accounts keys create service-account.json --iam-account=[ACCOUNT_NAME]@[PROJECT_ID].iam.gserviceaccount.com --project=[PROJECT_ID]`)
- Give Storage Admin Permission (`gcloud projects add-iam-policy-binding [PROJECT_ID] --member=serviceAccount:[ACCOUNT_NAME]@[PROJECT_ID].iam.gserviceaccount.com --role=roles/storage.admin`)
- Give Transfer User Permission (`gcloud projects add-iam-policy-binding [PROJECT_ID] --member=serviceAccount:[ACCOUNT_NAME]@[PROJECT_ID].iam.gserviceaccount.com --role=roles/storagetransfer.user`)
* Create Secret
- `kubectl create secret generic csi-gcs-secret --from-file=key=service-account.json`
* Pull Needed Images
Expand Down Expand Up @@ -94,6 +100,18 @@ ControllerValidateVolumeCapabilitiesSecret:
projectId: [Google Cloud Project ID]
key: |
[Storage Admin Key JSON]
ControllerExpandVolumeSecret:
projectId: [Google Cloud Project ID]
key: |
[Storage Admin Key JSON]
CreateSnapshotSecret:
projectId: [Google Cloud Project ID]
key: |
[Storage Admin Key JSON]
DeleteSnapshotSecret:
projectId: [Google Cloud Project ID]
key: |
[Storage Admin Key JSON]
```

## Develop inside Docker
Expand Down
1 change: 1 addition & 0 deletions examples/dynamic/sc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ parameters:
csi.storage.k8s.io/provisioner-secret-namespace: default
csi.storage.k8s.io/controller-expand-secret-name: csi-gcs-secret-creator
csi.storage.k8s.io/controller-expand-secret-namespace: default
gcs.csi.ofek.dev/project-id: joshmartin-csi-gcs-test
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.13
require (
cloud.google.com/go v0.38.0
github.com/container-storage-interface/spec v1.2.0
github.com/golang/protobuf v1.3.2
github.com/kubernetes-csi/csi-lib-utils v0.7.0
github.com/kubernetes-csi/csi-test v2.2.0+incompatible // indirect
github.com/kubernetes-csi/csi-test/v3 v3.1.0
Expand Down
250 changes: 216 additions & 34 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package driver
import (
"context"
"fmt"
"strconv"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/protobuf/ptypes"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"github.com/ofek/csi-gcs/pkg/flags"
"github.com/ofek/csi-gcs/pkg/util"
Expand Down Expand Up @@ -77,6 +79,8 @@ func (d *GCSDriver) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque
}
defer util.CleanupKey(keyFile, KeyStoragePath)

newCapacity := int64(req.GetCapacityRange().GetRequiredBytes())

// Creates a client.
client, err := storage.NewClient(ctx, option.WithCredentialsFile(keyFile))
if err != nil {
Expand All @@ -86,43 +90,84 @@ func (d *GCSDriver) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque
// Creates a Bucket instance.
bucket := client.Bucket(options[flags.FLAG_BUCKET])

projectId, projectIdExists := options[flags.FLAG_PROJECT_ID]
if !projectIdExists {
return nil, status.Errorf(codes.InvalidArgument, "Project Id not provided, snapshot can't be restored: %s", options[flags.FLAG_BUCKET])
}

// Check if Bucket Exists
_, err = bucket.Attrs(ctx)
bucketAttrs, err := bucket.Attrs(ctx)
if err == nil {
klog.V(2).Infof("Bucket '%s' exists", options[flags.FLAG_BUCKET])
} else {
klog.V(2).Infof("Bucket '%s' does not exist, creating", options[flags.FLAG_BUCKET])

projectId, projectIdExists := options[flags.FLAG_PROJECT_ID]
if !projectIdExists {
return nil, status.Errorf(codes.InvalidArgument, "Project Id not provided, bucket can't be created: %s", options[flags.FLAG_BUCKET])
if !util.BucketIs(bucketAttrs, "volume") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Volume")
}

existingCapacity, err := util.BucketCapacity(bucketAttrs)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to get bucket capacity: %v", err)
}

// Check / Set Capacity
if existingCapacity == 0 {
_, err = util.SetBucketCapacity(ctx, bucket, newCapacity)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to set bucket capacity: %v", err)
}
} else if existingCapacity < newCapacity {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", options[flags.FLAG_BUCKET]))
}
} else {
klog.V(2).Infof("Bucket '%s' does not exist, creating", options[flags.FLAG_BUCKET])

if err := bucket.Create(ctx, projectId, &storage.BucketAttrs{Location: options[flags.FLAG_LOCATION]}); err != nil {
if err := bucket.Create(ctx, projectId, &storage.BucketAttrs{Location: options[flags.FLAG_LOCATION], Labels: map[string]string{
"capacity": strconv.FormatInt(newCapacity, 10),
"type": "volume",
}}); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to create bucket: %v", err)
}
}

// Get Capacity
bucketAttrs, err := bucket.Attrs(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to get bucket attrs: %v", err)
}
if snapshot := req.GetVolumeContentSource().GetSnapshot(); snapshot != nil {
// Creates a Bucket instance.
snapshotBucket := client.Bucket(snapshot.SnapshotId)

existingCapacity, err := util.BucketCapacity(bucketAttrs)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to get bucket capacity: %v", err)
}
// Check if Bucket Exists
snapshotBucketAttrs, err := snapshotBucket.Attrs(ctx)
if err == nil {

// Check / Set Capacity
newCapacity := int64(req.GetCapacityRange().GetRequiredBytes())
if existingCapacity == 0 {
_, err = util.SetBucketCapacity(ctx, bucket, newCapacity)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to set bucket capacity: %v", err)
if !util.BucketIs(snapshotBucketAttrs, "snapshot") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Snapshot")
}

err = util.CopyBucketContent(ctx, keyFile, projectId, snapshot.SnapshotId, options[flags.FLAG_BUCKET])
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to restore snapshot: %v", err)
}
} else {
return nil, status.Errorf(codes.NotFound, "Snapshot does not exist")
}
}
if cloneVolume := req.GetVolumeContentSource().GetVolume(); cloneVolume != nil {
// Creates a Bucket instance.
cloneVolumeBucket := client.Bucket(cloneVolume.VolumeId)

// Check if Bucket Exists
cloneVolumeBucketAttrs, err := cloneVolumeBucket.Attrs(ctx)
if err == nil {

if !util.BucketIs(cloneVolumeBucketAttrs, "volume") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Volume")
}

err = util.CopyBucketContent(ctx, keyFile, projectId, cloneVolume.VolumeId, options[flags.FLAG_BUCKET])
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to restore snapshot: %v", err)
}
} else {
return nil, status.Errorf(codes.NotFound, "Volume does not exist")
}
} else if existingCapacity < newCapacity {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", options[flags.FLAG_BUCKET]))
}

return &csi.CreateVolumeResponse{
Expand Down Expand Up @@ -156,8 +201,12 @@ func (d *GCSDriver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeReque
// Creates a Bucket instance.
bucket := client.Bucket(req.VolumeId)

_, err = bucket.Attrs(ctx)
bucketAttrs, err := bucket.Attrs(ctx)
if err == nil {
if !util.BucketIs(bucketAttrs, "volume") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Volume")
}

if err := bucket.Delete(ctx); err != nil {
return nil, status.Errorf(codes.Internal, "Error deleting bucket %s, %v", req.VolumeId, err)
}
Expand Down Expand Up @@ -187,6 +236,20 @@ func (d *GCSDriver) ControllerGetCapabilities(ctx context.Context, req *csi.Cont
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
},
},
},
},
}, nil
}
Expand Down Expand Up @@ -218,12 +281,16 @@ func (d *GCSDriver) ValidateVolumeCapabilities(ctx context.Context, req *csi.Val
// Creates a Bucket instance.
bucket := client.Bucket(bucketName)

_, err = bucket.Attrs(ctx)
bucketAttrs, err := bucket.Attrs(ctx)

if err != nil {
return nil, status.Error(codes.NotFound, "volume does not exist")
}

if !util.BucketIs(bucketAttrs, "volume") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Volume")
}

for _, capability := range req.GetVolumeCapabilities() {
if capability.GetMount() != nil && capability.GetBlock() == nil {
continue
Expand Down Expand Up @@ -268,13 +335,129 @@ func (d *GCSDriver) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest
func (d *GCSDriver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
klog.V(4).Infof("Method CreateSnapshot called with: %s", protosanitizer.StripSecrets(req))

return nil, status.Error(codes.Unimplemented, "")
if req.SourceVolumeId == "" {
return nil, status.Errorf(codes.InvalidArgument, "SourceVolumeId cannot be empty")
}
if req.Name == "" {
return nil, status.Errorf(codes.InvalidArgument, "Name field cannot be empty")
}

// Default Options
var options = map[string]string{
"bucket": util.BucketName(req.Name),
"location": "US",
}

// Merge Secret Options
options = flags.MergeSecret(options, req.Secrets)

// Merge Context
if req.Parameters != nil {
options = flags.MergeAnnotations(options, req.Parameters)
}

// Retrieve Key Secret
keyFile, err := util.GetKey(req.Secrets, KeyStoragePath)
if err != nil {
return nil, err
}
defer util.CleanupKey(keyFile, KeyStoragePath)

// Creates a client.
client, err := storage.NewClient(ctx, option.WithCredentialsFile(keyFile))
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to create client: %v", err)
}

// Creates a Bucket instance.
bucket := client.Bucket(options[flags.FLAG_BUCKET])

projectId, projectIdExists := options[flags.FLAG_PROJECT_ID]
if !projectIdExists {
return nil, status.Errorf(codes.InvalidArgument, "Project Id not provided, bucket can't be created: %s", options[flags.FLAG_BUCKET])
}

// Check if Bucket Exists
bucketAttrs, err := bucket.Attrs(ctx)
if bucketAttrs != nil && err == nil {
klog.V(2).Infof("Bucket '%s' exists", options[flags.FLAG_BUCKET])

if !util.BucketIs(bucketAttrs, "snapshot") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Snapshot")
}

for labelName, labelValue := range bucketAttrs.Labels {
if labelName == "volume" && labelValue != req.SourceVolumeId {
return nil, status.Errorf(codes.AlreadyExists, "Bucket is for another volume")
}
}
} else {
klog.V(2).Infof("Bucket '%s' does not exist, creating", options[flags.FLAG_BUCKET])

if err := bucket.Create(ctx, projectId, &storage.BucketAttrs{Location: options[flags.FLAG_LOCATION], Labels: map[string]string{
"volume": req.SourceVolumeId,
"type": "snapshot",
}}); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to create bucket: %v", err)
}
}

creationTime := ptypes.TimestampNow()

err = util.CopyBucketContent(ctx, keyFile, projectId, req.SourceVolumeId, options[flags.FLAG_BUCKET])
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to restore snapshot: %v", err)
}

return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SnapshotId: options[flags.FLAG_BUCKET],
SourceVolumeId: req.SourceVolumeId,
CreationTime: creationTime,
ReadyToUse: true,
},
}, nil
}

func (d *GCSDriver) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
klog.V(4).Infof("Method DeleteSnapshot called with: %s", protosanitizer.StripSecrets(req))

return nil, status.Error(codes.Unimplemented, "")
if req.SnapshotId == "" {
return nil, status.Errorf(codes.InvalidArgument, "SnapshotId is invalid (empty string)")
}

// Retrieve Key Secret
keyFile, err := util.GetKey(req.Secrets, KeyStoragePath)
if err != nil {
return nil, err
}
defer util.CleanupKey(keyFile, KeyStoragePath)

// Creates a client.
client, err := storage.NewClient(ctx, option.WithCredentialsFile(keyFile))
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to create client: %v", err)
}

// Creates a Bucket instance.
bucket := client.Bucket(req.SnapshotId)

// Check if Bucket Exists
bucketAttrs, err := bucket.Attrs(ctx)
if err == nil {

if !util.BucketIs(bucketAttrs, "snapshot") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Snapshot")
}

if err := bucket.Delete(ctx); err != nil {
return nil, status.Errorf(codes.Internal, "Error deleting bucket %s, %v", req.SnapshotId, err)
}
} else {
klog.V(2).Infof("Bucket '%s' does not exist, not deleting", req.SnapshotId)
}

return &csi.DeleteSnapshotResponse{}, nil
}

func (d *GCSDriver) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
Expand Down Expand Up @@ -307,19 +490,18 @@ func (d *GCSDriver) ControllerExpandVolume(ctx context.Context, req *csi.Control
bucket := client.Bucket(req.VolumeId)

// Check if Bucket Exists
_, err = bucket.Attrs(ctx)
bucketAttrs, err := bucket.Attrs(ctx)
if err == nil {
klog.V(2).Infof("Bucket '%s' exists", req.VolumeId)

if !util.BucketIs(bucketAttrs, "volume") {
return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Volume")
}
} else {
return nil, status.Errorf(codes.NotFound, "Bucket '%s' does not exist", req.VolumeId)
}

// Get Capacity
bucketAttrs, err := bucket.Attrs(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to get bucket attrs: %v", err)
}

existingCapacity, err := util.BucketCapacity(bucketAttrs)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to get bucket capacity: %v", err)
Expand Down
Loading