Skip to content

Commit

Permalink
Merge #39724
Browse files Browse the repository at this point in the history
39724: bulk: Compute MVCCStats of the SST being ingested on-the-fly r=adityamaru27 a=adityamaru27

This change is an optimization to the MVCCStats collection
in the bulk ingestion pipeline. Currently when ingesting an
SST via the SSTBatcher, we have one iteration to construct an
SST, and an additional one to compute the MVCCStats for the
span being ingested.
In scenarios such as IMPORT, where we have an enforced guarantee
(via the disallowShadowing flag) that the KVs being ingested
do not shadow existing data, MVCCStats collection becomes very
simple. This change adds logic to collect these stats on-the-fly
while the SST is being constructed, thereby saving us an additional
iteration which has been profiled as a bottleneck in IMPORT.

TODO:
There is a significant performance win to be achieved by
ensuring that the stats computed are not estimates as it prevents
recompuation on splits. Running AddSSTable with disallowShadowing=true gets
us close to this as we do not allow colliding keys to be ingested. However,
in the situation that two SSTs have KV(s) which "perfectly" shadow an
existing key (equal ts and value), we do not consider this a collision.
While the KV would just overwrite the existing data, the stats would be
re-added, causing a double count for such KVs. One solution is to
compute the stats for these "skipped" KVs on-the-fly while checking for the
collision condition and returning their stats. The final stats would then
be base_stats + sst_stats - skipped_stats, and this would be accurate.

Benchmark update: Over three runs of TPCC 1k on a 4 node, default roachprod cluster, the time dropped from ~32m to ~22m.

Release note: None

Co-authored-by: Aditya Maru <adityamaru@cockroachlabs.com>
  • Loading branch information
craig[bot] and adityamaru27 committed Aug 20, 2019
2 parents b2d2e31 + 2a13ce6 commit 278e0ba
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 60 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/bulk"
"github.com/cockroachdb/cockroach/pkg/storage/diskmap"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -220,7 +221,7 @@ func (sp *sstWriter) Run(ctx context.Context) {
// can never be any shadow keys, and this check could be turned off.
// Plumb a user option to toggle this if the overhead without
// colliding keys becomes more significant.
if err := bulk.AddSSTable(ctx, sp.db, sst.span.Key, sst.span.EndKey, sst.data, true /* disallowShadowing */); err != nil {
if err := bulk.AddSSTable(ctx, sp.db, sst.span.Key, sst.span.EndKey, sst.data, true /* disallowShadowing */, enginepb.MVCCStats{} /* ms */); err != nil {
return err
}

Expand Down
32 changes: 31 additions & 1 deletion pkg/storage/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/kr/pretty"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -72,16 +73,33 @@ func EvalAddSSTable(
}
}

// Get the MVCCStats for the SST being ingested.
var stats enginepb.MVCCStats
if args.MVCCStats != nil {
stats = *args.MVCCStats
} else {
}

// Stats are computed on-the-fly when shadowing of keys is disallowed. If we
// took the fast path and race is enabled, assert the stats were correctly
// computed.
verifyFastPath := args.DisallowShadowing && util.RaceEnabled
if args.MVCCStats == nil || verifyFastPath {
log.VEventf(ctx, 2, "computing MVCCStats for SSTable [%s,%s)", mvccStartKey.Key, mvccEndKey.Key)

computed, err := engine.ComputeStatsGo(dataIter, mvccStartKey, mvccEndKey, h.Timestamp.WallTime)
if err != nil {
return result.Result{}, errors.Wrap(err, "computing SSTable MVCC stats")
}

if verifyFastPath {
// Update the timestamp to that of the recently computed stats to get the
// diff passing.
stats.LastUpdateNanos = computed.LastUpdateNanos
if !stats.Equal(computed) {
log.Fatalf(ctx, "fast-path MVCCStats computation gave wrong result: diff(fast, computed) = %s",
pretty.Diff(stats, computed))
}
}
stats = computed
}

