Skip to content

Commit

Permalink
Azure: Add support for async copy (#5118)
Browse files Browse the repository at this point in the history
* Azure: Add support for async copy

* CR Fixes

* CR Fixes 2

* Fix Azure auth_method deprecation

* Cache UDC

* CR fixes

* Lint error

* Remove auth_method from documentation

* Update pkg/block/errors.go

Co-authored-by: Barak Amar <barak.amar@treeverse.io>

* Fixes 2

* Fixes 3

* Fixes 4

* Fixes 5

* Fixes 6

---------

Co-authored-by: Barak Amar <barak.amar@treeverse.io>
  • Loading branch information
N-o-Z and nopcoder authored Feb 1, 2023
1 parent 791dd7c commit 6f9b32a
Show file tree
Hide file tree
Showing 27 changed files with 375 additions and 75 deletions.
2 changes: 0 additions & 2 deletions docs/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ This reference uses `.` to denote the nesting of values.
* `blockstore.gs.pre_signed_expiry` `(time duration : "15m")` - Expiry of pre-signed URL.
* `blockstore.azure.storage_account` `(string : )` - If specified, will be used as the Azure storage account
* `blockstore.azure.storage_access_key` `(string : )` - If specified, will be used as the Azure storage access key
* `blockstore.azure.auth_method` `(one of ["msi", "access-key"]: "access-key" )` - Authentication method to use (msi is used for Azure AD authentication).
* `blockstore.azure.pre_signed_expiry` `(time duration : "15m")` - Expiry of pre-signed URL.
* `blockstore.s3.region` `(string : "us-east-1")` - Default region for lakeFS to use when interacting with S3.
* `blockstore.s3.profile` `(string : )` - If specified, will be used as a [named credentials profile](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html)
Expand Down Expand Up @@ -330,7 +329,6 @@ auth:
blockstore:
type: azure
azure:
auth_method: access-key
storage_account: exampleStorageAcount
storage_access_key: ExampleAcessKeyMD7nkPOWgV7d4BUjzLw==

Expand Down
4 changes: 0 additions & 4 deletions esti/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

// Test Admin Policies: AuthFullAccess, ExportSetConfiguration, FSFullAccess, RepoManagementFullAccess
func TestAdminPolicies(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
adminClient := client
Expand Down Expand Up @@ -69,7 +68,6 @@ func TestAdminPolicies(t *testing.T) {

// Test Super User Policies: AuthManageOwnCredentials, FSFullAccess, RepoManagementReadAll
func TestSuperUserPolicies(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, logger, repo := setupTest(t)
gid := "SuperUsers"

Expand Down Expand Up @@ -120,7 +118,6 @@ func TestSuperUserPolicies(t *testing.T) {

// Test Developer Policies: AuthManageOwnCredentials, FSFullAccess, RepoManagementReadAll
func TestDeveloperPolicies(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, logger, repo := setupTest(t)
gid := "Developers"

Expand Down Expand Up @@ -166,7 +163,6 @@ func TestDeveloperPolicies(t *testing.T) {

// Test Viewer Policies: AuthManageOwnCredentials, FSReadAll
func TestViewerPolicies(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, logger, repo := setupTest(t)
gid := "Viewers"

Expand Down
3 changes: 0 additions & 3 deletions esti/commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
)

func TestCommitSingle(t *testing.T) {
SkipTestIfAskedTo(t)
for _, direct := range testDirectDataAccess {
name := "indirect"
if direct {
Expand Down Expand Up @@ -67,7 +66,6 @@ func upload(ctx context.Context, uploads chan Upload, direct bool) error {
}

func TestCommitInMixedOrder(t *testing.T) {
SkipTestIfAskedTo(t)
const (
parallelism = 5
size = 100
Expand Down Expand Up @@ -144,7 +142,6 @@ func TestCommitInMixedOrder(t *testing.T) {

// Verify panic fix when committing with nil tombstone over KV
func TestCommitWithTombstone(t *testing.T) {
SkipTestIfAskedTo(t)
for _, direct := range testDirectDataAccess {
name := "indirect"
if direct {
Expand Down
181 changes: 181 additions & 0 deletions esti/copy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package esti

import (
"context"
"net/http"
"strings"
"sync"
"testing"
"time"

"github.com/go-test/deep"
"github.com/spf13/viper"
"github.com/stretchr/testify/require"
"github.com/treeverse/lakefs/pkg/api"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/config"
)

const (
s3CopyDataPath = "s3://esti-system-testing-data/copy-test-data/"
gsCopyDataPath = "gs://esti-system-testing-data/copy-test-data/"
azureCopyDataPath = "https://esti.blob.core.windows.net/esti-system-testing-data/copy-test-data/"
azureAbortAccount = "esti4multipleaccounts"
ingestionBranch = "test-data"
largeObject = "squash.tar"
)

func TestCopyObject(t *testing.T) {
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

t.Run("copy_large_size_file", func(t *testing.T) {
importPath := getImportPath(t)
importTestData(t, ctx, client, repo, importPath)
res, err := client.StatObjectWithResponse(ctx, repo, ingestionBranch, &api.StatObjectParams{
Path: largeObject,
})
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode())

objStat := res.JSON200
destPath := "foo"
srcBranch := ingestionBranch
copyResp, err := client.CopyObjectWithResponse(ctx, repo, "main", &api.CopyObjectParams{
DestPath: destPath,
}, api.CopyObjectJSONRequestBody{
SrcPath: largeObject,
SrcRef: &srcBranch,
})
require.NoError(t, err, "failed to copy")
require.NotNil(t, copyResp.JSON201)

// Verify creation path, date and physical address are different
copyStat := copyResp.JSON201
require.NotEqual(t, objStat.PhysicalAddress, copyStat.PhysicalAddress)
require.GreaterOrEqual(t, copyStat.Mtime, objStat.Mtime)
require.Equal(t, destPath, copyStat.Path)

// Verify all else is equal
objStat.Mtime = copyStat.Mtime
objStat.Path = copyStat.Path
objStat.PhysicalAddress = copyStat.PhysicalAddress
require.Nil(t, deep.Equal(objStat, copyStat))

// get back info
statResp, err := client.StatObjectWithResponse(ctx, repo, "main", &api.StatObjectParams{Path: destPath})
require.NoError(t, err)
require.Equal(t, http.StatusOK, statResp.StatusCode())
require.Nil(t, deep.Equal(statResp.JSON200, copyStat))
})

// Copying different accounts takes more time and allows us to abort the copy in the middle
t.Run("copy_large_size_file_abort", func(t *testing.T) {
requireBlockstoreType(t, block.BlockstoreTypeAzure)
importPath := strings.Replace(azureCopyDataPath, "esti", azureAbortAccount, 1)
importTestData(t, ctx, client, repo, importPath)
res, err := client.StatObjectWithResponse(ctx, repo, ingestionBranch, &api.StatObjectParams{
Path: largeObject,
})
require.NoError(t, err)
require.NotNil(t, res.JSON200)

destPath := "bar"
srcBranch := ingestionBranch
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()
var (
wg sync.WaitGroup
copyResp *api.CopyObjectResponse
copyErr error
)
// Run copy object async and cancel context after 5 seconds
go func() {
wg.Add(1)
defer wg.Done()
copyResp, copyErr = client.CopyObjectWithResponse(cancelCtx, repo, "main", &api.CopyObjectParams{
DestPath: destPath,
}, api.CopyObjectJSONRequestBody{
SrcPath: largeObject,
SrcRef: &srcBranch,
})
}()

time.Sleep(5 * time.Second)
cancel()
wg.Wait()
require.ErrorIs(t, copyErr, context.Canceled)
require.Nil(t, copyResp)

// Verify object doesn't exist
statResp, err := client.StatObjectWithResponse(ctx, repo, "main", &api.StatObjectParams{Path: destPath})
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, statResp.StatusCode())
})
}

func getImportPath(t *testing.T) string {
t.Helper()
importPath := ""
blockstoreType := viper.GetString(config.BlockstoreTypeKey)
switch blockstoreType {
case block.BlockstoreTypeS3:
importPath = s3CopyDataPath
case block.BlockstoreTypeGS:
importPath = gsCopyDataPath
case block.BlockstoreTypeAzure:
importPath = azureCopyDataPath
default:
t.Skip("import isn't supported for non-production block adapters")
}
return importPath
}

func importTestData(t *testing.T, ctx context.Context, client api.ClientWithResponsesInterface, repoName, importPath string) {
var (
after = ""
token *string
ranges []api.RangeMetadata
)
for {
resp, err := client.IngestRangeWithResponse(ctx, repoName, api.IngestRangeJSONRequestBody{
After: after,
ContinuationToken: token,
FromSourceURI: importPath,
})
require.NoError(t, err, "failed to ingest range")
require.Equal(t, http.StatusCreated, resp.StatusCode())
require.NotNil(t, resp.JSON201)
ranges = append(ranges, *resp.JSON201.Range)
if !resp.JSON201.Pagination.HasMore {
break
}
after = resp.JSON201.Pagination.LastKey
token = resp.JSON201.Pagination.ContinuationToken
}

metarangeResp, err := client.CreateMetaRangeWithResponse(ctx, repoName, api.CreateMetaRangeJSONRequestBody{
Ranges: ranges,
})

require.NoError(t, err, "failed to create metarange")
require.NotNil(t, metarangeResp.JSON201)
require.NotNil(t, metarangeResp.JSON201.Id)

_, err = client.CreateBranchWithResponse(ctx, repoName, api.CreateBranchJSONRequestBody{
Name: ingestionBranch,
Source: "main",
})
require.NoError(t, err, "failed to create branch")

commitResp, err := client.CommitWithResponse(ctx, repoName, ingestionBranch, &api.CommitParams{
SourceMetarange: metarangeResp.JSON201.Id,
}, api.CommitJSONRequestBody{
Message: "created by import",
Metadata: &api.CommitCreation_Metadata{
AdditionalProperties: map[string]string{"created_by": "import"},
},
})
require.NoError(t, err, "failed to commit")
require.NotNil(t, commitResp.JSON201)
}
1 change: 0 additions & 1 deletion esti/delete_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
)

func TestDeleteObjects(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
const numOfObjects = 10
Expand Down
3 changes: 0 additions & 3 deletions esti/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func found(ctx context.Context, repo, ref, path string) (bool, error) {
}

func TestDeleteStaging(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
objPath := "1.txt"
Expand All @@ -45,7 +44,6 @@ func TestDeleteStaging(t *testing.T) {
}

func TestDeleteCommitted(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
objPath := "1.txt"
Expand All @@ -70,7 +68,6 @@ func TestDeleteCommitted(t *testing.T) {
}

func TestCommitDeleteCommitted(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
objPath := "1.txt"
Expand Down
2 changes: 0 additions & 2 deletions esti/hooks_failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@ hooks:
const hooksTimeout = 2 * time.Second

func TestHooksTimeout(t *testing.T) {
SkipTestIfAskedTo(t)
hookFailToCommit(t, "timeout")
}

func TestHooksFail(t *testing.T) {
SkipTestIfAskedTo(t)
hookFailToCommit(t, "fail")
}

Expand Down
1 change: 0 additions & 1 deletion esti/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func appendRes(info webhookEventInfo) {
}

func TestHooksSuccess(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
parseAndUploadActions(t, ctx, repo, mainBranch)
Expand Down
1 change: 0 additions & 1 deletion esti/identity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
)

func TestIdentity(t *testing.T) {
SkipTestIfAskedTo(t)
for _, direct := range testDirectDataAccess {
name := "indirect"
if direct {
Expand Down
1 change: 0 additions & 1 deletion esti/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const (
)

func TestImport(t *testing.T) {
SkipTestIfAskedTo(t)
importPath := ""
blockstoreType := viper.GetString(config.BlockstoreTypeKey)
switch blockstoreType {
Expand Down
1 change: 0 additions & 1 deletion esti/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ func TestMain(m *testing.M) {
params.AdminAccessKeyID = *adminAccessKeyID
params.AdminSecretAccessKey = *adminSecretAccessKey
}
viper.SetDefault("post_migrate", false)

logger, client, svc, endpointURL = testutil.SetupTestingEnv(&params)
azureStorageAccount = viper.GetString("azure_storage_account")
Expand Down
1 change: 0 additions & 1 deletion esti/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
)

func TestMergeAndList(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, logger, repo := setupTest(t)
defer tearDownTest(repo)
const branch = "feature-1"
Expand Down
1 change: 0 additions & 1 deletion esti/multipart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ const (
)

func TestMultipartUpload(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, logger, repo := setupTest(t)
defer tearDownTest(repo)
file := "multipart_file"
Expand Down
1 change: 0 additions & 1 deletion esti/ops/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ services:
- ESTI_ENDPOINT_URL=http://lakefs:8000
- ESTI_LAKECTL_DIR=/app
- ESTI_DATABASE_CONNECTION_STRING=postgres://lakefs:lakefs@postgres/postgres?sslmode=disable
- ESTI_POST_MIGRATE
- ESTI_GOTEST_FLAGS
- ESTI_FLAGS
- ESTI_FORCE_PATH_STYLE
Expand Down
1 change: 0 additions & 1 deletion esti/presign_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func matchPreSignedURLContent(t *testing.T, preSignedURL, content string) {
}

func TestPreSign(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

Expand Down
1 change: 0 additions & 1 deletion esti/rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
)

func TestResetAll(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
objPath := "1.txt"
Expand Down
Loading

0 comments on commit 6f9b32a

Please sign in to comment.