Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvcoord: Reintroduce catchup scan semaphore for regular rangefeed #113966

Merged
merged 1 commit into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ go_library(
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/pprofutil",
Expand Down
69 changes: 63 additions & 6 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
Expand Down Expand Up @@ -72,6 +73,14 @@ var catchupStartupRate = settings.RegisterIntSetting(
settings.WithPublic,
)

var catchupScanConcurrency = settings.RegisterIntSetting(
settings.ApplicationLevel,
"kv.rangefeed.catchup_scan_concurrency",
"number of catchup scans that a single rangefeed can execute concurrently; 0 implies unlimited",
8,
settings.NonNegativeInt,
)

var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"kv.rangefeed.range_stuck_threshold",
Expand Down Expand Up @@ -220,7 +229,7 @@ func (ds *DistSender) RangeFeedSpans(
cfg.rangeObserver(rr.ForEachPartialRangefeed)
}

rl := newCatchupScanRateLimiter(&ds.st.SV)
rl := newCatchupScanRateLimiter(&ds.st.SV, cfg.useMuxRangeFeed)

if enableMuxRangeFeed && cfg.useMuxRangeFeed {
return muxRangeFeed(ctx, cfg, spans, ds, rr, rl, eventCh)
Expand Down Expand Up @@ -694,11 +703,13 @@ func (a *activeRangeFeed) acquireCatchupScanQuota(
ctx context.Context, rl *catchupScanRateLimiter, metrics *DistSenderRangeFeedMetrics,
) error {
// Indicate catchup scan is starting.
if err := rl.Pace(ctx); err != nil {
alloc, err := rl.Pace(ctx)
if err != nil {
return err
}
metrics.RangefeedCatchupRanges.Inc(1)
a.catchupRes = func() {
alloc.Release()
metrics.RangefeedCatchupRanges.Dec(1)
}

Expand Down Expand Up @@ -987,18 +998,48 @@ type catchupScanRateLimiter struct {
pacer *quotapool.RateLimiter
sv *settings.Values
limit quotapool.Limit

// In addition to rate limiting catchup scans, a semaphore is used to restrict
// catchup scan concurrency for regular range feeds (catchupSem is nil for mux
// rangefeed).
// This additional limit is necessary due to the fact that regular
// rangefeed may buffer up to 2MB of data (or 128KB if
// useDedicatedRangefeedConnectionClass set to true) per rangefeed stream in the
// http2/gRPC buffers -- making OOMs likely if the consumer does not consume
// events quickly enough. See
// https://github.com/cockroachdb/cockroach/issues/74219 for details.
// TODO(yevgeniy): Drop this once regular rangefeed gets deprecated.
catchupSemLimit int
catchupSem *limit.ConcurrentRequestLimiter
}

func newCatchupScanRateLimiter(sv *settings.Values) *catchupScanRateLimiter {
func newCatchupScanRateLimiter(sv *settings.Values, useMuxRangeFeed bool) *catchupScanRateLimiter {
const slowAcquisitionThreshold = 5 * time.Second
lim := getCatchupRateLimit(sv)
return &catchupScanRateLimiter{

rl := &catchupScanRateLimiter{
sv: sv,
limit: lim,
pacer: quotapool.NewRateLimiter(
"distSenderCatchupLimit", lim, 0, /* smooth rate limit without burst */
quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowCatchupScanAcquisition(slowAcquisitionThreshold))),
}

if !useMuxRangeFeed {
rl.catchupSemLimit = maxConcurrentCatchupScans(sv)
l := limit.MakeConcurrentRequestLimiter("distSenderCatchupLimit", rl.catchupSemLimit)
rl.catchupSem = &l
}

return rl
}

func maxConcurrentCatchupScans(sv *settings.Values) int {
l := catchupScanConcurrency.Get(sv)
if l == 0 {
return math.MaxInt
}
return int(l)
}

func getCatchupRateLimit(sv *settings.Values) quotapool.Limit {
Expand All @@ -1009,15 +1050,31 @@ func getCatchupRateLimit(sv *settings.Values) quotapool.Limit {
}

// Pace paces the catchup scan startup.
func (rl *catchupScanRateLimiter) Pace(ctx context.Context) error {
func (rl *catchupScanRateLimiter) Pace(ctx context.Context) (limit.Reservation, error) {
// Take opportunity to update limits if they have changed.
if lim := getCatchupRateLimit(rl.sv); lim != rl.limit {
rl.limit = lim
rl.pacer.UpdateLimit(lim, 0 /* smooth rate limit without burst */)
}
return rl.pacer.WaitN(ctx, 1)

if err := rl.pacer.WaitN(ctx, 1); err != nil {
return nil, err
}

// Regular rangefeed, in addition to pacing also acquires catchup scan quota.
if rl.catchupSem != nil {
// Take opportunity to update limits if they have changed.
if lim := maxConcurrentCatchupScans(rl.sv); lim != rl.catchupSemLimit {
rl.catchupSem.SetLimit(lim)
}
return rl.catchupSem.Begin(ctx)
}

return catchupAlloc(releaseNothing), nil
}

func releaseNothing() {}

// logSlowCatchupScanAcquisition is a function returning a quotapool.SlowAcquisitionFunction.
// It differs from the quotapool.LogSlowAcquisition in that only some of slow acquisition
// events are logged to reduce log spam.
Expand Down
1 change: 0 additions & 1 deletion pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ var retiredSettings = map[InternalKey]struct{}{
"jobs.trace.force_dump_mode": {},
"timeseries.storage.30m_resolution_ttl": {},
"server.cpu_profile.enabled": {},
"kv.rangefeed.catchup_scan_concurrency": {},
"changefeed.lagging_ranges_threshold": {},
"changefeed.lagging_ranges_polling_rate": {},
"trace.jaeger.agent": {},
Expand Down