diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index d3a51a7908be..074bb173b4e1 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -9,9 +9,9 @@ package backupccl import ( - "bytes" "context" "fmt" + "io" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -366,7 +366,7 @@ func runBackupProcessor( } files := make([]BackupManifest_File, 0) - for _, file := range res.Files { + for i, file := range res.Files { f := BackupManifest_File{ Span: file.Span, Path: file.Path, @@ -381,7 +381,17 @@ func runBackupProcessor( // ch for the writer goroutine to handle. Otherwise, go // ahead and record the file for progress reporting. if len(file.SST) > 0 { - returnedSSTs <- returnedSST{f: f, sst: file.SST, revStart: res.StartTime, completedSpans: completedSpans} + ret := returnedSST{f: f, sst: file.SST, revStart: res.StartTime} + // If multiple files were returned for this span, only one -- the + // last -- should count as completing the requested span. + if i == len(files)-1 { + ret.completedSpans = completedSpans + } + select { + case returnedSSTs <- ret: + case <-ctxDone: + return ctx.Err() + } } else { files = append(files, f) } @@ -434,13 +444,22 @@ func runBackupProcessor( if err != nil { return err } - defer defaultStore.Close() - defaultSink := &sstSink{conf: sinkConf, dest: defaultStore} localitySinks := make(map[string]*sstSink) + defaultSink := &sstSink{conf: sinkConf, dest: defaultStore} + defer func() { for i := range localitySinks { - localitySinks[i].dest.Close() + err := localitySinks[i].Close() + err = errors.CombineErrors(localitySinks[i].dest.Close(), err) + if err != nil { + log.Warningf(ctx, "failed to close backup sink(s): %+v", err) + } + } + err := defaultSink.Close() + err = errors.CombineErrors(defaultStore.Close(), err) + if err != nil { + log.Warningf(ctx, "failed to close backup sink(s): %+v", err) } }() @@ -485,45 +504,46 @@ type sstSinkConf struct { } type sstSink struct { - dest cloud.ExternalStorage - conf sstSinkConf - sstWriter *storage.SSTWriter - bufferedSST storage.MemFile - bufferedFiles []BackupManifest_File - bufferedRevStart hlc.Timestamp - completedSpans int32 + dest cloud.ExternalStorage + conf sstSinkConf + + sst storage.SSTWriter + ctx context.Context + cancel func() + out io.WriteCloser + outName string + + flushedFiles []BackupManifest_File + flushedRevStart hlc.Timestamp + completedSpans int32 +} + +func (s *sstSink) Close() error { + if s.cancel != nil { + s.cancel() + } + if s.out != nil { + return s.out.Close() + } + return nil } func (s *sstSink) flush(ctx context.Context) error { - if s.sstWriter == nil { + if s.out == nil { return nil } - if err := s.sstWriter.Finish(); err != nil { + if err := s.sst.Finish(); err != nil { return err } - - data := s.bufferedSST.Bytes() - if s.conf.enc != nil { - var err error - data, err = storageccl.EncryptFile(data, s.conf.enc.Key) - if err != nil { - return err - } - } - - name := storageccl.GenerateUniqueSSTName(s.conf.id) - if err := cloud.WriteFile(ctx, s.dest, name, bytes.NewReader(data)); err != nil { - log.VEventf(ctx, 1, "failed to put file: %+v", err) + if err := s.out.Close(); err != nil { return errors.Wrap(err, "writing SST") } - - for i := range s.bufferedFiles { - s.bufferedFiles[i].Path = name - } + s.outName = "" + s.out = nil progDetails := BackupManifest_Progress{ - RevStartTime: s.bufferedRevStart, - Files: s.bufferedFiles, + RevStartTime: s.flushedRevStart, + Files: s.flushedFiles, CompletedSpans: s.completedSpans, } var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress @@ -538,15 +558,34 @@ func (s *sstSink) flush(ctx context.Context) error { case s.conf.progCh <- prog: } - s.sstWriter = nil - s.bufferedSST.Reset() - s.bufferedFiles = nil - s.bufferedRevStart.Reset() + s.flushedFiles = nil + s.flushedRevStart.Reset() s.completedSpans = 0 return nil } +func (s *sstSink) open(ctx context.Context) error { + s.outName = storageccl.GenerateUniqueSSTName(s.conf.id) + if s.ctx == nil { + s.ctx, s.cancel = context.WithCancel(ctx) + } + w, err := s.dest.Writer(s.ctx, s.outName) + if err != nil { + return err + } + if s.conf.enc != nil { + var err error + w, err = storageccl.EncryptingWriter(w, s.conf.enc.Key) + if err != nil { + return err + } + } + s.out = w + s.sst = storage.MakeBackupSSTWriter(s.out) + return nil +} + func (s *sstSink) write(ctx context.Context, resp returnedSST) error { span := resp.f.Span @@ -554,22 +593,26 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error { // since it overlaps but SSTWriter demands writes in-order. // TODO(dt): consider buffering resp until _next_ `write` to allow minimal // reordering of writes to avoid extra flushes. - if len(s.bufferedFiles) > 0 && span.Key.Compare(s.bufferedFiles[len(s.bufferedFiles)-1].Span.EndKey) < 0 { + if len(s.flushedFiles) > 0 && span.Key.Compare(s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey) < 0 { if err := s.flush(ctx); err != nil { return err } } - // Initialize the writer if needed then copy the SST content to the writer. - if s.sstWriter == nil { - w := storage.MakeBackupSSTWriter(&s.bufferedSST) - s.sstWriter = &w + // Initialize the writer if needed. + if s.out == nil { + if err := s.open(ctx); err != nil { + return err + } } + + // Copy SST content. sst, err := storage.NewMemSSTIterator(resp.sst, false) if err != nil { return err } defer sst.Close() + sst.SeekGE(storage.MVCCKey{Key: keys.MinKey}) for { if valid, err := sst.Valid(); !valid || err != nil { @@ -580,11 +623,11 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error { } k := sst.UnsafeKey() if k.Timestamp.IsEmpty() { - if err := s.sstWriter.PutUnversioned(k.Key, sst.UnsafeValue()); err != nil { + if err := s.sst.PutUnversioned(k.Key, sst.UnsafeValue()); err != nil { return err } } else { - if err := s.sstWriter.PutMVCC(sst.UnsafeKey(), sst.UnsafeValue()); err != nil { + if err := s.sst.PutMVCC(sst.UnsafeKey(), sst.UnsafeValue()); err != nil { return err } } @@ -594,19 +637,22 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error { // If this span extended the last span added -- that is, picked up where it // ended and has the same time-bounds -- then we can simply extend that span // and add to its entry counts. Otherwise we need to record it separately. - if l := len(s.bufferedFiles) - 1; l > 0 && s.bufferedFiles[l].Span.EndKey.Equal(span.Key) && - s.bufferedFiles[l].EndTime.EqOrdering(resp.f.EndTime) && - s.bufferedFiles[l].StartTime.EqOrdering(resp.f.StartTime) { - s.bufferedFiles[l].Span.EndKey = span.EndKey - s.bufferedFiles[l].EntryCounts.add(resp.f.EntryCounts) + if l := len(s.flushedFiles) - 1; l > 0 && s.flushedFiles[l].Span.EndKey.Equal(span.Key) && + s.flushedFiles[l].EndTime.EqOrdering(resp.f.EndTime) && + s.flushedFiles[l].StartTime.EqOrdering(resp.f.StartTime) { + s.flushedFiles[l].Span.EndKey = span.EndKey + s.flushedFiles[l].EntryCounts.add(resp.f.EntryCounts) } else { - s.bufferedFiles = append(s.bufferedFiles, resp.f) + f := resp.f + f.Path = s.outName + s.flushedFiles = append(s.flushedFiles, f) } - s.bufferedRevStart.Forward(resp.revStart) + s.flushedRevStart.Forward(resp.revStart) s.completedSpans += resp.completedSpans // If our accumulated SST is now big enough, flush it. - if int64(s.bufferedSST.Len()) > s.conf.targetFileSize { + // TODO(dt): use the compressed size instead. + if s.sst.DataSize > s.conf.targetFileSize { if err := s.flush(ctx); err != nil { return err } diff --git a/pkg/ccl/storageccl/encryption.go b/pkg/ccl/storageccl/encryption.go index 4581515e4437..85374200bd0b 100644 --- a/pkg/ccl/storageccl/encryption.go +++ b/pkg/ccl/storageccl/encryption.go @@ -80,16 +80,26 @@ func AppearsEncrypted(text []byte) bool { type encWriter struct { gcm cipher.AEAD iv []byte - ciphertext io.Writer + ciphertext io.WriteCloser buf []byte bufPos int } +// NopCloser wraps an io.Writer to add a no-op Close(). +type NopCloser struct { + io.Writer +} + +// Close is a no-op. +func (NopCloser) Close() error { + return nil +} + // EncryptFile encrypts a file with the supplied key and a randomly chosen IV // which is prepended in a header on the returned ciphertext. func EncryptFile(plaintext, key []byte) ([]byte, error) { b := &bytes.Buffer{} - w, err := EncryptingWriter(b, key) + w, err := EncryptingWriter(NopCloser{b}, key) if err != nil { return nil, err } @@ -105,7 +115,7 @@ func EncryptFile(plaintext, key []byte) ([]byte, error) { // EncryptingWriter returns a writer that wraps an underlying sink writer but // which encrypts bytes written to it before flushing them to the wrapped sink. -func EncryptingWriter(ciphertext io.Writer, key []byte) (io.WriteCloser, error) { +func EncryptingWriter(ciphertext io.WriteCloser, key []byte) (io.WriteCloser, error) { gcm, err := aesgcm(key) if err != nil { return nil, err @@ -154,7 +164,8 @@ func (e *encWriter) Close() error { // chunk maintains the invariant that a chunked file always ends in a sealed // chunk of less than chunk size, thus making tuncation, even along a chunk // boundary, detectable. - return e.flush() + err := e.flush() + return errors.CombineErrors(err, e.ciphertext.Close()) } func (e *encWriter) flush() error {