Skip to content

Commit

Permalink
storage: make ExportMVCCToSST take a writer
Browse files Browse the repository at this point in the history
This refactors ExportMVCCToSST to take a Writer argument instead of returning []byte.

The caller of it passes MemFile and then gets the []byte result out of the memfile,
just like ExportMVCCToSST did internally before.

Release note: none.
  • Loading branch information
dt committed Apr 30, 2021
1 parent b21add3 commit 7bca569
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 38 deletions.
6 changes: 4 additions & 2 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,13 @@ func evalExport(
useTBI := args.EnableTimeBoundIteratorOptimization && !args.StartTime.IsEmpty()
var curSizeOfExportedSSTs int64
for start := args.Key; start != nil; {
data, summary, resume, err := e.ExportMVCCToSst(start, args.EndKey, args.StartTime,
h.Timestamp, exportAllRevisions, targetSize, maxSize, useTBI)
destFile := &storage.MemFile{}
summary, resume, err := e.ExportMVCCToSst(start, args.EndKey, args.StartTime,
h.Timestamp, exportAllRevisions, targetSize, maxSize, useTBI, destFile)
if err != nil {
return result.Result{}, err
}
data := destFile.Data()

// NB: This should only happen on the first page of results. If there were
// more data to be read that lead to pagination then we'd see it in this
Expand Down
10 changes: 6 additions & 4 deletions pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,9 +576,11 @@ func assertEqualKVs(
var summary roachpb.BulkOpSummary
maxSize := uint64(0)
prevStart := start
sst, summary, start, err = e.ExportMVCCToSst(start, endKey, startTime, endTime,
exportAllRevisions, targetSize, maxSize, enableTimeBoundIteratorOptimization)
sstFile := &storage.MemFile{}
summary, start, err = e.ExportMVCCToSst(start, endKey, startTime, endTime,
exportAllRevisions, targetSize, maxSize, enableTimeBoundIteratorOptimization, sstFile)
require.NoError(t, err)
sst = sstFile.Data()
loaded := loadSST(t, sst, startKey, endKey)
// Ensure that the pagination worked properly.
if start != nil {
Expand Down Expand Up @@ -615,8 +617,8 @@ func assertEqualKVs(
if dataSizeWhenExceeded == maxSize {
maxSize--
}
_, _, _, err = e.ExportMVCCToSst(prevStart, endKey, startTime, endTime,
exportAllRevisions, targetSize, maxSize, enableTimeBoundIteratorOptimization)
_, _, err = e.ExportMVCCToSst(prevStart, endKey, startTime, endTime,
exportAllRevisions, targetSize, maxSize, enableTimeBoundIteratorOptimization, &storage.MemFile{})
require.Regexp(t, fmt.Sprintf("export size \\(%d bytes\\) exceeds max size \\(%d bytes\\)",
dataSizeWhenExceeded, maxSize), err)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package spanset

import (
"context"
"io"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -361,9 +362,10 @@ func (s spanSetReader) ExportMVCCToSst(
exportAllRevisions bool,
targetSize, maxSize uint64,
useTBI bool,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
dest io.WriteCloser,
) (roachpb.BulkOpSummary, roachpb.Key, error) {
return s.r.ExportMVCCToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize,
maxSize, useTBI)
maxSize, useTBI, dest)
}

func (s spanSetReader) MVCCGet(key storage.MVCCKey) ([]byte, error) {
Expand Down
9 changes: 7 additions & 2 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1352,11 +1352,16 @@ func runExportToSst(
for i := 0; i < b.N; i++ {
startTS := hlc.Timestamp{WallTime: int64(numRevisions / 2)}
endTS := hlc.Timestamp{WallTime: int64(numRevisions + 2)}
_, _, _, err := engine.ExportMVCCToSst(keys.LocalMax, roachpb.KeyMax, startTS, endTS,
exportAllRevisions, 0 /* targetSize */, 0 /* maxSize */, useTBI)
_, _, err := engine.ExportMVCCToSst(keys.LocalMax, roachpb.KeyMax, startTS, endTS,
exportAllRevisions, 0 /* targetSize */, 0 /* maxSize */, useTBI, noopWriter{})
if err != nil {
b.Fatal(err)
}
}
b.StopTimer()
}

type noopWriter struct{}

func (noopWriter) Close() error { return nil }
func (noopWriter) Write(p []byte) (int, error) { return len(p), nil }
4 changes: 3 additions & 1 deletion pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -364,7 +365,8 @@ type Reader interface {
ExportMVCCToSst(
startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp,
exportAllRevisions bool, targetSize uint64, maxSize uint64, useTBI bool,
) (sst []byte, _ roachpb.BulkOpSummary, resumeKey roachpb.Key, _ error)
dest io.WriteCloser,
) (_ roachpb.BulkOpSummary, resumeKey roachpb.Key, _ error)
// Get returns the value for the given key, nil otherwise. Semantically, it
// behaves as if an iterator with MVCCKeyAndIntentsIterKind was used.
//
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/mvcc_incremental_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ func assertExportedKVs(
useTBI bool,
) {
const big = 1 << 30
data, _, _, err := e.ExportMVCCToSst(startKey, endKey, startTime, endTime, revisions, big, big,
useTBI)
sstFile := &MemFile{}
_, _, err := e.ExportMVCCToSst(startKey, endKey, startTime, endTime, revisions, big, big,
useTBI, sstFile)
require.NoError(t, err)

data := sstFile.Data()
if data == nil {
require.Nil(t, expected)
return
Expand Down
49 changes: 26 additions & 23 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,13 +653,14 @@ func (p *Pebble) ExportMVCCToSst(
exportAllRevisions bool,
targetSize, maxSize uint64,
useTBI bool,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
dest io.WriteCloser,
) (roachpb.BulkOpSummary, roachpb.Key, error) {
r := wrapReader(p)
// Doing defer r.Free() does not inline.
b, summary, k, err := pebbleExportToSst(r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize,
maxSize, useTBI)
summary, k, err := pebbleExportToSst(r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize,
maxSize, useTBI, dest)
r.Free()
return b, summary, k, err
return summary, k, err
}

// MVCCGet implements the Engine interface.
Expand Down Expand Up @@ -1322,13 +1323,14 @@ func (p *pebbleReadOnly) ExportMVCCToSst(
exportAllRevisions bool,
targetSize, maxSize uint64,
useTBI bool,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
dest io.WriteCloser,
) (roachpb.BulkOpSummary, roachpb.Key, error) {
r := wrapReader(p)
// Doing defer r.Free() does not inline.
b, summary, k, err := pebbleExportToSst(
r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, useTBI)
summary, k, err := pebbleExportToSst(
r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, useTBI, dest)
r.Free()
return b, summary, k, err
return summary, k, err
}

func (p *pebbleReadOnly) MVCCGet(key MVCCKey) ([]byte, error) {
Expand Down Expand Up @@ -1584,13 +1586,14 @@ func (p *pebbleSnapshot) ExportMVCCToSst(
exportAllRevisions bool,
targetSize, maxSize uint64,
useTBI bool,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
dest io.WriteCloser,
) (roachpb.BulkOpSummary, roachpb.Key, error) {
r := wrapReader(p)
// Doing defer r.Free() does not inline.
b, summary, k, err := pebbleExportToSst(
r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, useTBI)
summary, k, err := pebbleExportToSst(
r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, useTBI, dest)
r.Free()
return b, summary, k, err
return summary, k, err
}

// Get implements the Reader interface.
Expand Down Expand Up @@ -1695,9 +1698,9 @@ func pebbleExportToSst(
exportAllRevisions bool,
targetSize, maxSize uint64,
useTBI bool,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
sstFile := &MemFile{}
sstWriter := MakeBackupSSTWriter(sstFile)
dest io.WriteCloser,
) (roachpb.BulkOpSummary, roachpb.Key, error) {
sstWriter := MakeBackupSSTWriter(noopSync{dest})
defer sstWriter.Close()

var rows RowCounter
Expand All @@ -1718,7 +1721,7 @@ func pebbleExportToSst(
if err != nil {
// The error may be a WriteIntentError. In which case, returning it will
// cause this command to be retried.
return nil, roachpb.BulkOpSummary{}, nil, err
return roachpb.BulkOpSummary{}, nil, err
}
if !ok {
break
Expand All @@ -1738,7 +1741,7 @@ func pebbleExportToSst(
skipTombstones := !exportAllRevisions && startTS.IsEmpty()
if len(unsafeValue) > 0 || !skipTombstones {
if err := rows.Count(unsafeKey.Key); err != nil {
return nil, roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "decoding %s", unsafeKey)
return roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "decoding %s", unsafeKey)
}
curSize := rows.BulkOpSummary.DataSize
reachedTargetSize := curSize > 0 && uint64(curSize) >= targetSize
Expand All @@ -1751,16 +1754,16 @@ func pebbleExportToSst(
// This should never be an intent since the incremental iterator returns
// an error when encountering intents.
if err := sstWriter.PutUnversioned(unsafeKey.Key, unsafeValue); err != nil {
return nil, roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "adding key %s", unsafeKey)
return roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "adding key %s", unsafeKey)
}
} else {
if err := sstWriter.PutMVCC(unsafeKey, unsafeValue); err != nil {
return nil, roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "adding key %s", unsafeKey)
return roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "adding key %s", unsafeKey)
}
}
newSize := curSize + int64(len(unsafeKey.Key)+len(unsafeValue))
if maxSize > 0 && newSize > int64(maxSize) {
return nil, roachpb.BulkOpSummary{}, nil,
return roachpb.BulkOpSummary{}, nil,
errors.Errorf("export size (%d bytes) exceeds max size (%d bytes)", newSize, maxSize)
}
rows.BulkOpSummary.DataSize = newSize
Expand All @@ -1776,12 +1779,12 @@ func pebbleExportToSst(
if rows.BulkOpSummary.DataSize == 0 {
// If no records were added to the sstable, skip completing it and return a
// nil slice – the export code will discard it anyway (based on 0 DataSize).
return nil, roachpb.BulkOpSummary{}, nil, nil
return roachpb.BulkOpSummary{}, nil, nil
}

if err := sstWriter.Finish(); err != nil {
return nil, roachpb.BulkOpSummary{}, nil, err
return roachpb.BulkOpSummary{}, nil, err
}

return sstFile.Data(), rows.BulkOpSummary, resumeKey, nil
return rows.BulkOpSummary, resumeKey, nil
}
4 changes: 3 additions & 1 deletion pkg/storage/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package storage

import (
"context"
"io"
"sync"

"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -133,7 +134,8 @@ func (p *pebbleBatch) ExportMVCCToSst(
exportAllRevisions bool,
targetSize, maxSize uint64,
useTBI bool,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
dest io.WriteCloser,
) (roachpb.BulkOpSummary, roachpb.Key, error) {
panic("unimplemented")
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/sst_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ type writeCloseSyncer interface {
Sync() error
}

type noopSync struct {
io.WriteCloser
}

func (noopSync) Sync() error {
return nil
}

// MakeBackupSSTWriter creates a new SSTWriter tailored for backup SSTs. These
// SSTs have bloom filters disabled and format set to LevelDB.
func MakeBackupSSTWriter(f writeCloseSyncer) SSTWriter {
Expand Down

0 comments on commit 7bca569

Please sign in to comment.