From 4f8d5d5382cb573edd61dd6a670ff8873d871204 Mon Sep 17 00:00:00 2001 From: Barak Amar Date: Thu, 22 Dec 2022 13:50:58 +0200 Subject: [PATCH 1/3] API upload object without using tmp file Multipart request read content directly to block adapter. The Go implementation use local temporary file to enable read from content. --- pkg/api/controller.go | 44 ++++++++++++++++++++++++++++---------- pkg/block/local/adapter.go | 8 +++---- pkg/block/s3/adapter.go | 36 +++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 15 deletions(-) diff --git a/pkg/api/controller.go b/pkg/api/controller.go index c626184399f..3a68373e9ab 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -10,6 +10,7 @@ import ( "fmt" "io" "mime" + "mime/multipart" "net/http" "net/mail" "net/url" @@ -2134,21 +2135,42 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi allowOverwrite = false } - // write the content - file, handler, err := r.FormFile("content") - if errors.Is(err, http.ErrMissingFile) { - writeError(w, r, http.StatusInternalServerError, fmt.Errorf("multipart uploads missing key 'content': %w", err)) - return - } + // read request body parse multi-part for "content" and upload the data + _, p, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { writeError(w, r, http.StatusInternalServerError, err) return } - defer func() { _ = file.Close() }() - contentType := handler.Header.Get("Content-Type") - address := c.PathProvider.NewPath() - blob, err := upload.WriteBlob(ctx, c.BlockAdapter, repo.StorageNamespace, address, file, handler.Size, block.PutOpts{StorageClass: params.StorageClass}) - if err != nil { + boundary := p["boundary"] + reader := multipart.NewReader(r.Body, boundary) + var ( + contentUploaded bool + contentType string + blob *upload.Blob + ) + for { + part, err := reader.NextPart() + if err == io.EOF { + break + } + contentType = part.Header.Get("Content-Type") + partName := part.FormName() + // part is an io.Reader, deal with it + if !contentUploaded && partName == "content" { + // upload the first "content" + address := c.PathProvider.NewPath() + blob, err = upload.WriteBlob(ctx, c.BlockAdapter, repo.StorageNamespace, address, part, -1, block.PutOpts{StorageClass: params.StorageClass}) + if err != nil { + _ = part.Close() + writeError(w, r, http.StatusInternalServerError, err) + return + } + contentUploaded = true + } + _ = part.Close() + } + if !contentUploaded { + err := fmt.Errorf("multipart upload missing key 'content': %w", http.ErrMissingFile) writeError(w, r, http.StatusInternalServerError, err) return } diff --git a/pkg/block/local/adapter.go b/pkg/block/local/adapter.go index 10341c404dd..1487bcfe46f 100644 --- a/pkg/block/local/adapter.go +++ b/pkg/block/local/adapter.go @@ -42,7 +42,7 @@ func WithRemoveEmptyDir(b bool) func(a *Adapter) { func NewAdapter(path string, opts ...func(a *Adapter)) (*Adapter, error) { // Clean() the path so that misconfiguration does not allow path traversal. path = filepath.Clean(path) - err := os.MkdirAll(path, 0700) //nolint: gomnd + err := os.MkdirAll(path, 0o700) //nolint: gomnd if err != nil { return nil, err } @@ -102,7 +102,7 @@ func (l *Adapter) maybeMkdir(path string, f func(p string) (*os.File, error)) (* return ret, err } d := filepath.Dir(filepath.Clean(path)) - if err = os.MkdirAll(d, 0750); err != nil { //nolint: gomnd + if err = os.MkdirAll(d, 0o750); err != nil { //nolint: gomnd return nil, err } return f(path) @@ -237,7 +237,7 @@ func (l *Adapter) Get(_ context.Context, obj block.ObjectPointer, _ int64) (read if err != nil { return nil, err } - f, err := os.OpenFile(filepath.Clean(p), os.O_RDONLY, 0600) //nolint: gomnd + f, err := os.OpenFile(filepath.Clean(p), os.O_RDONLY, 0o600) //nolint: gomnd if os.IsNotExist(err) { return nil, adapter.ErrDataNotFound } @@ -328,7 +328,7 @@ func (l *Adapter) CreateMultiPartUpload(_ context.Context, obj block.ObjectPoint return nil, err } fullDir := path.Dir(fullPath) - err = os.MkdirAll(fullDir, 0750) //nolint: gomnd + err = os.MkdirAll(fullDir, 0o750) //nolint: gomnd if err != nil { return nil, err } diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index 521283922a4..c9ea9692a29 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/request" @@ -137,6 +139,13 @@ func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes in if err != nil { return err } + + // for unknown size we assume we like to stream content, will use s3manager to perform the request. + // we assume the caller may not have 1:1 request to s3 put object in this case as it may perform multipart upload + if sizeBytes == -1 { + return a.managerUpload(ctx, qualifiedKey, reader, opts) + } + putObject := s3.PutObjectInput{ Bucket: aws.String(qualifiedKey.StorageNamespace), Key: aws.String(qualifiedKey.Key), @@ -685,6 +694,33 @@ func (a *Adapter) extractS3Server(resp *http.Response) { a.respServer = server } +func (a *Adapter) managerUpload(ctx context.Context, qualifiedKey block.QualifiedKey, reader io.Reader, opts block.PutOpts) error { + client := a.clients.Get(ctx, qualifiedKey.StorageNamespace) + uploader := s3manager.NewUploaderWithClient(client) + + input := &s3manager.UploadInput{ + Bucket: aws.String(qualifiedKey.StorageNamespace), + Key: aws.String(qualifiedKey.Key), + Body: reader, + StorageClass: opts.StorageClass, + } + if a.ServerSideEncryption != "" { + input.ServerSideEncryption = aws.String(a.ServerSideEncryption) + } + if a.ServerSideEncryptionKmsKeyID != "" { + input.SSEKMSKeyId = aws.String(a.ServerSideEncryptionKmsKeyID) + } + + output, err := uploader.UploadWithContext(ctx, input) + if err != nil { + return err + } + if aws.StringValue(output.ETag) == "" { + return ErrMissingETag + } + return nil +} + func extractAmzServerSideHeader(header http.Header) http.Header { // return additional headers: x-amz-server-side-* h := make(http.Header) From 6b244be49bb0bac7194401ba0cd85f8cd8cffc66 Mon Sep 17 00:00:00 2001 From: Barak Amar Date: Thu, 22 Dec 2022 15:03:41 +0200 Subject: [PATCH 2/3] Additional requet checks --- pkg/api/controller.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 3a68373e9ab..3e28963ddfe 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -2135,13 +2135,21 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi allowOverwrite = false } - // read request body parse multi-part for "content" and upload the data - _, p, err := mime.ParseMediaType(r.Header.Get("Content-Type")) + // read request body parse multipart for "content" and upload the data + mt, p, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { writeError(w, r, http.StatusInternalServerError, err) return } - boundary := p["boundary"] + if !strings.HasPrefix(mt, "multipart/") { + writeError(w, r, http.StatusInternalServerError, http.ErrNotMultipart) + return + } + boundary, ok := p["boundary"] + if !ok { + writeError(w, r, http.StatusInternalServerError, http.ErrMissingBoundary) + return + } reader := multipart.NewReader(r.Body, boundary) var ( contentUploaded bool From a76e0a20d9da92d97afd9041fc255c9b8eae07c7 Mon Sep 17 00:00:00 2001 From: Barak Amar Date: Thu, 22 Dec 2022 16:26:35 +0200 Subject: [PATCH 3/3] apply code review changes --- pkg/api/controller.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 3e28963ddfe..4cc0209c4bc 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -2141,7 +2141,7 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi writeError(w, r, http.StatusInternalServerError, err) return } - if !strings.HasPrefix(mt, "multipart/") { + if mt != "multipart/form-data" { writeError(w, r, http.StatusInternalServerError, http.ErrNotMultipart) return } @@ -2150,22 +2150,26 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi writeError(w, r, http.StatusInternalServerError, http.ErrMissingBoundary) return } + reader := multipart.NewReader(r.Body, boundary) var ( contentUploaded bool contentType string blob *upload.Blob ) - for { + for !contentUploaded { part, err := reader.NextPart() if err == io.EOF { break } + if err != nil { + writeError(w, r, http.StatusInternalServerError, err) + return + } contentType = part.Header.Get("Content-Type") partName := part.FormName() - // part is an io.Reader, deal with it - if !contentUploaded && partName == "content" { - // upload the first "content" + if partName == "content" { + // upload the first "content" and exit the loop address := c.PathProvider.NewPath() blob, err = upload.WriteBlob(ctx, c.BlockAdapter, repo.StorageNamespace, address, part, -1, block.PutOpts{StorageClass: params.StorageClass}) if err != nil {