Skip to content

Commit

Permalink
block adapter remove dependency in s3 ask for multipart struct
Browse files Browse the repository at this point in the history
  • Loading branch information
nopcoder committed Nov 14, 2021
1 parent 839e78c commit 9e548cf
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 41 deletions.
12 changes: 9 additions & 3 deletions pkg/block/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@ import (
"context"
"io"
"net/http"

"github.com/aws/aws-sdk-go/service/s3"
)

// MultipartPart single multipart information
type MultipartPart struct {
ETag string
PartNumber int64
}

// MultipartUploadCompletion parts described as part of complete multipart upload. Each part holds the part number and ETag received while calling part upload.
// NOTE that S3 implementation and our S3 gateway accept and returns ETag value surrounded with double-quotes ("), while
// the adapter implementations supply the raw value of the etag (without double quotes) and let the gateway manage the s3
// protocol specifications.
type MultipartUploadCompletion struct{ Part []*s3.CompletedPart }
type MultipartUploadCompletion struct {
Part []MultipartPart
}

// IdentifierType is the type the ObjectPointer Identifier
type IdentifierType int32
Expand Down
7 changes: 3 additions & 4 deletions pkg/block/azure/multipart_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"strings"

"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/google/uuid"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/logging"
Expand Down Expand Up @@ -60,15 +59,15 @@ func (m *MultipartBlockWriter) CommitBlockList(ctx context.Context, ids []string
return &azblob.BlockBlobCommitBlockListResponse{}, err
}

