Skip to content

Commit

Permalink
db: add IngestOperationStats for admission control
Browse files Browse the repository at this point in the history
To allow for async ingestion (being implemented) we don't reuse
TableIngestInfo, and allow an approximate value for the bytes
ingested into L0. These will be used in admission control to
more precisely account for how much load an operation is adding
to L0 (prototype is in
https://github.com/cockroachdb/cockroach/blob/ec4149e310f9b383f87cc7ff8776258d72927394/pkg/util/admission/work_queue.go#L911).

Fixes #1600
  • Loading branch information
sumeerbhola committed Apr 19, 2022
1 parent 194770c commit 4a0889f
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 9 deletions.
3 changes: 2 additions & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ func runForceIngestCmd(td *datadriven.TestData, d *DB) error {
}
}
}
return d.ingest(paths, func(
_, err := d.ingest(paths, func(
tableNewIters,
IterOptions,
Compare,
Expand All @@ -898,6 +898,7 @@ func runForceIngestCmd(td *datadriven.TestData, d *DB) error {
) (int, error) {
return level, nil
})
return err
}

func runLSMCmd(td *datadriven.TestData, d *DB) string {
Expand Down
47 changes: 40 additions & 7 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,38 @@ func (d *DB) Ingest(paths []string) error {
if d.opts.ReadOnly {
return ErrReadOnly
}
_, err := d.ingest(paths, ingestTargetLevel)
return err
}

// IngestOperationStats provides some information about where in the LSM the
// bytes were ingested.
type IngestOperationStats struct {
// Bytes is the total bytes in the ingested sstables.
Bytes uint64
// ApproxIngestedIntoL0Bytes is the approximate number of bytes ingested
// into L0.
// Currently, this value is completely accurate, but we are allowing this to
// be approximate once https://github.com/cockroachdb/pebble/issues/25 is
// implemented.
ApproxIngestedIntoL0Bytes uint64
}

// IngestWithStats does the same as Ingest, and additionally returns
// IngestOperationStats.
func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) {
if err := d.closed.Load(); err != nil {
panic(err)
}
if d.opts.ReadOnly {
return IngestOperationStats{}, ErrReadOnly
}
return d.ingest(paths, ingestTargetLevel)
}

func (d *DB) ingest(paths []string, targetLevelFunc ingestTargetLevelFunc) error {
func (d *DB) ingest(
paths []string, targetLevelFunc ingestTargetLevelFunc,
) (IngestOperationStats, error) {
// Allocate file numbers for all of the files being ingested and mark them as
// pending in order to prevent them from being deleted. Note that this causes
// the file number ordering to be out of alignment with sequence number
Expand All @@ -595,16 +623,16 @@ func (d *DB) ingest(paths []string, targetLevelFunc ingestTargetLevelFunc) error
// and elides empty sstables.
meta, paths, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, d.cacheID, pendingOutputs)
if err != nil {
return err
return IngestOperationStats{}, err
}
if len(meta) == 0 {
// All of the sstables to be ingested were empty. Nothing to do.
return nil
return IngestOperationStats{}, nil
}

// Verify the sstables do not overlap.
if err := ingestSortAndVerify(d.cmp, meta, paths); err != nil {
return err
return IngestOperationStats{}, err
}

// Hard link the sstables into the DB directory. Since the sstables aren't
Expand All @@ -613,14 +641,14 @@ func (d *DB) ingest(paths []string, targetLevelFunc ingestTargetLevelFunc) error
// fall back to copying, and if that fails we undo our work and return an
// error.
if err := ingestLink(jobID, d.opts, d.dirname, paths, meta); err != nil {
return err
return IngestOperationStats{}, err
}
// Fsync the directory we added the tables to. We need to do this at some
// point before we update the MANIFEST (via logAndApply), otherwise a crash
// can have the tables referenced in the MANIFEST, but not present in the
// directory.
if err := d.dataDir.Sync(); err != nil {
return err
return IngestOperationStats{}, err
}

var mem *flushableEntry
Expand Down Expand Up @@ -695,6 +723,7 @@ func (d *DB) ingest(paths []string, targetLevelFunc ingestTargetLevelFunc) error
GlobalSeqNum: meta[0].SmallestSeqNum,
Err: err,
}
var stats IngestOperationStats
if ve != nil {
info.Tables = make([]struct {
TableInfo
Expand All @@ -704,11 +733,15 @@ func (d *DB) ingest(paths []string, targetLevelFunc ingestTargetLevelFunc) error
e := &ve.NewFiles[i]
info.Tables[i].Level = e.Level
info.Tables[i].TableInfo = e.Meta.TableInfo()
stats.Bytes += e.Meta.Size
if e.Level == 0 {
stats.ApproxIngestedIntoL0Bytes += e.Meta.Size
}
}
}
d.opts.EventListener.TableIngested(info)

return err
return stats, err
}

type ingestTargetLevelFunc func(
Expand Down
38 changes: 37 additions & 1 deletion ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,14 +998,50 @@ func TestIngestFlushQueuedMemTable(t *testing.T) {
require.NoError(t, w.Set([]byte(k), nil))
}
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{"ext"}))
stats, err := d.IngestWithStats([]string{"ext"})
require.NoError(t, err)
require.Equal(t, stats.ApproxIngestedIntoL0Bytes, stats.Bytes)
require.Less(t, uint64(0), stats.Bytes)
}

ingest("a")

require.NoError(t, d.Close())
}

func TestIngestStats(t *testing.T) {
mem := vfs.NewMem()
d, err := Open("", &Options{
FS: mem,
})
require.NoError(t, err)

ingest := func(expectedLevel int, keys ...string) {
t.Helper()
f, err := mem.Create("ext")
require.NoError(t, err)

w := sstable.NewWriter(f, sstable.WriterOptions{})
for _, k := range keys {
require.NoError(t, w.Set([]byte(k), nil))
}
require.NoError(t, w.Close())
stats, err := d.IngestWithStats([]string{"ext"})
require.NoError(t, err)
if expectedLevel == 0 {
require.Equal(t, stats.ApproxIngestedIntoL0Bytes, stats.Bytes)
} else {
require.EqualValues(t, 0, stats.ApproxIngestedIntoL0Bytes)
}
require.Less(t, uint64(0), stats.Bytes)
}
ingest(6, "a")
ingest(0, "a")
ingest(6, "b", "g")
ingest(0, "c")
require.NoError(t, d.Close())
}

func TestIngestFlushQueuedLargeBatch(t *testing.T) {
// Verify that ingestion forces a flush of a queued large batch.

Expand Down

0 comments on commit 4a0889f

Please sign in to comment.