Expand Down Expand Up @@ -135,6 +153,18 @@ func EvalAddSSTable(
// Callers can trigger such a re-computation to fixup any discrepancies (and
// remove the ContainsEstimates flag) after they are done ingesting files by
// sending an explicit recompute.
//
// TODO(adityamaru): There is a significant performance win to be achieved by
// ensuring that the stats computed are not estimates as it prevents
// recompuation on splits. Running AddSSTable with disallowShadowing=true gets
// us close to this as we do not allow colliding keys to be ingested. However,
// in the situation that two SSTs have KV(s) which "perfectly" shadow an
// existing key (equal ts and value), we do not consider this a collision.
// While the KV would just overwrite the existing data, the stats would be
// added below, causing a double count for those KVs. One solution is to
// compute the stats for these "skipped" KVs on-the-fly while checking for the
// collision condition and returning their stats. The final stats would then
// be ms + stats - skipped_stats, and this would be accurate.
stats.ContainsEstimates = true
ms.Add(stats)

Expand Down
27 changes: 23 additions & 4 deletions pkg/storage/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,20 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
return sstBytes
}

getStats := func(startKey, endKey engine.MVCCKey, data []byte) enginepb.MVCCStats {
dataIter, err := engine.NewMemSSTIterator(data, true)
if err != nil {
return enginepb.MVCCStats{}
}
defer dataIter.Close()

stats, err := engine.ComputeStatsGo(dataIter, startKey, endKey, 0)
if err != nil {
t.Fatalf("%+v", err)
}
return stats
}

// Test key collision when ingesting a key in the start of existing data, and
// SST. The colliding key is also equal to the header start key.
{
Expand All @@ -454,6 +468,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
})

sstBytes := getSSTBytes(sstKVs)
stats := getStats(engine.MVCCKey{Key: roachpb.Key("a")}, engine.MVCCKey{Key: roachpb.Key("b")}, sstBytes)
cArgs := batcheval.CommandArgs{
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
Expand All @@ -462,8 +477,9 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
Data: sstBytes,
DisallowShadowing: true,
MVCCStats: &stats,
},
Stats: &enginepb.MVCCStats{},
Stats: &stats,
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
Expand Down Expand Up @@ -518,7 +534,6 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
Data: sstBytes,
DisallowShadowing: true,
},
Stats: &enginepb.MVCCStats{},
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
Expand All @@ -536,6 +551,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
})

sstBytes := getSSTBytes(sstKVs)
stats := getStats(engine.MVCCKey{Key: roachpb.Key("c")}, engine.MVCCKey{Key: roachpb.Key("i")}, sstBytes)
cArgs := batcheval.CommandArgs{
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
Expand All @@ -544,8 +560,9 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("c"), EndKey: roachpb.Key("i")},
Data: sstBytes,
DisallowShadowing: true,
MVCCStats: &stats,
},
Stats: &enginepb.MVCCStats{},
Stats: &stats,
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
Expand Down Expand Up @@ -737,6 +754,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
})

sstBytes := getSSTBytes(sstKVs)
stats := getStats(engine.MVCCKey{Key: roachpb.Key("e")}, engine.MVCCKey{Key: roachpb.Key("zz")}, sstBytes)
cArgs := batcheval.CommandArgs{
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 7},
Expand All @@ -745,8 +763,9 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("e"), EndKey: roachpb.Key("zz")},
Data: sstBytes,
DisallowShadowing: true,
MVCCStats: &stats,
},
Stats: &enginepb.MVCCStats{},
Stats: &stats,
}

_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
Expand Down
64 changes: 54 additions & 10 deletions pkg/storage/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type SSTBatcher struct {

// allows ingestion of keys where the MVCC.Key would shadow an existing row.
disallowShadowing bool

// stores on-the-fly stats for the SST if disallowShadowing is true.
ms enginepb.MVCCStats
}

// MakeSSTBatcher makes a ready-to-use SSTBatcher.
Expand All @@ -85,6 +88,22 @@ func MakeSSTBatcher(ctx context.Context, db sender, flushBytes int64) (*SSTBatch
return b, err
}

func (b *SSTBatcher) updateMVCCStats(key engine.MVCCKey, value []byte) {
metaKeySize := int64(len(key.Key)) + 1
metaValSize := int64(0)
b.ms.LiveBytes += metaKeySize
b.ms.LiveCount++
b.ms.KeyBytes += metaKeySize
b.ms.ValBytes += metaValSize
b.ms.KeyCount++

totalBytes := int64(len(value)) + engine.MVCCVersionTimestampSize
b.ms.LiveBytes += totalBytes
b.ms.KeyBytes += engine.MVCCVersionTimestampSize
b.ms.ValBytes += int64(len(value))
b.ms.ValCount++
}

