Skip to content

Commit

Permalink
admission, kvserver: snapshot integration for disk bandwidth
Browse files Browse the repository at this point in the history
This patch integrates raft snapshot ingestion with the disk write
mechanism in admission control. The following internal machinery changes
were made to make that possible:
- `SnapshotQueue` was added as an implementation of the `requester`
  interface. Internally, it is a simple FIFO queue unlike the other work
  queue, since we can make the assumption that all snapshots are of the
  same priority and are processed as system tenant requests.
- A new `kvStoreTokenChildGranter` was created to grant tokens to
  snapshot requests.
- We now have a `StoreWorkType` that differentiates `regular`,
  `elastic`, and `snapshot` work for the store granters. This was
  necessary because snapshots do not incur the same write-amp as the
  other work types – they land into L6 of the LSM due to excises. We
  also only want these requests to be subject to pacing based on disk
  bandwidth.
- We now prioritize store writes in the following order: `regular`,
  `snapshot`, `elastic`.
- The `demuxHandle` of the `GrantCoordinator` now uses `StoreWorkType`.

The integration point for the `SnapshotQueue` is in `Receive()` where we
use a pacing mechanism to process incoming snapshots. Snapshots are
subject to `snapshotBurstSize` amounts of disk writes before asking for
further admission of the same size. The `multiSSTWriter` uses Pebble's
SST size estimates to maintain a running count of disk writes incurred
by the snapshot ingest. Once the SST is finalized we deduct/return
further tokens.

Closes cockroachdb#120708.

Release note (ops change): Admission Control now has an integration for
pacing snapshot ingest traffic based on disk bandwidth.
`kvadmission.store.snapshot_ingest_bandwidth_control.enabled` is used to
turn on this integration. Note that it requires provisioned bandwidth to
be set for the store (or cluster through the cluster setting) for it to
take effect.
  • Loading branch information
aadityasondhi authored and annrpom committed Oct 14, 2024
1 parent f28a817 commit 2ac6bcf
Show file tree
Hide file tree
Showing 22 changed files with 1,034 additions and 167 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
<tr><td>STORAGE</td><td>admission.wait_durations.kv.high-pri</td><td>Wait time durations for requests that waited</td><td>Wait time Duration</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>admission.wait_durations.kv.locking-normal-pri</td><td>Wait time durations for requests that waited</td><td>Wait time Duration</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>admission.wait_durations.kv.normal-pri</td><td>Wait time durations for requests that waited</td><td>Wait time Duration</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>admission.wait_durations.snapshot_ingest</td><td>Wait time for snapshot ingest requests that waited</td><td>Wait time Duration</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>admission.wait_durations.sql-kv-response</td><td>Wait time durations for requests that waited</td><td>Wait time Duration</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>admission.wait_durations.sql-kv-response.locking-normal-pri</td><td>Wait time durations for requests that waited</td><td>Wait time Duration</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>admission.wait_durations.sql-kv-response.normal-pri</td><td>Wait time durations for requests that waited</td><td>Wait time Duration</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
<tr><td><div id="setting-kv-transaction-write-pipelining-enabled" class="anchored"><code>kv.transaction.write_pipelining.enabled<br />(alias: kv.transaction.write_pipelining_enabled)</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional writes are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-write-pipelining-max-batch-size" class="anchored"><code>kv.transaction.write_pipelining.max_batch_size<br />(alias: kv.transaction.write_pipelining_max_batch_size)</code></div></td><td>integer</td><td><code>128</code></td><td>if non-zero, defines that maximum size batch that will be pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kvadmission-store-provisioned-bandwidth" class="anchored"><code>kvadmission.store.provisioned_bandwidth</code></div></td><td>byte size</td><td><code>0 B</code></td><td>if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be overridden on a per-store basis using the --store flag. Note that setting the provisioned bandwidth to a positive value may enable disk bandwidth based admission control, since admission.disk_bandwidth_tokens.elastic.enabled defaults to true</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kvadmission-store-snapshot-ingest-bandwidth-control-enabled" class="anchored"><code>kvadmission.store.snapshot_ingest_bandwidth_control.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set to true, snapshot ingests will be subject to disk write control in AC</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-obs-tablemetadata-automatic-updates-enabled" class="anchored"><code>obs.tablemetadata.automatic_updates.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>enables automatic updates of the table metadata cache system.table_metadata</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-obs-tablemetadata-data-valid-duration" class="anchored"><code>obs.tablemetadata.data_valid_duration</code></div></td><td>duration</td><td><code>20m0s</code></td><td>the duration for which the data in system.table_metadata is considered valid</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-schedules-backup-gc-protection-enabled" class="anchored"><code>schedules.backup.gc_protection.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>enable chaining of GC protection across backups run as part of a schedule</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
47 changes: 32 additions & 15 deletions pkg/cmd/roachtest/tests/admission_control_snapshot_overload_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func registerSnapshotOverloadIO(r registry.Registry) {
// sstables should ingest into L6.
limitCompactionConcurrency: true,
limitDiskBandwidth: false,
readPercent: 75,
workloadBlockBytes: 12288,
}))

