Skip to content

Commit

Permalink
Merge #66792 #66802
Browse files Browse the repository at this point in the history
66792: opt: fix panic due to concurrent map writes when copying metadata r=rytaft a=rytaft

This commit fixes a bug that caused a panic due to concurrent map writes
when copying table metadata. The fix is to make a deep copy of the map
before updating it.

Fixes #66717

Release note (bug fix): Fixed a panic that could occur in the optimizer
when executing a prepared plan with placeholders. This could happen when
one of the tables used by the query had computed columns or a partial
index.

66802: backupccl: stream writes of returned SSTs to remote files r=dt a=dt

This changes the backup processor to open remote files for writing and then write the content of returned SSTs as they are returned instead of writing to an in-memory SSTable and then flushing that to cloud storage later.

Release note: none.

Co-authored-by: Rebecca Taft <becca@cockroachlabs.com>
Co-authored-by: David Taylor <tinystatemachine@gmail.com>
  • Loading branch information
3 people committed Jun 24, 2021
3 parents bf7f350 + 990807e + b234b9f commit a1f969c
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 61 deletions.
152 changes: 99 additions & 53 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
package backupccl

import (
"bytes"
"context"
"fmt"
"io"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -366,7 +366,7 @@ func runBackupProcessor(
}

files := make([]BackupManifest_File, 0)
for _, file := range res.Files {
for i, file := range res.Files {
f := BackupManifest_File{
Span: file.Span,
Path: file.Path,
Expand All @@ -381,7 +381,17 @@ func runBackupProcessor(
// ch for the writer goroutine to handle. Otherwise, go
// ahead and record the file for progress reporting.
if len(file.SST) > 0 {
returnedSSTs <- returnedSST{f: f, sst: file.SST, revStart: res.StartTime, completedSpans: completedSpans}
ret := returnedSST{f: f, sst: file.SST, revStart: res.StartTime}
// If multiple files were returned for this span, only one -- the
// last -- should count as completing the requested span.
if i == len(files)-1 {
ret.completedSpans = completedSpans
}
select {
case returnedSSTs <- ret:
case <-ctxDone:
return ctx.Err()
}
} else {
files = append(files, f)
}
Expand Down Expand Up @@ -434,13 +444,22 @@ func runBackupProcessor(
if err != nil {
return err
}
defer defaultStore.Close()
defaultSink := &sstSink{conf: sinkConf, dest: defaultStore}

localitySinks := make(map[string]*sstSink)
defaultSink := &sstSink{conf: sinkConf, dest: defaultStore}

defer func() {
for i := range localitySinks {
localitySinks[i].dest.Close()
err := localitySinks[i].Close()
err = errors.CombineErrors(localitySinks[i].dest.Close(), err)
if err != nil {
log.Warningf(ctx, "failed to close backup sink(s): %+v", err)
}
}
err := defaultSink.Close()
err = errors.CombineErrors(defaultStore.Close(), err)
if err != nil {
log.Warningf(ctx, "failed to close backup sink(s): %+v", err)
}
}()

Expand Down Expand Up @@ -485,45 +504,46 @@ type sstSinkConf struct {
}

type sstSink struct {
dest cloud.ExternalStorage
conf sstSinkConf
sstWriter *storage.SSTWriter
bufferedSST storage.MemFile
bufferedFiles []BackupManifest_File
bufferedRevStart hlc.Timestamp
completedSpans int32
dest cloud.ExternalStorage
conf sstSinkConf

sst storage.SSTWriter
ctx context.Context
cancel func()
out io.WriteCloser
outName string

flushedFiles []BackupManifest_File
flushedRevStart hlc.Timestamp
completedSpans int32
}

func (s *sstSink) Close() error {
if s.cancel != nil {
s.cancel()
}
if s.out != nil {
return s.out.Close()
}
return nil
}

func (s *sstSink) flush(ctx context.Context) error {
if s.sstWriter == nil {
if s.out == nil {
return nil
}
if err := s.sstWriter.Finish(); err != nil {
if err := s.sst.Finish(); err != nil {
return err
}

data := s.bufferedSST.Bytes()
if s.conf.enc != nil {
var err error
data, err = storageccl.EncryptFile(data, s.conf.enc.Key)
if err != nil {
return err
}
}

name := storageccl.GenerateUniqueSSTName(s.conf.id)
if err := cloud.WriteFile(ctx, s.dest, name, bytes.NewReader(data)); err != nil {
log.VEventf(ctx, 1, "failed to put file: %+v", err)
if err := s.out.Close(); err != nil {
return errors.Wrap(err, "writing SST")
}

for i := range s.bufferedFiles {
s.bufferedFiles[i].Path = name
}
s.outName = ""
s.out = nil

progDetails := BackupManifest_Progress{
RevStartTime: s.bufferedRevStart,
Files: s.bufferedFiles,
RevStartTime: s.flushedRevStart,
Files: s.flushedFiles,
CompletedSpans: s.completedSpans,
}
var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
Expand All @@ -538,38 +558,61 @@ func (s *sstSink) flush(ctx context.Context) error {
case s.conf.progCh <- prog:
}

s.sstWriter = nil
s.bufferedSST.Reset()
s.bufferedFiles = nil
s.bufferedRevStart.Reset()
s.flushedFiles = nil
s.flushedRevStart.Reset()
s.completedSpans = 0

return nil
}

func (s *sstSink) open(ctx context.Context) error {
s.outName = storageccl.GenerateUniqueSSTName(s.conf.id)
if s.ctx == nil {
s.ctx, s.cancel = context.WithCancel(ctx)
}
w, err := s.dest.Writer(s.ctx, s.outName)
if err != nil {
return err
}
if s.conf.enc != nil {
var err error
w, err = storageccl.EncryptingWriter(w, s.conf.enc.Key)
if err != nil {
return err
}
}
s.out = w
s.sst = storage.MakeBackupSSTWriter(s.out)
return nil
}

func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
span := resp.f.Span

// If this span starts before the last buffered span ended, we need to flush
// since it overlaps but SSTWriter demands writes in-order.
// TODO(dt): consider buffering resp until _next_ `write` to allow minimal
// reordering of writes to avoid extra flushes.
if len(s.bufferedFiles) > 0 && span.Key.Compare(s.bufferedFiles[len(s.bufferedFiles)-1].Span.EndKey) < 0 {
if len(s.flushedFiles) > 0 && span.Key.Compare(s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey) < 0 {
if err := s.flush(ctx); err != nil {
return err
}
}

// Initialize the writer if needed then copy the SST content to the writer.
if s.sstWriter == nil {
w := storage.MakeBackupSSTWriter(&s.bufferedSST)
s.sstWriter = &w
// Initialize the writer if needed.
if s.out == nil {
if err := s.open(ctx); err != nil {
return err
}
}

// Copy SST content.
sst, err := storage.NewMemSSTIterator(resp.sst, false)
if err != nil {
return err
}
defer sst.Close()

sst.SeekGE(storage.MVCCKey{Key: keys.MinKey})
for {
if valid, err := sst.Valid(); !valid || err != nil {
Expand All @@ -580,11 +623,11 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
}
k := sst.UnsafeKey()
if k.Timestamp.IsEmpty() {
if err := s.sstWriter.PutUnversioned(k.Key, sst.UnsafeValue()); err != nil {
if err := s.sst.PutUnversioned(k.Key, sst.UnsafeValue()); err != nil {
return err
}
} else {
if err := s.sstWriter.PutMVCC(sst.UnsafeKey(), sst.UnsafeValue()); err != nil {
if err := s.sst.PutMVCC(sst.UnsafeKey(), sst.UnsafeValue()); err != nil {
return err
}
}
Expand All @@ -594,19 +637,22 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
// If this span extended the last span added -- that is, picked up where it
// ended and has the same time-bounds -- then we can simply extend that span
// and add to its entry counts. Otherwise we need to record it separately.
if l := len(s.bufferedFiles) - 1; l > 0 && s.bufferedFiles[l].Span.EndKey.Equal(span.Key) &&
s.bufferedFiles[l].EndTime.EqOrdering(resp.f.EndTime) &&
s.bufferedFiles[l].StartTime.EqOrdering(resp.f.StartTime) {
s.bufferedFiles[l].Span.EndKey = span.EndKey
s.bufferedFiles[l].EntryCounts.add(resp.f.EntryCounts)
if l := len(s.flushedFiles) - 1; l > 0 && s.flushedFiles[l].Span.EndKey.Equal(span.Key) &&
s.flushedFiles[l].EndTime.EqOrdering(resp.f.EndTime) &&
s.flushedFiles[l].StartTime.EqOrdering(resp.f.StartTime) {
s.flushedFiles[l].Span.EndKey = span.EndKey
s.flushedFiles[l].EntryCounts.add(resp.f.EntryCounts)
} else {
s.bufferedFiles = append(s.bufferedFiles, resp.f)
f := resp.f
f.Path = s.outName
s.flushedFiles = append(s.flushedFiles, f)
}
s.bufferedRevStart.Forward(resp.revStart)
s.flushedRevStart.Forward(resp.revStart)
s.completedSpans += resp.completedSpans

// If our accumulated SST is now big enough, flush it.
if int64(s.bufferedSST.Len()) > s.conf.targetFileSize {
// TODO(dt): use the compressed size instead.
if s.sst.DataSize > s.conf.targetFileSize {
if err := s.flush(ctx); err != nil {
return err
}
Expand Down
19 changes: 15 additions & 4 deletions pkg/ccl/storageccl/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,26 @@ func AppearsEncrypted(text []byte) bool {
type encWriter struct {
gcm cipher.AEAD
iv []byte
ciphertext io.Writer
ciphertext io.WriteCloser
buf []byte
bufPos int
}

// NopCloser wraps an io.Writer to add a no-op Close().
type NopCloser struct {
io.Writer
}

// Close is a no-op.
func (NopCloser) Close() error {
return nil
}

// EncryptFile encrypts a file with the supplied key and a randomly chosen IV
// which is prepended in a header on the returned ciphertext.
func EncryptFile(plaintext, key []byte) ([]byte, error) {
b := &bytes.Buffer{}
w, err := EncryptingWriter(b, key)
w, err := EncryptingWriter(NopCloser{b}, key)
if err != nil {
return nil, err
}
Expand All @@ -105,7 +115,7 @@ func EncryptFile(plaintext, key []byte) ([]byte, error) {

// EncryptingWriter returns a writer that wraps an underlying sink writer but
// which encrypts bytes written to it before flushing them to the wrapped sink.
func EncryptingWriter(ciphertext io.Writer, key []byte) (io.WriteCloser, error) {
func EncryptingWriter(ciphertext io.WriteCloser, key []byte) (io.WriteCloser, error) {
gcm, err := aesgcm(key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -154,7 +164,8 @@ func (e *encWriter) Close() error {
// chunk maintains the invariant that a chunked file always ends in a sealed
// chunk of less than chunk size, thus making tuncation, even along a chunk
// boundary, detectable.
return e.flush()
err := e.flush()
return errors.CombineErrors(err, e.ciphertext.Close())
}

func (e *encWriter) flush() error {
Expand Down
18 changes: 14 additions & 4 deletions pkg/sql/opt/table_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,21 @@ func (tm *TableMeta) copyScalars(copyScalar func(Expr) Expr) {
if tm.Constraints != nil {
tm.Constraints = copyScalar(tm.Constraints).(ScalarExpr)
}
for col, e := range tm.ComputedCols {
tm.ComputedCols[col] = copyScalar(e).(ScalarExpr)

if tm.ComputedCols != nil {
computedCols := make(map[ColumnID]ScalarExpr, len(tm.ComputedCols))
for col, e := range tm.ComputedCols {
computedCols[col] = copyScalar(e).(ScalarExpr)
}
tm.ComputedCols = computedCols
}
for idx, e := range tm.partialIndexPredicates {
tm.partialIndexPredicates[idx] = copyScalar(e).(ScalarExpr)

if tm.partialIndexPredicates != nil {
partialIndexPredicates := make(map[cat.IndexOrdinal]ScalarExpr, len(tm.partialIndexPredicates))
for idx, e := range tm.partialIndexPredicates {
partialIndexPredicates[idx] = copyScalar(e).(ScalarExpr)
}
tm.partialIndexPredicates = partialIndexPredicates
}
}

Expand Down

0 comments on commit a1f969c

Please sign in to comment.