Skip to content

Commit 79bcffb

Browse files
authored
*: support concurrent write for S3 writer (#45723) (#49185)
ref #45719, close #48607
1 parent cbba9a9 commit 79bcffb

15 files changed

+75
-22
lines changed

br/pkg/mock/storage/storage.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

br/pkg/storage/azblob.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ func (s *AzureBlobStorage) URI() string {
415415
}
416416

417417
// Create implements the StorageWriter interface.
418-
func (s *AzureBlobStorage) Create(_ context.Context, name string) (ExternalFileWriter, error) {
418+
func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
419419
client := s.containerClient.NewBlockBlobClient(s.withPrefix(name))
420420
uploader := &azblobUploader{
421421
blobClient: client,

br/pkg/storage/compress.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ func WithCompression(inner ExternalStorage, compressionType CompressType) Extern
2424
return &withCompression{ExternalStorage: inner, compressType: compressionType}
2525
}
2626

27-
func (w *withCompression) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
27+
func (w *withCompression) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
2828
var (
2929
writer ExternalFileWriter
3030
err error
3131
)
3232
if s3Storage, ok := w.ExternalStorage.(*S3Storage); ok {
3333
writer, err = s3Storage.CreateUploader(ctx, name)
3434
} else {
35-
writer, err = w.ExternalStorage.Create(ctx, name)
35+
writer, err = w.ExternalStorage.Create(ctx, name, nil)
3636
}
3737
if err != nil {
3838
return nil, errors.Trace(err)

br/pkg/storage/gcs.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func (s *GCSStorage) URI() string {
246246
}
247247

248248
// Create implements ExternalStorage interface.
249-
func (s *GCSStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
249+
func (s *GCSStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
250250
object := s.objectName(name)
251251
wc := s.bucket.Object(object).NewWriter(ctx)
252252
wc.StorageClass = s.gcs.StorageClass

br/pkg/storage/hdfs.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func (s *HDFSStorage) URI() string {
123123
}
124124

125125
// Create opens a file writer by path. path is relative path to storage base path
126-
func (*HDFSStorage) Create(_ context.Context, _ string) (ExternalFileWriter, error) {
126+
func (*HDFSStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error) {
127127
return nil, errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup")
128128
}
129129

br/pkg/storage/local.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (l *LocalStorage) Open(_ context.Context, path string) (ExternalFileReader,
131131
}
132132

133133
// Create implements ExternalStorage interface.
134-
func (l *LocalStorage) Create(_ context.Context, name string) (ExternalFileWriter, error) {
134+
func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
135135
file, err := os.Create(filepath.Join(l.base, name))
136136
if err != nil {
137137
return nil, errors.Trace(err)

br/pkg/storage/local_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func TestDeleteFile(t *testing.T) {
2525
require.NoError(t, err)
2626
require.Equal(t, false, ret)
2727

28-
_, err = store.Create(context.Background(), name)
28+
_, err = store.Create(context.Background(), name, nil)
2929
require.NoError(t, err)
3030

3131
ret, err = store.FileExists(context.Background(), name)

br/pkg/storage/memstore.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func (*MemStorage) URI() string {
219219
// Create creates a file and returning a writer to write data into.
220220
// When the writer is closed, the data is stored in the file.
221221
// It implements the `ExternalStorage` interface
222-
func (s *MemStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
222+
func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
223223
select {
224224
case <-ctx.Done():
225225
return nil, ctx.Err()

br/pkg/storage/memstore_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func TestMemStoreBasic(t *testing.T) {
5050
require.NotNil(t, err)
5151

5252
// create a writer to write
53-
w, err := store.Create(ctx, "/hello.txt")
53+
w, err := store.Create(ctx, "/hello.txt", nil)
5454
require.Nil(t, err)
5555
_, err = w.Write(ctx, []byte("hello world 3"))
5656
require.Nil(t, err)

br/pkg/storage/noop.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (*noopStorage) URI() string {
4343
}
4444

4545
// Create implements ExternalStorage interface.
46-
func (*noopStorage) Create(_ context.Context, _ string) (ExternalFileWriter, error) {
46+
func (*noopStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error) {
4747
return &noopWriter{}, nil
4848
}
4949

br/pkg/storage/s3.go

+53-4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"regexp"
1313
"strconv"
1414
"strings"
15+
"sync"
1516
"time"
1617

1718
alicred "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials"
@@ -912,11 +913,59 @@ func (rs *S3Storage) CreateUploader(ctx context.Context, name string) (ExternalF
912913
}, nil
913914
}
914915

915-
// Create creates multi upload request.
916-
func (rs *S3Storage) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
917-
uploader, err := rs.CreateUploader(ctx, name)
916+
type s3ObjectWriter struct {
917+
wd *io.PipeWriter
918+
wg *sync.WaitGroup
919+
err error
920+
}
921+
922+
// Write implement the io.Writer interface.
923+
func (s *s3ObjectWriter) Write(_ context.Context, p []byte) (int, error) {
924+
return s.wd.Write(p)
925+
}
926+
927+
// Close implement the io.Closer interface.
928+
func (s *s3ObjectWriter) Close(_ context.Context) error {
929+
err := s.wd.Close()
918930
if err != nil {
919-
return nil, err
931+
return err
932+
}
933+
s.wg.Wait()
934+
return s.err
935+
}
936+
937+
// Create creates multi upload request.
938+
func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (ExternalFileWriter, error) {
939+
var uploader ExternalFileWriter
940+
var err error
941+
if option == nil || option.Concurrency <= 1 {
942+
uploader, err = rs.CreateUploader(ctx, name)
943+
if err != nil {
944+
return nil, err
945+
}
946+
} else {
947+
up := s3manager.NewUploaderWithClient(rs.svc, func(u *s3manager.Uploader) {
948+
u.Concurrency = option.Concurrency
949+
u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(option.Concurrency * 8 * 1024 * 1024)
950+
})
951+
rd, wd := io.Pipe()
952+
upParams := &s3manager.UploadInput{
953+
Bucket: aws.String(rs.options.Bucket),
954+
Key: aws.String(rs.options.Prefix + name),
955+
Body: rd,
956+
}
957+
s3Writer := &s3ObjectWriter{wd: wd, wg: &sync.WaitGroup{}}
958+
s3Writer.wg.Add(1)
959+
go func() {
960+
_, err := up.UploadWithContext(ctx, upParams)
961+
err1 := rd.Close()
962+
if err != nil {
963+
log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err), zap.Error(err1))
964+
}
965+
s3Writer.err = err
966+
s3Writer.wg.Done()
967+
}()
968+
uploader = s3Writer
920969
}
921970
uploaderWriter := newBufferedWriter(uploader, hardcodedS3ChunkSize, NoCompression)
922971
return uploaderWriter, nil

br/pkg/storage/storage.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ type Writer interface {
7878
Close(ctx context.Context) error
7979
}
8080

81+
type WriterOption struct {
82+
Concurrency int
83+
}
84+
8185
// ExternalStorage represents a kind of file system storage.
8286
type ExternalStorage interface {
8387
// WriteFile writes a complete file to storage, similar to os.WriteFile, but WriteFile should be atomic
@@ -101,8 +105,8 @@ type ExternalStorage interface {
101105
// URI returns the base path as a URI
102106
URI() string
103107

104-
// Create opens a file writer by path. path is relative path to storage base path
105-
Create(ctx context.Context, path string) (ExternalFileWriter, error)
108+
// Create opens a file writer by path. path is relative path to storage base path. Currently only s3 implemented WriterOption
109+
Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error)
106110
// Rename file name from oldFileName to newFileName
107111
Rename(ctx context.Context, oldFileName, newFileName string) error
108112
}

br/pkg/storage/writer_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestExternalFileWriter(t *testing.T) {
2929
storage, err := Create(ctx, backend, true)
3030
require.NoError(t, err)
3131
fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt"
32-
writer, err := storage.Create(ctx, fileName)
32+
writer, err := storage.Create(ctx, fileName, nil)
3333
require.NoError(t, err)
3434
for _, str := range test.content {
3535
p := []byte(str)
@@ -105,7 +105,7 @@ func TestCompressReaderWriter(t *testing.T) {
105105
storage = WithCompression(storage, test.compressType)
106106
suffix := createSuffixString(test.compressType)
107107
fileName := strings.ReplaceAll(test.name, " ", "-") + suffix
108-
writer, err := storage.Create(ctx, fileName)
108+
writer, err := storage.Create(ctx, fileName, nil)
109109
require.NoError(t, err)
110110
for _, str := range test.content {
111111
p := []byte(str)

br/pkg/task/stream_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func fakeCheckpointFiles(
155155
filename := fmt.Sprintf("%v.ts", info.storeID)
156156
buff := make([]byte, 8)
157157
binary.LittleEndian.PutUint64(buff, info.global_checkpoint)
158-
if _, err := s.Create(ctx, filename); err != nil {
158+
if _, err := s.Create(ctx, filename, nil); err != nil {
159159
return errors.Trace(err)
160160
}
161161
if err := s.WriteFile(ctx, filename, buff); err != nil {

dumpling/export/writer_util.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ func writeBytes(tctx *tcontext.Context, writer storage.ExternalFileWriter, p []b
454454
func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context) error, error) {
455455
fileName += compressFileSuffix(compressType)
456456
fullPath := s.URI() + "/" + fileName
457-
writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName)
457+
writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName, nil)
458458
if err != nil {
459459
tctx.L().Warn("fail to open file",
460460
zap.String("path", fullPath),
@@ -487,7 +487,7 @@ func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage,
487487
initRoutine := func() error {
488488
// use separated context pCtx here to make sure context used in ExternalFile won't be canceled before close,
489489
// which will cause a context canceled error when closing gcs's Writer
490-
w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName)
490+
w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName, nil)
491491
if err != nil {
492492
pCtx.L().Warn("fail to open file",
493493
zap.String("path", fullPath),

0 commit comments

Comments
 (0)