Skip to content

Commit

Permalink
add timeout util func
Browse files Browse the repository at this point in the history
  • Loading branch information
umagnus committed Mar 15, 2024
1 parent 54b2a2f commit f444c17
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 48 deletions.
82 changes: 42 additions & 40 deletions pkg/azurefile/azurefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,6 @@ const (
SnapshotID = "snapshot_id"

FSGroupChangeNone = "None"

waitForAzCopyInterval = 2 * time.Second
)

var (
Expand Down Expand Up @@ -1011,55 +1009,59 @@ func (d *Driver) copyFileShare(ctx context.Context, req *csi.CreateVolumeRequest
return fmt.Errorf("srcFileShareName(%s) or dstFileShareName(%s) is empty", srcFileShareName, dstFileShareName)
}

timeAfter := time.After(time.Duration(d.waitForAzCopyTimeoutMinutes) * time.Minute)
timeTick := time.Tick(waitForAzCopyInterval)
srcPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, srcFileShareName, accountSASToken)
dstPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, dstFileShareName, accountSASToken)

jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
if jobState == fileutil.AzcopyJobError || jobState == fileutil.AzcopyJobCompleted {
switch jobState {
case fileutil.AzcopyJobError, fileutil.AzcopyJobCompleted:
return err
}
klog.V(2).Infof("begin to copy fileshare %s to %s", srcFileShareName, dstFileShareName)
for {
select {
case <-timeTick:
jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
switch jobState {
case fileutil.AzcopyJobError, fileutil.AzcopyJobCompleted:
case fileutil.AzcopyJobRunning:
return fmt.Errorf("an existed azcopy job is running, copy percent: %s%%, please wait for it to complete", percent)
case fileutil.AzcopyJobNotFound:
klog.V(2).Infof("copy fileshare %s to %s", srcFileShareName, dstFileShareName)
copyErr := fileutil.WaitForExecCompletion(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, func() error {
cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
cmd.Args = append(cmd.Args, defaultAzcopyCopyOptions...)
if len(authAzcopyEnv) > 0 {
cmd.Env = append(os.Environ(), authAzcopyEnv...)
}
if out, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
}
return nil
}, func() error {
_, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
return fmt.Errorf("timeout waiting for copy fileshare %s to %s succeed, copy percent: %s%%", srcFileShareName, dstFileShareName, percent)
})
if accountSASToken == "" && copyErr != nil && strings.Contains(copyErr.Error(), authorizationPermissionMismatch) {
klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage File Data SMB Share Elevated Contributor\" role to controller identity, fall back to use sas token, original error: %v", copyErr)
d.azcopySasTokenCache.Set(accountName, "")
var sasToken string
if sasToken, _, err = d.getAzcopyAuth(ctx, accountName, "", storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, true); err != nil {
return err
case fileutil.AzcopyJobNotFound:
klog.V(2).Infof("copy fileshare %s to %s", srcFileShareName, dstFileShareName)
cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
}
copyErr = fileutil.WaitForExecCompletion(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, func() error {
cmd := exec.Command("azcopy", "copy", srcPath+sasToken, dstPath+sasToken)
cmd.Args = append(cmd.Args, defaultAzcopyCopyOptions...)
if len(authAzcopyEnv) > 0 {
cmd.Env = append(os.Environ(), authAzcopyEnv...)
if out, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
}
out, copyErr := cmd.CombinedOutput()
if accountSASToken == "" && strings.Contains(string(out), authorizationPermissionMismatch) && copyErr != nil {
klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage File Data SMB Share Elevated Contributor\" role to controller identity, fall back to use sas token, original output: %v", string(out))
d.azcopySasTokenCache.Set(accountName, "")
var sasToken string
if sasToken, _, err = d.getAzcopyAuth(ctx, accountName, "", storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, true); err != nil {
return err
}
cmd := exec.Command("azcopy", "copy", srcPath+sasToken, dstPath+sasToken)
cmd.Args = append(cmd.Args, defaultAzcopyCopyOptions...)
out, copyErr = cmd.CombinedOutput()
}
if copyErr != nil {
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstFileShareName, copyErr, string(out))
} else {
klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName)
}
return copyErr
}
case <-timeAfter:
return fmt.Errorf("timeout waiting for copy fileshare %s to %s succeed", srcFileShareName, dstFileShareName)
return nil
}, func() error {
_, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
return fmt.Errorf("timeout waiting for copy fileshare %s to %s succeed, copy percent: %s%%", srcFileShareName, dstFileShareName, percent)
})
}
if copyErr != nil {
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstFileShareName, copyErr)
} else {
klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName)
}
return copyErr
}
return err
}

