diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 625e28900b19..0c6e1cd43509 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -12,6 +12,7 @@ import ( "context" "fmt" "io" + "sort" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -79,8 +80,18 @@ var ( 16<<20, settings.NonNegativeInt, ) + smallFileBuffer = settings.RegisterByteSizeSetting( + "bulkio.backup.merge_file_buffer_size", + "size limit used when buffering backup files before merging them", + 16<<20, + settings.NonNegativeInt, + ) ) +// maxSinkQueueFiles is how many replies we'll queue up before flushing to allow +// some re-ordering, unless we hit smallFileBuffer size first. +const maxSinkQueueFiles = 24 + const backupProcessorName = "backupDataProcessor" // TODO(pbardea): It would be nice if we could add some DistSQL processor tests @@ -444,6 +455,7 @@ func runBackupProcessor( enc: spec.Encryption, targetFileSize: targetFileSize, progCh: progCh, + settings: &flowCtx.Cfg.Settings.SV, } defaultStore, err := flowCtx.Cfg.ExternalStorage(ctx, defaultConf) @@ -486,7 +498,7 @@ func runBackupProcessor( sink = defaultSink } - if err := sink.write(ctx, res); err != nil { + if err := sink.push(ctx, res); err != nil { return err } } @@ -507,12 +519,16 @@ type sstSinkConf struct { targetFileSize int64 enc *roachpb.FileEncryptionOptions id base.SQLInstanceID + settings *settings.Values } type sstSink struct { dest cloud.ExternalStorage conf sstSinkConf + queue []returnedSST + queueSize int + sst storage.SSTWriter ctx context.Context cancel func() @@ -520,11 +536,24 @@ type sstSink struct { outName string flushedFiles []BackupManifest_File + flushedSize int64 flushedRevStart hlc.Timestamp completedSpans int32 + + stats struct { + files int + flushes int + oooFlushes int + sizeFlushes int + spanGrows int + } } func (s *sstSink) Close() error { + if log.V(1) && s.ctx != nil { + log.Infof(s.ctx, "backup sst sink recv'd %d files, wrote %d (%d due to size, %d due to re-ordering), %d recv files extended prior span", + s.stats.files, s.stats.flushes, s.stats.sizeFlushes, s.stats.oooFlushes, s.stats.spanGrows) + } if s.cancel != nil { s.cancel() } @@ -534,10 +563,54 @@ func (s *sstSink) Close() error { return nil } +// push pushes one returned backup file into the sink. Returned files can arrive +// out of order, but must be written to an underlying file in-order or else a +// new underlying file has to be opened. The queue allows buffering up files and +// sorting them before pushing them to the underlying file to try to avoid this. +// When the queue length or sum of the data sizes in it exceeds thresholds the +// queue is sorted and the first half is flushed. +func (s *sstSink) push(ctx context.Context, resp returnedSST) error { + s.queue = append(s.queue, resp) + s.queueSize += len(resp.sst) + + if len(s.queue) >= maxSinkQueueFiles || s.queueSize >= int(smallFileBuffer.Get(s.conf.settings)) { + sort.Slice(s.queue, func(i, j int) bool { return s.queue[i].f.Span.Key.Compare(s.queue[j].f.Span.Key) < 0 }) + + // Drain the first half. + drain := len(s.queue) / 2 + if drain < 1 { + drain = 1 + } + for i := range s.queue[:drain] { + if err := s.write(ctx, s.queue[i]); err != nil { + return err + } + s.queueSize -= len(s.queue[i].sst) + } + + // Shift down the remainder of the queue and slice off the tail. + copy(s.queue, s.queue[drain:]) + s.queue = s.queue[:len(s.queue)-drain] + } + return nil +} + func (s *sstSink) flush(ctx context.Context) error { + for i := range s.queue { + if err := s.write(ctx, s.queue[i]); err != nil { + return err + } + } + s.queue = nil + return s.flushFile(ctx) +} + +func (s *sstSink) flushFile(ctx context.Context) error { if s.out == nil { return nil } + s.stats.flushes++ + if err := s.sst.Finish(); err != nil { return err } @@ -565,6 +638,7 @@ func (s *sstSink) flush(ctx context.Context) error { } s.flushedFiles = nil + s.flushedSize = 0 s.flushedRevStart.Reset() s.completedSpans = 0 @@ -593,14 +667,15 @@ func (s *sstSink) open(ctx context.Context) error { } func (s *sstSink) write(ctx context.Context, resp returnedSST) error { + s.stats.files++ + span := resp.f.Span // If this span starts before the last buffered span ended, we need to flush // since it overlaps but SSTWriter demands writes in-order. - // TODO(dt): consider buffering resp until _next_ `write` to allow minimal - // reordering of writes to avoid extra flushes. if len(s.flushedFiles) > 0 && span.Key.Compare(s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey) < 0 { - if err := s.flush(ctx); err != nil { + s.stats.oooFlushes++ + if err := s.flushFile(ctx); err != nil { return err } } @@ -648,6 +723,7 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error { s.flushedFiles[l].StartTime.EqOrdering(resp.f.StartTime) { s.flushedFiles[l].Span.EndKey = span.EndKey s.flushedFiles[l].EntryCounts.add(resp.f.EntryCounts) + s.stats.spanGrows++ } else { f := resp.f f.Path = s.outName @@ -655,11 +731,12 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error { } s.flushedRevStart.Forward(resp.revStart) s.completedSpans += resp.completedSpans + s.flushedSize += int64(len(resp.sst)) // If our accumulated SST is now big enough, flush it. - // TODO(dt): use the compressed size instead. - if s.sst.DataSize > s.conf.targetFileSize { - if err := s.flush(ctx); err != nil { + if s.flushedSize > s.conf.targetFileSize { + s.stats.sizeFlushes++ + if err := s.flushFile(ctx); err != nil { return err } }