Skip to content

Commit

Permalink
changefeedccl: Break gossip dependency
Browse files Browse the repository at this point in the history
Break the dependency on Gossip library which was
used to determine the number of nodes in the cluster
in order to limit scanner concurrency.
Instead, rely on the range descriptors in order
to determine how many nodes host the ranges that
need to be scanned.

Fixes #47971

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Apr 10, 2023
1 parent b36c119 commit ad7d868
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 36 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,8 @@ type distResolver struct {
func (r *distResolver) getRangesForSpans(
ctx context.Context, spans []roachpb.Span,
) ([]roachpb.Span, error) {
return kvfeed.AllRangeSpans(ctx, r.DistSender, spans)
spans, _, err := kvfeed.AllRangeSpans(ctx, r.DistSender, spans)
return spans, err
}

func rebalanceSpanPartitions(
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/sql/covering",
"//pkg/storage/enginepb",
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
Expand Down
61 changes: 26 additions & 35 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg

sender := p.db.NonTransactionalSender()
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
spans, err := getSpansToProcess(ctx, distSender, cfg.Spans)
spans, numNodesHint, err := getRangesToProcess(ctx, distSender, cfg.Spans)
if err != nil {
return err
}
Expand All @@ -81,7 +82,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg
defer backfillClear()
}

maxConcurrentScans := maxConcurrentScanRequests(p.gossip, &p.settings.SV)
maxConcurrentScans := maxConcurrentScanRequests(numNodesHint, &p.settings.SV)
exportLim := limit.MakeConcurrentRequestLimiter("changefeedScanRequestLimiter", maxConcurrentScans)

lastScanLimitUserSetting := changefeedbase.ScanRequestLimit.Get(&p.settings.SV)
Expand All @@ -95,7 +96,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg
// If the user defined scan request limit has changed, recalculate it
if currentUserScanLimit := changefeedbase.ScanRequestLimit.Get(&p.settings.SV); currentUserScanLimit != lastScanLimitUserSetting {
lastScanLimitUserSetting = currentUserScanLimit
exportLim.SetLimit(maxConcurrentScanRequests(p.gossip, &p.settings.SV))
exportLim.SetLimit(maxConcurrentScanRequests(numNodesHint, &p.settings.SV))
}

limAlloc, err := exportLim.Begin(ctx)
Expand Down Expand Up @@ -249,12 +250,14 @@ func (p *scanRequestScanner) exportSpan(
return nil
}

func getSpansToProcess(
// getRangesToProcess returns the list of ranges covering input list of spans.
// Returns the number of nodes that are leaseholders for those spans.
func getRangesToProcess(
ctx context.Context, ds *kvcoord.DistSender, targetSpans []roachpb.Span,
) ([]roachpb.Span, error) {
ranges, err := AllRangeSpans(ctx, ds, targetSpans)
) ([]roachpb.Span, int, error) {
ranges, numNodes, err := AllRangeSpans(ctx, ds, targetSpans)
if err != nil {
return nil, err
return nil, 0, err
}

type spanMarker struct{}
Expand Down Expand Up @@ -289,7 +292,7 @@ func getSpansToProcess(
}
requests = append(requests, roachpb.Span{Key: chunk.Start, EndKey: chunk.End})
}
return requests, nil
return requests, numNodes, nil
}

// slurpScanResponse iterates the ScanResponse and inserts the contained kvs into
Expand Down Expand Up @@ -319,66 +322,54 @@ func slurpScanResponse(
return nil
}

// AllRangeSpans returns the list of all ranges that for the specified list of spans.
// AllRangeSpans returns the list of all ranges that cover input spans along with the
// nodeCountHint indicating the number of nodes that host those ranges.
func AllRangeSpans(
ctx context.Context, ds *kvcoord.DistSender, spans []roachpb.Span,
) ([]roachpb.Span, error) {

) (_ []roachpb.Span, nodeCountHint int, _ error) {
ranges := make([]roachpb.Span, 0, len(spans))

it := kvcoord.MakeRangeIterator(ds)
var replicas util.FastIntMap

for i := range spans {
rSpan, err := keys.SpanAddr(spans[i])
if err != nil {
return nil, err
return nil, 0, err
}
for it.Seek(ctx, rSpan.Key, kvcoord.Ascending); ; it.Next(ctx) {
if !it.Valid() {
return nil, it.Error()
return nil, 0, it.Error()
}
ranges = append(ranges, roachpb.Span{
Key: it.Desc().StartKey.AsRawKey(), EndKey: it.Desc().EndKey.AsRawKey(),
})
for _, r := range it.Desc().InternalReplicas {
replicas.Set(int(r.NodeID), 0)
}
if !it.NeedAnother(rSpan) {
break
}
}
}

return ranges, nil
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(gw gossip.OptionalGossip) int {
g, err := gw.OptionalErr(47971)
if err != nil {
// can't count nodes in tenants
return 1
}
var nodes int
_ = g.IterateInfos(gossip.KeyNodeDescPrefix, func(_ string, _ gossip.Info) error {
nodes++
return nil
})
return nodes
return ranges, replicas.Len(), nil
}

// maxConcurrentScanRequests returns the number of concurrent scan requests.
func maxConcurrentScanRequests(gw gossip.OptionalGossip, sv *settings.Values) int {
func maxConcurrentScanRequests(numNodesHint int, sv *settings.Values) int {
// If the user specified ScanRequestLimit -- use that value.
if max := changefeedbase.ScanRequestLimit.Get(sv); max > 0 {
return int(max)
}
if numNodesHint < 1 {
return 1
}

// TODO(yevgeniy): Currently, issuing multiple concurrent updates scaled to the size of
// the cluster only make sense for the core change feeds. This configuration shoould
// be specified explicitly when creating scanner.
nodes := clusterNodeCount(gw)
// This is all hand-wavy: 3 per node used to be the default for a very long time.
// However, this could get out of hand if the clusters are large.
// So cap the max to an arbitrary value of a 100.
max := 3 * nodes
max := 3 * numNodesHint
if max > 100 {
max = 100
}
Expand Down

0 comments on commit ad7d868

Please sign in to comment.