Skip to content

Commit

Permalink
kv,rangefeed: integrate catchup scans with elastic cpu
Browse files Browse the repository at this point in the history
Part of cockroachdb#65957.

Changefeed backfills, given their scan-heavy nature, can be fairly
CPU-intensive. In cockroachdb#89656 we introduced a roachtest demonstrating the
latency impact backfills can have on a moderately CPU-saturated cluster.
Similar to what we saw for backups, this CPU heavy nature can elevate Go
scheduling latencies which in turn translates to foreground latency
impact. This commit integrates rangefeed catchup scan with the elastic
CPU limiter we introduced in cockroachdb#86638; this is one of two optional halves
of changefeed backfills. The second half is the initial scan -- scan
requests issued over some keyspan as of some timestamp. For that we
simply rely on the existing slots mechanism but now setting a lower
priority bit (BulkNormalPri) -- cockroachdb#88733. Experimentally we observed that
during initial scans the encoding routines in changefeedccl are the most
impactful CPU-wise, something cockroachdb#89589 can help with. We leave admission
integration of parallel worker goroutines to future work (cockroachdb#90089).

Unlike export requests rangefeed catchup scans are non-premptible. The
rangefeed RPC is a streaming one, and the catchup scan is done during
stream setup. So we don't have resumption tokens to propagate up to the
caller like we did for backups. We still want CPU-bound work going
through admission control to only use 100ms of CPU time, to exercise
better control over scheduling latencies. To that end, we introduce the
following component used within the rangefeed catchup iterator.

    // Pacer is used in tight loops (CPU-bound) for non-premptible
    // elastic work. Callers are expected to invoke Pace() every loop
    // iteration and Close() once done. Internally this type integrates
    // with elastic CPU work queue, acquiring tokens for the CPU work
    // being done, and blocking if tokens are unavailable. This allows
    // for a form of cooperative scheduling with elastic CPU granters.
    type Pacer struct
    func (p *Pacer) Pace(ctx context.Context) error { ... }
    func (p *Pacer) Close() { ... }

Release note: None
  • Loading branch information
irfansharif committed Oct 28, 2022
1 parent bd10129 commit be500e0
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 33 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ var FrontierCheckpointMaxBytes = settings.RegisterByteSizeSetting(
settings.TenantWritable,
"changefeed.frontier_checkpoint_max_bytes",
"controls the maximum size of the checkpoint as a total size of key bytes",
1<<20,
1<<20, // 1 MiB
)

// ScanRequestLimit is the number of Scan requests that can run at once.
Expand All @@ -126,7 +126,7 @@ var ScanRequestSize = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.backfill.scan_request_size",
"the maximum number of bytes returned by each scan request",
16<<20,
16<<20, // 16 MiB
)

