Skip to content

Commit

Permalink
Merge #114525
Browse files Browse the repository at this point in the history
114525: streamingccl: prevent node lag replanning starvation r=stevendanna a=msbutler

This patch prevents the lastNodeLagCheck time from updating every time the frontier processor receives a checkpoint, which can happen every few seconds. This previously prevented the node lag replanning check to trigger because this time needed to be older than 10 minutes. Rather, this timestamp should only update if we actually compute the lag check.

Fixes #114341

Release note: none

Co-authored-by: Michael Butler <butler@cockroachlabs.com>
  • Loading branch information
craig[bot] and msbutler committed Nov 16, 2023
2 parents 64384f6 + b61cbb9 commit c748278
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
5 changes: 4 additions & 1 deletion pkg/ccl/streamingccl/streamingest/node_lag_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
package streamingest

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

Expand All @@ -22,12 +24,13 @@ var ErrNodeLagging = errors.New("node frontier too far behind other nodes")
// more than maxAllowable lag behind any other destination node. This function
// assumes that all nodes have finished their initial scan (i.e. have a nonzero hwm).
func checkLaggingNodes(
executionDetails []frontierExecutionDetails, maxAllowableLag time.Duration,
ctx context.Context, executionDetails []frontierExecutionDetails, maxAllowableLag time.Duration,
) error {
if maxAllowableLag == 0 {
return nil
}
laggingNode, minLagDifference := computeMinLagDifference(executionDetails)
log.VEventf(ctx, 2, "computed min lag diff: %d lagging node, difference %.2f", laggingNode, minLagDifference.Minutes())
if maxAllowableLag < minLagDifference {
return errors.Wrapf(ErrNodeLagging, "node %d is %.2f minutes behind the next node. Try replanning", laggingNode, minLagDifference.Minutes())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,26 +535,26 @@ func (sf *streamIngestionFrontier) maybePersistFrontierEntries() error {
}

func (sf *streamIngestionFrontier) maybeCheckForLaggingNodes() error {
defer func() {
sf.lastNodeLagCheck = timeutil.Now()
}()
ctx := sf.Ctx()
checkFreq := streamingccl.ReplanFrequency.Get(&sf.FlowCtx.Cfg.Settings.SV)
maxLag := streamingccl.InterNodeLag.Get(&sf.FlowCtx.Cfg.Settings.SV)
if checkFreq == 0 || maxLag == 0 || timeutil.Since(sf.lastNodeLagCheck) < checkFreq {
log.VEventf(ctx, 2, "skipping lag replanning check: maxLag %d; checkFreq %.2f; last node check %s; time since last check %.2f",
maxLag, checkFreq.Minutes(), sf.lastNodeLagCheck, timeutil.Since(sf.lastNodeLagCheck).Minutes())
return nil
}

ctx := sf.Ctx()

// Don't check for lagging nodes if the hwm has yet to advance.
if sf.replicatedTimeAtStart.Equal(sf.persistedReplicatedTime) {
log.VEventf(ctx, 2, "skipping lagging nodes check as hwm has yet to advance past %s", sf.replicatedTimeAtStart)
log.VEventf(ctx, 2, "skipping lag replanning check: hwm has yet to advance past %s", sf.replicatedTimeAtStart)
return nil
}

defer func() {
sf.lastNodeLagCheck = timeutil.Now()
}()
executionDetails := constructSpanFrontierExecutionDetailsWithFrontier(sf.spec.PartitionSpecs, sf.frontier)

log.VEvent(ctx, 2, "checking for lagging nodes")
return checkLaggingNodes(
ctx,
executionDetails,
maxLag,
)
Expand Down

0 comments on commit c748278

Please sign in to comment.