From d3af167c831e019474fa43ee62d727822bed711d Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 1 Nov 2023 12:58:39 +0800 Subject: [PATCH 1/4] s3: fix s3 concurrent uploader will overwrite error Signed-off-by: lance6716 --- br/pkg/storage/s3.go | 4 ++-- br/pkg/storage/s3_test.go | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 14ff8fc717fb5..eebbeb4653c77 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -1044,9 +1044,9 @@ func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOpti s3Writer.wg.Add(1) go func() { _, err := up.UploadWithContext(ctx, upParams) - err1 := rd.Close() if err != nil { - log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err), zap.Error(err1)) + log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err)) + _ = rd.CloseWithError(err) } s3Writer.err = err s3Writer.wg.Done() diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index d1f5d21bc617d..1f46dd41ec11c 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -484,6 +484,26 @@ func TestWriteNoError(t *testing.T) { require.NoError(t, err) } +// TestWriteNoError ensures the WriteFile API issues a PutObject request and wait +// until the object is available in the S3 bucket. +func TestMultiUploadErrorNotOverwritten(t *testing.T) { + s := createS3Suite(t) + ctx := aws.BackgroundContext() + + s.s3.EXPECT(). + CreateMultipartUploadWithContext(ctx, gomock.Any(), gomock.Any()). + Return(nil, errors.New("mock error")) + + w, err := s.storage.Create(ctx, "file", &WriterOption{Concurrency: 2}) + require.NoError(t, err) + // data should be larger than 5MB + data := make([]byte, 5*1024*1024+6716) + n, err := w.Write(ctx, data) + require.NoError(t, err) + require.Equal(t, 5*1024*1024+6716, n) + require.ErrorContains(t, w.Close(ctx), "mock error") +} + // TestReadNoError ensures the ReadFile API issues a GetObject request and correctly // read the entire body. func TestReadNoError(t *testing.T) { From 8f1b9ae15b68cf1ca6783e4d249f6e1e14b2fa00 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 1 Nov 2023 13:01:49 +0800 Subject: [PATCH 2/4] refine comment Signed-off-by: lance6716 --- br/pkg/storage/s3_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index 1f46dd41ec11c..829e2049bdbcd 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -484,8 +484,6 @@ func TestWriteNoError(t *testing.T) { require.NoError(t, err) } -// TestWriteNoError ensures the WriteFile API issues a PutObject request and wait -// until the object is available in the S3 bucket. func TestMultiUploadErrorNotOverwritten(t *testing.T) { s := createS3Suite(t) ctx := aws.BackgroundContext() @@ -496,7 +494,7 @@ func TestMultiUploadErrorNotOverwritten(t *testing.T) { w, err := s.storage.Create(ctx, "file", &WriterOption{Concurrency: 2}) require.NoError(t, err) - // data should be larger than 5MB + // data should be larger than 5MB to trigger CreateMultipartUploadWithContext path data := make([]byte, 5*1024*1024+6716) n, err := w.Write(ctx, data) require.NoError(t, err) From 728510adc302218dce472562ead2b46b05cd061b Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 1 Nov 2023 13:03:27 +0800 Subject: [PATCH 3/4] fix for ks3 as well Signed-off-by: lance6716 --- br/pkg/storage/ks3.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/storage/ks3.go b/br/pkg/storage/ks3.go index 42c4786c306df..f97038ee95c95 100644 --- a/br/pkg/storage/ks3.go +++ b/br/pkg/storage/ks3.go @@ -673,9 +673,9 @@ func (rs *KS3Storage) Create(ctx context.Context, name string, option *WriterOpt s3Writer.wg.Add(1) go func() { _, err := up.Upload(upParams) - err1 := rd.Close() if err != nil { - log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err), zap.Error(err1)) + log.Warn("upload to ks3 failed", zap.String("filename", name), zap.Error(err)) + _ = rd.CloseWithError(err) } s3Writer.err = err s3Writer.wg.Done() From 9f1a21fd5c1bb5049f9e11a8988c549f2f85844a Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 1 Nov 2023 14:39:07 +0800 Subject: [PATCH 4/4] add comment Signed-off-by: lance6716 --- br/pkg/storage/ks3.go | 1 + br/pkg/storage/s3.go | 1 + 2 files changed, 2 insertions(+) diff --git a/br/pkg/storage/ks3.go b/br/pkg/storage/ks3.go index f97038ee95c95..b917146b6f06a 100644 --- a/br/pkg/storage/ks3.go +++ b/br/pkg/storage/ks3.go @@ -673,6 +673,7 @@ func (rs *KS3Storage) Create(ctx context.Context, name string, option *WriterOpt s3Writer.wg.Add(1) go func() { _, err := up.Upload(upParams) + // like a channel we only let sender close the pipe in happy path if err != nil { log.Warn("upload to ks3 failed", zap.String("filename", name), zap.Error(err)) _ = rd.CloseWithError(err) diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index eebbeb4653c77..97f58ec5b89fd 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -1044,6 +1044,7 @@ func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOpti s3Writer.wg.Add(1) go func() { _, err := up.UploadWithContext(ctx, upParams) + // like a channel we only let sender close the pipe in happy path if err != nil { log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err)) _ = rd.CloseWithError(err)