Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure: Add support for async copy #5118

Merged
merged 16 commits into from
Feb 1, 2023
2 changes: 0 additions & 2 deletions docs/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ This reference uses `.` to denote the nesting of values.
* `blockstore.gs.pre_signed_expiry` `(time duration : "15m")` - Expiry of pre-signed URL.
* `blockstore.azure.storage_account` `(string : )` - If specified, will be used as the Azure storage account
* `blockstore.azure.storage_access_key` `(string : )` - If specified, will be used as the Azure storage access key
* `blockstore.azure.auth_method` `(one of ["msi", "access-key"]: "access-key" )` - Authentication method to use (msi is used for Azure AD authentication).
* `blockstore.azure.pre_signed_expiry` `(time duration : "15m")` - Expiry of pre-signed URL.
* `blockstore.s3.region` `(string : "us-east-1")` - Default region for lakeFS to use when interacting with S3.
* `blockstore.s3.profile` `(string : )` - If specified, will be used as a [named credentials profile](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html)
Expand Down Expand Up @@ -330,7 +329,6 @@ auth:
blockstore:
type: azure
azure:
auth_method: access-key
storage_account: exampleStorageAcount
storage_access_key: ExampleAcessKeyMD7nkPOWgV7d4BUjzLw==

Expand Down
4 changes: 0 additions & 4 deletions esti/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

