Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support concurrent write for S3 writer #45723

Merged
merged 4 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/mock/storage/storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion br/pkg/storage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@
const azblobChunkSize = 64 * 1024 * 1024

// Create implements the StorageWriter interface.
func (s *AzureBlobStorage) Create(_ context.Context, name string) (ExternalFileWriter, error) {
func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {

Check warning on line 510 in br/pkg/storage/azblob.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/storage/azblob.go#L510

Added line #L510 was not covered by tests
client := s.containerClient.NewBlockBlobClient(s.withPrefix(name))
uploader := &azblobUploader{
blobClient: client,
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/storage/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ func WithCompression(inner ExternalStorage, compressionType CompressType) Extern
return &withCompression{ExternalStorage: inner, compressType: compressionType}
}

func (w *withCompression) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
func (w *withCompression) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
var (
writer ExternalFileWriter
err error
)
if s3Storage, ok := w.ExternalStorage.(*S3Storage); ok {
writer, err = s3Storage.CreateUploader(ctx, name)
} else {
writer, err = w.ExternalStorage.Create(ctx, name)
writer, err = w.ExternalStorage.Create(ctx, name, nil)
}
if err != nil {
return nil, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@
}

// Create implements ExternalStorage interface.
func (s *GCSStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
func (s *GCSStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {

Check warning on line 249 in br/pkg/storage/gcs.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/storage/gcs.go#L249

Added line #L249 was not covered by tests
object := s.objectName(name)
wc := s.bucket.Object(object).NewWriter(ctx)
wc.StorageClass = s.gcs.StorageClass
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
}

// Create opens a file writer by path. path is relative path to storage base path
func (*HDFSStorage) Create(_ context.Context, _ string) (ExternalFileWriter, error) {
func (*HDFSStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error) {

Check warning on line 126 in br/pkg/storage/hdfs.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/storage/hdfs.go#L126

Added line #L126 was not covered by tests
return nil, errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup")
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (l *LocalStorage) Open(_ context.Context, path string) (ExternalFileReader,
}

// Create implements ExternalStorage interface.
func (l *LocalStorage) Create(_ context.Context, name string) (ExternalFileWriter, error) {
func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
file, err := os.Create(filepath.Join(l.base, name))
if err != nil {
return nil, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestDeleteFile(t *testing.T) {
require.NoError(t, err)
require.Equal(t, false, ret)

_, err = store.Create(context.Background(), name)
_, err = store.Create(context.Background(), name, nil)
require.NoError(t, err)

ret, err = store.FileExists(context.Background(), name)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (*MemStorage) URI() string {
// Create creates a file and returning a writer to write data into.
// When the writer is closed, the data is stored in the file.
// It implements the `ExternalStorage` interface
func (s *MemStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestMemStoreBasic(t *testing.T) {
require.NotNil(t, err)

// create a writer to write
w, err := store.Create(ctx, "/hello.txt")
w, err := store.Create(ctx, "/hello.txt", nil)
require.Nil(t, err)
_, err = w.Write(ctx, []byte("hello world 3"))
require.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
}

// Create implements ExternalStorage interface.
func (*noopStorage) Create(_ context.Context, _ string) (ExternalFileWriter, error) {
func (*noopStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error) {

Check warning on line 46 in br/pkg/storage/noop.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/storage/noop.go#L46

Added line #L46 was not covered by tests
return &noopWriter{}, nil
}

Expand Down
57 changes: 53 additions & 4 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"regexp"
"strconv"
"strings"
"sync"
"time"

alicred "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials"
Expand Down Expand Up @@ -912,11 +913,59 @@
}, nil
}

// Create creates multi upload request.
func (rs *S3Storage) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
uploader, err := rs.CreateUploader(ctx, name)
type s3ObjectWriter struct {
wd *io.PipeWriter
wg *sync.WaitGroup
err error
}

// Write implement the io.Writer interface.
func (s *s3ObjectWriter) Write(ctx context.Context, p []byte) (int, error) {
n, err := s.wd.Write(p)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
return n, err

Check warning on line 925 in br/pkg/storage/s3.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/storage/s3.go#L923-L925

Added lines #L923 - L925 were not covered by tests
}

// Close implement the io.Closer interface.
func (s *s3ObjectWriter) Close(ctx context.Context) error {
err := s.wd.Close()

Check warning on line 930 in br/pkg/storage/s3.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/storage/s3.go#L929-L930

Added lines #L929 - L930 were not covered by tests
if err != nil {
return nil, err
return err
}
s.wg.Wait()
return s.err

Check warning on line 935 in br/pkg/storage/s3.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/storage/s3.go#L932-L935

Added lines #L932 - L935 were not covered by tests
}

// Create creates multi upload request.
func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (ExternalFileWriter, error) {
var uploader ExternalFileWriter
var err error
if option == nil || option.Concurrency <= 1 {
uploader, err = rs.CreateUploader(ctx, name)
if err != nil {
return nil, err
}
} else {
up := s3manager.NewUploaderWithClient(rs.svc, func(u *s3manager.Uploader) {
u.Concurrency = option.Concurrency
u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(option.Concurrency * 8 * 1024 * 1024)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we need to expose the 8MB as a configuration in future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, we can use hardcode for now.

})
rd, wd := io.Pipe()
upParams := &s3manager.UploadInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + name),
Body: rd,
}
s3Writer := &s3ObjectWriter{wd: wd, wg: &sync.WaitGroup{}}
s3Writer.wg.Add(1)
go func() {
_, err := up.Upload(upParams)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err))
}
s3Writer.err = err
s3Writer.wg.Done()

Check warning on line 966 in br/pkg/storage/s3.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/storage/s3.go#L939-L966

Added lines #L939 - L966 were not covered by tests
}()
uploader = s3Writer

Check warning on line 968 in br/pkg/storage/s3.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/storage/s3.go#L968

Added line #L968 was not covered by tests
}
uploaderWriter := newBufferedWriter(uploader, hardcodedS3ChunkSize, NoCompression)
return uploaderWriter, nil
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ type Writer interface {
Close(ctx context.Context) error
}

type WriterOption struct {
Concurrency int
}

// ExternalStorage represents a kind of file system storage.
type ExternalStorage interface {
// WriteFile writes a complete file to storage, similar to os.WriteFile, but WriteFile should be atomic
Expand All @@ -104,7 +108,7 @@ type ExternalStorage interface {
URI() string

// Create opens a file writer by path. path is relative path to storage base path
wjhuang2016 marked this conversation as resolved.
Show resolved Hide resolved
Create(ctx context.Context, path string) (ExternalFileWriter, error)
Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error)
// Rename file name from oldFileName to newFileName
Rename(ctx context.Context, oldFileName, newFileName string) error
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/storage/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestExternalFileWriter(t *testing.T) {
storage, err := Create(ctx, backend, true)
require.NoError(t, err)
fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt"
writer, err := storage.Create(ctx, fileName)
writer, err := storage.Create(ctx, fileName, nil)
require.NoError(t, err)
for _, str := range test.content {
p := []byte(str)
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestCompressReaderWriter(t *testing.T) {
storage = WithCompression(storage, test.compressType)
suffix := createSuffixString(test.compressType)
fileName := strings.ReplaceAll(test.name, " ", "-") + suffix
writer, err := storage.Create(ctx, fileName)
writer, err := storage.Create(ctx, fileName, nil)
require.NoError(t, err)
for _, str := range test.content {
p := []byte(str)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func fakeCheckpointFiles(
filename := fmt.Sprintf("%v.ts", info.storeID)
buff := make([]byte, 8)
binary.LittleEndian.PutUint64(buff, info.global_checkpoint)
if _, err := s.Create(ctx, filename); err != nil {
if _, err := s.Create(ctx, filename, nil); err != nil {
return errors.Trace(err)
}
if err := s.WriteFile(ctx, filename, buff); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions dumpling/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func writeBytes(tctx *tcontext.Context, writer storage.ExternalFileWriter, p []b
func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context) error, error) {
fileName += compressFileSuffix(compressType)
fullPath := s.URI() + "/" + fileName
writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName)
writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName, nil)
if err != nil {
tctx.L().Warn("fail to open file",
zap.String("path", fullPath),
Expand Down Expand Up @@ -487,7 +487,7 @@ func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage,
initRoutine := func() error {
// use separated context pCtx here to make sure context used in ExternalFile won't be canceled before close,
// which will cause a context canceled error when closing gcs's Writer
w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName)
w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName, nil)
if err != nil {
pCtx.L().Warn("fail to open file",
zap.String("path", fullPath),
Expand Down
Loading