Skip to content

Commit

Permalink
backupccl: merge 'small' files
Browse files Browse the repository at this point in the history
When backing up, each range is asked to export its data to the BACKUP
storage destination. However if a range contains very little data to
backup, which is very often be the case during an incremental backup if
only a handful of rows in that range were modified, the resulting file
may be very small. If a cluster has tens of thousands of ranges, having
each write separate, small files produces a backup made up of tens of
thousands of tiny files. Running such a backup every hour or more often
rapidly produces potentially millions of files very quickly. This adds
up in storage costs, metadata and tracking overhead, etc.

This change adds a setting bulkio.backup.merge_file_size under which a
range will _return_ the file it would have written to the backup storage
destination instead of writing it. This leverages the fact that the
backup process will merge the returned file with other returned files
until it has a file of the desired target size.

Release note (ops change): the new setting bulkio.backup.merge_file_size
allows BACKUP to buffer and merge smaller files to reduce the number of
small individual files created by BACKUP.
  • Loading branch information
dt committed Jun 24, 2021
1 parent b234b9f commit 2c31602
Show file tree
Hide file tree
Showing 8 changed files with 588 additions and 537 deletions.
10 changes: 8 additions & 2 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ var (
"external storage instead of writing them directly from the storage layer",
false,
)
smallFileSize = settings.RegisterByteSizeSetting(
"bulkio.backup.merge_file_size",
"size under which backup files will be forwarded to another node to be merged with other smaller files "+
"(and implies files will be buffered in-memory until this size before being written to backup storage)",
16<<20,
settings.NonNegativeInt,
)
)

const backupProcessorName = "backupDataProcessor"
Expand Down Expand Up @@ -239,8 +246,6 @@ func runBackupProcessor(
case <-ctxDone:
return ctx.Err()
case span := <-todo:
// TODO(pbardea): It would be nice if we could avoid producing many small
// SSTs. See #44480.
header := roachpb.Header{Timestamp: span.end}
req := &roachpb.ExportRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(span.span),
Expand All @@ -249,6 +254,7 @@ func runBackupProcessor(
EnableTimeBoundIteratorOptimization: useTBI.Get(&clusterSettings.SV),
MVCCFilter: spec.MVCCFilter,
TargetFileSize: targetFileSize,
ReturnSstBelowSize: smallFileSize.Get(&clusterSettings.SV),
}
if writeSSTsInProcessor {
req.ReturnSST = true
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,9 @@ func checkInProgressBackupRestore(
defer cleanup()

sqlDB.Exec(t, `CREATE DATABASE restoredb`)
// the small test-case will get entirely buffered/merged by small-file merging
// and not report any progress in the meantime unless it is disabled.
sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_size = '0'`)

var totalExpectedBackupRequests int
// mergedRangeQuery calculates the number of spans we expect PartitionSpans to
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,10 @@ func TestReintroduceOfflineSpans(t *testing.T) {
dbBackupLoc := "nodelocal://0/my_db_backup"
clusterBackupLoc := "nodelocal://0/my_cluster_backup"

// the small test-case will get entirely buffered/merged by small-file merging
// and not report any progress in the meantime unless it is disabled.
srcDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_size = '0'`)

// Take a backup that we'll use to create an OFFLINE descriptor.
srcDB.Exec(t, `CREATE INDEX new_idx ON data.bank (balance)`)
srcDB.Exec(t, `BACKUP DATABASE data TO $1 WITH revision_history`, dbBackupLoc)
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/cliccl/debug_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ func TestExportDataWithMultipleRanges(t *testing.T) {
defer srv.Stopper().Stop(ctx)

sqlDB := sqlutils.MakeSQLRunner(db)
// the small test-case will get entirely buffered/merged by small-file merging
// and mean there one only be a single file.
sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_size = '0'`)
sqlDB.Exec(t, `CREATE DATABASE testDB`)
sqlDB.Exec(t, `USE testDB`)
sqlDB.Exec(t, `CREATE TABLE fooTable(id int PRIMARY KEY)`)
Expand Down
26 changes: 15 additions & 11 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,6 @@ func evalExport(
break
}

if args.Encryption != nil {
data, err = EncryptFile(data, args.Encryption.Key)
if err != nil {
return result.Result{}, err
}
}

span := roachpb.Span{Key: start}
if resume != nil {
span.EndKey = resume
Expand All @@ -191,7 +184,21 @@ func evalExport(
LocalityKV: localityKV,
}

if exportStore != nil {
returnSST := args.ReturnSST
if args.ReturnSstBelowSize > 0 && len(data) < int(args.ReturnSstBelowSize) {
returnSST = true
}

if returnSST {
exported.SST = data
} else {
if args.Encryption != nil {
data, err = EncryptFile(data, args.Encryption.Key)
if err != nil {
return result.Result{}, err
}
}

exported.Path = GenerateUniqueSSTName(base.SQLInstanceID(cArgs.EvalCtx.NodeID()))
if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxUploadRetries, func() error {
// We blindly retry any error here because we expect the caller to have
Expand All @@ -205,9 +212,6 @@ func evalExport(
return result.Result{}, err
}
}
if args.ReturnSST {
exported.SST = data
}
reply.Files = append(reply.Files, exported)
start = resume

Expand Down
9 changes: 0 additions & 9 deletions pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ import (
"bytes"
"context"
"fmt"
"io/ioutil"
"math"
"math/rand"
"path/filepath"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -84,13 +82,6 @@ func TestExportCmd(t *testing.T) {
}
defer sst.Close()

fileContents, err := ioutil.ReadFile(filepath.Join(dir, "foo", file.Path))
if err != nil {
t.Fatalf("%+v", err)
}
if !bytes.Equal(fileContents, file.SST) {
t.Fatal("Returned SST and exported SST don't match!")
}
sst.SeekGE(storage.MVCCKey{Key: keys.MinKey})
for {
if valid, err := sst.Valid(); !valid || err != nil {
Expand Down
Loading

0 comments on commit 2c31602

Please sign in to comment.