Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements object integrity checksums #1016

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions auth/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package auth
import (
"context"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"strings"
Expand All @@ -39,6 +40,7 @@ type Grantee struct {
}

type GetBucketAclOutput struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ AccessControlPolicy"`
Owner *types.Owner
AccessControlList AccessControlList
}
Expand Down Expand Up @@ -262,8 +264,8 @@ func CheckIfAccountsExist(accs []string, iam IAMService) ([]string, error) {
result = append(result, acc)
continue
}
if err == ErrNotSupported {
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
if errors.Is(err, s3err.GetAPIError(s3err.ErrAdminMethodNotSupported)) {
return nil, err
}
return nil, fmt.Errorf("check user account: %w", err)
}
Expand Down
14 changes: 6 additions & 8 deletions auth/iam_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,37 @@
package auth

import (
"errors"
"github.com/versity/versitygw/s3err"
)

// IAMServiceSingle manages the single tenant (root-only) IAM service
type IAMServiceSingle struct{}

var _ IAMService = &IAMServiceSingle{}

var ErrNotSupported = errors.New("method is not supported")

// CreateAccount not valid in single tenant mode
func (IAMServiceSingle) CreateAccount(account Account) error {
return ErrNotSupported
return s3err.GetAPIError(s3err.ErrAdminMethodNotSupported)
}

// GetUserAccount no accounts in single tenant mode
func (IAMServiceSingle) GetUserAccount(access string) (Account, error) {
return Account{}, ErrNoSuchUser
return Account{}, s3err.GetAPIError(s3err.ErrAdminMethodNotSupported)
}

// UpdateUserAccount no accounts in single tenant mode
func (IAMServiceSingle) UpdateUserAccount(access string, props MutableProps) error {
return ErrNotSupported
return s3err.GetAPIError(s3err.ErrAdminMethodNotSupported)
}

// DeleteUserAccount no accounts in single tenant mode
func (IAMServiceSingle) DeleteUserAccount(access string) error {
return ErrNotSupported
return s3err.GetAPIError(s3err.ErrAdminMethodNotSupported)
}

// ListUserAccounts no accounts in single tenant mode
func (IAMServiceSingle) ListUserAccounts() ([]Account, error) {
return []Account{}, nil
return []Account{}, s3err.GetAPIError(s3err.ErrAdminMethodNotSupported)
}

// Shutdown graceful termination of service
Expand Down
37 changes: 21 additions & 16 deletions backend/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ func (az *Azure) CreateBucket(ctx context.Context, input *s3.CreateBucketInput,
}

