Skip to content

Commit

Permalink
objectstore/cos: support multi-part upload (thanos-io#5137) (thanos-i…
Browse files Browse the repository at this point in the history
…o#5139)

Signed-off-by: Nicholaswang <wzhever@gmail.com>
  • Loading branch information
hanjm authored and Nicholaswang committed Mar 6, 2022
1 parent 63107e7 commit 041b3b6
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4908](https://github.com/thanos-io/thanos/pull/4908) UI: Show 'minus' icon and add tooltip when store min / max time is not available.
- [#4883](https://github.com/thanos-io/thanos/pull/4883) Mixin: adhere to RFC 1123 compatible component naming.
- [#5114](https://github.com/thanos-io/thanos/pull/5114) Tools `thanos bucket inspect` fix time formatting.
- [#5139](https://github.com/thanos-io/thanos/pull/5139) COS: Support multi-part upload, fix upload issue when index size more than 5GB.

## [v0.24.0](https://github.com/thanos-io/thanos/tree/release-0.24) - 2021.12.22

Expand Down
81 changes: 79 additions & 2 deletions pkg/objstore/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"net/url"
Expand Down Expand Up @@ -194,10 +195,86 @@ func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAt
}, nil
}

var (
_ cos.FixedLengthReader = (*fixedLengthReader)(nil)
)

type fixedLengthReader struct {
io.Reader
size int64
}

func newFixedLengthReader(r io.Reader, size int64) io.Reader {
return fixedLengthReader{
Reader: io.LimitReader(r, size),
size: size,
}
}

// Size implement cos.FixedLengthReader interface.
func (r fixedLengthReader) Size() int64 {
return r.size
}

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
if _, err := b.client.Object.Put(ctx, name, r, nil); err != nil {
return errors.Wrap(err, "upload cos object")
size, err := objstore.TryToGetSize(r)
if err != nil {
return errors.Wrapf(err, "getting size of %s", name)
}
// partSize 128MB.
const partSize = 1024 * 1024 * 128
partNums, lastSlice := int(math.Floor(float64(size)/partSize)), size%partSize
if partNums == 0 {
if _, err := b.client.Object.Put(ctx, name, r, nil); err != nil {
return errors.Wrapf(err, "Put object: %s", name)
}
return nil
}
// 1. init.
result, _, err := b.client.Object.InitiateMultipartUpload(ctx, name, nil)
if err != nil {
return errors.Wrapf(err, "InitiateMultipartUpload %s", name)
}
uploadEveryPart := func(partSize int64, part int, uploadID string) (string, error) {
r := newFixedLengthReader(r, partSize)
resp, err := b.client.Object.UploadPart(ctx, name, uploadID, part, r, &cos.ObjectUploadPartOptions{
ContentLength: partSize,
})
if err != nil {
if _, err := b.client.Object.AbortMultipartUpload(ctx, name, uploadID); err != nil {
return "", err
}
return "", err
}
etag := resp.Header.Get("ETag")
return etag, nil
}
optcom := &cos.CompleteMultipartUploadOptions{}
// 2. upload parts.
for part := 1; part <= partNums; part++ {
etag, err := uploadEveryPart(partSize, part, result.UploadID)
if err != nil {
return errors.Wrapf(err, "uploadPart %d, %s", part, name)
}
optcom.Parts = append(optcom.Parts, cos.Object{
PartNumber: part, ETag: etag},
)
}
// 3. upload last part.
if lastSlice != 0 {
part := partNums + 1
etag, err := uploadEveryPart(lastSlice, part, result.UploadID)
if err != nil {
return errors.Wrapf(err, "uploadPart %d, %s", part, name)
}
optcom.Parts = append(optcom.Parts, cos.Object{
PartNumber: part, ETag: etag},
)
}
// 4. complete.
if _, _, err := b.client.Object.CompleteMultipartUpload(ctx, name, result.UploadID, optcom); err != nil {
return errors.Wrapf(err, "CompleteMultipartUpload %s", name)
}
return nil
}
Expand Down

0 comments on commit 041b3b6

Please sign in to comment.