Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
68468: backup: always write to cloud storage in processor r=dt a=dt

Release note (sql change): the setting bulkio.backup.proxy_file_writes.enabled is no longer needed to enable proxied writes which are now the default.

Also, while I'm here, de-couple the target file size used by the processor (which streams its writes out to cloud storage) from the target file size of ExportRequest (which is now always in-memory, and twice over at that, on both the KV and SQL side of the RPC).

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
  • Loading branch information
craig[bot] and dt committed Aug 6, 2021
2 parents 1169740 + 2a54d10 commit 6fafc56
Show file tree
Hide file tree
Showing 14 changed files with 231 additions and 315 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ kv.allocator.qps_rebalance_threshold float 0.25 minimum fraction away from the m
kv.allocator.range_rebalance_threshold float 0.05 minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull
kv.bulk_io_write.max_rate byte size 1.0 TiB the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops
kv.bulk_sst.max_allowed_overage byte size 64 MiB if positive, allowed size in excess of target size for SSTs from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory
kv.bulk_sst.target_size byte size 64 MiB target size for SSTs emitted from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory
kv.bulk_sst.target_size byte size 16 MiB target size for SSTs emitted from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory
kv.closed_timestamp.follower_reads_enabled boolean true allow (all) replicas to serve consistent historical reads based on closed timestamp information
kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records
kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<tr><td><code>kv.allocator.range_rebalance_threshold</code></td><td>float</td><td><code>0.05</code></td><td>minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.bulk_io_write.max_rate</code></td><td>byte size</td><td><code>1.0 TiB</code></td><td>the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops</td></tr>
<tr><td><code>kv.bulk_sst.max_allowed_overage</code></td><td>byte size</td><td><code>64 MiB</code></td><td>if positive, allowed size in excess of target size for SSTs from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory</td></tr>
<tr><td><code>kv.bulk_sst.target_size</code></td><td>byte size</td><td><code>64 MiB</code></td><td>target size for SSTs emitted from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory</td></tr>
<tr><td><code>kv.bulk_sst.target_size</code></td><td>byte size</td><td><code>16 MiB</code></td><td>target size for SSTs emitted from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory</td></tr>
<tr><td><code>kv.closed_timestamp.follower_reads_enabled</code></td><td>boolean</td><td><code>true</code></td><td>allow (all) replicas to serve consistent historical reads based on closed timestamp information</td></tr>
<tr><td><code>kv.protectedts.reconciliation.interval</code></td><td>duration</td><td><code>5m0s</code></td><td>the frequency for reconciling jobs with protected timestamp records</td></tr>
<tr><td><code>kv.range_split.by_load_enabled</code></td><td>boolean</td><td><code>true</code></td><td>allow automatic splits of ranges based on where load is concentrated</td></tr>
Expand Down
285 changes: 124 additions & 161 deletions pkg/ccl/backupccl/backup.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ message BackupExportTraceResponseEvent {
string duration = 1;
int32 num_files = 2;
repeated RowCount file_summaries = 3 [(gogoproto.nullable) = false];
bool has_returned_ssts = 4 [(gogoproto.customname) = "HasReturnedSSTs"];
reserved 4 ;
string retryable_error = 5;
}

