Skip to content

Commit

Permalink
kvcoord: Replace rangefeed catchup semaphore with rate limiter
Browse files Browse the repository at this point in the history
The catchup scan limit was added in #77725 in order to attempt
to restrict a single rangefeed from consuming all catchup
scan slots on the KV side.  This client side limit has
been largely ineffective.

More recently, this semaphore has been coopted in #109346 in order
to pace goroutine creation rate on the client. This functionality
is important, but the implementation is not very good.

Namely, we are interested in controling the rate of new catchup scans
being started by the client (and thus, control the rate of goroutine
creation).  This implementation replaces the old implementation
with rate limit based approach.  The rate limits are configured using
two new settings: `kv.rangefeed.max_catchup_scans` which sets
the maximum burst rate of the number of currently running
catchup scans (default 180), and a
`kv.rangefeed.catchup_scan_duration_estimate` setting which
sets the expected duration of a single catchup scan.
Taken together, these settings by default, will limit
the number of catchup scans to ~60/second.

Closes #110439
Epic: CRDB-26372

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Sep 19, 2023
1 parent d414347 commit 5c38983
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 34 deletions.
2 changes: 2 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ kv.closed_timestamp.lead_for_global_reads_override duration 0s if nonzero, overr
kv.closed_timestamp.side_transport_interval duration 200ms the interval at which the closed timestamp side-transport attempts to advance each range's closed timestamp; set to 0 to disable the side-transport tenant-ro
kv.closed_timestamp.target_duration duration 3s if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration tenant-ro
kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records tenant-ro
kv.rangefeed.catchup_scan_duration_estimate duration 3s an estimate on how long catchup scans take; setting controls the rate with which new catchup scans can start tenant-rw
kv.rangefeed.closed_timestamp_refresh_interval duration 3s the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval tenant-ro
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled tenant-rw
kv.rangefeed.max_catchup_scans integer 180 maximum number of ranges that may run concurrently; 0 implies unlimited tenant-rw
kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.rangefeed.closed_timestamp_refresh_interval takes precedence) tenant-rw
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions tenant-rw
kv.transaction.max_refresh_spans_bytes integer 4194304 maximum number of bytes used to track refresh spans in serializable transactions tenant-rw
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@
<tr><td><div id="setting-kv-range-split-by-load-enabled" class="anchored"><code>kv.range_split.by_load.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>allow automatic splits of ranges based on where load is concentrated</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-range-split-load-cpu-threshold" class="anchored"><code>kv.range_split.load_cpu_threshold</code></div></td><td>duration</td><td><code>500ms</code></td><td>the CPU use per second over which, the range becomes a candidate for load based splitting</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-range-split-load-qps-threshold" class="anchored"><code>kv.range_split.load_qps_threshold</code></div></td><td>integer</td><td><code>2500</code></td><td>the QPS over which, the range becomes a candidate for load based splitting</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-rangefeed-catchup-scan-duration-estimate" class="anchored"><code>kv.rangefeed.catchup_scan_duration_estimate</code></div></td><td>duration</td><td><code>3s</code></td><td>an estimate on how long catchup scans take; setting controls the rate with which new catchup scans can start</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-rangefeed-closed-timestamp-refresh-interval" class="anchored"><code>kv.rangefeed.closed_timestamp_refresh_interval</code></div></td><td>duration</td><td><code>3s</code></td><td>the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-rangefeed-enabled" class="anchored"><code>kv.rangefeed.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-rangefeed-max-catchup-scans" class="anchored"><code>kv.rangefeed.max_catchup_scans</code></div></td><td>integer</td><td><code>180</code></td><td>maximum number of ranges that may run concurrently; 0 implies unlimited</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-rangefeed-range-stuck-threshold" class="anchored"><code>kv.rangefeed.range_stuck_threshold</code></div></td><td>duration</td><td><code>1m0s</code></td><td>restart rangefeeds if they don&#39;t emit anything for the specified threshold; 0 disables (kv.rangefeed.closed_timestamp_refresh_interval takes precedence)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-replica-circuit-breaker-slow-replication-threshold" class="anchored"><code>kv.replica_circuit_breaker.slow_replication_threshold</code></div></td><td>duration</td><td><code>1m0s</code></td><td>duration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers)</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-replica-stats-addsst-request-size-factor" class="anchored"><code>kv.replica_stats.addsst_request_size_factor</code></div></td><td>integer</td><td><code>50000</code></td><td>the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1</td><td>Dedicated/Self-Hosted</td></tr>
Expand Down
1 change: 0 additions & 1 deletion pkg/cmd/roachtest/tests/cdc_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ func makeCDCBenchOptions() (option.StartOpts, install.ClusterSettings) {

// Bump up the number of allowed catchup scans. Doing catchup for 100k ranges with default
// configuration (8 client side, 16 per store) takes a while (~1500-2000 ranges per min minutes).
settings.ClusterSettings["kv.rangefeed.catchup_scan_concurrency"] = "16"
settings.ClusterSettings["kv.rangefeed.concurrent_catchup_iterators"] = "16"

// Give changefeed more memory and slow down rangefeed checkpoints.
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ go_library(
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/pprofutil",
Expand Down
9 changes: 4 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand All @@ -49,7 +48,7 @@ type rangefeedMuxer struct {
metrics *DistSenderRangeFeedMetrics
cfg rangeFeedConfig
registry *rangeFeedRegistry
catchupSem *limit.ConcurrentRequestLimiter
catchupSem *catchupScanRateLimiter
eventCh chan<- RangeFeedMessage

// Each call to start new range feed gets a unique ID which is echoed back
Expand All @@ -71,7 +70,7 @@ func muxRangeFeed(
spans []SpanTimePair,
ds *DistSender,
rr *rangeFeedRegistry,
catchupSem *limit.ConcurrentRequestLimiter,
catchupRateLimiter *catchupScanRateLimiter,
eventCh chan<- RangeFeedMessage,
) (retErr error) {
if log.V(1) {
Expand All @@ -88,7 +87,7 @@ func muxRangeFeed(
ds: ds,
cfg: cfg,
metrics: &ds.metrics.DistSenderRangeFeedMetrics,
catchupSem: catchupSem,
catchupSem: catchupRateLimiter,
eventCh: eventCh,
}
if cfg.knobs.metrics != nil {
Expand Down Expand Up @@ -244,7 +243,7 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error
streamID := atomic.AddInt64(&m.seqID, 1)

// Before starting single rangefeed, acquire catchup scan quota.
if err := s.acquireCatchupScanQuota(ctx, &m.ds.st.SV, m.catchupSem, m.metrics); err != nil {
if err := s.acquireCatchupScanQuota(ctx, m.catchupSem, m.metrics); err != nil {
return err
}

Expand Down
85 changes: 59 additions & 26 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -64,12 +64,22 @@ var useDedicatedRangefeedConnectionClass = settings.RegisterBoolSetting(
"kv.rangefeed.use_dedicated_connection_class.enabled", false),
)

var catchupScanConcurrency = settings.RegisterIntSetting(
var catchupScanDurationEstimate = settings.RegisterDurationSetting(
settings.TenantWritable,
"kv.rangefeed.catchup_scan_concurrency",
"number of catchup scans that a single rangefeed can execute concurrently; 0 implies unlimited",
8,
"kv.rangefeed.catchup_scan_duration_estimate",
"an estimate on how long catchup scans take; setting controls the rate with which new catchup scans can start",
3*time.Second,
settings.NonNegativeDuration,
settings.WithPublic,
)

var catchupScanBurst = settings.RegisterIntSetting(
settings.TenantWritable,
"kv.rangefeed.max_catchup_scans",
"maximum number of ranges that may run concurrently; 0 implies unlimited",
180, // 180 ranges over 3 seconds window: 60 ranges/sec; 12k ranges in 200 seconds.
settings.NonNegativeInt,
settings.WithPublic,
)

var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting(
Expand All @@ -80,14 +90,6 @@ var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting(
settings.NonNegativeDuration,
settings.WithPublic)

func maxConcurrentCatchupScans(sv *settings.Values) int {
l := catchupScanConcurrency.Get(sv)
if l == 0 {
return math.MaxInt
}
return int(l)
}

// ForEachRangeFn is used to execute `fn` over each range in a rangefeed.
type ForEachRangeFn func(fn ActiveRangeFeedIterFn) error

Expand Down Expand Up @@ -226,12 +228,11 @@ func (ds *DistSender) RangeFeedSpans(
cfg.rangeObserver(rr.ForEachPartialRangefeed)
}

catchupSem := limit.MakeConcurrentRequestLimiter(
"distSenderCatchupLimit", maxConcurrentCatchupScans(&ds.st.SV))
rl := newCatchupScanRateLimiter(&ds.st.SV)

if ds.st.Version.IsActive(ctx, clusterversion.TODODelete_V22_2RangefeedUseOneStreamPerNode) &&
enableMuxRangeFeed && cfg.useMuxRangeFeed {
return muxRangeFeed(ctx, cfg, spans, ds, rr, &catchupSem, eventCh)
return muxRangeFeed(ctx, cfg, spans, ds, rr, rl, eventCh)
}

// Goroutine that processes subdivided ranges and creates a rangefeed for
Expand All @@ -256,7 +257,7 @@ func (ds *DistSender) RangeFeedSpans(
}
// Prior to spawning goroutine to process this feed, acquire catchup scan quota.
// This quota acquisition paces the rate of new goroutine creation.
if err := active.acquireCatchupScanQuota(ctx, &ds.st.SV, &catchupSem, metrics); err != nil {
if err := active.acquireCatchupScanQuota(ctx, rl, metrics); err != nil {
return err
}
if log.V(1) {
Expand Down Expand Up @@ -700,23 +701,19 @@ func (a catchupAlloc) Release() {
}

func (a *activeRangeFeed) acquireCatchupScanQuota(
ctx context.Context,
sv *settings.Values,
catchupSem *limit.ConcurrentRequestLimiter,
metrics *DistSenderRangeFeedMetrics,
ctx context.Context, rl *catchupScanRateLimiter, metrics *DistSenderRangeFeedMetrics,
) error {
// Indicate catchup scan is starting; Before potentially blocking on a semaphore, take
// opportunity to update semaphore limit.
catchupSem.SetLimit(maxConcurrentCatchupScans(sv))
res, err := catchupSem.Begin(ctx)
// Indicate catchup scan is starting.
alloc, err := rl.Acquire(ctx)
if err != nil {
return err
}
metrics.RangefeedCatchupRanges.Inc(1)
a.catchupRes = func() {
metrics.RangefeedCatchupRanges.Dec(1)
res.Release()
alloc.Return()
}

a.Lock()
defer a.Unlock()
a.InCatchup = true
Expand Down Expand Up @@ -989,3 +986,39 @@ func TestingWithMuxRangeFeedRequestSenderCapture(

// TestingMakeRangeFeedMetrics exposes makeDistSenderRangeFeedMetrics for test use.
var TestingMakeRangeFeedMetrics = makeDistSenderRangeFeedMetrics

type catchupScanRateLimiter struct {
*quotapool.RateLimiter
sv *settings.Values
limit quotapool.Limit
burst int64
}

func newCatchupScanRateLimiter(sv *settings.Values) *catchupScanRateLimiter {
rl := &catchupScanRateLimiter{sv: sv}
rl.limit, rl.burst = getCatchupScanLimits(rl.sv)
rl.RateLimiter = quotapool.NewRateLimiter("distSenderCatchupLimit", rl.limit, rl.burst)
return rl
}

func getCatchupScanLimits(sv *settings.Values) (quotapool.Limit, int64) {
lim := quotapool.Inf()
burst := catchupScanBurst.Get(sv)
if burst > 0 {
if window := catchupScanDurationEstimate.Get(sv) / time.Second; window > 0 {
lim = quotapool.Limit(burst) / quotapool.Limit(window)
}
}
return lim, burst
}

// Acquire acquires catchup scan quota.
func (rl *catchupScanRateLimiter) Acquire(ctx context.Context) (*quotapool.RateAlloc, error) {
// Take opportunity to update limits if they have changed.
if lim, burst := getCatchupScanLimits(rl.sv); lim != rl.limit || burst != rl.burst {
rl.limit, rl.burst = lim, burst
rl.RateLimiter.UpdateLimit(lim, burst)
}

return rl.RateLimiter.Acquire(ctx, 1)
}
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) {
// Initial setup: only single catchup scan allowed.
sqlDB.ExecMultiple(t,
`SET CLUSTER SETTING kv.rangefeed.enabled = true`,
`SET CLUSTER SETTING kv.rangefeed.catchup_scan_concurrency = 1`,
`SET CLUSTER SETTING kv.rangefeed.max_catchup_scans = 1`,
`ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`,
`CREATE TABLE foo (key INT PRIMARY KEY)`,
`INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`,
Expand Down
1 change: 1 addition & 0 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ var retiredSettings = map[InternalKey]struct{}{
"jobs.trace.force_dump_mode": {},
"timeseries.storage.30m_resolution_ttl": {},
"server.cpu_profile.enabled": {},
"kv.rangefeed.catchup_scan_concurrency": {},
}

// sqlDefaultSettings is the list of "grandfathered" existing sql.defaults
Expand Down

0 comments on commit 5c38983

Please sign in to comment.