From d0c4ab0cdef5ebf1c5d0e73d80d764e7e820d57f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20M=C3=A4nnchen?= Date: Sun, 17 May 2020 16:47:02 +0100 Subject: [PATCH] Snapshot Support --- docs/contributing/setup.md | 18 +++ examples/dynamic/sc.yaml | 1 + go.mod | 1 + pkg/driver/controller.go | 250 ++++++++++++++++++++++++++++++++----- pkg/util/common.go | 51 ++++++++ 5 files changed, 287 insertions(+), 34 deletions(-) diff --git a/docs/contributing/setup.md b/docs/contributing/setup.md index 4144364..512feae 100644 --- a/docs/contributing/setup.md +++ b/docs/contributing/setup.md @@ -18,10 +18,16 @@ - Login to `gcloud` (`gcloud auth login`) * Google Cloud Project - Create Test Project (`gcloud projects create [PROJECT_ID] --name=[PROJECT_NAME]`) +* Enable Storage Transfer API + - Visit https://console.developers.google.com/apis/library/storagetransfer.googleapis.com + - Enable API + - Setup Access (https://cloud.google.com/storage-transfer/docs/configure-access) + - `gcloud projects add-iam-policy-binding [PROJECT_ID] --member=serviceAccount:project-[PROJECT_NUMBER]@storage-transfer-service.iam.gserviceaccount.com --role=roles/storage.admin` * Google Cloud Service Account - Create (`gcloud iam service-accounts create [ACCOUNT_NAME] --display-name="Test Account" --description="Test Account for GCS CSI" --project=[PROJECT_ID]`) - Create Key (`gcloud iam service-accounts keys create service-account.json --iam-account=[ACCOUNT_NAME]@[PROJECT_ID].iam.gserviceaccount.com --project=[PROJECT_ID]`) - Give Storage Admin Permission (`gcloud projects add-iam-policy-binding [PROJECT_ID] --member=serviceAccount:[ACCOUNT_NAME]@[PROJECT_ID].iam.gserviceaccount.com --role=roles/storage.admin`) + - Give Transfer User Permission (`gcloud projects add-iam-policy-binding [PROJECT_ID] --member=serviceAccount:[ACCOUNT_NAME]@[PROJECT_ID].iam.gserviceaccount.com --role=roles/storagetransfer.user`) * Create Secret - `kubectl create secret generic csi-gcs-secret --from-file=key=service-account.json` * Pull Needed Images @@ -94,6 +100,18 @@ ControllerValidateVolumeCapabilitiesSecret: projectId: [Google Cloud Project ID] key: | [Storage Admin Key JSON] +ControllerExpandVolumeSecret: + projectId: [Google Cloud Project ID] + key: | + [Storage Admin Key JSON] +CreateSnapshotSecret: + projectId: [Google Cloud Project ID] + key: | + [Storage Admin Key JSON] +DeleteSnapshotSecret: + projectId: [Google Cloud Project ID] + key: | + [Storage Admin Key JSON] ``` ## Develop inside Docker diff --git a/examples/dynamic/sc.yaml b/examples/dynamic/sc.yaml index b90e18b..386acec 100644 --- a/examples/dynamic/sc.yaml +++ b/examples/dynamic/sc.yaml @@ -13,3 +13,4 @@ parameters: csi.storage.k8s.io/provisioner-secret-namespace: default csi.storage.k8s.io/controller-expand-secret-name: csi-gcs-secret-creator csi.storage.k8s.io/controller-expand-secret-namespace: default + gcs.csi.ofek.dev/project-id: joshmartin-csi-gcs-test diff --git a/go.mod b/go.mod index 551ffdf..6a03d6d 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.13 require ( cloud.google.com/go v0.38.0 github.com/container-storage-interface/spec v1.2.0 + github.com/golang/protobuf v1.3.2 github.com/kubernetes-csi/csi-lib-utils v0.7.0 github.com/kubernetes-csi/csi-test v2.2.0+incompatible // indirect github.com/kubernetes-csi/csi-test/v3 v3.1.0 diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 455feaa..01d557e 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -3,8 +3,10 @@ package driver import ( "context" "fmt" + "strconv" "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/golang/protobuf/ptypes" "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" "github.com/ofek/csi-gcs/pkg/flags" "github.com/ofek/csi-gcs/pkg/util" @@ -77,6 +79,8 @@ func (d *GCSDriver) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque } defer util.CleanupKey(keyFile, KeyStoragePath) + newCapacity := int64(req.GetCapacityRange().GetRequiredBytes()) + // Creates a client. client, err := storage.NewClient(ctx, option.WithCredentialsFile(keyFile)) if err != nil { @@ -86,43 +90,84 @@ func (d *GCSDriver) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque // Creates a Bucket instance. bucket := client.Bucket(options[flags.FLAG_BUCKET]) + projectId, projectIdExists := options[flags.FLAG_PROJECT_ID] + if !projectIdExists { + return nil, status.Errorf(codes.InvalidArgument, "Project Id not provided, snapshot can't be restored: %s", options[flags.FLAG_BUCKET]) + } + // Check if Bucket Exists - _, err = bucket.Attrs(ctx) + bucketAttrs, err := bucket.Attrs(ctx) if err == nil { klog.V(2).Infof("Bucket '%s' exists", options[flags.FLAG_BUCKET]) - } else { - klog.V(2).Infof("Bucket '%s' does not exist, creating", options[flags.FLAG_BUCKET]) - projectId, projectIdExists := options[flags.FLAG_PROJECT_ID] - if !projectIdExists { - return nil, status.Errorf(codes.InvalidArgument, "Project Id not provided, bucket can't be created: %s", options[flags.FLAG_BUCKET]) + if !util.BucketIs(bucketAttrs, "volume") { + return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Volume") + } + + existingCapacity, err := util.BucketCapacity(bucketAttrs) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to get bucket capacity: %v", err) + } + + // Check / Set Capacity + if existingCapacity == 0 { + _, err = util.SetBucketCapacity(ctx, bucket, newCapacity) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to set bucket capacity: %v", err) + } + } else if existingCapacity < newCapacity { + return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", options[flags.FLAG_BUCKET])) } + } else { + klog.V(2).Infof("Bucket '%s' does not exist, creating", options[flags.FLAG_BUCKET]) - if err := bucket.Create(ctx, projectId, &storage.BucketAttrs{Location: options[flags.FLAG_LOCATION]}); err != nil { + if err := bucket.Create(ctx, projectId, &storage.BucketAttrs{Location: options[flags.FLAG_LOCATION], Labels: map[string]string{ + "capacity": strconv.FormatInt(newCapacity, 10), + "type": "volume", + }}); err != nil { return nil, status.Errorf(codes.Internal, "Failed to create bucket: %v", err) } } - // Get Capacity - bucketAttrs, err := bucket.Attrs(ctx) - if err != nil { - return nil, status.Errorf(codes.Internal, "Failed to get bucket attrs: %v", err) - } + if snapshot := req.GetVolumeContentSource().GetSnapshot(); snapshot != nil { + // Creates a Bucket instance. + snapshotBucket := client.Bucket(snapshot.SnapshotId) - existingCapacity, err := util.BucketCapacity(bucketAttrs) - if err != nil { - return nil, status.Errorf(codes.Internal, "Failed to get bucket capacity: %v", err) - } + // Check if Bucket Exists + snapshotBucketAttrs, err := snapshotBucket.Attrs(ctx) + if err == nil { - // Check / Set Capacity - newCapacity := int64(req.GetCapacityRange().GetRequiredBytes()) - if existingCapacity == 0 { - _, err = util.SetBucketCapacity(ctx, bucket, newCapacity) - if err != nil { - return nil, status.Errorf(codes.Internal, "Failed to set bucket capacity: %v", err) + if !util.BucketIs(snapshotBucketAttrs, "snapshot") { + return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Snapshot") + } + + err = util.CopyBucketContent(ctx, keyFile, projectId, snapshot.SnapshotId, options[flags.FLAG_BUCKET]) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to restore snapshot: %v", err) + } + } else { + return nil, status.Errorf(codes.NotFound, "Snapshot does not exist") + } + } + if cloneVolume := req.GetVolumeContentSource().GetVolume(); cloneVolume != nil { + // Creates a Bucket instance. + cloneVolumeBucket := client.Bucket(cloneVolume.VolumeId) + + // Check if Bucket Exists + cloneVolumeBucketAttrs, err := cloneVolumeBucket.Attrs(ctx) + if err == nil { + + if !util.BucketIs(cloneVolumeBucketAttrs, "volume") { + return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Volume") + } + + err = util.CopyBucketContent(ctx, keyFile, projectId, cloneVolume.VolumeId, options[flags.FLAG_BUCKET]) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to restore snapshot: %v", err) + } + } else { + return nil, status.Errorf(codes.NotFound, "Volume does not exist") } - } else if existingCapacity < newCapacity { - return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", options[flags.FLAG_BUCKET])) } return &csi.CreateVolumeResponse{ @@ -156,8 +201,12 @@ func (d *GCSDriver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeReque // Creates a Bucket instance. bucket := client.Bucket(req.VolumeId) - _, err = bucket.Attrs(ctx) + bucketAttrs, err := bucket.Attrs(ctx) if err == nil { + if !util.BucketIs(bucketAttrs, "volume") { + return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Volume") + } + if err := bucket.Delete(ctx); err != nil { return nil, status.Errorf(codes.Internal, "Error deleting bucket %s, %v", req.VolumeId, err) } @@ -187,6 +236,20 @@ func (d *GCSDriver) ControllerGetCapabilities(ctx context.Context, req *csi.Cont }, }, }, + { + Type: &csi.ControllerServiceCapability_Rpc{ + Rpc: &csi.ControllerServiceCapability_RPC{ + Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, + }, + }, + }, + { + Type: &csi.ControllerServiceCapability_Rpc{ + Rpc: &csi.ControllerServiceCapability_RPC{ + Type: csi.ControllerServiceCapability_RPC_CLONE_VOLUME, + }, + }, + }, }, }, nil } @@ -218,12 +281,16 @@ func (d *GCSDriver) ValidateVolumeCapabilities(ctx context.Context, req *csi.Val // Creates a Bucket instance. bucket := client.Bucket(bucketName) - _, err = bucket.Attrs(ctx) + bucketAttrs, err := bucket.Attrs(ctx) if err != nil { return nil, status.Error(codes.NotFound, "volume does not exist") } + if !util.BucketIs(bucketAttrs, "volume") { + return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Volume") + } + for _, capability := range req.GetVolumeCapabilities() { if capability.GetMount() != nil && capability.GetBlock() == nil { continue @@ -268,13 +335,129 @@ func (d *GCSDriver) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest func (d *GCSDriver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { klog.V(4).Infof("Method CreateSnapshot called with: %s", protosanitizer.StripSecrets(req)) - return nil, status.Error(codes.Unimplemented, "") + if req.SourceVolumeId == "" { + return nil, status.Errorf(codes.InvalidArgument, "SourceVolumeId cannot be empty") + } + if req.Name == "" { + return nil, status.Errorf(codes.InvalidArgument, "Name field cannot be empty") + } + + // Default Options + var options = map[string]string{ + "bucket": util.BucketName(req.Name), + "location": "US", + } + + // Merge Secret Options + options = flags.MergeSecret(options, req.Secrets) + + // Merge Context + if req.Parameters != nil { + options = flags.MergeAnnotations(options, req.Parameters) + } + + // Retrieve Key Secret + keyFile, err := util.GetKey(req.Secrets, KeyStoragePath) + if err != nil { + return nil, err + } + defer util.CleanupKey(keyFile, KeyStoragePath) + + // Creates a client. + client, err := storage.NewClient(ctx, option.WithCredentialsFile(keyFile)) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to create client: %v", err) + } + + // Creates a Bucket instance. + bucket := client.Bucket(options[flags.FLAG_BUCKET]) + + projectId, projectIdExists := options[flags.FLAG_PROJECT_ID] + if !projectIdExists { + return nil, status.Errorf(codes.InvalidArgument, "Project Id not provided, bucket can't be created: %s", options[flags.FLAG_BUCKET]) + } + + // Check if Bucket Exists + bucketAttrs, err := bucket.Attrs(ctx) + if bucketAttrs != nil && err == nil { + klog.V(2).Infof("Bucket '%s' exists", options[flags.FLAG_BUCKET]) + + if !util.BucketIs(bucketAttrs, "snapshot") { + return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Snapshot") + } + + for labelName, labelValue := range bucketAttrs.Labels { + if labelName == "volume" && labelValue != req.SourceVolumeId { + return nil, status.Errorf(codes.AlreadyExists, "Bucket is for another volume") + } + } + } else { + klog.V(2).Infof("Bucket '%s' does not exist, creating", options[flags.FLAG_BUCKET]) + + if err := bucket.Create(ctx, projectId, &storage.BucketAttrs{Location: options[flags.FLAG_LOCATION], Labels: map[string]string{ + "volume": req.SourceVolumeId, + "type": "snapshot", + }}); err != nil { + return nil, status.Errorf(codes.Internal, "Failed to create bucket: %v", err) + } + } + + creationTime := ptypes.TimestampNow() + + err = util.CopyBucketContent(ctx, keyFile, projectId, req.SourceVolumeId, options[flags.FLAG_BUCKET]) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to restore snapshot: %v", err) + } + + return &csi.CreateSnapshotResponse{ + Snapshot: &csi.Snapshot{ + SnapshotId: options[flags.FLAG_BUCKET], + SourceVolumeId: req.SourceVolumeId, + CreationTime: creationTime, + ReadyToUse: true, + }, + }, nil } func (d *GCSDriver) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { klog.V(4).Infof("Method DeleteSnapshot called with: %s", protosanitizer.StripSecrets(req)) - return nil, status.Error(codes.Unimplemented, "") + if req.SnapshotId == "" { + return nil, status.Errorf(codes.InvalidArgument, "SnapshotId is invalid (empty string)") + } + + // Retrieve Key Secret + keyFile, err := util.GetKey(req.Secrets, KeyStoragePath) + if err != nil { + return nil, err + } + defer util.CleanupKey(keyFile, KeyStoragePath) + + // Creates a client. + client, err := storage.NewClient(ctx, option.WithCredentialsFile(keyFile)) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to create client: %v", err) + } + + // Creates a Bucket instance. + bucket := client.Bucket(req.SnapshotId) + + // Check if Bucket Exists + bucketAttrs, err := bucket.Attrs(ctx) + if err == nil { + + if !util.BucketIs(bucketAttrs, "snapshot") { + return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Snapshot") + } + + if err := bucket.Delete(ctx); err != nil { + return nil, status.Errorf(codes.Internal, "Error deleting bucket %s, %v", req.SnapshotId, err) + } + } else { + klog.V(2).Infof("Bucket '%s' does not exist, not deleting", req.SnapshotId) + } + + return &csi.DeleteSnapshotResponse{}, nil } func (d *GCSDriver) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) { @@ -307,19 +490,18 @@ func (d *GCSDriver) ControllerExpandVolume(ctx context.Context, req *csi.Control bucket := client.Bucket(req.VolumeId) // Check if Bucket Exists - _, err = bucket.Attrs(ctx) + bucketAttrs, err := bucket.Attrs(ctx) if err == nil { klog.V(2).Infof("Bucket '%s' exists", req.VolumeId) + + if !util.BucketIs(bucketAttrs, "volume") { + return nil, status.Errorf(codes.FailedPrecondition, "Bucket is not a Volume") + } } else { return nil, status.Errorf(codes.NotFound, "Bucket '%s' does not exist", req.VolumeId) } // Get Capacity - bucketAttrs, err := bucket.Attrs(ctx) - if err != nil { - return nil, status.Errorf(codes.Internal, "Failed to get bucket attrs: %v", err) - } - existingCapacity, err := util.BucketCapacity(bucketAttrs) if err != nil { return nil, status.Errorf(codes.Internal, "Failed to get bucket capacity: %v", err) diff --git a/pkg/util/common.go b/pkg/util/common.go index 20f140a..4a57600 100644 --- a/pkg/util/common.go +++ b/pkg/util/common.go @@ -11,11 +11,14 @@ import ( "path/filepath" "strconv" "strings" + "time" "cloud.google.com/go/storage" "github.com/ofek/csi-gcs/pkg/apis/published-volume/v1beta1" gcs "github.com/ofek/csi-gcs/pkg/client/clientset/clientset" "google.golang.org/api/iterator" + "google.golang.org/api/option" + "google.golang.org/api/storagetransfer/v1" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -142,6 +145,16 @@ func BucketCapacity(attrs *storage.BucketAttrs) (int64, error) { return 0, nil } +func BucketIs(attrs *storage.BucketAttrs, bucketType string) bool { + for labelName, labelValue := range attrs.Labels { + if labelName == "type" && labelValue != bucketType { + return false + } + } + + return true +} + func SetBucketCapacity(ctx context.Context, bucket *storage.BucketHandle, capacity int64) (attrs *storage.BucketAttrs, err error) { var uattrs = storage.BucketAttrsToUpdate{} @@ -282,3 +295,41 @@ func UnregisterMount(volumeID string, targetPath string, node string) (err error return nil } + +func CopyBucketContent(ctx context.Context, keyFile string, projectID string, source string, destination string) (err error) { + storageTransferClient, err := storagetransfer.NewService(ctx, option.WithCredentialsFile(keyFile)) + if err != nil { + return status.Errorf(codes.Internal, "Failed to create client: %v", err) + } + + loc, _ := time.LoadLocation("UTC") + now := time.Now().In(loc) + today := &storagetransfer.Date{ + Day: int64(now.Day()), + Month: int64(now.Month()), + Year: int64(now.Year()), + } + + _, err = storageTransferClient.TransferJobs.Create(&storagetransfer.TransferJob{ + ProjectId: projectID, + Status: "ENABLED", + Schedule: &storagetransfer.Schedule{ + ScheduleStartDate: today, + ScheduleEndDate: today, + }, + TransferSpec: &storagetransfer.TransferSpec{ + GcsDataSource: &storagetransfer.GcsData{ + BucketName: source, + }, + GcsDataSink: &storagetransfer.GcsData{ + BucketName: destination, + }, + }, + }).Context(ctx).Do() + + if err != nil { + return status.Errorf(codes.Internal, "Failed to copy data: %v", err) + } + + return nil +}