// This tests the behaviour of snpashot ingestion in bandwidth constrained
Expand All @@ -76,6 +78,8 @@ func registerSnapshotOverloadIO(r registry.Registry) {
volumeSize: 1000,
limitCompactionConcurrency: false,
limitDiskBandwidth: true,
readPercent: 20,
workloadBlockBytes: 1024,
}))

}
Expand All @@ -84,6 +88,8 @@ type admissionControlSnapshotOverloadIOOpts struct {
volumeSize int
limitCompactionConcurrency bool
limitDiskBandwidth bool
readPercent int
workloadBlockBytes int
}

func runAdmissionControlSnapshotOverloadIO(
Expand Down Expand Up @@ -138,18 +144,6 @@ func runAdmissionControlSnapshotOverloadIO(
}
}

if cfg.limitDiskBandwidth {
const bandwidthLimit = 128
dataDir := "/mnt/data1"
if err := setBandwidthLimit(ctx, t, c, c.CRDBNodes(), "wbps", bandwidthLimit<<20 /* 128MiB */, false, dataDir); err != nil {
t.Fatal(err)
}
if _, err := db.ExecContext(
ctx, fmt.Sprintf("SET CLUSTER SETTING kvadmission.store.provisioned_bandwidth = '%dMiB'", bandwidthLimit)); err != nil {
t.Fatalf("failed to set kvadmission.store.provisioned_bandwidth: %v", err)
}
}

// Setup the prometheus instance and client.
t.Status(fmt.Sprintf("setting up prometheus/grafana (<%s)", 2*time.Minute))
var statCollector clusterstats.StatCollector
Expand All @@ -176,15 +170,38 @@ func runAdmissionControlSnapshotOverloadIO(
"./cockroach workload init kv --drop --insert-count=40000000 "+
"--max-block-bytes=12288 --min-block-bytes=12288 {pgurl:1-3}")

// Now set disk bandwidth limits
if cfg.limitDiskBandwidth {
const bandwidthLimit = 128
dataDir := "/mnt/data1"
if err := setBandwidthLimit(ctx, t, c, c.CRDBNodes(), "wbps", bandwidthLimit<<20 /* 128MiB */, false, dataDir); err != nil {
t.Fatal(err)
}
if _, err := db.ExecContext(
ctx, fmt.Sprintf("SET CLUSTER SETTING kvadmission.store.provisioned_bandwidth = '%dMiB'", bandwidthLimit)); err != nil {
t.Fatalf("failed to set kvadmission.store.provisioned_bandwidth: %v", err)
}
if _, err := db.ExecContext(
ctx, "SET CLUSTER SETTING kvadmission.store.snapshot_ingest_bandwidth_control.enabled = 'true'"); err != nil {
t.Fatalf("failed to set kvadmission.store.snapshot_ingest_bandwidth_control.enabled: %v", err)
}
}

