Skip to content

Commit

Permalink
smb restore from snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
umagnus committed Apr 6, 2024
1 parent cf1065a commit 11f9a5c
Show file tree
Hide file tree
Showing 14 changed files with 441 additions and 124 deletions.
Binary file modified charts/latest/azurefile-csi-driver-v0.0.0.tgz
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ spec:
volumeMounts:
- mountPath: /csi
name: socket-dir
- mountPath: /root/.azcopy
name: azcopy-dir
- mountPath: /etc/kubernetes/
name: azure-cred
{{- if eq .Values.linux.distro "fedora" }}
Expand All @@ -209,6 +211,8 @@ spec:
volumes:
- name: socket-dir
emptyDir: {}
- name: azcopy-dir
emptyDir: {}
- name: azure-cred
hostPath:
path: /etc/kubernetes/
Expand Down
4 changes: 4 additions & 0 deletions deploy/csi-azurefile-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ spec:
volumeMounts:
- mountPath: /csi
name: socket-dir
- mountPath: /root/.azcopy
name: azcopy-dir
- mountPath: /etc/kubernetes/
name: azure-cred
resources:
Expand All @@ -155,6 +157,8 @@ spec:
volumes:
- name: socket-dir
emptyDir: {}
- name: azcopy-dir
emptyDir: {}
- name: azure-cred
hostPath:
path: /etc/kubernetes/
Expand Down
16 changes: 16 additions & 0 deletions deploy/example/snapshot/pvc-azurefile-snapshot-restored.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: pvc-azurefile-snapshot-restored
spec:
accessModes:
- ReadWriteMany
storageClassName: azurefile-csi
resources:
requests:
storage: 100Gi
dataSource:
name: azurefile-volume-snapshot
kind: VolumeSnapshot
apiGroup: snapshot.storage.k8s.io
61 changes: 5 additions & 56 deletions pkg/azurefile/azurefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"errors"
"fmt"
"net/url"
"os"
"os/exec"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -197,7 +195,10 @@ var (

retriableErrors = []string{accountNotProvisioned, tooManyRequests, shareBeingDeleted, clientThrottled}

defaultAzcopyCopyOptions = []string{"--recursive", "--check-length=false"}
// azcopyCloneVolumeOptions used in volume cloning and set --check-length to false because volume data may be in changing state, copy volume is not same as current source volume
azcopyCloneVolumeOptions = []string{"--recursive", "--check-length=false"}
// azcopySnapshotRestoreOptions used in smb snapshot restore and set --check-length to true because snapshot data is changeless
azcopySnapshotRestoreOptions = []string{"--recursive", "--check-length=true"}
)

// Driver implements all interfaces of CSI drivers
Expand Down Expand Up @@ -262,9 +263,6 @@ type Driver struct {

kubeconfig string
endpoint string

// azcopy use sas token by default
azcopyUseSasToken bool
}

// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
Expand Down Expand Up @@ -302,7 +300,6 @@ func NewDriver(options *DriverOptions) *Driver {
driver.azcopy = &fileutil.Azcopy{}
driver.kubeconfig = options.KubeConfig
driver.endpoint = options.Endpoint
driver.azcopyUseSasToken = options.AzcopyUseSasToken

var err error
getter := func(key string) (interface{}, error) { return nil, nil }
Expand Down Expand Up @@ -1012,55 +1009,7 @@ func (d *Driver) copyFileShare(ctx context.Context, req *csi.CreateVolumeRequest
srcPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, srcFileShareName, accountSASToken)
dstPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, dstFileShareName, accountSASToken)

jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
switch jobState {
case fileutil.AzcopyJobError, fileutil.AzcopyJobCompleted:
return err
case fileutil.AzcopyJobRunning:
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
case fileutil.AzcopyJobNotFound:
klog.V(2).Infof("copy fileshare %s to %s", srcFileShareName, dstFileShareName)
execFuncWithAuth := func() error {
cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
cmd.Args = append(cmd.Args, defaultAzcopyCopyOptions...)
if len(authAzcopyEnv) > 0 {
cmd.Env = append(os.Environ(), authAzcopyEnv...)
}
if out, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
}
return nil
}
timeoutFunc := func() error {
_, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcFileShareName, dstFileShareName, percent)
}
copyErr := fileutil.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFuncWithAuth, timeoutFunc)
if accountSASToken == "" && copyErr != nil && strings.Contains(copyErr.Error(), authorizationPermissionMismatch) {
klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage File Data SMB Share Elevated Contributor\" role to controller identity, fall back to use sas token, original error: %v", copyErr)
var sasToken string
if sasToken, _, err = d.getAzcopyAuth(ctx, accountName, "", storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, true); err != nil {
return err
}
execFuncWithSasToken := func() error {
cmd := exec.Command("azcopy", "copy", srcPath+sasToken, dstPath+sasToken)
cmd.Args = append(cmd.Args, defaultAzcopyCopyOptions...)
if out, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
}
return nil
}
copyErr = fileutil.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFuncWithSasToken, timeoutFunc)
}
if copyErr != nil {
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstFileShareName, copyErr)
} else {
klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName)
}
return copyErr
}
return err
return d.copyFileShareByAzcopy(ctx, srcFileShareName, dstFileShareName, srcPath, dstPath, "", accountName, accountName, resourceGroupName, accountSASToken, authAzcopyEnv, secretName, secretNamespace, secrets, accountOptions, storageEndpointSuffix)
}

// GetTotalAccountQuota returns the total quota in GB of all file shares in the storage account and the number of file shares
Expand Down
4 changes: 1 addition & 3 deletions pkg/azurefile/azurefile_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type DriverOptions struct {
WaitForAzCopyTimeoutMinutes int
KubeConfig string
Endpoint string
AzcopyUseSasToken bool
}

func (o *DriverOptions) AddFlags() *flag.FlagSet {
Expand Down Expand Up @@ -81,10 +80,9 @@ func (o *DriverOptions) AddFlags() *flag.FlagSet {
fs.IntVar(&o.VolStatsCacheExpireInMinutes, "vol-stats-cache-expire-in-minutes", 10, "The cache expire time in minutes for volume stats cache")
fs.BoolVar(&o.PrintVolumeStatsCallLogs, "print-volume-stats-call-logs", false, "Whether to print volume statfs call logs with log level 2")
fs.IntVar(&o.SasTokenExpirationMinutes, "sas-token-expiration-minutes", 1440, "sas token expiration minutes during volume cloning and snapshot restore")
fs.IntVar(&o.WaitForAzCopyTimeoutMinutes, "wait-for-azcopy-timeout-minutes", 18, "timeout in minutes for waiting for azcopy to finish")
fs.IntVar(&o.WaitForAzCopyTimeoutMinutes, "wait-for-azcopy-timeout-minutes", 5, "timeout in minutes for waiting for azcopy to finish")
fs.StringVar(&o.KubeConfig, "kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
fs.StringVar(&o.Endpoint, "endpoint", "unix://tmp/csi.sock", "CSI endpoint")
fs.BoolVar(&o.AzcopyUseSasToken, "azcopy-use-sas-token", true, "Whether SAS token should be used in azcopy based on volume clone and snapshot restore")

return fs
}
151 changes: 138 additions & 13 deletions pkg/azurefile/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"
"net/url"
"os"
"os/exec"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -53,13 +55,14 @@ const (
snapshotTimeFormat = "2006-01-02T15:04:05.0000000Z07:00"
snapshotsExpand = "snapshots"

azcopyAutoLoginType = "AZCOPY_AUTO_LOGIN_TYPE"
azcopySPAApplicationID = "AZCOPY_SPA_APPLICATION_ID"
azcopySPAClientSecret = "AZCOPY_SPA_CLIENT_SECRET"
azcopyTenantID = "AZCOPY_TENANT_ID"
azcopyMSIClientID = "AZCOPY_MSI_CLIENT_ID"
MSI = "MSI"
SPN = "SPN"
azcopyAutoLoginType = "AZCOPY_AUTO_LOGIN_TYPE"
azcopySPAApplicationID = "AZCOPY_SPA_APPLICATION_ID"
azcopySPAClientSecret = "AZCOPY_SPA_CLIENT_SECRET"
azcopyTenantID = "AZCOPY_TENANT_ID"
azcopyMSIClientID = "AZCOPY_MSI_CLIENT_ID"
MSI = "MSI"
SPN = "SPN"

authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
)

Expand Down Expand Up @@ -577,11 +580,11 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
return nil, status.Errorf(codes.Internal, "failed to create file share(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d), error: %v", validFileShareName, account, sku, subsID, resourceGroup, location, fileShareSize, err)
}
if req.GetVolumeContentSource() != nil {
accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secret, secretName, secretNamespace, d.azcopyUseSasToken)
accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secret, secretName, secretNamespace, false)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
if err := d.copyVolume(ctx, req, accountSASToken, authAzcopyEnv, secretName, secretNamespace, secret, shareOptions, accountOptions, storageEndpointSuffix); err != nil {
if err := d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, secretName, secretNamespace, secret, shareOptions, accountOptions, storageEndpointSuffix); err != nil {
return nil, err
}
// storeAccountKey is not needed here since copy volume is only using SAS token
Expand Down Expand Up @@ -732,11 +735,11 @@ func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest)
}

