Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

block adapter remove dependency in s3 for multipart struct #2696

Merged
merged 4 commits into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions pkg/block/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@ import (
"context"
"io"
"net/http"

"github.com/aws/aws-sdk-go/service/s3"
)

// MultipartPart single multipart information
type MultipartPart struct {
ETag string
PartNumber int64
nopcoder marked this conversation as resolved.
Show resolved Hide resolved
}

// MultipartUploadCompletion parts described as part of complete multipart upload. Each part holds the part number and ETag received while calling part upload.
// NOTE that S3 implementation and our S3 gateway accept and returns ETag value surrounded with double-quotes ("), while
// the adapter implementations supply the raw value of the etag (without double quotes) and let the gateway manage the s3
// protocol specifications.
type MultipartUploadCompletion struct{ Part []*s3.CompletedPart }
type MultipartUploadCompletion struct {
Part []MultipartPart
}

// IdentifierType is the type the ObjectPointer Identifier
type IdentifierType int32
Expand Down
7 changes: 3 additions & 4 deletions pkg/block/azure/multipart_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"strings"

"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/google/uuid"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/logging"
Expand Down Expand Up @@ -60,15 +59,15 @@ func (m *MultipartBlockWriter) CommitBlockList(ctx context.Context, ids []string
return &azblob.BlockBlobCommitBlockListResponse{}, err
}