// AddMVCCKey adds a key+timestamp/value pair to the batch (flushing if needed).
// This is only for callers that want to control the timestamp on individual
// keys -- like RESTORE where we want the restored data to look the like backup.
Expand Down Expand Up @@ -122,6 +141,16 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key engine.MVCCKey, value [
if err := b.rowCounter.Count(key.Key); err != nil {
return err
}

// If we do not allowing shadowing of keys when ingesting an SST via
// AddSSTable, then we can update the MVCCStats on the fly because we are
// guaranteed to ingest unique keys. This saves us an extra iteration in
// AddSSTable which has been identified as a significant performance
// regression for IMPORT.
if b.disallowShadowing {
b.updateMVCCStats(key, value)
}

return b.sstWriter.Add(engine.MVCCKeyValue{Key: key, Value: value})
}

Expand All @@ -134,6 +163,7 @@ func (b *SSTBatcher) Reset() error {
b.batchEndValue = b.batchEndValue[:0]
b.flushKey = nil
b.flushKeyChecked = false
b.ms.Reset()

b.rowCounter.BulkOpSummary.Reset()
return nil
Expand Down Expand Up @@ -193,7 +223,13 @@ func (b *SSTBatcher) Flush(ctx context.Context) error {
if err != nil {
return errors.Wrapf(err, "finishing constructed sstable")
}
if err := AddSSTable(ctx, b.db, start, end, sstBytes, b.disallowShadowing); err != nil {

// If the stats have been computed on-the-fly, set the last updated time
// before ingesting the SST.
if (b.ms != enginepb.MVCCStats{}) {
b.ms.LastUpdateNanos = timeutil.Now().UnixNano()
}
if err := AddSSTable(ctx, b.db, start, end, sstBytes, b.disallowShadowing, b.ms); err != nil {
return err
}
b.totalRows.Add(b.rowCounter.BulkOpSummary)
Expand Down Expand Up @@ -228,22 +264,30 @@ type sstSpan struct {
// SST spans a split, in which case it is iterated and split into two SSTs, one
// for each side of the split in the error, and each are retried.
func AddSSTable(
ctx context.Context, db sender, start, end roachpb.Key, sstBytes []byte, disallowShadowing bool,
ctx context.Context,
db sender,
start, end roachpb.Key,
sstBytes []byte,
disallowShadowing bool,
ms enginepb.MVCCStats,
) error {
// Create an iterator that iterates over the top level SST for use in stats
// computation and to produce split ssts if needed for retries.
now := timeutil.Now().UnixNano()
iter, err := engine.NewMemSSTIterator(sstBytes, true)
if err != nil {
return err
}
defer iter.Close()

now := timeutil.Now().UnixNano()
stats, err := engine.ComputeStatsGo(
iter, engine.MVCCKey{Key: start}, engine.MVCCKey{Key: end}, now,
)
if err != nil {
return errors.Wrapf(err, "computing stats for SST [%s, %s)", start, end)
var stats enginepb.MVCCStats
if (ms == enginepb.MVCCStats{}) {
stats, err = engine.ComputeStatsGo(
iter, engine.MVCCKey{Key: start}, engine.MVCCKey{Key: end}, now,
)
if err != nil {
return errors.Wrapf(err, "computing stats for SST [%s, %s)", start, end)
}
} else {
stats = ms
}

work := []*sstSpan{{start: start, end: end, sstBytes: sstBytes, disallowShadowing: disallowShadowing, stats: stats}}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func TestAddBigSpanningSSTWithSplits(t *testing.T) {

t.Logf("Adding %dkb sst spanning %d splits from %v to %v", len(sst)/kb, len(splits), start, end)
if err := bulk.AddSSTable(
context.TODO(), mock, start, end, sst, false, /* disallowShadowing */
context.TODO(), mock, start, end, sst, false /* disallowShadowing */, enginepb.MVCCStats{},
); err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 278e0ba

Please sign in to comment.