// SinkThrottleConfig describes throttling configuration for the sink.
Expand Down
1 change: 0 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ const (
// V22_2SupportAssumeRoleAuth is the version where assume role authorization is
// supported in cloud storage and KMS.
V22_2SupportAssumeRoleAuth

// V22_2FixUserfileRelatedDescriptorCorruption adds a migration which uses
// heuristics to identify invalid table descriptors for userfile-related
// descriptors.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {
defer cancel()
stream1 := &dummyStream{t: t, ctx: ctx, name: "rangefeed1", recv: make(chan *roachpb.RangeFeedEvent)}
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream1", func(ctx context.Context) {
err := tc.repls[0].RangeFeed(args, stream1).GoError()
err := tc.repls[0].RangeFeed(ctx, args, stream1).GoError()
if ctx.Err() != nil {
return // main goroutine stopping
}
Expand Down Expand Up @@ -496,7 +496,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {
// the breaker.
stream2 := &dummyStream{t: t, ctx: ctx, name: "rangefeed2", recv: make(chan *roachpb.RangeFeedEvent)}
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream2", func(ctx context.Context) {
err := tc.repls[0].RangeFeed(args, stream2).GoError()
err := tc.repls[0].RangeFeed(ctx, args, stream2).GoError()
if ctx.Err() != nil {
return // main goroutine stopping
}
Expand Down
119 changes: 113 additions & 6 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,30 +51,52 @@ var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting(
},
)

// elasticCPUDurationPerRangefeedScanUnit controls how many CPU tokens are
// allotted for each unit of work during rangefeed catchup scans.
var elasticCPUDurationPerRangefeedScanUnit = settings.RegisterDurationSetting(
settings.SystemOnly,
"kvadmission.elastic_cpu.duration_per_rangefeed_scan_unit",
"controls how many CPU tokens are allotted for each unit of work during rangefeed catchup scans",
admission.MaxElasticCPUDuration,
func(duration time.Duration) error {
if duration < admission.MinElasticCPUDuration {
return fmt.Errorf("minimum CPU duration allowed is %s, got %s",
admission.MinElasticCPUDuration, duration)
}
if duration > admission.MaxElasticCPUDuration {
return fmt.Errorf("maximum CPU duration allowed is %s, got %s",
admission.MaxElasticCPUDuration, duration)
}
return nil
},
)

// Controller provides admission control for the KV layer.
type Controller interface {
// AdmitKVWork must be called before performing KV work.
// BatchRequest.AdmissionHeader and BatchRequest.Replica.StoreID must be
// populated for admission to work correctly. If err is non-nil, the
// returned handle can be ignored. If err is nil, AdmittedKVWorkDone must be
// called after the KV work is done executing.
AdmitKVWork(
ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest,
) (Handle, error)
AdmitKVWork(context.Context, roachpb.TenantID, *roachpb.BatchRequest) (Handle, error)
// AdmittedKVWorkDone is called after the admitted KV work is done
// executing.
AdmittedKVWorkDone(Handle, *StoreWriteBytes)
// AdmitRangefeedRequest must be called before serving rangefeed requests.
// It returns a Pacer that's used within rangefeed catchup scans (typically
// CPU-intensive and affects scheduling latencies negatively).
AdmitRangefeedRequest(roachpb.TenantID, *roachpb.RangeFeedRequest) *Pacer
// SetTenantWeightProvider is used to set the provider that will be
// periodically polled for weights. The stopper should be used to terminate
// the periodic polling.
SetTenantWeightProvider(provider TenantWeightProvider, stopper *stop.Stopper)
SetTenantWeightProvider(TenantWeightProvider, *stop.Stopper)
// SnapshotIngested informs admission control about a range snapshot
// ingestion.
SnapshotIngested(storeID roachpb.StoreID, ingestStats pebble.IngestOperationStats)
SnapshotIngested(roachpb.StoreID, pebble.IngestOperationStats)
// FollowerStoreWriteBytes informs admission control about writes
// replicated to a raft follower, that have not been subject to admission
// control.
FollowerStoreWriteBytes(storeID roachpb.StoreID, followerWriteBytes FollowerStoreWriteBytes)
FollowerStoreWriteBytes(roachpb.StoreID, FollowerStoreWriteBytes)
}

// TenantWeightProvider can be periodically asked to provide the tenant
Expand Down Expand Up @@ -266,6 +288,27 @@ func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteByt
}
}

// AdmitRangefeedRequest implements the Controller interface.
func (n *controllerImpl) AdmitRangefeedRequest(
tenantID roachpb.TenantID, request *roachpb.RangeFeedRequest,
) *Pacer {
// TODO(irfansharif): We need to version gate/be defensive when integrating
// rangefeeds since admission headers will not be fully set on older version
// nodes. See EnableRangefeedElasticCPUControl in cockroach_versions.go.
// Consider a cluster setting too.

return &Pacer{
unit: elasticCPUDurationPerRangefeedScanUnit.Get(&n.settings.SV),
wi: admission.WorkInfo{
TenantID: tenantID,
Priority: admissionpb.WorkPriority(request.AdmissionHeader.Priority),
CreateTime: request.AdmissionHeader.CreateTime,
BypassAdmission: false,
},
wq: n.elasticCPUWorkQueue,
}
}

// SetTenantWeightProvider implements the Controller interface.
func (n *controllerImpl) SetTenantWeightProvider(
provider TenantWeightProvider, stopper *stop.Stopper,
Expand Down Expand Up @@ -382,3 +425,67 @@ func (wb *StoreWriteBytes) Release() {
}
storeWriteBytesPool.Put(wb)
}