t.Status(fmt.Sprintf("starting kv workload thread (<%s)", time.Minute))
m := c.NewMonitor(ctx, c.CRDBNodes())
m.Go(func(ctx context.Context) error {
c.Run(ctx, option.WithNodes(c.WorkloadNode()),
fmt.Sprintf("./cockroach workload run kv --tolerate-errors "+
"--splits=1000 --histograms=%s/stats.json --read-percent=75 "+
"--max-rate=600 --max-block-bytes=12288 --min-block-bytes=12288 "+
"--splits=1000 --histograms=%s/stats.json --read-percent=%d "+
"--max-rate=600 --max-block-bytes=%d --min-block-bytes=%d "+
"--concurrency=4000 --duration=%s {pgurl:1-2}",
t.PerfArtifactsDir(), (6*time.Hour).String()))
t.PerfArtifactsDir(),
cfg.readPercent,
cfg.workloadBlockBytes,
cfg.workloadBlockBytes,
(6*time.Hour).String(),
),
)
return nil
})

Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ type Controller interface {
_ context.Context, _ roachpb.TenantID, _ roachpb.StoreID, _ roachpb.RangeID, _ roachpb.ReplicaID,
leaderTerm uint64, _ raftpb.Entry)
replica_rac2.ACWorkQueue
// GetSnapshotQueue returns the SnapshotQueue which is used for ingesting raft
// snapshots.
GetSnapshotQueue(roachpb.StoreID) *admission.SnapshotQueue
}

// TenantWeightProvider can be periodically asked to provide the tenant
Expand Down Expand Up @@ -702,6 +705,14 @@ func (n *controllerImpl) Admit(ctx context.Context, entry replica_rac2.EntryForA
return true
}

func (n *controllerImpl) GetSnapshotQueue(storeID roachpb.StoreID) *admission.SnapshotQueue {
sq := n.storeGrantCoords.TryGetSnapshotQueueForStore(storeID)
if sq == nil {
return nil
}
return sq.(*admission.SnapshotQueue)
}

// FollowerStoreWriteBytes captures stats about writes done to a store by a
// replica that is not the leaseholder. These are used for admission control.
type FollowerStoreWriteBytes struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,10 @@ func (r *Replica) applySnapshot(
writeBytes = uint64(inSnap.SSTSize)
}
}
// The "ignored" here is to ignore the writes to create the AC linear models
// for LSM writes. Since these writes typically correspond to actual writes
// onto the disk, we account for them separately in
// kvBatchSnapshotStrategy.Receive().
if r.store.cfg.KVAdmissionController != nil {
r.store.cfg.KVAdmissionController.SnapshotIngestedOrWritten(
r.store.StoreID(), ingestStats, writeBytes)
Expand Down
40 changes: 40 additions & 0 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
Expand Down Expand Up @@ -154,6 +155,8 @@ type multiSSTWriter struct {
dataSize int64
// The total size of the SSTs.
sstSize int64
// Incremental count of number of bytes written to disk.
writeBytes int64
// if skipClearForMVCCSpan is true, the MVCC span is not ClearEngineRange()d in
// the same sstable. We rely on the caller to take care of clearing this span
// through a different process (eg. IngestAndExcise on pebble). Note that
Expand Down Expand Up @@ -335,6 +338,8 @@ func (msstw *multiSSTWriter) finalizeSST(ctx context.Context, nextKey *storage.E
metaEndKey, nextKey)
}
}
// Account for any additional bytes written other than the KV data.
msstw.writeBytes += int64(msstw.currSST.Meta.Size) - msstw.currSST.DataSize
msstw.dataSize += msstw.currSST.DataSize
msstw.sstSize += int64(msstw.currSST.Meta.Size)
msstw.currSpan++
Expand Down Expand Up @@ -419,9 +424,11 @@ func (msstw *multiSSTWriter) Put(ctx context.Context, key storage.EngineKey, val
if err := msstw.rolloverSST(ctx, key, key); err != nil {
return err
}
prevWriteBytes := msstw.currSST.EstimatedSize()
if err := msstw.currSST.PutEngineKey(key, value); err != nil {
return errors.Wrap(err, "failed to put in sst")
}
msstw.writeBytes += int64(msstw.currSST.EstimatedSize() - prevWriteBytes)
return nil
}

