diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go index 5f0bc39fdcdd..4cc2dba12e95 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go @@ -67,11 +67,21 @@ func (p *scanRequestScanner) Scan( maxConcurrentExports := maxConcurrentExportRequests(p.gossip, &p.settings.SV) exportLim := limit.MakeConcurrentRequestLimiter("changefeedExportRequestLimiter", maxConcurrentExports) + + lastScanLimitUserSetting := changefeedbase.ScanRequestLimit.Get(&p.settings.SV) + g := ctxgroup.WithContext(ctx) // atomicFinished is used only to enhance debugging messages. var atomicFinished int64 for _, span := range spans { span := span + + // If the user defined scan request limit has changed, recalculate it + if currentUserScanLimit := changefeedbase.ScanRequestLimit.Get(&p.settings.SV); currentUserScanLimit != lastScanLimitUserSetting { + lastScanLimitUserSetting = currentUserScanLimit + exportLim.SetLimit(maxConcurrentExportRequests(p.gossip, &p.settings.SV)) + } + limAlloc, err := exportLim.Begin(ctx) if err != nil { cancel()