Skip to content

Commit

Permalink
storage: support parallel write for gcs (#49545)
Browse files Browse the repository at this point in the history
close #48443
  • Loading branch information
lance6716 authored Dec 19, 2023
1 parent b850d26 commit b8fe33a
Show file tree
Hide file tree
Showing 11 changed files with 602 additions and 80 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"//br/pkg/storage",
"//pkg/kv",
"//pkg/metrics",
"//pkg/util",
"//pkg/util/hack",
"//pkg/util/logutil",
"//pkg/util/size",
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ func (r *byteReader) readNBytes(n int) ([]byte, error) {
return bs[0], nil
}
// need to flatten bs
if n <= 0 {
return nil, errors.Errorf("illegal n (%d) when reading from external storage", n)
}
if n > int(size.GB) {
return nil, errors.Errorf("read %d bytes from external storage, exceed max limit %d", n, size.GB)
}
Expand Down
11 changes: 7 additions & 4 deletions br/pkg/lightning/backend/external/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"io"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

func readAllData(
Expand Down Expand Up @@ -65,12 +66,13 @@ func readAllData(
if err != nil {
return err
}
var eg errgroup.Group
eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)
// TODO(lance6716): limit the concurrency of eg to 30 does not help
for i := range dataFiles {
i := i
eg.Go(func() error {
return readOneFile(
ctx,
err2 := readOneFile(
egCtx,
storage,
dataFiles[i],
startKey,
Expand All @@ -80,6 +82,7 @@ func readAllData(
bufPool,
output,
)
return errors.Annotatef(err2, "failed to read file %s", dataFiles[i])
})
}
return eg.Wait()
Expand Down
165 changes: 96 additions & 69 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func (b *WriterBuilder) Build(
filenamePrefix: filenamePrefix,
keyAdapter: keyAdapter,
writerID: writerID,
kvStore: nil,
onClose: b.onClose,
closed: false,
multiFileStats: make([]MultipleFilesStat, 1),
Expand Down Expand Up @@ -293,8 +292,7 @@ type Writer struct {
filenamePrefix string
keyAdapter common.KeyAdapter

kvStore *KeyValueStore
rc *rangePropertiesCollector
rc *rangePropertiesCollector

memSizeLimit uint64

Expand Down Expand Up @@ -400,88 +398,53 @@ func (w *Writer) recordMinMax(newMin, newMax tidbkv.Key, size uint64) {
w.totalSize += size
}

const flushKVsRetryTimes = 3

func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
if len(w.kvLocations) == 0 {
return nil
}

logger := logutil.Logger(ctx)
dataFile, statFile, dataWriter, statWriter, err := w.createStorageWriter(ctx)
if err != nil {
return err
}

var (
savedBytes uint64
statSize int
sortDuration, writeDuration time.Duration
writeStartTime time.Time
logger := logutil.Logger(ctx).With(
zap.String("writer-id", w.writerID),
zap.Int("sequence-number", w.currentSeq),
)
savedBytes = w.batchSize
startTs := time.Now()

kvCnt := len(w.kvLocations)
defer func() {
w.currentSeq++
err1, err2 := dataWriter.Close(ctx), statWriter.Close(ctx)
if err != nil {
return
}
if err1 != nil {
logger.Error("close data writer failed", zap.Error(err1))
err = err1
return
}
if err2 != nil {
logger.Error("close stat writer failed", zap.Error(err2))
err = err2
return
}
writeDuration = time.Since(writeStartTime)
logger.Info("flush kv",
zap.Uint64("bytes", savedBytes),
zap.Int("kv-cnt", kvCnt),
zap.Int("stat-size", statSize),
zap.Duration("sort-time", sortDuration),
zap.Duration("write-time", writeDuration),
zap.String("sort-speed(kv/s)", getSpeed(uint64(kvCnt), sortDuration.Seconds(), false)),
zap.String("write-speed(bytes/s)", getSpeed(savedBytes, writeDuration.Seconds(), true)),
zap.String("writer-id", w.writerID),
)
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("write").Observe(writeDuration.Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("write").Observe(float64(savedBytes) / 1024.0 / 1024.0 / writeDuration.Seconds())
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort_and_write").Observe(time.Since(startTs).Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort_and_write").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTs).Seconds())
}()

sortStart := time.Now()
slices.SortFunc(w.kvLocations, func(i, j membuf.SliceLocation) int {
return bytes.Compare(w.getKeyByLoc(i), w.getKeyByLoc(j))
})
sortDuration = time.Since(sortStart)

writeStartTime = time.Now()
sortDuration := time.Since(sortStart)
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort").Observe(sortDuration.Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort").Observe(float64(savedBytes) / 1024.0 / 1024.0 / sortDuration.Seconds())
w.kvStore, err = NewKeyValueStore(ctx, dataWriter, w.rc)
if err != nil {
return err
}

for _, pair := range w.kvLocations {
err = w.kvStore.addEncodedData(w.kvBuffer.GetSlice(pair))
if err != nil {
return err
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort").Observe(float64(w.batchSize) / 1024.0 / 1024.0 / sortDuration.Seconds())

writeStartTime := time.Now()
var dataFile, statFile string
for i := 0; i < flushKVsRetryTimes; i++ {
dataFile, statFile, err = w.flushSortedKVs(ctx)
if err == nil {
break
}
logger.Warn("flush sorted kv failed",
zap.Error(err),
zap.Int("retry-count", i),
)
}

w.kvStore.Close()
encodedStat := w.rc.encode()
statSize = len(encodedStat)
_, err = statWriter.Write(ctx, encodedStat)
if err != nil {
return err
}
writeDuration := time.Since(writeStartTime)
kvCnt := len(w.kvLocations)
logger.Info("flush kv",
zap.Uint64("bytes", w.batchSize),
zap.Int("kv-cnt", kvCnt),
zap.Duration("sort-time", sortDuration),
zap.Duration("write-time", writeDuration),
zap.String("sort-speed(kv/s)", getSpeed(uint64(kvCnt), sortDuration.Seconds(), false)),
zap.String("writer-id", w.writerID),
)
totalDuration := time.Since(sortStart)
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort_and_write").Observe(totalDuration.Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort_and_write").Observe(float64(w.batchSize) / 1024.0 / 1024.0 / totalDuration.Seconds())

minKey, maxKey := w.getKeyByLoc(w.kvLocations[0]), w.getKeyByLoc(w.kvLocations[len(w.kvLocations)-1])
w.recordMinMax(minKey, maxKey, uint64(w.kvSize))
Expand All @@ -507,9 +470,73 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
w.kvBuffer.Reset()
w.rc.reset()
w.batchSize = 0
w.currentSeq++
return nil
}

func (w *Writer) flushSortedKVs(ctx context.Context) (string, string, error) {
logger := logutil.Logger(ctx).With(
zap.String("writer-id", w.writerID),
zap.Int("sequence-number", w.currentSeq),
)
writeStartTime := time.Now()
dataFile, statFile, dataWriter, statWriter, err := w.createStorageWriter(ctx)
if err != nil {
return "", "", err
}
defer func() {
// close the writers when meet error. If no error happens, writers will
// be closed outside and assigned to nil.
if dataWriter != nil {
_ = dataWriter.Close(ctx)
}
if statWriter != nil {
_ = statWriter.Close(ctx)
}
}()
kvStore, err := NewKeyValueStore(ctx, dataWriter, w.rc)
if err != nil {
return "", "", err
}

for _, pair := range w.kvLocations {
err = kvStore.addEncodedData(w.kvBuffer.GetSlice(pair))
if err != nil {
return "", "", err
}
}

kvStore.Close()
encodedStat := w.rc.encode()
statSize := len(encodedStat)
_, err = statWriter.Write(ctx, encodedStat)
if err != nil {
return "", "", err
}
err = dataWriter.Close(ctx)
dataWriter = nil
if err != nil {
return "", "", err
}
err = statWriter.Close(ctx)
statWriter = nil
if err != nil {
return "", "", err
}

writeDuration := time.Since(writeStartTime)
logger.Info("flush sorted kv",
zap.Uint64("bytes", w.batchSize),
zap.Int("stat-size", statSize),
zap.Duration("write-time", writeDuration),
zap.String("write-speed(bytes/s)", getSpeed(w.batchSize, writeDuration.Seconds(), true)),
)
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("write").Observe(writeDuration.Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("write").Observe(float64(w.batchSize) / 1024.0 / 1024.0 / writeDuration.Seconds())

return dataFile, statFile, nil
}

func (w *Writer) getKeyByLoc(loc membuf.SliceLocation) []byte {
block := w.kvBuffer.GetSlice(loc)
keyLen := binary.BigEndian.Uint64(block[:lengthBytes])
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"compress.go",
"flags.go",
"gcs.go",
"gcs_extra.go",
"hdfs.go",
"helper.go",
"ks3.go",
Expand Down Expand Up @@ -49,6 +50,7 @@ go_library(
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//bloberror",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//blockblob",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//container",
"@com_github_go_resty_resty_v2//:resty",
"@com_github_google_uuid//:uuid",
"@com_github_klauspost_compress//gzip",
"@com_github_klauspost_compress//snappy",
Expand Down
32 changes: 25 additions & 7 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (options *GCSBackendOptions) parseFromFlags(flags *pflag.FlagSet) error {
type GCSStorage struct {
gcs *backuppb.GCS
bucket *storage.BucketHandle
cli *storage.Client
}

// GetBucketHandle gets the handle to the GCS API on the bucket.
Expand Down Expand Up @@ -272,12 +273,29 @@ func (s *GCSStorage) URI() string {
}

// Create implements ExternalStorage interface.
func (s *GCSStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
object := s.objectName(name)
wc := s.bucket.Object(object).NewWriter(ctx)
wc.StorageClass = s.gcs.StorageClass
wc.PredefinedACL = s.gcs.PredefinedAcl
return newFlushStorageWriter(wc, &emptyFlusher{}, wc), nil
func (s *GCSStorage) Create(ctx context.Context, name string, wo *WriterOption) (ExternalFileWriter, error) {
// NewGCSWriter requires real testing environment on Google Cloud.
mockGCS := intest.InTest && strings.Contains(s.gcs.GetEndpoint(), "127.0.0.1")
if wo == nil || wo.Concurrency <= 1 || mockGCS {
object := s.objectName(name)
wc := s.bucket.Object(object).NewWriter(ctx)
wc.StorageClass = s.gcs.StorageClass
wc.PredefinedACL = s.gcs.PredefinedAcl
return newFlushStorageWriter(wc, &emptyFlusher{}, wc), nil
}
uri := s.objectName(name)
// 5MB is the minimum part size for GCS.
partSize := int64(gcsMinimumChunkSize)
if wo.PartSize > partSize {
partSize = wo.PartSize
}
w, err := NewGCSWriter(ctx, s.cli, uri, partSize, wo.Concurrency, s.gcs.Bucket)
if err != nil {
return nil, errors.Trace(err)
}
fw := newFlushStorageWriter(w, &emptyFlusher{}, w)
bw := newBufferedWriter(fw, int(partSize), NoCompression)
return bw, nil
}

// Rename file name from oldFileName to newFileName.
Expand Down Expand Up @@ -371,7 +389,7 @@ skipHandleCred:
// so we need find sst in slash directory
gcs.Prefix += "//"
}
return &GCSStorage{gcs: gcs, bucket: bucket}, nil
return &GCSStorage{gcs: gcs, bucket: bucket, cli: client}, nil
}

func hasSSTFiles(ctx context.Context, bucket *storage.BucketHandle, prefix string) bool {
Expand Down
Loading

0 comments on commit b8fe33a

Please sign in to comment.