func completeMultipart(ctx context.Context, parts []*s3.CompletedPart, container azblob.ContainerURL, objName string, retryOptions azblob.RetryReaderOptions) (*block.CompleteMultiPartUploadResponse, error) {
func completeMultipart(ctx context.Context, parts []block.MultipartPart, container azblob.ContainerURL, objName string, retryOptions azblob.RetryReaderOptions) (*block.CompleteMultiPartUploadResponse, error) {
sort.Slice(parts, func(i, j int) bool {
return *parts[i].PartNumber < *parts[j].PartNumber
return parts[i].PartNumber < parts[j].PartNumber
})
// extract staging blockIDs
metaBlockIDs := make([]string, len(parts))
for i, part := range parts {
// add Quotations marks (") if missing, Etags sent by spark include Quotations marks, Etags sent aws cli don't include Quotations marks
etag := strings.Trim(*part.ETag, "\"")
etag := strings.Trim(part.ETag, "\"")
etag = "\"" + etag + "\""
base64Etag := base64.StdEncoding.EncodeToString([]byte(etag))
metaBlockIDs[i] = base64Etag
Expand Down
12 changes: 2 additions & 10 deletions pkg/block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ const (

var (
ErrNotImplemented = errors.New("not implemented")
ErrMissingPartNumber = errors.New("missing part number")
ErrMissingPartETag = errors.New("missing part ETag")
ErrMismatchPartETag = errors.New("mismatch part ETag")
ErrMismatchPartName = errors.New("mismatch part name")
ErrMaxMultipartObjects = errors.New("maximum multipart object reached")
Expand Down Expand Up @@ -472,17 +470,11 @@ func (a *Adapter) validateMultipartUploadParts(uploadID string, multipartList *b
return ErrPartListMismatch
}
for i, p := range multipartList.Part {
if p.PartNumber == nil {
return fmt.Errorf("invalid part at position %d: %w", i, ErrMissingPartNumber)
}
if p.ETag == nil {
return fmt.Errorf("invalid part at position %d: %w", i, ErrMissingPartETag)
}
objName := formatMultipartFilename(uploadID, *p.PartNumber)
objName := formatMultipartFilename(uploadID, p.PartNumber)
if objName != bucketParts[i].Name {
return fmt.Errorf("invalid part at position %d: %w", i, ErrMismatchPartName)
}
if *p.ETag != bucketParts[i].Etag {
if p.ETag != bucketParts[i].Etag {
return fmt.Errorf("invalid part at position %d: %w", i, ErrMismatchPartETag)
}
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/block/local/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"strconv"
"strings"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/google/uuid"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/block/adapter"
Expand Down Expand Up @@ -57,15 +56,15 @@ func NewAdapter(path string, opts ...func(a *Adapter)) (*Adapter, error) {
if !isDirectoryWritable(path) {
return nil, ErrPathNotWritable
}
adapter := &Adapter{
localAdapter := &Adapter{
path: path,
uploadIDTranslator: &block.NoOpTranslator{},
removeEmptyDir: true,
}
for _, opt := range opts {
opt(adapter)
opt(localAdapter)
}
return adapter, nil
return localAdapter, nil
}

func resolveNamespace(obj block.ObjectPointer) (block.QualifiedKey, error) {
Expand Down Expand Up @@ -399,10 +398,10 @@ func (l *Adapter) CompleteMultiPartUpload(_ context.Context, obj block.ObjectPoi
}, nil
}

func computeETag(parts []*s3.CompletedPart) string {
func computeETag(parts []block.MultipartPart) string {
var etagHex []string
for _, p := range parts {
e := strings.Trim(*p.ETag, `"`)
e := strings.Trim(p.ETag, `"`)
etagHex = append(etagHex, e)
}
s := strings.Join(etagHex, "")
Expand Down
10 changes: 3 additions & 7 deletions pkg/block/local/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"strings"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/go-test/deep"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/block/local"
Expand Down Expand Up @@ -107,14 +105,12 @@ func TestLocalMultipartUpload(t *testing.T) {
pointer := makePointer(c.path)
resp, err := a.CreateMultiPartUpload(ctx, pointer, nil, block.CreateMultiPartUploadOpts{})
testutil.MustDo(t, "CreateMultiPartUpload", err)
parts := make([]*s3.CompletedPart, 0)
parts := make([]block.MultipartPart, len(c.partData))
for partNumber, content := range c.partData {
partResp, err := a.UploadPart(ctx, pointer, 0, strings.NewReader(content), resp.UploadID, int64(partNumber))
testutil.MustDo(t, "UploadPart", err)
parts = append(parts, &s3.CompletedPart{
ETag: aws.String(partResp.ETag),
PartNumber: aws.Int64(int64(partNumber)),
})
parts[partNumber].PartNumber = int64(partNumber) + 1
parts[partNumber].ETag = partResp.ETag
}
_, err = a.CompleteMultiPartUpload(ctx, pointer, resp.UploadID, &block.MultipartUploadCompletion{
Part: parts,
Expand Down
10 changes: 4 additions & 6 deletions pkg/block/local/etag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,21 @@ import (
"encoding/hex"
"testing"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/treeverse/lakefs/pkg/block"
)

const PartsNo = 30

func TestEtag(t *testing.T) {
var base [16]byte
b := base[:]
parts := make([]*s3.CompletedPart, PartsNo)
parts := make([]block.MultipartPart, PartsNo)
for i := 0; i < PartsNo; i++ {
for j := 0; j < len(b); j++ {
b[j] = byte(32 + i + j)
}
s := hex.EncodeToString(b)
p := new(s3.CompletedPart)
p.ETag = &s
parts[i] = p
parts[i].PartNumber = int64(i) + 1
parts[i].ETag = hex.EncodeToString(b)
}
etag := computeETag(parts)
if etag != "9cae1a3b7e97542c261cf2e1b50ba482" {
Expand Down
14 changes: 12 additions & 2 deletions pkg/block/s3/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,20 +563,30 @@ func (a *Adapter) AbortMultiPartUpload(ctx context.Context, obj block.ObjectPoin
return err
}

func convertFromBlockMultipartUploadCompletion(multipartList *block.MultipartUploadCompletion) *s3.CompletedMultipartUpload {
var parts []*s3.CompletedPart
for _, p := range multipartList.Part {
parts = append(parts, &s3.CompletedPart{
ETag: aws.String(p.ETag),
PartNumber: aws.Int64(p.PartNumber),
})
}
return &s3.CompletedMultipartUpload{Parts: parts}
}

func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectPointer, uploadID string, multipartList *block.MultipartUploadCompletion) (*block.CompleteMultiPartUploadResponse, error) {
var err error
defer reportMetrics("CompleteMultiPartUpload", time.Now(), nil, &err)
qualifiedKey, err := resolveNamespace(obj)
if err != nil {
return nil, err
}
cmpu := &s3.CompletedMultipartUpload{Parts: multipartList.Part}
translatedUploadID := a.uploadIDTranslator.TranslateUploadID(uploadID)
input := &s3.CompleteMultipartUploadInput{
Bucket: aws.String(qualifiedKey.StorageNamespace),
Key: aws.String(qualifiedKey.Key),
UploadId: aws.String(translatedUploadID),
MultipartUpload: cmpu,
MultipartUpload: convertFromBlockMultipartUploadCompletion(multipartList),
}
lg := a.log(ctx).WithFields(logging.Fields{
"upload_id": uploadID,
Expand Down
4 changes: 1 addition & 3 deletions pkg/gateway/operations/postobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ func (controller *PostObject) HandleCompleteMultipartUpload(w http.ResponseWrite
// we make sure that each part's ETag will be without the wrapping quotes
func normalizeMultipartUploadCompletion(list *block.MultipartUploadCompletion) {
for _, part := range list.Part {
if part.ETag != nil {
*part.ETag = strings.Trim(*part.ETag, `"`)
}
part.ETag = strings.Trim(part.ETag, `"`)
}
}

Expand Down

0 comments on commit 9e548cf

Please sign in to comment.