// Pacer is used in tight loops (CPU-bound) for non-premptible elastic work.
// Callers are expected to invoke Pace() every loop iteration and Close() once
// done. Internally this type integrates with elastic CPU work queue, acquiring
// tokens for the CPU work being done, and blocking if tokens are unavailable.
// This allows for a form of cooperative scheduling with elastic CPU granters.
type Pacer struct {
unit time.Duration
wi admission.WorkInfo
wq *admission.ElasticCPUWorkQueue

cur *admission.ElasticCPUWorkHandle
}

// Pace is part of the Pacer interface.
func (p *Pacer) Pace(ctx context.Context) error {
if p == nil {
return nil
}

if overLimit, _ := p.cur.OverLimit(); overLimit {
p.wq.AdmittedWorkDone(p.cur)
p.cur = nil
}

if p.cur == nil {
handle, err := p.wq.Admit(ctx, p.unit, p.wi)
if err != nil {
return err
}
p.cur = handle
}
return nil
}

// Close is part of the Pacer interface.
func (p *Pacer) Close() {
if p == nil || p.cur == nil {
return
}

p.wq.AdmittedWorkDone(p.cur)
p.cur = nil
}

type pacerKey struct{}

// ContextWithPacer returns a Context wrapping the supplied Pacer, if any.
func ContextWithPacer(ctx context.Context, h *Pacer) context.Context {
if h == nil {
return ctx
}
return context.WithValue(ctx, pacerKey{}, h)
}