231 changes: 88 additions & 143 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,10 @@ var (
time.Minute*5,
settings.NonNegativeDuration,
)
alwaysWriteInProc = settings.RegisterBoolSetting(
"bulkio.backup.proxy_file_writes.enabled",
"return files to the backup coordination processes to write to "+
"external storage instead of writing them directly from the storage layer",
false,
)
smallFileSize = settings.RegisterByteSizeSetting(
"bulkio.backup.merge_file_size",
"size under which backup files will be forwarded to another node to be merged with other smaller files "+
"(and implies files will be buffered in-memory until this size before being written to backup storage)",
16<<20,
settings.NonNegativeInt,
targetFileSize = settings.RegisterByteSizeSetting(
"bulkio.backup.file_size",
"target file size",
128<<20,
)
smallFileBuffer = settings.RegisterByteSizeSetting(
"bulkio.backup.merge_file_buffer_size",
Expand Down Expand Up @@ -197,7 +189,8 @@ func runBackupProcessor(
backupProcessorSpan := tracing.SpanFromContext(ctx)
clusterSettings := flowCtx.Cfg.Settings

todo := make(chan spanAndTime, len(spec.Spans)+len(spec.IntroducedSpans))
totalSpans := len(spec.Spans) + len(spec.IntroducedSpans)
todo := make(chan spanAndTime, totalSpans)
var spanIdx int
for _, s := range spec.IntroducedSpans {
todo <- spanAndTime{spanIdx: spanIdx, span: s, start: hlc.Timestamp{},
Expand All @@ -210,27 +203,41 @@ func runBackupProcessor(
spanIdx++
}

targetFileSize := storageccl.ExportRequestTargetFileSize.Get(&clusterSettings.SV)

// For all backups, partitioned or not, the main BACKUP manifest is stored at
// details.URI.
defaultConf, err := cloud.ExternalStorageConfFromURI(spec.DefaultURI, spec.User())
destURI := spec.DefaultURI
var destLocalityKV string

if len(spec.URIsByLocalityKV) > 0 {
var localitySinkURI string
// When matching, more specific KVs in the node locality take precedence
// over less specific ones so search back to front.
for i := len(flowCtx.EvalCtx.Locality.Tiers) - 1; i >= 0; i-- {
tier := flowCtx.EvalCtx.Locality.Tiers[i].String()
if dest, ok := spec.URIsByLocalityKV[tier]; ok {
localitySinkURI = dest
destLocalityKV = tier
break
}
}
if localitySinkURI != "" {
log.Infof(ctx, "backing up %d spans to destination specified by locality %s", totalSpans, destLocalityKV)
destURI = localitySinkURI
} else {
nodeLocalities := make([]string, 0, len(flowCtx.EvalCtx.Locality.Tiers))
for _, i := range flowCtx.EvalCtx.Locality.Tiers {
nodeLocalities = append(nodeLocalities, i.String())
}
backupLocalities := make([]string, 0, len(spec.URIsByLocalityKV))
for i := range spec.URIsByLocalityKV {
backupLocalities = append(backupLocalities, i)
}
log.Infof(ctx, "backing up %d spans to default locality because backup localities %s have no match in node's localities %s", totalSpans, backupLocalities, nodeLocalities)
}
}
dest, err := cloud.ExternalStorageConfFromURI(destURI, spec.User())
if err != nil {
return err
}

storageConfByLocalityKV := make(map[string]*roachpb.ExternalStorage)
for kv, uri := range spec.URIsByLocalityKV {
conf, err := cloud.ExternalStorageConfFromURI(uri, spec.User())
if err != nil {
return err
}
storageConfByLocalityKV[kv] = &conf
}

// If this is a tenant backup, we need to write the file from the SQL layer.
writeSSTsInProcessor := !flowCtx.Cfg.Codec.ForSystemTenant() || alwaysWriteInProc.Get(&clusterSettings.SV)

returnedSSTs := make(chan returnedSST, 1)

grp := ctxgroup.WithContext(ctx)
Expand Down Expand Up @@ -262,18 +269,11 @@ func runBackupProcessor(
header := roachpb.Header{Timestamp: span.end}
req := &roachpb.ExportRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(span.span),
StorageByLocalityKV: storageConfByLocalityKV,
StartTime: span.start,
EnableTimeBoundIteratorOptimization: useTBI.Get(&clusterSettings.SV),
MVCCFilter: spec.MVCCFilter,
TargetFileSize: targetFileSize,
ReturnSstBelowSize: smallFileSize.Get(&clusterSettings.SV),
}
if writeSSTsInProcessor {
req.ReturnSST = true
} else {
req.Storage = defaultConf
req.Encryption = spec.Encryption
TargetFileSize: storageccl.ExportRequestTargetFileSize.Get(&clusterSettings.SV),
ReturnSST: true,
}

// If we're doing re-attempts but are not yet in the priority regime,
Expand Down Expand Up @@ -312,17 +312,10 @@ func runBackupProcessor(
header.WaitPolicy = lock.WaitPolicy_Error
}

// If we are asking for the SSTs to be returned, we set the DistSender
// response target bytes field to a sentinel value.
// The sentinel value of 1 forces the ExportRequest to paginate after
// creating a single SST. The max size of this SST can be controlled
// using the existing cluster settings, `kv.bulk_sst.target_size` and
// `kv.bulk_sst.max_allowed_overage`.
// This allows us to cap the size of the ExportRequest response (stored
// in memory) to the sum of the above cluster settings.
if req.ReturnSST {
header.TargetBytes = 1
}
// We set the DistSender response target bytes field to a sentinel
// value. The sentinel value of 1 forces the ExportRequest to paginate
// after creating a single SST.
header.TargetBytes = 1

log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)",
span.span, span.attempts+1, header.UserPriority.String())
Expand Down Expand Up @@ -402,65 +395,37 @@ func runBackupProcessor(
Duration: duration.String(),
FileSummaries: make([]RowCount, 0),
}
var numFiles int
files := make([]BackupManifest_File, 0)

if len(res.Files) > 1 {
log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1")
}

for i, file := range res.Files {
numFiles++
f := BackupManifest_File{
Span: file.Span,
Path: file.Path,
EntryCounts: countRows(file.Exported, spec.PKIDs),
LocalityKV: file.LocalityKV,
}
exportResponseTraceEvent.FileSummaries = append(exportResponseTraceEvent.FileSummaries, f.EntryCounts)
if span.start != spec.BackupStartTime {
f.StartTime = span.start
f.EndTime = span.end
}
// If this file reply has an inline SST, push it to the
// ch for the writer goroutine to handle. Otherwise, go
// ahead and record the file for progress reporting.
if len(file.SST) > 0 {
exportResponseTraceEvent.HasReturnedSSTs = true
ret := returnedSST{f: f, sst: file.SST, revStart: res.StartTime}
// If multiple files were returned for this span, only one -- the
// last -- should count as completing the requested span.
if i == len(files)-1 {
ret.completedSpans = completedSpans
}
select {
case returnedSSTs <- ret:
case <-ctxDone:
return ctx.Err()
}
} else {
files = append(files, f)
}
}
exportResponseTraceEvent.NumFiles = int32(numFiles)
backupProcessorSpan.RecordStructured(exportResponseTraceEvent)

// If we have replies for exported files (as oppposed to the
// ones with inline SSTs we had to forward to the uploader
// goroutine), we can report them as progress completed.
if len(files) > 0 {
progDetails := BackupManifest_Progress{
RevStartTime: res.StartTime,
Files: files,
CompletedSpans: completedSpans,
ret := returnedSST{f: f, sst: file.SST, revStart: res.StartTime}
// If multiple files were returned for this span, only one -- the
// last -- should count as completing the requested span.
if i == len(res.Files)-1 {
ret.completedSpans = completedSpans
}
var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
details, err := gogotypes.MarshalAny(&progDetails)
if err != nil {
return err
}
prog.ProgressDetails = *details
select {
case <-ctx.Done():
case returnedSSTs <- ret:
case <-ctxDone:
return ctx.Err()
case progCh <- prog:
}
}
exportResponseTraceEvent.NumFiles = int32(len(res.Files))
backupProcessorSpan.RecordStructured(exportResponseTraceEvent)

default:
// No work left to do, so we can exit. Note that another worker could
// still be running and may still push new work (a retry) on to todo but
Expand All @@ -477,75 +442,44 @@ func runBackupProcessor(
// contents to cloud storage.
grp.GoCtx(func(ctx context.Context) error {
sinkConf := sstSinkConf{
id: flowCtx.NodeID.SQLInstanceID(),
enc: spec.Encryption,
targetFileSize: targetFileSize,
progCh: progCh,
settings: &flowCtx.Cfg.Settings.SV,
id: flowCtx.NodeID.SQLInstanceID(),
enc: spec.Encryption,
progCh: progCh,
settings: &flowCtx.Cfg.Settings.SV,
}

defaultStore, err := flowCtx.Cfg.ExternalStorage(ctx, defaultConf)
storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest)
if err != nil {
return err
}

localitySinks := make(map[string]*sstSink)
defaultSink := &sstSink{conf: sinkConf, dest: defaultStore}
sink := &sstSink{conf: sinkConf, dest: storage}

defer func() {
for i := range localitySinks {
err := localitySinks[i].Close()
err = errors.CombineErrors(localitySinks[i].dest.Close(), err)
if err != nil {
log.Warningf(ctx, "failed to close backup sink(s): %+v", err)
}
}
err := defaultSink.Close()
err = errors.CombineErrors(defaultStore.Close(), err)
err := sink.Close()
err = errors.CombineErrors(storage.Close(), err)
if err != nil {
log.Warningf(ctx, "failed to close backup sink(s): %+v", err)
}
}()

for res := range returnedSSTs {
var sink *sstSink

if existing, ok := localitySinks[res.f.LocalityKV]; ok {
sink = existing
} else if conf, ok := storageConfByLocalityKV[res.f.LocalityKV]; ok {
es, err := flowCtx.Cfg.ExternalStorage(ctx, *conf)
if err != nil {
return err
}
// No defer Close here -- we defer a close of all of them above.
sink = &sstSink{conf: sinkConf, dest: es}
localitySinks[res.f.LocalityKV] = sink
} else {
sink = defaultSink
}

res.f.LocalityKV = destLocalityKV
if err := sink.push(ctx, res); err != nil {
return err
}
}

for _, s := range localitySinks {
if err := s.flush(ctx); err != nil {
return err
}
}
return defaultSink.flush(ctx)
return sink.flush(ctx)
})

return grp.Wait()
}

type sstSinkConf struct {
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
targetFileSize int64
enc *roachpb.FileEncryptionOptions
id base.SQLInstanceID
settings *settings.Values
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
enc *roachpb.FileEncryptionOptions
id base.SQLInstanceID
settings *settings.Values
}

type sstSink struct {
Expand Down Expand Up @@ -699,10 +633,16 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error {

// If this span starts before the last buffered span ended, we need to flush
// since it overlaps but SSTWriter demands writes in-order.
if len(s.flushedFiles) > 0 && span.Key.Compare(s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey) < 0 {
s.stats.oooFlushes++
if err := s.flushFile(ctx); err != nil {
return err
if len(s.flushedFiles) > 0 {
last := s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey
if span.Key.Compare(last) < 0 {
log.VEventf(ctx, 1, "flushing backup file %s of size %d because span %s cannot append before %s",
s.outName, s.flushedSize, span, last,
)
s.stats.oooFlushes++
if err := s.flushFile(ctx); err != nil {
return err
}
}
}

Expand All @@ -713,6 +653,8 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
}
}

log.VEventf(ctx, 2, "writing %s to backup file %s", span, s.outName)

// Copy SST content.
sst, err := storage.NewMemSSTIterator(resp.sst, false)
if err != nil {
Expand Down Expand Up @@ -760,11 +702,14 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
s.flushedSize += int64(len(resp.sst))

// If our accumulated SST is now big enough, flush it.
if s.flushedSize > s.conf.targetFileSize {
if s.flushedSize > targetFileSize.Get(s.conf.settings) {
s.stats.sizeFlushes++
log.VEventf(ctx, 2, "flushing backup file %s with size %d", s.outName, s.flushedSize)
if err := s.flushFile(ctx); err != nil {
return err
}
} else {
log.VEventf(ctx, 3, "continuing to write to backup file %s of size %d", s.outName, s.flushedSize)
}
return nil
}
Expand Down
Loading

0 comments on commit 6fafc56

Please sign in to comment.