// GetTotalAccountQuota returns the total quota in GB of all file shares in the storage account and the number of file shares
Expand Down
2 changes: 1 addition & 1 deletion pkg/azurefile/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
// logging the job status if it's volume cloning
if req.GetVolumeContentSource() != nil {
jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
}
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
}
Expand Down
9 changes: 3 additions & 6 deletions pkg/azurefile/controllerserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1890,7 +1890,7 @@ func TestCopyVolume(t *testing.T) {
},
},
{
name: "azcopy job is first in progress and then be completed",
name: "azcopy job is in progress",
testFunc: func(t *testing.T) {
d := NewFakeDriver()
mp := map[string]string{}
Expand Down Expand Up @@ -1919,15 +1919,12 @@ func TestCopyVolume(t *testing.T) {

m := util.NewMockEXEC(ctrl)
listStr1 := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: InProgress\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false"
listStr2 := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: Completed\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false"
o1 := m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return(listStr1, nil).Times(1)
m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return(listStr1, nil).Times(1)
m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return("Percent Complete (approx): 50.0", nil)
o2 := m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstFileshare -B 3"), gomock.Any()).Return(listStr2, nil)
gomock.InOrder(o1, o2)

d.azcopy.ExecCmd = m

var expectedErr error
expectedErr := fmt.Errorf("an existed azcopy job is running, copy percent: 50.0%%, please wait for it to complete")
err := d.copyVolume(ctx, req, "sastoken", []string{}, "", "", secret, &fileclient.ShareOptions{Name: "dstFileshare"}, nil, "core.windows.net")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("Unexpected error: %v", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/azurefile/volume_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import (
)

const (
volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists"
volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists"
volumeOperationAlreadyExistsWithAzcopyFmt = "An operation using azcopy with the given Volume ID %s already exists. Azcopy job status: %s, copy percent: %s%%, error: %v"
)

// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs
Expand Down
28 changes: 28 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os/exec"
"strings"
"sync"
"time"

"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -205,3 +206,30 @@ func parseAzcopyJobShow(jobshow string) (AzcopyJobState, string, error) {
}
return AzcopyJobRunning, strings.ReplaceAll(segments[1], "\n", ""), nil
}

// ExecFunc returns a exec function's output and error
type ExecFunc func() (err error)

// TimeoutFunc returns output and error if an ExecFunc timeout
type TimeoutFunc func() (err error)

// WaitForExecCompletion waits for the exec function to complete or times out
func WaitForExecCompletion(timeout time.Duration, execFunc ExecFunc, timeoutFunc TimeoutFunc) error {
// Create a channel to receive the result of the azcopy exec function
done := make(chan bool)
var err error

// Start the azcopy exec function in a goroutine
go func() {
err = execFunc()
done <- true
}()

// Wait for the function to complete or time out
select {
case <-done:
return err
case <-time.After(timeout):
return timeoutFunc()
}
}
53 changes: 53 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"testing"
"time"

gomock "go.uber.org/mock/gomock"
)
Expand Down Expand Up @@ -260,3 +261,55 @@ func TestParseAzcopyJobShow(t *testing.T) {
}
}
}

func TestWaitForExecCompletion(t *testing.T) {
tests := []struct {
desc string
timeout time.Duration
execFunc ExecFunc
timeoutFunc TimeoutFunc
expectedErr error
}{
{
desc: "execFunc returns error",
timeout: 1 * time.Second,
execFunc: func() error {
return fmt.Errorf("execFunc error")
},
timeoutFunc: func() error {
return fmt.Errorf("timeout error")
},
expectedErr: fmt.Errorf("execFunc error"),
},
{
desc: "execFunc timeout",
timeout: 1 * time.Second,
execFunc: func() error {
time.Sleep(2 * time.Second)
return nil
},
timeoutFunc: func() error {
return fmt.Errorf("timeout error")
},
expectedErr: fmt.Errorf("timeout error"),
},
{
desc: "execFunc completed successfully",
timeout: 1 * time.Second,
execFunc: func() error {
return nil
},
timeoutFunc: func() error {
return fmt.Errorf("timeout error")
},
expectedErr: nil,
},
}

for _, test := range tests {
err := WaitForExecCompletion(test.timeout, test.execFunc, test.timeoutFunc)
if err != nil && (err.Error() != test.expectedErr.Error()) {
t.Errorf("unexpected error: %v, expected error: %v", err, test.expectedErr)
}
}
}

0 comments on commit f444c17

Please sign in to comment.