From 267fe947829acb6a44e6339319098bb3cf6fdf36 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Tue, 1 Aug 2023 18:32:44 +0800 Subject: [PATCH 1/4] done Signed-off-by: wjhuang2016 --- br/pkg/mock/storage/storage.go | 2 +- br/pkg/storage/azblob.go | 2 +- br/pkg/storage/compress.go | 4 +-- br/pkg/storage/gcs.go | 2 +- br/pkg/storage/hdfs.go | 2 +- br/pkg/storage/local.go | 2 +- br/pkg/storage/local_test.go | 2 +- br/pkg/storage/memstore.go | 2 +- br/pkg/storage/memstore_test.go | 2 +- br/pkg/storage/noop.go | 2 +- br/pkg/storage/s3.go | 58 ++++++++++++++++++++++++++++++--- br/pkg/storage/storage.go | 6 +++- br/pkg/storage/writer_test.go | 4 +-- br/pkg/task/stream_test.go | 2 +- dumpling/export/writer_util.go | 4 +-- 15 files changed, 75 insertions(+), 21 deletions(-) diff --git a/br/pkg/mock/storage/storage.go b/br/pkg/mock/storage/storage.go index 32e96c1dd3448..83db0fffe0b88 100644 --- a/br/pkg/mock/storage/storage.go +++ b/br/pkg/mock/storage/storage.go @@ -38,7 +38,7 @@ func (m *MockExternalStorage) EXPECT() *MockExternalStorageMockRecorder { } // Create mocks base method. -func (m *MockExternalStorage) Create(arg0 context.Context, arg1 string) (storage.ExternalFileWriter, error) { +func (m *MockExternalStorage) Create(arg0 context.Context, arg1 string, _ *storage.WriterOption) (storage.ExternalFileWriter, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Create", arg0, arg1) ret0, _ := ret[0].(storage.ExternalFileWriter) diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index c3f9ecffb26de..48f19caaae8be 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -507,7 +507,7 @@ func (s *AzureBlobStorage) URI() string { const azblobChunkSize = 64 * 1024 * 1024 // Create implements the StorageWriter interface. -func (s *AzureBlobStorage) Create(_ context.Context, name string) (ExternalFileWriter, error) { +func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) { client := s.containerClient.NewBlockBlobClient(s.withPrefix(name)) uploader := &azblobUploader{ blobClient: client, diff --git a/br/pkg/storage/compress.go b/br/pkg/storage/compress.go index 5ab041cfe0bf2..544938cc4f7c2 100644 --- a/br/pkg/storage/compress.go +++ b/br/pkg/storage/compress.go @@ -24,7 +24,7 @@ func WithCompression(inner ExternalStorage, compressionType CompressType) Extern return &withCompression{ExternalStorage: inner, compressType: compressionType} } -func (w *withCompression) Create(ctx context.Context, name string) (ExternalFileWriter, error) { +func (w *withCompression) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) { var ( writer ExternalFileWriter err error @@ -32,7 +32,7 @@ func (w *withCompression) Create(ctx context.Context, name string) (ExternalFile if s3Storage, ok := w.ExternalStorage.(*S3Storage); ok { writer, err = s3Storage.CreateUploader(ctx, name) } else { - writer, err = w.ExternalStorage.Create(ctx, name) + writer, err = w.ExternalStorage.Create(ctx, name, nil) } if err != nil { return nil, errors.Trace(err) diff --git a/br/pkg/storage/gcs.go b/br/pkg/storage/gcs.go index db094cdc107da..f32d4344d9a83 100644 --- a/br/pkg/storage/gcs.go +++ b/br/pkg/storage/gcs.go @@ -246,7 +246,7 @@ func (s *GCSStorage) URI() string { } // Create implements ExternalStorage interface. -func (s *GCSStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error) { +func (s *GCSStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) { object := s.objectName(name) wc := s.bucket.Object(object).NewWriter(ctx) wc.StorageClass = s.gcs.StorageClass diff --git a/br/pkg/storage/hdfs.go b/br/pkg/storage/hdfs.go index 4a971f7a3cb0f..5877febbafab2 100644 --- a/br/pkg/storage/hdfs.go +++ b/br/pkg/storage/hdfs.go @@ -123,7 +123,7 @@ func (s *HDFSStorage) URI() string { } // Create opens a file writer by path. path is relative path to storage base path -func (*HDFSStorage) Create(_ context.Context, _ string) (ExternalFileWriter, error) { +func (*HDFSStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error) { return nil, errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup") } diff --git a/br/pkg/storage/local.go b/br/pkg/storage/local.go index d8ad2edc2e8aa..8079030aaa7f0 100644 --- a/br/pkg/storage/local.go +++ b/br/pkg/storage/local.go @@ -138,7 +138,7 @@ func (l *LocalStorage) Open(_ context.Context, path string) (ExternalFileReader, } // Create implements ExternalStorage interface. -func (l *LocalStorage) Create(_ context.Context, name string) (ExternalFileWriter, error) { +func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) { file, err := os.Create(filepath.Join(l.base, name)) if err != nil { return nil, errors.Trace(err) diff --git a/br/pkg/storage/local_test.go b/br/pkg/storage/local_test.go index 53ecd7cfb5b62..408c8acd0e2f8 100644 --- a/br/pkg/storage/local_test.go +++ b/br/pkg/storage/local_test.go @@ -26,7 +26,7 @@ func TestDeleteFile(t *testing.T) { require.NoError(t, err) require.Equal(t, false, ret) - _, err = store.Create(context.Background(), name) + _, err = store.Create(context.Background(), name, nil) require.NoError(t, err) ret, err = store.FileExists(context.Background(), name) diff --git a/br/pkg/storage/memstore.go b/br/pkg/storage/memstore.go index 96276ca600790..0ae108b89835e 100644 --- a/br/pkg/storage/memstore.go +++ b/br/pkg/storage/memstore.go @@ -219,7 +219,7 @@ func (*MemStorage) URI() string { // Create creates a file and returning a writer to write data into. // When the writer is closed, the data is stored in the file. // It implements the `ExternalStorage` interface -func (s *MemStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error) { +func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) { select { case <-ctx.Done(): return nil, ctx.Err() diff --git a/br/pkg/storage/memstore_test.go b/br/pkg/storage/memstore_test.go index 3ae9a08d168bc..1f32ed0a27d71 100644 --- a/br/pkg/storage/memstore_test.go +++ b/br/pkg/storage/memstore_test.go @@ -50,7 +50,7 @@ func TestMemStoreBasic(t *testing.T) { require.NotNil(t, err) // create a writer to write - w, err := store.Create(ctx, "/hello.txt") + w, err := store.Create(ctx, "/hello.txt", nil) require.Nil(t, err) _, err = w.Write(ctx, []byte("hello world 3")) require.Nil(t, err) diff --git a/br/pkg/storage/noop.go b/br/pkg/storage/noop.go index 8e58366efdcf5..9d8e39abde68c 100644 --- a/br/pkg/storage/noop.go +++ b/br/pkg/storage/noop.go @@ -43,7 +43,7 @@ func (*noopStorage) URI() string { } // Create implements ExternalStorage interface. -func (*noopStorage) Create(_ context.Context, _ string) (ExternalFileWriter, error) { +func (*noopStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error) { return &noopWriter{}, nil } diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index d6d29706825bc..7db563a540938 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -12,6 +12,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" alicred "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials" @@ -912,11 +913,60 @@ func (rs *S3Storage) CreateUploader(ctx context.Context, name string) (ExternalF }, nil } -// Create creates multi upload request. -func (rs *S3Storage) Create(ctx context.Context, name string) (ExternalFileWriter, error) { - uploader, err := rs.CreateUploader(ctx, name) +type s3ObjectWriter struct { + wd *io.PipeWriter + wg *sync.WaitGroup + err error +} + +// Write implement the io.Writer interface. +func (s *s3ObjectWriter) Write(ctx context.Context, p []byte) (int, error) { + n, err := s.wd.Write(p) + return n, err +} + +// Close implement the io.Closer interface. +func (s *s3ObjectWriter) Close(ctx context.Context) error { + err := s.wd.Close() if err != nil { - return nil, err + return err + } + s.wg.Wait() + return s.err +} + +// Create creates multi upload request. +func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (ExternalFileWriter, error) { + var uploader ExternalFileWriter + var err error + if option == nil || option.Concurrency <= 1 { + uploader, err = rs.CreateUploader(ctx, name) + if err != nil { + return nil, err + } + } else { + up := s3manager.NewUploaderWithClient(rs.svc, func(u *s3manager.Uploader) { + u.Concurrency = option.Concurrency + u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(option.Concurrency * 8 * 1024 * 1024) + }) + rd, wd := io.Pipe() + upParams := &s3manager.UploadInput{ + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + name), + Body: rd, + } + s3Writer := &s3ObjectWriter{wd: wd, wg: &sync.WaitGroup{}} + s3Writer.wg.Add(1) + go func() { + _, err := up.Upload(upParams) + if err != nil { + log.Error("upload to s3 failed", zap.Error(err)) + } + log.Info("upload to s3 success") + s3Writer.err = err + s3Writer.wg.Done() + }() + uploader = s3Writer } uploaderWriter := newBufferedWriter(uploader, hardcodedS3ChunkSize, NoCompression) return uploaderWriter, nil diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 377ef962eb239..1a22ed1671cba 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -80,6 +80,10 @@ type Writer interface { Close(ctx context.Context) error } +type WriterOption struct { + Concurrency int +} + // ExternalStorage represents a kind of file system storage. type ExternalStorage interface { // WriteFile writes a complete file to storage, similar to os.WriteFile, but WriteFile should be atomic @@ -104,7 +108,7 @@ type ExternalStorage interface { URI() string // Create opens a file writer by path. path is relative path to storage base path - Create(ctx context.Context, path string) (ExternalFileWriter, error) + Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error) // Rename file name from oldFileName to newFileName Rename(ctx context.Context, oldFileName, newFileName string) error } diff --git a/br/pkg/storage/writer_test.go b/br/pkg/storage/writer_test.go index 22fa87d34de47..4f41aeb97a4f1 100644 --- a/br/pkg/storage/writer_test.go +++ b/br/pkg/storage/writer_test.go @@ -29,7 +29,7 @@ func TestExternalFileWriter(t *testing.T) { storage, err := Create(ctx, backend, true) require.NoError(t, err) fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt" - writer, err := storage.Create(ctx, fileName) + writer, err := storage.Create(ctx, fileName, nil) require.NoError(t, err) for _, str := range test.content { p := []byte(str) @@ -105,7 +105,7 @@ func TestCompressReaderWriter(t *testing.T) { storage = WithCompression(storage, test.compressType) suffix := createSuffixString(test.compressType) fileName := strings.ReplaceAll(test.name, " ", "-") + suffix - writer, err := storage.Create(ctx, fileName) + writer, err := storage.Create(ctx, fileName, nil) require.NoError(t, err) for _, str := range test.content { p := []byte(str) diff --git a/br/pkg/task/stream_test.go b/br/pkg/task/stream_test.go index 3ef57a71a07ef..7b6f5e20b5198 100644 --- a/br/pkg/task/stream_test.go +++ b/br/pkg/task/stream_test.go @@ -217,7 +217,7 @@ func fakeCheckpointFiles( filename := fmt.Sprintf("%v.ts", info.storeID) buff := make([]byte, 8) binary.LittleEndian.PutUint64(buff, info.global_checkpoint) - if _, err := s.Create(ctx, filename); err != nil { + if _, err := s.Create(ctx, filename, nil); err != nil { return errors.Trace(err) } if err := s.WriteFile(ctx, filename, buff); err != nil { diff --git a/dumpling/export/writer_util.go b/dumpling/export/writer_util.go index fe7672f4f0015..2a4dbbc1cf4cd 100644 --- a/dumpling/export/writer_util.go +++ b/dumpling/export/writer_util.go @@ -454,7 +454,7 @@ func writeBytes(tctx *tcontext.Context, writer storage.ExternalFileWriter, p []b func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context) error, error) { fileName += compressFileSuffix(compressType) fullPath := s.URI() + "/" + fileName - writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName) + writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName, nil) if err != nil { tctx.L().Warn("fail to open file", zap.String("path", fullPath), @@ -487,7 +487,7 @@ func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage, initRoutine := func() error { // use separated context pCtx here to make sure context used in ExternalFile won't be canceled before close, // which will cause a context canceled error when closing gcs's Writer - w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName) + w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName, nil) if err != nil { pCtx.L().Warn("fail to open file", zap.String("path", fullPath), From 132cb87d1c10a698e52a1b960616965291db36b1 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Tue, 1 Aug 2023 18:39:03 +0800 Subject: [PATCH 2/4] refine Signed-off-by: wjhuang2016 --- br/pkg/storage/s3.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 7db563a540938..3eb1093813945 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -960,9 +960,8 @@ func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOpti go func() { _, err := up.Upload(upParams) if err != nil { - log.Error("upload to s3 failed", zap.Error(err)) + log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err)) } - log.Info("upload to s3 success") s3Writer.err = err s3Writer.wg.Done() }() From 7c03dc10fe10a59d9be1eff09a92be5e1d7a60ca Mon Sep 17 00:00:00 2001 From: wjHuang Date: Thu, 3 Aug 2023 16:08:50 +0800 Subject: [PATCH 3/4] Update br/pkg/storage/storage.go Co-authored-by: lance6716 --- br/pkg/storage/storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 1a22ed1671cba..894019a2ede8a 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -107,7 +107,7 @@ type ExternalStorage interface { // URI returns the base path as a URI URI() string - // Create opens a file writer by path. path is relative path to storage base path + // Create opens a file writer by path. path is relative path to storage base path. Currently only s3 implemented WriterOption Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error) // Rename file name from oldFileName to newFileName Rename(ctx context.Context, oldFileName, newFileName string) error From 814a854045b9a7845fe5ab025b227c18409367ed Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Thu, 3 Aug 2023 16:18:08 +0800 Subject: [PATCH 4/4] refine Signed-off-by: wjhuang2016 --- br/pkg/storage/s3.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 3eb1093813945..898129149cdce 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -920,13 +920,12 @@ type s3ObjectWriter struct { } // Write implement the io.Writer interface. -func (s *s3ObjectWriter) Write(ctx context.Context, p []byte) (int, error) { - n, err := s.wd.Write(p) - return n, err +func (s *s3ObjectWriter) Write(_ context.Context, p []byte) (int, error) { + return s.wd.Write(p) } // Close implement the io.Closer interface. -func (s *s3ObjectWriter) Close(ctx context.Context) error { +func (s *s3ObjectWriter) Close(_ context.Context) error { err := s.wd.Close() if err != nil { return err @@ -958,9 +957,10 @@ func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOpti s3Writer := &s3ObjectWriter{wd: wd, wg: &sync.WaitGroup{}} s3Writer.wg.Add(1) go func() { - _, err := up.Upload(upParams) + _, err := up.UploadWithContext(ctx, upParams) + err1 := rd.Close() if err != nil { - log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err)) + log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err), zap.Error(err1)) } s3Writer.err = err s3Writer.wg.Done()