// copyVolume copy an azure file
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, secretName, secretNamespace string, secrets map[string]string, shareOptions *fileclient.ShareOptions, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountName, accountSASToken string, authAzcopyEnv []string, secretName, secretNamespace string, secrets map[string]string, shareOptions *fileclient.ShareOptions, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
vs := req.VolumeContentSource
switch vs.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
return d.restoreSnapshot(ctx, req, accountName, accountSASToken, authAzcopyEnv, secretName, secretNamespace, secrets, shareOptions, accountOptions, storageEndpointSuffix)
case *csi.VolumeContentSource_Volume:
return d.copyFileShare(ctx, req, accountSASToken, authAzcopyEnv, secretName, secretNamespace, secrets, shareOptions, accountOptions, storageEndpointSuffix)
default:
Expand Down Expand Up @@ -986,6 +989,129 @@ func (d *Driver) ListSnapshots(_ context.Context, _ *csi.ListSnapshotsRequest) (
return nil, status.Error(codes.Unimplemented, "")
}

// restoreSnapshot restores from a snapshot
func (d *Driver) restoreSnapshot(ctx context.Context, req *csi.CreateVolumeRequest, dstAccountName, dstAccountSasToken string, authAzcopyEnv []string, secretName, secretNamespace string, secrets map[string]string, shareOptions *fileclient.ShareOptions, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
if shareOptions.Protocol == storage.EnabledProtocolsNFS {
return fmt.Errorf("protocol nfs is not supported for snapshot restore")
}
var sourceSnapshotID string
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetSnapshot() != nil {
sourceSnapshotID = req.GetVolumeContentSource().GetSnapshot().GetSnapshotId()
}
resourceGroupName, srcAccountName, srcFileShareName, _, _, _, err := GetFileShareInfo(sourceSnapshotID) //nolint:dogsled
if err != nil {
return status.Error(codes.NotFound, err.Error())
}
snapshot, err := getSnapshot(sourceSnapshotID)
if err != nil {
return status.Error(codes.NotFound, err.Error())
}
dstFileShareName := shareOptions.Name
if srcFileShareName == "" || dstFileShareName == "" {
return fmt.Errorf("srcFileShareName(%s) or dstFileShareName(%s) is empty", srcFileShareName, dstFileShareName)
}
var srcAccountSasToken string
srcAccountSasToken = dstAccountSasToken
if srcAccountName != dstAccountName && dstAccountSasToken != "" {
srcAccountOptions := &azure.AccountOptions{
Name: srcAccountName,
ResourceGroup: accountOptions.ResourceGroup,
SubscriptionID: accountOptions.SubscriptionID,
GetLatestAccountKey: accountOptions.GetLatestAccountKey,
}
if srcAccountSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace, true); err != nil {
return err
}
}

srcPath := fmt.Sprintf("https://%s.file.%s/%s%s", srcAccountName, storageEndpointSuffix, srcFileShareName, srcAccountSasToken)
dstPath := fmt.Sprintf("https://%s.file.%s/%s%s", dstAccountName, storageEndpointSuffix, dstFileShareName, dstAccountSasToken)

srcFileShareSnapshotName := fmt.Sprintf("%s(snapshot: %s)", srcFileShareName, snapshot)
return d.copyFileShareByAzcopy(ctx, srcFileShareSnapshotName, dstFileShareName, srcPath, dstPath, snapshot, srcAccountName, dstAccountName, resourceGroupName, srcAccountSasToken, authAzcopyEnv, secretName, secretNamespace, secrets, accountOptions, storageEndpointSuffix)
}