func (az *Azure) ListBuckets(ctx context.Context, input s3response.ListBucketsInput) (s3response.ListAllMyBucketsResult, error) {
fmt.Printf("%+v\n", input)
pager := az.client.NewListContainersPager(
&service.ListContainersOptions{
Include: service.ListContainersInclude{
Expand Down Expand Up @@ -906,9 +905,9 @@ func (az *Azure) CreateMultipartUpload(ctx context.Context, input *s3.CreateMult
}

// Each part is translated into an uncommitted block in a newly created blob in staging area
func (az *Azure) UploadPart(ctx context.Context, input *s3.UploadPartInput) (etag string, err error) {
func (az *Azure) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s3.UploadPartOutput, error) {
if err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId); err != nil {
return "", err
return nil, err
}

// TODO: request streamable version of StageBlock()
Expand All @@ -917,43 +916,45 @@ func (az *Azure) UploadPart(ctx context.Context, input *s3.UploadPartInput) (eta
// the body in memory to create an io.ReadSeekCloser
rdr, err := getReadSeekCloser(input.Body)
if err != nil {
return "", err
return nil, err
}

client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
if err != nil {
return "", err
return nil, err
}

// block id serves as etag here
etag = blockIDInt32ToBase64(*input.PartNumber)
etag := blockIDInt32ToBase64(*input.PartNumber)
_, err = client.StageBlock(ctx, etag, rdr, nil)
if err != nil {
return "", parseMpError(err)
return nil, parseMpError(err)
}

return etag, nil
return &s3.UploadPartOutput{
ETag: &etag,
}, nil
}

func (az *Azure) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInput) (s3response.CopyObjectResult, error) {
func (az *Azure) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInput) (s3response.CopyPartResult, error) {
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
if err != nil {
return s3response.CopyObjectResult{}, nil
return s3response.CopyPartResult{}, nil
}

if err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId); err != nil {
return s3response.CopyObjectResult{}, err
return s3response.CopyPartResult{}, err
}

eTag := blockIDInt32ToBase64(*input.PartNumber)
//TODO: handle block copy by range
//TODO: the action returns not implemented on azurite, maybe in production this will work?
_, err = client.StageBlockFromURL(ctx, eTag, *input.CopySource, nil)
if err != nil {
return s3response.CopyObjectResult{}, parseMpError(err)
return s3response.CopyPartResult{}, parseMpError(err)
}

return s3response.CopyObjectResult{}, nil
return s3response.CopyPartResult{}, nil
}

// Lists all uncommitted parts from the blob
Expand Down Expand Up @@ -1459,7 +1460,10 @@ func (az *Azure) ChangeBucketOwner(ctx context.Context, bucket string, acl []byt
// The action actually returns the containers owned by the user, who initialized the gateway
// TODO: Not sure if there's a way to list all the containers and owners?
func (az *Azure) ListBucketsAndOwners(ctx context.Context) (buckets []s3response.Bucket, err error) {
pager := az.client.NewListContainersPager(nil)
opts := &service.ListContainersOptions{
Include: service.ListContainersInclude{Metadata: true},
}
pager := az.client.NewListContainersPager(opts)

for pager.More() {
resp, err := pager.NextPage(ctx)
Expand Down Expand Up @@ -1735,17 +1739,18 @@ func (az *Azure) deleteContainerMetaData(ctx context.Context, bucket, key string
}

func getAclFromMetadata(meta map[string]*string, key key) (*auth.ACL, error) {
var acl auth.ACL

data, ok := meta[string(key)]
if !ok {
return nil, s3err.GetAPIError(s3err.ErrInternalError)
return &acl, nil
}

value, err := decodeString(*data)
if err != nil {
return nil, err
}

var acl auth.ACL
if len(value) == 0 {
return &acl, nil
}
Expand Down
12 changes: 6 additions & 6 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type Backend interface {
AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput) error
ListMultipartUploads(context.Context, *s3.ListMultipartUploadsInput) (s3response.ListMultipartUploadsResult, error)
ListParts(context.Context, *s3.ListPartsInput) (s3response.ListPartsResult, error)
UploadPart(context.Context, *s3.UploadPartInput) (etag string, err error)
UploadPartCopy(context.Context, *s3.UploadPartCopyInput) (s3response.CopyObjectResult, error)
UploadPart(context.Context, *s3.UploadPartInput) (*s3.UploadPartOutput, error)
UploadPartCopy(context.Context, *s3.UploadPartCopyInput) (s3response.CopyPartResult, error)

// standard object operations
PutObject(context.Context, *s3.PutObjectInput) (s3response.PutObjectOutput, error)
Expand Down Expand Up @@ -166,11 +166,11 @@ func (BackendUnsupported) ListMultipartUploads(context.Context, *s3.ListMultipar
func (BackendUnsupported) ListParts(context.Context, *s3.ListPartsInput) (s3response.ListPartsResult, error) {
return s3response.ListPartsResult{}, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) UploadPart(context.Context, *s3.UploadPartInput) (etag string, err error) {
return "", s3err.GetAPIError(s3err.ErrNotImplemented)
func (BackendUnsupported) UploadPart(context.Context, *s3.UploadPartInput) (*s3.UploadPartOutput, error) {
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) UploadPartCopy(context.Context, *s3.UploadPartCopyInput) (s3response.CopyObjectResult, error) {
return s3response.CopyObjectResult{}, s3err.GetAPIError(s3err.ErrNotImplemented)
func (BackendUnsupported) UploadPartCopy(context.Context, *s3.UploadPartCopyInput) (s3response.CopyPartResult, error) {
return s3response.CopyPartResult{}, s3err.GetAPIError(s3err.ErrNotImplemented)
}

func (BackendUnsupported) PutObject(context.Context, *s3.PutObjectInput) (s3response.PutObjectOutput, error) {
Expand Down
Loading