diff --git a/pkg/ccl/streamingccl/streamingest/node_lag_detector.go b/pkg/ccl/streamingccl/streamingest/node_lag_detector.go index acc31fbf95d3..0b92a978e730 100644 --- a/pkg/ccl/streamingccl/streamingest/node_lag_detector.go +++ b/pkg/ccl/streamingccl/streamingest/node_lag_detector.go @@ -9,10 +9,12 @@ package streamingest import ( + "context" "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -22,12 +24,13 @@ var ErrNodeLagging = errors.New("node frontier too far behind other nodes") // more than maxAllowable lag behind any other destination node. This function // assumes that all nodes have finished their initial scan (i.e. have a nonzero hwm). func checkLaggingNodes( - executionDetails []frontierExecutionDetails, maxAllowableLag time.Duration, + ctx context.Context, executionDetails []frontierExecutionDetails, maxAllowableLag time.Duration, ) error { if maxAllowableLag == 0 { return nil } laggingNode, minLagDifference := computeMinLagDifference(executionDetails) + log.VEventf(ctx, 2, "computed min lag diff: %d lagging node, difference %.2f", laggingNode, minLagDifference.Minutes()) if maxAllowableLag < minLagDifference { return errors.Wrapf(ErrNodeLagging, "node %d is %.2f minutes behind the next node. Try replanning", laggingNode, minLagDifference.Minutes()) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 9bba1ced4208..37edce01081f 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -535,26 +535,26 @@ func (sf *streamIngestionFrontier) maybePersistFrontierEntries() error { } func (sf *streamIngestionFrontier) maybeCheckForLaggingNodes() error { - defer func() { - sf.lastNodeLagCheck = timeutil.Now() - }() + ctx := sf.Ctx() checkFreq := streamingccl.ReplanFrequency.Get(&sf.FlowCtx.Cfg.Settings.SV) maxLag := streamingccl.InterNodeLag.Get(&sf.FlowCtx.Cfg.Settings.SV) if checkFreq == 0 || maxLag == 0 || timeutil.Since(sf.lastNodeLagCheck) < checkFreq { + log.VEventf(ctx, 2, "skipping lag replanning check: maxLag %d; checkFreq %.2f; last node check %s; time since last check %.2f", + maxLag, checkFreq.Minutes(), sf.lastNodeLagCheck, timeutil.Since(sf.lastNodeLagCheck).Minutes()) return nil } - - ctx := sf.Ctx() - // Don't check for lagging nodes if the hwm has yet to advance. if sf.replicatedTimeAtStart.Equal(sf.persistedReplicatedTime) { - log.VEventf(ctx, 2, "skipping lagging nodes check as hwm has yet to advance past %s", sf.replicatedTimeAtStart) + log.VEventf(ctx, 2, "skipping lag replanning check: hwm has yet to advance past %s", sf.replicatedTimeAtStart) return nil } - + defer func() { + sf.lastNodeLagCheck = timeutil.Now() + }() executionDetails := constructSpanFrontierExecutionDetailsWithFrontier(sf.spec.PartitionSpecs, sf.frontier) - + log.VEvent(ctx, 2, "checking for lagging nodes") return checkLaggingNodes( + ctx, executionDetails, maxLag, )