Skip to content

Commit

Permalink
s3: fix s3 concurrent uploader will overwrite error (#48163)
Browse files Browse the repository at this point in the history
close #48164
  • Loading branch information
lance6716 committed Nov 1, 2023
1 parent 8b0c3c0 commit b99d1c4
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
5 changes: 3 additions & 2 deletions br/pkg/storage/ks3.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,9 +673,10 @@ func (rs *KS3Storage) Create(ctx context.Context, name string, option *WriterOpt
s3Writer.wg.Add(1)
go func() {
_, err := up.Upload(upParams)
err1 := rd.Close()
// like a channel we only let sender close the pipe in happy path
if err != nil {
log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err), zap.Error(err1))
log.Warn("upload to ks3 failed", zap.String("filename", name), zap.Error(err))
_ = rd.CloseWithError(err)
}
s3Writer.err = err
s3Writer.wg.Done()
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,9 +1044,10 @@ func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOpti
s3Writer.wg.Add(1)
go func() {
_, err := up.UploadWithContext(ctx, upParams)
err1 := rd.Close()
// like a channel we only let sender close the pipe in happy path
if err != nil {
log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err), zap.Error(err1))
log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err))
_ = rd.CloseWithError(err)
}
s3Writer.err = err
s3Writer.wg.Done()
Expand Down
18 changes: 18 additions & 0 deletions br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,24 @@ func TestWriteNoError(t *testing.T) {
require.NoError(t, err)
}

func TestMultiUploadErrorNotOverwritten(t *testing.T) {
s := createS3Suite(t)
ctx := aws.BackgroundContext()

s.s3.EXPECT().
CreateMultipartUploadWithContext(ctx, gomock.Any(), gomock.Any()).
Return(nil, errors.New("mock error"))

w, err := s.storage.Create(ctx, "file", &WriterOption{Concurrency: 2})
require.NoError(t, err)
// data should be larger than 5MB to trigger CreateMultipartUploadWithContext path
data := make([]byte, 5*1024*1024+6716)
n, err := w.Write(ctx, data)
require.NoError(t, err)
require.Equal(t, 5*1024*1024+6716, n)
require.ErrorContains(t, w.Close(ctx), "mock error")
}

// TestReadNoError ensures the ReadFile API issues a GetObject request and correctly
// read the entire body.
func TestReadNoError(t *testing.T) {
Expand Down

0 comments on commit b99d1c4

Please sign in to comment.