// PacerFromContext returns the Pacer contained in the Context, if any.
func PacerFromContext(ctx context.Context) *Pacer {
val := ctx.Value(pacerKey{})
h, ok := val.(*Pacer)
if !ok {
return nil
}
return h
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv/kvserver/kvadmission",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/storage",
Expand Down
19 changes: 17 additions & 2 deletions pkg/kv/kvserver/rangefeed/catchup_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ package rangefeed

import (
"bytes"
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -62,6 +64,7 @@ type CatchUpIterator struct {
close func()
span roachpb.Span
startTime hlc.Timestamp // exclusive
pacer *kvadmission.Pacer
}

// NewCatchUpIterator returns a CatchUpIterator for the given Reader over the
Expand All @@ -70,7 +73,11 @@ type CatchUpIterator struct {
// NB: startTime is exclusive, i.e. the first possible event will be emitted at
// Timestamp.Next().
func NewCatchUpIterator(
reader storage.Reader, span roachpb.Span, startTime hlc.Timestamp, closer func(),
reader storage.Reader,
span roachpb.Span,
startTime hlc.Timestamp,
closer func(),
pacer *kvadmission.Pacer,
) *CatchUpIterator {
return &CatchUpIterator{
simpleCatchupIter: storage.NewMVCCIncrementalIterator(reader,
Expand All @@ -89,13 +96,15 @@ func NewCatchUpIterator(
close: closer,
span: span,
startTime: startTime,
pacer: pacer,
}
}

// Close closes the iterator and calls the instantiator-supplied close
// callback.
func (i *CatchUpIterator) Close() {
i.simpleCatchupIter.Close()
i.pacer.Close()
if i.close != nil {
i.close()
}
Expand All @@ -117,7 +126,9 @@ type outputEventFn func(e *roachpb.RangeFeedEvent) error
// For example, with MVCC range tombstones [a-f)@5 and [a-f)@3 overlapping point
// keys a@6, a@4, and b@2, the emitted order is [a-f)@3,[a-f)@5,a@4,a@6,b@2 because
// the start key "a" is ordered before all of the timestamped point keys.
func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) error {
func (i *CatchUpIterator) CatchUpScan(
ctx context.Context, outputFn outputEventFn, withDiff bool,
) error {
var a bufalloc.ByteAllocator
// MVCCIterator will encounter historical values for each key in
// reverse-chronological order. To output in chronological order, store
Expand Down Expand Up @@ -319,6 +330,10 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err
i.Next()
}
}

if err := i.pacer.Pace(ctx); err != nil {
return errors.Wrap(err, "automatic pacing: %v")
}
}

// Output events for the last key encountered.
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ func runCatchUpBenchmark(b *testing.B, emk engineMaker, opts benchOptions) (numE
EndKey: endKey,
}

ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
func() {
iter := rangefeed.NewCatchUpIterator(eng, span, opts.ts, nil)
iter := rangefeed.NewCatchUpIterator(eng, span, opts.ts, nil, nil)
defer iter.Close()
counter := 0
err := iter.CatchUpScan(func(*roachpb.RangeFeedEvent) error {
err := iter.CatchUpScan(ctx, func(*roachpb.RangeFeedEvent) error {
counter++
return nil
}, opts.withDiff)
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/rangefeed/catchup_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ func TestCatchupScan(t *testing.T) {
}
testutils.RunTrueAndFalse(t, "withDiff", func(t *testing.T, withDiff bool) {
span := roachpb.Span{Key: testKey1, EndKey: roachpb.KeyMax}
iter := NewCatchUpIterator(eng, span, ts1, nil)
iter := NewCatchUpIterator(eng, span, ts1, nil, nil)
defer iter.Close()
var events []roachpb.RangeFeedValue
// ts1 here is exclusive, so we do not want the versions at ts1.
require.NoError(t, iter.CatchUpScan(func(e *roachpb.RangeFeedEvent) error {
require.NoError(t, iter.CatchUpScan(ctx, func(e *roachpb.RangeFeedEvent) error {
events = append(events, *e.Val)
return nil
}, withDiff))
Expand Down Expand Up @@ -154,10 +154,10 @@ func TestCatchupScanInlineError(t *testing.T) {

// Run a catchup scan across the span and watch it error.
span := roachpb.Span{Key: keys.LocalMax, EndKey: keys.MaxKey}
iter := NewCatchUpIterator(eng, span, hlc.Timestamp{}, nil)
iter := NewCatchUpIterator(eng, span, hlc.Timestamp{}, nil, nil)
defer iter.Close()

err := iter.CatchUpScan(nil, false)
err := iter.CatchUpScan(ctx, nil, false)
require.Error(t, err)
require.Contains(t, err.Error(), "unexpected inline value")
}
Expand Down Expand Up @@ -194,11 +194,11 @@ func TestCatchupScanSeesOldIntent(t *testing.T) {

// Run a catchup scan across the span and watch it succeed.
span := roachpb.Span{Key: keys.LocalMax, EndKey: keys.MaxKey}
iter := NewCatchUpIterator(eng, span, tsCutoff, nil)
iter := NewCatchUpIterator(eng, span, tsCutoff, nil, nil)
defer iter.Close()

keys := map[string]struct{}{}
require.NoError(t, iter.CatchUpScan(func(e *roachpb.RangeFeedEvent) error {
require.NoError(t, iter.CatchUpScan(ctx, func(e *roachpb.RangeFeedEvent) error {
keys[string(e.Val.Key)] = struct{}{}
return nil
}, true /* withDiff */))
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (r *registration) disconnect(pErr *roachpb.Error) {
// have been emitted.
func (r *registration) outputLoop(ctx context.Context) error {
// If the registration has a catch-up scan, run it.
if err := r.maybeRunCatchUpScan(); err != nil {
if err := r.maybeRunCatchUpScan(ctx); err != nil {
err = errors.Wrap(err, "catch-up scan failed")
log.Errorf(ctx, "%v", err)
return err
Expand Down Expand Up @@ -372,7 +372,7 @@ func (r *registration) drainAllocations(ctx context.Context) {
//
// If the registration does not have a catchUpIteratorConstructor, this method
// is a no-op.
func (r *registration) maybeRunCatchUpScan() error {
func (r *registration) maybeRunCatchUpScan(ctx context.Context) error {
catchUpIter := r.detachCatchUpIter()
if catchUpIter == nil {
return nil
Expand All @@ -383,7 +383,7 @@ func (r *registration) maybeRunCatchUpScan() error {
r.metrics.RangeFeedCatchUpScanNanos.Inc(timeutil.Since(start).Nanoseconds())
}()

return catchUpIter.CatchUpScan(r.stream.Send, r.withDiff)
return catchUpIter.CatchUpScan(ctx, r.stream.Send, r.withDiff)
}

// ID implements interval.Interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func TestRegistrationCatchUpScan(t *testing.T) {
}, hlc.Timestamp{WallTime: 4}, iter, true /* withDiff */)

require.Zero(t, r.metrics.RangeFeedCatchUpScanNanos.Count())
require.NoError(t, r.maybeRunCatchUpScan())
require.NoError(t, r.maybeRunCatchUpScan(context.Background()))
require.True(t, iter.closed)
require.NotZero(t, r.metrics.RangeFeedCatchUpScanNanos.Count())

Expand Down
Loading

0 comments on commit be500e0

Please sign in to comment.