Skip to content

Commit

Permalink
fix: pass the file size to be uploaded to allow for S3 multipart uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
adrien-f committed Dec 10, 2018
1 parent d15ddc1 commit 876e122
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 21 deletions.
4 changes: 2 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
for _, m := range metas[5:] {
var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&m))
testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf))
testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf, int64(buf.Len())))
}

groups, err := sy.Groups()
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
fmt.Println("create", m.ULID)
var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&m))
testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf))
testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf, int64(buf.Len())))
}

// Do one initial synchronization with the bucket.
Expand Down
10 changes: 6 additions & 4 deletions pkg/compact/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,10 @@ func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxT
b, err := json.Marshal(meta1)
testutil.Ok(t, err)

testutil.Ok(t, bkt.Upload(context.Background(), id+"/meta.json", bytes.NewReader(b)))
testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000001", strings.NewReader("@test-data@")))
testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000002", strings.NewReader("@test-data@")))
testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000003", strings.NewReader("@test-data@")))
bytesReader := bytes.NewReader(b)
testutil.Ok(t, bkt.Upload(context.Background(), id+"/meta.json", bytesReader, bytesReader.Size()))
stringReader := strings.NewReader("@test-data@")
testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000001", stringReader, stringReader.Size()))
testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000002", stringReader, stringReader.Size()))
testutil.Ok(t, bkt.Upload(context.Background(), id+"/chunks/000003", stringReader, stringReader.Size()))
}
2 changes: 1 addition & 1 deletion pkg/objstore/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
}

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader, _ int64) error {
level.Debug(b.logger).Log("msg", "Uploading blob", "blob", name)
blobURL, err := getBlobURL(ctx, b.config.StorageAccountName, b.config.StorageAccountKey, b.config.ContainerName, name)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/objstore/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
}

// Upload writes the file specified in src to remote GCS location specified as target.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader, _ int64) error {
w := b.bkt.Object(name).NewWriter(ctx)

if _, err := io.Copy(w, r); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/objstore/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (b *Bucket) Exists(_ context.Context, name string) (bool, error) {
}

// Upload writes the file specified in src to into the memory.
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error {
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader, _ int64) error {
body, err := ioutil.ReadAll(r)
if err != nil {
return err
Expand Down
15 changes: 11 additions & 4 deletions pkg/objstore/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Bucket interface {
BucketReader

// Upload the contents of the reader as an object into the bucket.
Upload(ctx context.Context, name string, r io.Reader) error
Upload(ctx context.Context, name string, r io.Reader, sizeBytes int64) error

// Delete removes the object with the given name.
Delete(ctx context.Context, name string) error
Expand Down Expand Up @@ -83,7 +83,14 @@ func UploadFile(ctx context.Context, logger log.Logger, bkt Bucket, src, dst str
}
defer runutil.CloseWithLogOnErr(logger, r, "close file %s", src)

if err := bkt.Upload(ctx, dst, r); err != nil {
fileSize := int64(-1)
fileInfo, err := r.Stat()
if err != nil {
return errors.Wrapf(err, "could not stat file %s", src)
}
fileSize = fileInfo.Size()

if err := bkt.Upload(ctx, dst, r, fileSize); err != nil {
return errors.Wrapf(err, "upload file %s as %s", src, dst)
}
return nil
Expand Down Expand Up @@ -278,11 +285,11 @@ func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) {
return ok, err
}

func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) error {
func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader, sizeBytes int64) error {
const op = "upload"
start := time.Now()

err := b.bkt.Upload(ctx, name, r)
err := b.bkt.Upload(ctx, name, r, sizeBytes)
if err != nil {
b.opsFailures.WithLabelValues(op).Inc()
} else {
Expand Down
15 changes: 10 additions & 5 deletions pkg/objstore/objtesting/acceptance_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func TestObjStore_AcceptanceTest_e2e(t *testing.T) {
testutil.Assert(t, !ok, "expected not exits")

// Upload first object.
testutil.Ok(t, bkt.Upload(context.Background(), "id1/obj_1.some", strings.NewReader("@test-data@")))
testDataReader := strings.NewReader("@test-data@")
testutil.Ok(t, bkt.Upload(context.Background(), "id1/obj_1.some", testDataReader, testDataReader.Size()))

// Double check we can immediately read it.
rc1, err := bkt.Get(context.Background(), "id1/obj_1.some")
Expand All @@ -52,10 +53,14 @@ func TestObjStore_AcceptanceTest_e2e(t *testing.T) {
testutil.Assert(t, ok, "expected exits")

// Upload other objects.
testutil.Ok(t, bkt.Upload(context.Background(), "id1/obj_2.some", strings.NewReader("@test-data2@")))
testutil.Ok(t, bkt.Upload(context.Background(), "id1/obj_3.some", strings.NewReader("@test-data3@")))
testutil.Ok(t, bkt.Upload(context.Background(), "id2/obj_4.some", strings.NewReader("@test-data4@")))
testutil.Ok(t, bkt.Upload(context.Background(), "obj_5.some", strings.NewReader("@test-data5@")))
testDataReader2 := strings.NewReader("@test-data2@")
testDataReader3 := strings.NewReader("@test-data3@")
testDataReader4 := strings.NewReader("@test-data4@")
testDataReader5 := strings.NewReader("@test-data5@")
testutil.Ok(t, bkt.Upload(context.Background(), "id1/obj_2.some", testDataReader2, testDataReader2.Size()))
testutil.Ok(t, bkt.Upload(context.Background(), "id1/obj_3.some", testDataReader3, testDataReader3.Size()))
testutil.Ok(t, bkt.Upload(context.Background(), "id2/obj_4.some", testDataReader4, testDataReader4.Size()))
testutil.Ok(t, bkt.Upload(context.Background(), "obj_5.some", testDataReader5, testDataReader5.Size()))

// Can we iter over items from top dir?
var seen []string
Expand Down
4 changes: 2 additions & 2 deletions pkg/objstore/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
}

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
_, err := b.client.PutObjectWithContext(ctx, b.name, name, r, -1,
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader, sizeBytes int64) error {
_, err := b.client.PutObjectWithContext(ctx, b.name, name, r, sizeBytes,
minio.PutObjectOptions{ServerSideEncryption: b.sse, UserMetadata: map[string]string{"X-Amz-Acl": "bucket-owner-full-control"}},
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/objstore/swift/swift.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (c *Container) IsObjNotFoundErr(err error) bool {
}

// Upload writes the contents of the reader as an object into the container.
func (c *Container) Upload(ctx context.Context, name string, r io.Reader) error {
func (c *Container) Upload(ctx context.Context, name string, r io.Reader, _ int64) error {
options := &objects.CreateOpts{Content: r}
res := objects.Create(c.client, c.name, name, options)
return res.Err
Expand Down

0 comments on commit 876e122

Please sign in to comment.