func (d *Driver) copyFileShareByAzcopy(ctx context.Context, srcFileShareName, dstFileShareName, srcPath, dstPath, snapshot, srcAccountName, dstAccountName, resourceGroupName, accountSASToken string, authAzcopyEnv []string, secretName, secretNamespace string, secrets map[string]string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
azcopyCopyOptions := azcopyCloneVolumeOptions
srcPathAuth := srcPath
srcPathSASSnapshot := ""
if snapshot != "" {
azcopyCopyOptions = azcopySnapshotRestoreOptions
if accountSASToken == "" {
srcPathAuth = fmt.Sprintf("%s?sharesnapshot=%s", srcPath, snapshot)
} else {
srcPathAuth = fmt.Sprintf("%s&sharesnapshot=%s", srcPath, snapshot)
}
srcPathSASSnapshot = fmt.Sprintf("&sharesnapshot=%s", snapshot)
}

jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
switch jobState {
case volumehelper.AzcopyJobError, volumehelper.AzcopyJobCompleted:
return err
case volumehelper.AzcopyJobRunning:
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
case volumehelper.AzcopyJobNotFound:
klog.V(2).Infof("copy fileshare %s to %s", srcFileShareName, dstFileShareName)
execFuncWithAuth := func() error {
if out, err := d.execAzcopyCopy(srcPathAuth, dstPath, azcopyCopyOptions, authAzcopyEnv); err != nil {
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
}
return nil
}
timeoutFunc := func() error {
_, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
return fmt.Errorf("timeout waiting for copy fileshare %s to %s complete, current copy percent: %s%%", srcFileShareName, dstFileShareName, percent)
}
copyErr := volumehelper.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFuncWithAuth, timeoutFunc)
if accountSASToken == "" && copyErr != nil && strings.Contains(copyErr.Error(), authorizationPermissionMismatch) {
klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage File Data Privileged Contributor\" role to controller identity, fall back to use sas token, original error: %v", copyErr)
var srcSasToken, dstSasToken string
srcAccountOptions := &azure.AccountOptions{
Name: srcAccountName,
ResourceGroup: accountOptions.ResourceGroup,
SubscriptionID: accountOptions.SubscriptionID,
GetLatestAccountKey: accountOptions.GetLatestAccountKey,
}
if srcSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace, true); err != nil {
return err
}
if srcAccountName == dstAccountName {
dstSasToken = srcSasToken
} else {
if dstSasToken, _, err = d.getAzcopyAuth(ctx, dstAccountName, "", storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, true); err != nil {
return err
}
}
execFuncWithSasToken := func() error {
if out, err := d.execAzcopyCopy(srcPath+srcSasToken+srcPathSASSnapshot, dstPath+dstSasToken, azcopyCopyOptions, []string{}); err != nil {
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
}
return nil
}
copyErr = volumehelper.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFuncWithSasToken, timeoutFunc)
}
if copyErr != nil {
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error: %v", resourceGroupName, srcAccountName, dstFileShareName, copyErr)
} else {
klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName)
}
return copyErr
}
return err
}

// execAzcopyCopy exec azcopy copy command
func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, authAzcopyEnv []string) ([]byte, error) {
cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
cmd.Args = append(cmd.Args, azcopyCopyOptions...)
if len(authAzcopyEnv) > 0 {
cmd.Env = append(os.Environ(), authAzcopyEnv...)
}
return cmd.CombinedOutput()
}

// ControllerExpandVolume controller expand volume
func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
volumeID := req.GetVolumeId()
Expand Down Expand Up @@ -1227,11 +1353,10 @@ func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
// 1. secrets is not empty
// 2. driver is not using managed identity and service principal
// 3. azcopy returns AuthorizationPermissionMismatch error when using service principal or managed identity
// 4. parameter useSasToken is true
func (d *Driver) getAzcopyAuth(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *azure.AccountOptions, secrets map[string]string, secretName, secretNamespace string, useSasToken bool) (string, []string, error) {
var authAzcopyEnv []string
var err error
if !useSasToken && len(secrets) == 0 && len(secretName) == 0 {
if len(secrets) == 0 && len(secretName) == 0 {
// search in cache first
if cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault); err == nil && cache != nil {
klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
Expand Down
Loading

0 comments on commit 11f9a5c

Please sign in to comment.