Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: stream writes of returned SSTs to remote files #66802

Merged
merged 3 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 99 additions & 53 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
package backupccl

import (
"bytes"
"context"
"fmt"
"io"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -366,7 +366,7 @@ func runBackupProcessor(
}

files := make([]BackupManifest_File, 0)
for _, file := range res.Files {
for i, file := range res.Files {
f := BackupManifest_File{
Span: file.Span,
Path: file.Path,
Expand All @@ -381,7 +381,17 @@ func runBackupProcessor(
// ch for the writer goroutine to handle. Otherwise, go
// ahead and record the file for progress reporting.
if len(file.SST) > 0 {
returnedSSTs <- returnedSST{f: f, sst: file.SST, revStart: res.StartTime, completedSpans: completedSpans}
ret := returnedSST{f: f, sst: file.SST, revStart: res.StartTime}
// If multiple files were returned for this span, only one -- the
// last -- should count as completing the requested span.
if i == len(files)-1 {
ret.completedSpans = completedSpans
}
select {
case returnedSSTs <- ret:
case <-ctxDone:
return ctx.Err()
}
} else {
files = append(files, f)
}
Expand Down Expand Up @@ -434,13 +444,22 @@ func runBackupProcessor(
if err != nil {
return err
}
defer defaultStore.Close()
defaultSink := &sstSink{conf: sinkConf, dest: defaultStore}

localitySinks := make(map[string]*sstSink)
defaultSink := &sstSink{conf: sinkConf, dest: defaultStore}

defer func() {
for i := range localitySinks {
localitySinks[i].dest.Close()
err := localitySinks[i].Close()
err = errors.CombineErrors(localitySinks[i].dest.Close(), err)
if err != nil {
log.Warningf(ctx, "failed to close backup sink(s): %+v", err)
}
}
err := defaultSink.Close()
err = errors.CombineErrors(defaultStore.Close(), err)
if err != nil {
log.Warningf(ctx, "failed to close backup sink(s): %+v", err)
}
}()

Expand Down Expand Up @@ -485,45 +504,46 @@ type sstSinkConf struct {
}

type sstSink struct {
dest cloud.ExternalStorage
conf sstSinkConf
sstWriter *storage.SSTWriter
bufferedSST storage.MemFile
bufferedFiles []BackupManifest_File
bufferedRevStart hlc.Timestamp
completedSpans int32
dest cloud.ExternalStorage
conf sstSinkConf

sst storage.SSTWriter
ctx context.Context
cancel func()
out io.WriteCloser
outName string

flushedFiles []BackupManifest_File
flushedRevStart hlc.Timestamp
completedSpans int32
}

func (s *sstSink) Close() error {
if s.cancel != nil {
s.cancel()
}
if s.out != nil {
return s.out.Close()
}
return nil
}

func (s *sstSink) flush(ctx context.Context) error {
if s.sstWriter == nil {
if s.out == nil {
return nil
}
if err := s.sstWriter.Finish(); err != nil {
if err := s.sst.Finish(); err != nil {
return err
}

data := s.bufferedSST.Bytes()
if s.conf.enc != nil {
var err error
data, err = storageccl.EncryptFile(data, s.conf.enc.Key)
if err != nil {
return err
}
}

name := storageccl.GenerateUniqueSSTName(s.conf.id)
if err := cloud.WriteFile(ctx, s.dest, name, bytes.NewReader(data)); err != nil {
log.VEventf(ctx, 1, "failed to put file: %+v", err)
if err := s.out.Close(); err != nil {
return errors.Wrap(err, "writing SST")
}

for i := range s.bufferedFiles {
s.bufferedFiles[i].Path = name
}
s.outName = ""
s.out = nil

progDetails := BackupManifest_Progress{
RevStartTime: s.bufferedRevStart,
Files: s.bufferedFiles,
RevStartTime: s.flushedRevStart,
Files: s.flushedFiles,
CompletedSpans: s.completedSpans,
}
var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
Expand All @@ -538,38 +558,61 @@ func (s *sstSink) flush(ctx context.Context) error {
case s.conf.progCh <- prog:
}

s.sstWriter = nil
s.bufferedSST.Reset()
s.bufferedFiles = nil
s.bufferedRevStart.Reset()
s.flushedFiles = nil
s.flushedRevStart.Reset()
s.completedSpans = 0

return nil
}

func (s *sstSink) open(ctx context.Context) error {
s.outName = storageccl.GenerateUniqueSSTName(s.conf.id)
if s.ctx == nil {
s.ctx, s.cancel = context.WithCancel(ctx)
}
w, err := s.dest.Writer(s.ctx, s.outName)
if err != nil {
return err
}
if s.conf.enc != nil {
var err error
w, err = storageccl.EncryptingWriter(w, s.conf.enc.Key)
if err != nil {
return err
}
}
s.out = w
s.sst = storage.MakeBackupSSTWriter(s.out)
return nil
}

func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
span := resp.f.Span

// If this span starts before the last buffered span ended, we need to flush
// since it overlaps but SSTWriter demands writes in-order.
// TODO(dt): consider buffering resp until _next_ `write` to allow minimal
// reordering of writes to avoid extra flushes.
if len(s.bufferedFiles) > 0 && span.Key.Compare(s.bufferedFiles[len(s.bufferedFiles)-1].Span.EndKey) < 0 {
if len(s.flushedFiles) > 0 && span.Key.Compare(s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey) < 0 {
if err := s.flush(ctx); err != nil {
return err
}
}

// Initialize the writer if needed then copy the SST content to the writer.
if s.sstWriter == nil {
w := storage.MakeBackupSSTWriter(&s.bufferedSST)
s.sstWriter = &w
// Initialize the writer if needed.
if s.out == nil {
if err := s.open(ctx); err != nil {
return err
}
}

// Copy SST content.
sst, err := storage.NewMemSSTIterator(resp.sst, false)
if err != nil {
return err
}
defer sst.Close()

sst.SeekGE(storage.MVCCKey{Key: keys.MinKey})
for {
if valid, err := sst.Valid(); !valid || err != nil {
Expand All @@ -580,11 +623,11 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
}
k := sst.UnsafeKey()
if k.Timestamp.IsEmpty() {
if err := s.sstWriter.PutUnversioned(k.Key, sst.UnsafeValue()); err != nil {
if err := s.sst.PutUnversioned(k.Key, sst.UnsafeValue()); err != nil {
return err
}
} else {
if err := s.sstWriter.PutMVCC(sst.UnsafeKey(), sst.UnsafeValue()); err != nil {
if err := s.sst.PutMVCC(sst.UnsafeKey(), sst.UnsafeValue()); err != nil {
return err
}
}
Expand All @@ -594,19 +637,22 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
// If this span extended the last span added -- that is, picked up where it
// ended and has the same time-bounds -- then we can simply extend that span
// and add to its entry counts. Otherwise we need to record it separately.
if l := len(s.bufferedFiles) - 1; l > 0 && s.bufferedFiles[l].Span.EndKey.Equal(span.Key) &&
s.bufferedFiles[l].EndTime.EqOrdering(resp.f.EndTime) &&
s.bufferedFiles[l].StartTime.EqOrdering(resp.f.StartTime) {
s.bufferedFiles[l].Span.EndKey = span.EndKey
s.bufferedFiles[l].EntryCounts.add(resp.f.EntryCounts)
if l := len(s.flushedFiles) - 1; l > 0 && s.flushedFiles[l].Span.EndKey.Equal(span.Key) &&
s.flushedFiles[l].EndTime.EqOrdering(resp.f.EndTime) &&
s.flushedFiles[l].StartTime.EqOrdering(resp.f.StartTime) {
s.flushedFiles[l].Span.EndKey = span.EndKey
s.flushedFiles[l].EntryCounts.add(resp.f.EntryCounts)
} else {
s.bufferedFiles = append(s.bufferedFiles, resp.f)
f := resp.f
f.Path = s.outName
s.flushedFiles = append(s.flushedFiles, f)
}
s.bufferedRevStart.Forward(resp.revStart)
s.flushedRevStart.Forward(resp.revStart)
s.completedSpans += resp.completedSpans

// If our accumulated SST is now big enough, flush it.
if int64(s.bufferedSST.Len()) > s.conf.targetFileSize {
// TODO(dt): use the compressed size instead.
if s.sst.DataSize > s.conf.targetFileSize {
if err := s.flush(ctx); err != nil {
return err
}
Expand Down
19 changes: 15 additions & 4 deletions pkg/ccl/storageccl/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,26 @@ func AppearsEncrypted(text []byte) bool {
type encWriter struct {
gcm cipher.AEAD
iv []byte
ciphertext io.Writer
ciphertext io.WriteCloser
buf []byte
bufPos int
}

// NopCloser wraps an io.Writer to add a no-op Close().
type NopCloser struct {
io.Writer
}

// Close is a no-op.
func (NopCloser) Close() error {
return nil
}

// EncryptFile encrypts a file with the supplied key and a randomly chosen IV
// which is prepended in a header on the returned ciphertext.
func EncryptFile(plaintext, key []byte) ([]byte, error) {
b := &bytes.Buffer{}
w, err := EncryptingWriter(b, key)
w, err := EncryptingWriter(NopCloser{b}, key)
if err != nil {
return nil, err
}
Expand All @@ -105,7 +115,7 @@ func EncryptFile(plaintext, key []byte) ([]byte, error) {

// EncryptingWriter returns a writer that wraps an underlying sink writer but
// which encrypts bytes written to it before flushing them to the wrapped sink.
func EncryptingWriter(ciphertext io.Writer, key []byte) (io.WriteCloser, error) {
func EncryptingWriter(ciphertext io.WriteCloser, key []byte) (io.WriteCloser, error) {
gcm, err := aesgcm(key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -154,7 +164,8 @@ func (e *encWriter) Close() error {
// chunk maintains the invariant that a chunked file always ends in a sealed
// chunk of less than chunk size, thus making tuncation, even along a chunk
// boundary, detectable.
return e.flush()
err := e.flush()
return errors.CombineErrors(err, e.ciphertext.Close())
}

func (e *encWriter) flush() error {
Expand Down