// Test Admin Policies: AuthFullAccess, ExportSetConfiguration, FSFullAccess, RepoManagementFullAccess
func TestAdminPolicies(t *testing.T) {
SkipTestIfAskedTo(t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relevant to all the places we used SkipTestIfAskedTo, check with @idanovo if we still this feature as part of our system test code.
This enabled us to skip specific tests by demand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still use it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it to setupTest function and removed explicit call where setupTest is used

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • A bit confusing as not all tests require setupTest - it setup a repository.
  • Calling setupTest should also cleanup the repository for the cases we didn't defer cleanup code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setupTest doesn't perform cleanup - this is done by the teardownTest function, this is the case currently.
Moving the call under setupTest ensures it is not missed by anyone creating a new test and using this function.
setupTest does setup a repository but it is called "setupTest" and not "createRepository" I assume we can use it for other purposes of setting up a test - including skipping under certain circumstances

ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
adminClient := client
Expand Down Expand Up @@ -69,7 +68,6 @@ func TestAdminPolicies(t *testing.T) {

// Test Super User Policies: AuthManageOwnCredentials, FSFullAccess, RepoManagementReadAll
func TestSuperUserPolicies(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, logger, repo := setupTest(t)
gid := "SuperUsers"

Expand Down Expand Up @@ -120,7 +118,6 @@ func TestSuperUserPolicies(t *testing.T) {

// Test Developer Policies: AuthManageOwnCredentials, FSFullAccess, RepoManagementReadAll
func TestDeveloperPolicies(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, logger, repo := setupTest(t)
gid := "Developers"

Expand Down Expand Up @@ -166,7 +163,6 @@ func TestDeveloperPolicies(t *testing.T) {

// Test Viewer Policies: AuthManageOwnCredentials, FSReadAll
func TestViewerPolicies(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, logger, repo := setupTest(t)
gid := "Viewers"

Expand Down
3 changes: 0 additions & 3 deletions esti/commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
)

func TestCommitSingle(t *testing.T) {
SkipTestIfAskedTo(t)
for _, direct := range testDirectDataAccess {
name := "indirect"
if direct {
Expand Down Expand Up @@ -67,7 +66,6 @@ func upload(ctx context.Context, uploads chan Upload, direct bool) error {
}

func TestCommitInMixedOrder(t *testing.T) {
SkipTestIfAskedTo(t)
const (
parallelism = 5
size = 100
Expand Down Expand Up @@ -144,7 +142,6 @@ func TestCommitInMixedOrder(t *testing.T) {

// Verify panic fix when committing with nil tombstone over KV
func TestCommitWithTombstone(t *testing.T) {
SkipTestIfAskedTo(t)
for _, direct := range testDirectDataAccess {
name := "indirect"
if direct {
Expand Down
194 changes: 194 additions & 0 deletions esti/copy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package esti

import (
"context"
"fmt"
"net/http"
"net/url"
"sync"
"testing"
"time"

"github.com/go-test/deep"
"github.com/spf13/viper"
"github.com/stretchr/testify/require"
"github.com/treeverse/lakefs/pkg/api"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/block/azure"
"github.com/treeverse/lakefs/pkg/config"
)

const (
s3CopyDataPath = "s3://esti-system-testing-data/copy-test-data/"
gsCopyDataPath = "gs://esti-system-testing-data/copy-test-data/"
azureCopyDataPathTemplate = "https://%s.blob.core.windows.net/esti-system-testing-data/copy-test-data/"
azureAbortAccount = "esti4multipleaccounts"
ingestionBranch = "test-data"
largeObject = "squash.tar"
)

func TestCopyObject(t *testing.T) {
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

t.Run("copy_large_size_file", func(t *testing.T) {
blockstoreType := viper.GetString(config.BlockstoreTypeKey)
var (
accountName string
err error
)
// Copying from same account occurs immediately even for large files (async)
if blockstoreType == block.BlockstoreTypeAzure { // Extract storage account
resp, err := client.GetRepositoryWithResponse(ctx, repo)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode())
u, err := url.Parse(resp.JSON200.StorageNamespace)
require.NoError(t, err)
accountName, err = azure.ExtractStorageAccount(u)
require.NoError(t, err)
}

importTestData(t, ctx, client, repo, accountName)
N-o-Z marked this conversation as resolved.
Show resolved Hide resolved
res, err := client.StatObjectWithResponse(ctx, repo, ingestionBranch, &api.StatObjectParams{
Path: largeObject,
})
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode())

objStat := res.JSON200
destPath := "foo"
srcBranch := ingestionBranch
copyResp, err := client.CopyObjectWithResponse(ctx, repo, "main", &api.CopyObjectParams{
DestPath: destPath,
}, api.CopyObjectJSONRequestBody{
SrcPath: largeObject,
SrcRef: &srcBranch,
})
require.NoError(t, err, "failed to copy")
require.NotNil(t, copyResp.JSON201)

// Verify creation path, date and physical address are different
copyStat := copyResp.JSON201
require.NotEqual(t, objStat.PhysicalAddress, copyStat.PhysicalAddress)
require.GreaterOrEqual(t, copyStat.Mtime, objStat.Mtime)
require.Equal(t, destPath, copyStat.Path)

// Verify all else is equal
objStat.Mtime = copyStat.Mtime
objStat.Path = copyStat.Path
objStat.PhysicalAddress = copyStat.PhysicalAddress
require.Nil(t, deep.Equal(objStat, copyStat))

// get back info
statResp, err := client.StatObjectWithResponse(ctx, repo, "main", &api.StatObjectParams{Path: destPath})
require.NoError(t, err)
require.Equal(t, http.StatusOK, statResp.StatusCode())
require.Nil(t, deep.Equal(statResp.JSON200, copyStat))
})

// Copying different accounts takes more time and allows us to abort the copy in the middle
t.Run("copy_large_size_file_abort", func(t *testing.T) {
requireBlockstoreType(t, block.BlockstoreTypeAzure)
importTestData(t, ctx, client, repo, azureAbortAccount)
var err error
res, err := client.StatObjectWithResponse(ctx, repo, ingestionBranch, &api.StatObjectParams{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err shadow on the second line. would move the err to the inner scope or name it diff.

Path: largeObject,
})
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode())
N-o-Z marked this conversation as resolved.
Show resolved Hide resolved

destPath := "bar"
srcBranch := ingestionBranch
cancelCtx, cancel := context.WithCancel(ctx)
N-o-Z marked this conversation as resolved.
Show resolved Hide resolved
var (
wg sync.WaitGroup
copyResp *api.CopyObjectResponse
)
// Run copy object async and cancel context after 5 seconds
go func() {
wg.Add(1)
copyResp, err = client.CopyObjectWithResponse(cancelCtx, repo, "main", &api.CopyObjectParams{
DestPath: destPath,
}, api.CopyObjectJSONRequestBody{
SrcPath: largeObject,
SrcRef: &srcBranch,
})
defer wg.Done()
N-o-Z marked this conversation as resolved.
Show resolved Hide resolved
}()

time.Sleep(5 * time.Second)
cancel()
wg.Wait()
require.ErrorIs(t, err, context.Canceled)
require.Nil(t, copyResp)

// Verify object doesn't exist

// get back info
N-o-Z marked this conversation as resolved.
Show resolved Hide resolved
statResp, err := client.StatObjectWithResponse(ctx, repo, "main", &api.StatObjectParams{Path: destPath})
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, statResp.StatusCode())
})
}

func importTestData(t *testing.T, ctx context.Context, client api.ClientWithResponsesInterface, repoName, azAccountName string) {
t.Helper()
importPath := ""
blockstoreType := viper.GetString(config.BlockstoreTypeKey)
switch blockstoreType {
case block.BlockstoreTypeS3:
importPath = s3CopyDataPath
case block.BlockstoreTypeGS:
importPath = gsCopyDataPath
case block.BlockstoreTypeAzure:
importPath = fmt.Sprintf(azureCopyDataPathTemplate, azAccountName)
N-o-Z marked this conversation as resolved.
Show resolved Hide resolved
default:
t.Skip("import isn't supported for non-production block adapters")
}
var (
after = ""
token *string
ranges []api.RangeMetadata
)
for {
resp, err := client.IngestRangeWithResponse(ctx, repoName, api.IngestRangeJSONRequestBody{
After: after,
ContinuationToken: token,
FromSourceURI: importPath,
})
require.NoError(t, err, "failed to ingest range")
require.Equal(t, http.StatusCreated, resp.StatusCode())
require.NotNil(t, resp.JSON201)
ranges = append(ranges, *resp.JSON201.Range)
if !resp.JSON201.Pagination.HasMore {
break
}
after = resp.JSON201.Pagination.LastKey
token = resp.JSON201.Pagination.ContinuationToken
}

metarangeResp, err := client.CreateMetaRangeWithResponse(ctx, repoName, api.CreateMetaRangeJSONRequestBody{
Ranges: ranges,
})

require.NoError(t, err, "failed to create metarange")
require.Equal(t, http.StatusCreated, metarangeResp.StatusCode())
N-o-Z marked this conversation as resolved.
Show resolved Hide resolved
require.NotNil(t, metarangeResp.JSON201.Id)

_, err = client.CreateBranchWithResponse(ctx, repoName, api.CreateBranchJSONRequestBody{
Name: ingestionBranch,
Source: "main",
})
require.NoError(t, err, "failed to create branch")

commitResp, err := client.CommitWithResponse(ctx, repoName, ingestionBranch, &api.CommitParams{
SourceMetarange: metarangeResp.JSON201.Id,
}, api.CommitJSONRequestBody{
Message: "created by import",
Metadata: &api.CommitCreation_Metadata{
AdditionalProperties: map[string]string{"created_by": "import"},
},
})
require.NoError(t, err, "failed to commit")
require.Equal(t, http.StatusCreated, commitResp.StatusCode())
N-o-Z marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 0 additions & 1 deletion esti/delete_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
)

func TestDeleteObjects(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
const numOfObjects = 10
Expand Down
3 changes: 0 additions & 3 deletions esti/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func found(ctx context.Context, repo, ref, path string) (bool, error) {
}

func TestDeleteStaging(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
objPath := "1.txt"
Expand All @@ -45,7 +44,6 @@ func TestDeleteStaging(t *testing.T) {
}

func TestDeleteCommitted(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
objPath := "1.txt"
Expand All @@ -70,7 +68,6 @@ func TestDeleteCommitted(t *testing.T) {
}

func TestCommitDeleteCommitted(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
objPath := "1.txt"
Expand Down
2 changes: 0 additions & 2 deletions esti/hooks_failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@ hooks:
const hooksTimeout = 2 * time.Second

func TestHooksTimeout(t *testing.T) {
SkipTestIfAskedTo(t)
hookFailToCommit(t, "timeout")
}

func TestHooksFail(t *testing.T) {
SkipTestIfAskedTo(t)
hookFailToCommit(t, "fail")
}

Expand Down
1 change: 0 additions & 1 deletion esti/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func appendRes(info webhookEventInfo) {
}

func TestHooksSuccess(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
parseAndUploadActions(t, ctx, repo, mainBranch)
Expand Down
1 change: 0 additions & 1 deletion esti/identity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
)

func TestIdentity(t *testing.T) {
SkipTestIfAskedTo(t)
for _, direct := range testDirectDataAccess {
name := "indirect"
if direct {
Expand Down
1 change: 0 additions & 1 deletion esti/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const (
)

func TestImport(t *testing.T) {
SkipTestIfAskedTo(t)
importPath := ""
blockstoreType := viper.GetString(config.BlockstoreTypeKey)
switch blockstoreType {
Expand Down
1 change: 0 additions & 1 deletion esti/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ func TestMain(m *testing.M) {
params.AdminAccessKeyID = *adminAccessKeyID
params.AdminSecretAccessKey = *adminSecretAccessKey
}
viper.SetDefault("post_migrate", false)

logger, client, svc, endpointURL = testutil.SetupTestingEnv(&params)
azureStorageAccount = viper.GetString("azure_storage_account")
Expand Down
1 change: 0 additions & 1 deletion esti/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
)

func TestMergeAndList(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, logger, repo := setupTest(t)
defer tearDownTest(repo)
const branch = "feature-1"
Expand Down
1 change: 0 additions & 1 deletion esti/multipart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ const (
)

func TestMultipartUpload(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, logger, repo := setupTest(t)
defer tearDownTest(repo)
file := "multipart_file"
Expand Down
1 change: 0 additions & 1 deletion esti/ops/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ services:
- ESTI_ENDPOINT_URL=http://lakefs:8000
- ESTI_LAKECTL_DIR=/app
- ESTI_DATABASE_CONNECTION_STRING=postgres://lakefs:lakefs@postgres/postgres?sslmode=disable
- ESTI_POST_MIGRATE
- ESTI_GOTEST_FLAGS
- ESTI_FLAGS
- ESTI_FORCE_PATH_STYLE
Expand Down
1 change: 0 additions & 1 deletion esti/presign_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func matchPreSignedURLContent(t *testing.T, preSignedURL, content string) {
}

func TestPreSign(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

Expand Down
1 change: 0 additions & 1 deletion esti/rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
)

func TestResetAll(t *testing.T) {
SkipTestIfAskedTo(t)
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
objPath := "1.txt"
Expand Down
Loading