Expand All @@ -435,6 +442,7 @@ func (msstw *multiSSTWriter) PutInternalPointKey(
if err := msstw.rolloverSST(ctx, decodedKey, decodedKey); err != nil {
return err
}
prevWriteBytes := msstw.currSST.EstimatedSize()
var err error
switch kind {
case pebble.InternalKeyKindSet, pebble.InternalKeyKindSetWithDelete:
Expand All @@ -447,6 +455,7 @@ func (msstw *multiSSTWriter) PutInternalPointKey(
if err != nil {
return errors.Wrap(err, "failed to put in sst")
}
msstw.writeBytes += int64(msstw.currSST.EstimatedSize() - prevWriteBytes)
return nil
}

Expand Down Expand Up @@ -479,9 +488,11 @@ func (msstw *multiSSTWriter) PutInternalRangeDelete(ctx context.Context, start,
if err := msstw.rolloverSST(ctx, decodedStart, decodedEnd); err != nil {
return err
}
prevWriteBytes := msstw.currSST.EstimatedSize()
if err := msstw.currSST.ClearRawEncodedRange(start, end); err != nil {
return errors.Wrap(err, "failed to put range delete in sst")
}
msstw.writeBytes += int64(msstw.currSST.EstimatedSize() - prevWriteBytes)
return nil
}

Expand All @@ -498,9 +509,11 @@ func (msstw *multiSSTWriter) PutInternalRangeKey(
if err := msstw.rolloverSST(ctx, decodedStart, decodedEnd); err != nil {
return err
}
prevWriteBytes := msstw.currSST.EstimatedSize()
if err := msstw.currSST.PutInternalRangeKey(start, end, key); err != nil {
return errors.Wrap(err, "failed to put range key in sst")
}
msstw.writeBytes += int64(msstw.currSST.EstimatedSize() - prevWriteBytes)
return nil
}

Expand All @@ -514,12 +527,15 @@ func (msstw *multiSSTWriter) PutRangeKey(
return err
}
if msstw.skipClearForMVCCSpan {
prevWriteBytes := msstw.currSST.EstimatedSize()
// Skip the fragmenter. See the comment in skipClearForMVCCSpan.
if err := msstw.currSST.PutEngineRangeKey(start, end, suffix, value); err != nil {
return errors.Wrap(err, "failed to put range key in sst")
}
msstw.writeBytes += int64(msstw.currSST.EstimatedSize() - prevWriteBytes)
return nil
}

startKey, endKey := storage.EngineKey{Key: start}.Encode(), storage.EngineKey{Key: end}.Encode()
startTrailer := pebble.MakeInternalKeyTrailer(0, pebble.InternalKeyKindRangeKeySet)
msstw.rangeKeyFrag.Add(rangekey.Span{
Expand Down Expand Up @@ -716,6 +732,16 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(

var sharedSSTs []pebble.SharedSSTMeta
var externalSSTs []pebble.ExternalFile
var prevWriteBytes int64

snapshotQ := s.cfg.KVAdmissionController.GetSnapshotQueue(s.StoreID())
// Using a nil pacer is effectively a noop if snapshot control is disabled.
var pacer *admission.SnapshotPacer = nil
if admission.DiskBandwidthForSnapshotIngest.Get(&s.cfg.Settings.SV) {
pacer = admission.NewSnapshotPacer(snapshotQ, s.StoreID())
}
// It is safe to call Close() on a nil pacer.
defer pacer.Close()

for {
timingTag.start("recv")
Expand Down Expand Up @@ -750,6 +776,14 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
for batchReader.Next() {
// TODO(lyang24): maybe avoid decoding engine key twice.
// msstw calls (i.e. PutInternalPointKey) can use the decoded engine key here as input.

writeBytes := msstw.writeBytes - prevWriteBytes
// Calling nil pacer is a noop.
if err := pacer.Pace(ctx, writeBytes, false /* final */); err != nil {
return noSnap, errors.Wrapf(err, "snapshot admission pacer")
}
prevWriteBytes = msstw.writeBytes

ek, err := batchReader.EngineKey()
if err != nil {
return noSnap, err
Expand Down Expand Up @@ -878,6 +912,12 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
if err != nil {
return noSnap, errors.Wrapf(err, "finishing sst for raft snapshot")
}
// Defensive call to account for any discrepancies. The SST sizes should
// have been updated upon closing.
additionalWrites := sstSize - msstw.writeBytes
if err := pacer.Pace(ctx, additionalWrites, true /* final */); err != nil {
return noSnap, errors.Wrapf(err, "snapshot admission pacer")
}
msstw.Close()
timingTag.stop("sst")
log.Eventf(ctx, "all data received from snapshot and all SSTs were finalized")
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/sst_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,13 @@ func (fw *SSTWriter) BufferedSize() int {
return 0
}

// EstimatedSize returns the underlying RawWriter's estimated size. Note that
// this size is an estimate as if the writer were to be closed at the time of
// calling.
func (fw *SSTWriter) EstimatedSize() uint64 {
return fw.fw.Raw().EstimatedSize()
}

// MemObject is an in-memory implementation of objstorage.Writable, intended
// use with SSTWriter.
type MemObject struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/admission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"pacer.go",
"scheduler_latency_listener.go",
"sequencer.go",
"snapshot_queue.go",
"sql_cpu_overload_indicator.go",
"store_token_estimation.go",
"testing_knobs.go",
Expand All @@ -36,6 +37,7 @@ go_library(
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/queue",
"//pkg/util/schedulerlatency",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand All @@ -60,6 +62,7 @@ go_test(
"replicated_write_admission_test.go",
"scheduler_latency_listener_test.go",
"sequencer_test.go",
"snapshot_queue_test.go",
"store_token_estimation_test.go",
"tokens_linear_model_test.go",
"work_queue_test.go",
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ type granterWithIOTokens interface {
) (tokensUsed int64, tokensUsedByElasticWork int64)
// getDiskTokensUsedAndReset returns the disk bandwidth tokens used since the
// last such call.
getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]diskTokens
getDiskTokensUsedAndReset() [admissionpb.NumStoreWorkTypes]diskTokens
// setLinearModels supplies the models to use when storeWriteDone or
// storeReplicatedWorkAdmittedLocked is called, to adjust token consumption.
// Note that these models are not used for token adjustment at admission
Expand Down
31 changes: 31 additions & 0 deletions pkg/util/admission/admissionpb/admissionpb.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,37 @@ const (
NumWorkClasses
)

// StoreWorkType represents the type of work,
type StoreWorkType int8

const (
// RegularStoreWorkType is for type of store-specific work that corresponds to
// RegularWorkClass.
RegularStoreWorkType StoreWorkType = iota
// SnapshotIngestStoreWorkType is for snapshot work type. It is classified as
// ElasticWorkClass, but is prioritized higher than other work of that class.
SnapshotIngestStoreWorkType = 1
// ElasticStoreWorkType is for store-specific work that corresponds to
// ElasticWorkClass, excluding SnapshotIngestStoreWorkType.
ElasticStoreWorkType = 2
// NumStoreWorkTypes is the number of store work types.
NumStoreWorkTypes = 3
)

// WorkClassFromStoreWorkType translates StoreWorkType to a WorkClass
func WorkClassFromStoreWorkType(workType StoreWorkType) WorkClass {
var class WorkClass
switch workType {
case RegularStoreWorkType:
class = RegularWorkClass
case ElasticStoreWorkType:
class = ElasticWorkClass
case SnapshotIngestStoreWorkType:
class = ElasticWorkClass
}
return class
}

// WorkClassFromPri translates a WorkPriority to its given WorkClass.
func WorkClassFromPri(pri WorkPriority) WorkClass {
class := RegularWorkClass
Expand Down
Loading

0 comments on commit 2ac6bcf

Please sign in to comment.