func completeMultipart(ctx context.Context, parts []*s3.CompletedPart, container azblob.ContainerURL, objName string, retryOptions azblob.RetryReaderOptions) (*block.CompleteMultiPartUploadResponse, error) {
func completeMultipart(ctx context.Context, parts []block.MultipartPart, container azblob.ContainerURL, objName string, retryOptions azblob.RetryReaderOptions) (*block.CompleteMultiPartUploadResponse, error) {
sort.Slice(parts, func(i, j int) bool {
return *parts[i].PartNumber < *parts[j].PartNumber
return parts[i].PartNumber < parts[j].PartNumber
})
// extract staging blockIDs
metaBlockIDs := make([]string, len(parts))
for i, part := range parts {
// add Quotations marks (") if missing, Etags sent by spark include Quotations marks, Etags sent aws cli don't include Quotations marks
etag := strings.Trim(*part.ETag, "\"")
etag := strings.Trim(part.ETag, "\"")
etag = "\"" + etag + "\""
base64Etag := base64.StdEncoding.EncodeToString([]byte(etag))
metaBlockIDs[i] = base64Etag
Expand Down
12 changes: 2 additions & 10 deletions pkg/block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ const (

var (
ErrNotImplemented = errors.New("not implemented")
ErrMissingPartNumber = errors.New("missing part number")
ErrMissingPartETag = errors.New("missing part ETag")
ErrMismatchPartETag = errors.New("mismatch part ETag")
ErrMismatchPartName = errors.New("mismatch part name")
ErrMaxMultipartObjects = errors.New("maximum multipart object reached")
Expand Down Expand Up @@ -472,17 +470,11 @@ func (a *Adapter) validateMultipartUploadParts(uploadID string, multipartList *b
return ErrPartListMismatch
}
for i, p := range multipartList.Part {
if p.PartNumber == nil {
return fmt.Errorf("invalid part at position %d: %w", i, ErrMissingPartNumber)
}
if p.ETag == nil {
return fmt.Errorf("invalid part at position %d: %w", i, ErrMissingPartETag)
}
objName := formatMultipartFilename(uploadID, *p.PartNumber)
objName := formatMultipartFilename(uploadID, p.PartNumber)
if objName != bucketParts[i].Name {
return fmt.Errorf("invalid part at position %d: %w", i, ErrMismatchPartName)
}
if *p.ETag != bucketParts[i].Etag {
if p.ETag != bucketParts[i].Etag {
return fmt.Errorf("invalid part at position %d: %w", i, ErrMismatchPartETag)
}
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/block/local/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"strconv"
"strings"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/google/uuid"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/block/adapter"
Expand Down Expand Up @@ -57,15 +56,15 @@ func NewAdapter(path string, opts ...func(a *Adapter)) (*Adapter, error) {
if !isDirectoryWritable(path) {
return nil, ErrPathNotWritable
}
adapter := &Adapter{
localAdapter := &Adapter{
path: path,
uploadIDTranslator: &block.NoOpTranslator{},
removeEmptyDir: true,
}
for _, opt := range opts {
opt(adapter)
opt(localAdapter)
}
return adapter, nil
return localAdapter, nil
}

func resolveNamespace(obj block.ObjectPointer) (block.QualifiedKey, error) {
Expand Down Expand Up @@ -399,10 +398,10 @@ func (l *Adapter) CompleteMultiPartUpload(_ context.Context, obj block.ObjectPoi
}, nil
}

func computeETag(parts []*s3.CompletedPart) string {
func computeETag(parts []block.MultipartPart) string {
var etagHex []string
for _, p := range parts {
e := strings.Trim(*p.ETag, `"`)
e := strings.Trim(p.ETag, `"`)
etagHex = append(etagHex, e)
}
s := strings.Join(etagHex, "")
Expand Down
10 changes: 3 additions & 7 deletions pkg/block/local/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"strings"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/go-test/deep"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/block/local"
Expand Down Expand Up @@ -107,14 +105,12 @@ func TestLocalMultipartUpload(t *testing.T) {
pointer := makePointer(c.path)
resp, err := a.CreateMultiPartUpload(ctx, pointer, nil, block.CreateMultiPartUploadOpts{})
testutil.MustDo(t, "CreateMultiPartUpload", err)
parts := make([]*s3.CompletedPart, 0)
parts := make([]block.MultipartPart, len(c.partData))
for partNumber, content := range c.partData {
partResp, err := a.UploadPart(ctx, pointer, 0, strings.NewReader(content), resp.UploadID, int64(partNumber))
testutil.MustDo(t, "UploadPart", err)
parts = append(parts, &s3.CompletedPart{
ETag: aws.String(partResp.ETag),
PartNumber: aws.Int64(int64(partNumber)),
})
parts[partNumber].PartNumber = int64(partNumber) + 1
parts[partNumber].ETag = partResp.ETag
}
_, err = a.CompleteMultiPartUpload(ctx, pointer, resp.UploadID, &block.MultipartUploadCompletion{
Part: parts,
Expand Down
10 changes: 4 additions & 6 deletions pkg/block/local/etag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,21 @@ import (
"encoding/hex"
"testing"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/treeverse/lakefs/pkg/block"
)

const PartsNo = 30

func TestEtag(t *testing.T) {
var base [16]byte
b := base[:]
parts := make([]*s3.CompletedPart, PartsNo)
parts := make([]block.MultipartPart, PartsNo)
for i := 0; i < PartsNo; i++ {
for j := 0; j < len(b); j++ {
b[j] = byte(32 + i + j)
}
s := hex.EncodeToString(b)
p := new(s3.CompletedPart)
p.ETag = &s
parts[i] = p
parts[i].PartNumber = int64(i) + 1
parts[i].ETag = hex.EncodeToString(b)
}
etag := computeETag(parts)
if etag != "9cae1a3b7e97542c261cf2e1b50ba482" {
Expand Down
14 changes: 12 additions & 2 deletions pkg/block/s3/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,20 +563,30 @@ func (a *Adapter) AbortMultiPartUpload(ctx context.Context, obj block.ObjectPoin
return err
}

func convertFromBlockMultipartUploadCompletion(multipartList *block.MultipartUploadCompletion) *s3.CompletedMultipartUpload {
var parts []*s3.CompletedPart
nopcoder marked this conversation as resolved.
Show resolved Hide resolved
for _, p := range multipartList.Part {
parts = append(parts, &s3.CompletedPart{
ETag: aws.String(p.ETag),
PartNumber: aws.Int64(p.PartNumber),
})
}
return &s3.CompletedMultipartUpload{Parts: parts}
}

func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectPointer, uploadID string, multipartList *block.MultipartUploadCompletion) (*block.CompleteMultiPartUploadResponse, error) {
var err error
defer reportMetrics("CompleteMultiPartUpload", time.Now(), nil, &err)
qualifiedKey, err := resolveNamespace(obj)
if err != nil {
return nil, err
}
cmpu := &s3.CompletedMultipartUpload{Parts: multipartList.Part}
translatedUploadID := a.uploadIDTranslator.TranslateUploadID(uploadID)
input := &s3.CompleteMultipartUploadInput{
Bucket: aws.String(qualifiedKey.StorageNamespace),
Key: aws.String(qualifiedKey.Key),
UploadId: aws.String(translatedUploadID),
MultipartUpload: cmpu,
MultipartUpload: convertFromBlockMultipartUploadCompletion(multipartList),
}
lg := a.log(ctx).WithFields(logging.Fields{
"upload_id": uploadID,
Expand Down
6 changes: 2 additions & 4 deletions pkg/gateway/operations/postobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,8 @@ func (controller *PostObject) HandleCompleteMultipartUpload(w http.ResponseWrite
// normalizeMultipartUploadCompletion normalization incoming multipart upload completion list.
// we make sure that each part's ETag will be without the wrapping quotes
func normalizeMultipartUploadCompletion(list *block.MultipartUploadCompletion) {
for _, part := range list.Part {
if part.ETag != nil {
*part.ETag = strings.Trim(*part.ETag, `"`)
}
for i := range list.Part {
list.Part[i].ETag = strings.Trim(list.Part[i].ETag, `"`)
}
}

Expand Down