Skip to content

Commit

Permalink
Merge #66876
Browse files Browse the repository at this point in the history
66876: backupccl: buffer/sort returned files before flushing to remote r=dt a=dt

BACKUP has multiple workers sending export requests concurrently so the
returned files may arrive out of order. Previously this would always
force the writer that flushed returned files out to the remote file to
close the file it had been writing an open a new one, as any one file
must be in-order. This adds a small queue of returned files to the sink
to which it adds files it is given to write to remote storage. Once the
queue has accumulated, it is sorted and partially drained to the remote
file. This should increase the odds that files are added to the remote
file in-order and thus do not require closing and re-opening additional
remote files.

With default worker (3) on a tpcc5k cluster running an incremental backup
this was observed to reduce the number of files a node wrote from ~50-70 to
<10.

Release note: none.

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
  • Loading branch information
craig[bot] and dt committed Jun 28, 2021
2 parents ff9ea89 + ee6fe3a commit 9785a7b
Showing 1 changed file with 84 additions and 7 deletions.
91 changes: 84 additions & 7 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"fmt"
"io"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -79,8 +80,18 @@ var (
16<<20,
settings.NonNegativeInt,
)
smallFileBuffer = settings.RegisterByteSizeSetting(
"bulkio.backup.merge_file_buffer_size",
"size limit used when buffering backup files before merging them",
16<<20,
settings.NonNegativeInt,
)
)

// maxSinkQueueFiles is how many replies we'll queue up before flushing to allow
// some re-ordering, unless we hit smallFileBuffer size first.
const maxSinkQueueFiles = 24

const backupProcessorName = "backupDataProcessor"

// TODO(pbardea): It would be nice if we could add some DistSQL processor tests
Expand Down Expand Up @@ -444,6 +455,7 @@ func runBackupProcessor(
enc: spec.Encryption,
targetFileSize: targetFileSize,
progCh: progCh,
settings: &flowCtx.Cfg.Settings.SV,
}

defaultStore, err := flowCtx.Cfg.ExternalStorage(ctx, defaultConf)
Expand Down Expand Up @@ -486,7 +498,7 @@ func runBackupProcessor(
sink = defaultSink
}

if err := sink.write(ctx, res); err != nil {
if err := sink.push(ctx, res); err != nil {
return err
}
}
Expand All @@ -507,24 +519,41 @@ type sstSinkConf struct {
targetFileSize int64
enc *roachpb.FileEncryptionOptions
id base.SQLInstanceID
settings *settings.Values
}

type sstSink struct {
dest cloud.ExternalStorage
conf sstSinkConf

queue []returnedSST
queueSize int

sst storage.SSTWriter
ctx context.Context
cancel func()
out io.WriteCloser
outName string

flushedFiles []BackupManifest_File
flushedSize int64
flushedRevStart hlc.Timestamp
completedSpans int32

stats struct {
files int
flushes int
oooFlushes int
sizeFlushes int
spanGrows int
}
}

func (s *sstSink) Close() error {
if log.V(1) && s.ctx != nil {
log.Infof(s.ctx, "backup sst sink recv'd %d files, wrote %d (%d due to size, %d due to re-ordering), %d recv files extended prior span",
s.stats.files, s.stats.flushes, s.stats.sizeFlushes, s.stats.oooFlushes, s.stats.spanGrows)
}
if s.cancel != nil {
s.cancel()
}
Expand All @@ -534,10 +563,54 @@ func (s *sstSink) Close() error {
return nil
}

// push pushes one returned backup file into the sink. Returned files can arrive
// out of order, but must be written to an underlying file in-order or else a
// new underlying file has to be opened. The queue allows buffering up files and
// sorting them before pushing them to the underlying file to try to avoid this.
// When the queue length or sum of the data sizes in it exceeds thresholds the
// queue is sorted and the first half is flushed.
func (s *sstSink) push(ctx context.Context, resp returnedSST) error {
s.queue = append(s.queue, resp)
s.queueSize += len(resp.sst)

if len(s.queue) >= maxSinkQueueFiles || s.queueSize >= int(smallFileBuffer.Get(s.conf.settings)) {
sort.Slice(s.queue, func(i, j int) bool { return s.queue[i].f.Span.Key.Compare(s.queue[j].f.Span.Key) < 0 })

// Drain the first half.
drain := len(s.queue) / 2
if drain < 1 {
drain = 1
}
for i := range s.queue[:drain] {
if err := s.write(ctx, s.queue[i]); err != nil {
return err
}
s.queueSize -= len(s.queue[i].sst)
}

// Shift down the remainder of the queue and slice off the tail.
copy(s.queue, s.queue[drain:])
s.queue = s.queue[:len(s.queue)-drain]
}
return nil
}

func (s *sstSink) flush(ctx context.Context) error {
for i := range s.queue {
if err := s.write(ctx, s.queue[i]); err != nil {
return err
}
}
s.queue = nil
return s.flushFile(ctx)
}

func (s *sstSink) flushFile(ctx context.Context) error {
if s.out == nil {
return nil
}
s.stats.flushes++

if err := s.sst.Finish(); err != nil {
return err
}
Expand Down Expand Up @@ -565,6 +638,7 @@ func (s *sstSink) flush(ctx context.Context) error {
}

s.flushedFiles = nil
s.flushedSize = 0
s.flushedRevStart.Reset()
s.completedSpans = 0

Expand Down Expand Up @@ -593,14 +667,15 @@ func (s *sstSink) open(ctx context.Context) error {
}

func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
s.stats.files++

span := resp.f.Span

// If this span starts before the last buffered span ended, we need to flush
// since it overlaps but SSTWriter demands writes in-order.
// TODO(dt): consider buffering resp until _next_ `write` to allow minimal
// reordering of writes to avoid extra flushes.
if len(s.flushedFiles) > 0 && span.Key.Compare(s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey) < 0 {
if err := s.flush(ctx); err != nil {
s.stats.oooFlushes++
if err := s.flushFile(ctx); err != nil {
return err
}
}
Expand Down Expand Up @@ -648,18 +723,20 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
s.flushedFiles[l].StartTime.EqOrdering(resp.f.StartTime) {
s.flushedFiles[l].Span.EndKey = span.EndKey
s.flushedFiles[l].EntryCounts.add(resp.f.EntryCounts)
s.stats.spanGrows++
} else {
f := resp.f
f.Path = s.outName
s.flushedFiles = append(s.flushedFiles, f)
}
s.flushedRevStart.Forward(resp.revStart)
s.completedSpans += resp.completedSpans
s.flushedSize += int64(len(resp.sst))

// If our accumulated SST is now big enough, flush it.
// TODO(dt): use the compressed size instead.
if s.sst.DataSize > s.conf.targetFileSize {
if err := s.flush(ctx); err != nil {
if s.flushedSize > s.conf.targetFileSize {
s.stats.sizeFlushes++
if err := s.flushFile(ctx); err != nil {
return err
}
}
Expand Down

0 comments on commit 9785a7b

Please sign in to comment.