Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.2: changefeedccl: Add jitter to aggregator flushes #114729

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ bulkio.backup.deprecated_full_backup_with_subdir.enabled boolean false when true
bulkio.backup.file_size byte size 128 MiB target size for individual data files produced during BACKUP application
bulkio.backup.read_timeout duration 5m0s amount of time after which a read attempt is considered timed out, which causes the backup to fail application
bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads application
changefeed.aggregator.flush_jitter float 0 jitter aggregator flushes as a fraction of min_checkpoint_frequency application
changefeed.backfill.concurrent_scan_requests integer 0 number of concurrent scan requests per node issued during a backfill application
changefeed.backfill.scan_request_size integer 524288 the maximum number of bytes returned by each scan request application
changefeed.balance_range_distribution.enabled boolean false if enabled, the ranges are balanced equally among all nodes application
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<tr><td><div id="setting-bulkio-backup-read-timeout" class="anchored"><code>bulkio.backup.read_timeout</code></div></td><td>duration</td><td><code>5m0s</code></td><td>amount of time after which a read attempt is considered timed out, which causes the backup to fail</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-bulkio-backup-read-with-priority-after" class="anchored"><code>bulkio.backup.read_with_priority_after</code></div></td><td>duration</td><td><code>1m0s</code></td><td>amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-bulkio-stream-ingestion-minimum-flush-interval" class="anchored"><code>physical_replication.consumer.minimum_flush_interval</code></div></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-aggregator-flush-jitter" class="anchored"><code>changefeed.aggregator.flush_jitter</code></div></td><td>float</td><td><code>0</code></td><td>jitter aggregator flushes as a fraction of min_checkpoint_frequency</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-backfill-concurrent-scan-requests" class="anchored"><code>changefeed.backfill.concurrent_scan_requests</code></div></td><td>integer</td><td><code>0</code></td><td>number of concurrent scan requests per node issued during a backfill</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-backfill-scan-request-size" class="anchored"><code>changefeed.backfill.scan_request_size</code></div></td><td>integer</td><td><code>524288</code></td><td>the maximum number of bytes returned by each scan request</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-balance-range-distribution-enable" class="anchored"><code>changefeed.balance_range_distribution.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if enabled, the ranges are balanced equally among all nodes</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
36 changes: 31 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package changefeedccl
import (
"context"
"fmt"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils"
Expand Down Expand Up @@ -91,7 +92,7 @@ type changeAggregator struct {
// eventConsumer consumes the event.
eventConsumer eventConsumer

lastHighWaterFlush time.Time // last time high watermark was checkpointed.
nextHighWaterFlush time.Time // next time high watermark may be flushed.
flushFrequency time.Duration // how often high watermark can be checkpointed.
lastSpanFlush time.Time // last time expensive, span based checkpoint was written.

Expand Down Expand Up @@ -586,6 +587,23 @@ var aggregatorHeartbeatFrequency = settings.RegisterDurationSetting(
settings.NonNegativeDuration,
)

var aggregatorFlushJitter = settings.RegisterFloatSetting(
settings.ApplicationLevel,
"changefeed.aggregator.flush_jitter",
"jitter aggregator flushes as a fraction of min_checkpoint_frequency",
0, /* disabled by default */
settings.NonNegativeFloat,
settings.WithPublic,
)

func nextFlushWithJitter(s timeutil.TimeSource, d time.Duration, j float64) time.Time {
if j == 0 {
return s.Now().Add(d)
}
nextFlush := d + time.Duration(rand.Int63n(int64(j*float64(d))))
return s.Now().Add(nextFlush)
}

// Next is part of the RowSource interface.
func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
shouldEmitHeartBeat := func() bool {
Expand Down Expand Up @@ -747,12 +765,20 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error

forceFlush := resolved.BoundaryType != jobspb.ResolvedSpan_NONE

// NB: if we miss flush window, and the flush frequency is fairly high (minutes),
// it might be a while before frontier advances again (particularly if
// the number of ranges and closed timestamp settings are high).
// TODO(yevgeniy): Consider doing something similar to how job checkpointing
// works in the frontier where if we missed the window to checkpoint, we will attempt
// the checkpoint at the next opportune moment.
checkpointFrontier := advanced &&
(forceFlush || timeutil.Since(ca.lastHighWaterFlush) > ca.flushFrequency)
(forceFlush || timeutil.Now().After(ca.nextHighWaterFlush))

sv := &ca.flowCtx.Cfg.Settings.SV
if checkpointFrontier {
defer func() {
ca.lastHighWaterFlush = timeutil.Now()
ca.nextHighWaterFlush = nextFlushWithJitter(
timeutil.DefaultTimeSource{}, ca.flushFrequency, aggregatorFlushJitter.Get(sv))
}()
return ca.flushFrontier()
}
Expand All @@ -761,8 +787,8 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error
// either in backfills or if the highwater mark is excessively lagging behind
checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */
(resolved.Timestamp.Equal(ca.frontier.BackfillTS()) ||
ca.frontier.hasLaggingSpans(ca.spec.Feed.StatementTime, &ca.flowCtx.Cfg.Settings.SV)) &&
canCheckpointSpans(&ca.flowCtx.Cfg.Settings.SV, ca.lastSpanFlush)
ca.frontier.hasLaggingSpans(ca.spec.Feed.StatementTime, sv)) &&
canCheckpointSpans(sv, ca.lastSpanFlush)

if checkpointSpans {
defer func() {
Expand Down
35 changes: 35 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6884,6 +6884,41 @@ func TestCheckpointFrequency(t *testing.T) {
require.False(t, js.progressUpdatesSkipped)
}

func TestFlushJitter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Test the logic around applying jitter to the flush logic.
// The more involved test that would try to capture flush times would likely
// be pretty flaky due to the fact that flush times do not happen at exactly
// min_flush_frequency period, and thus it would be hard to tell if the
// difference is due to jitter or not. Just verify nextFlushWithJitter function
// works as expected with controlled time source.
const flushFrequency time.Duration = 100 * time.Millisecond
ts := timeutil.NewManualTime(timeutil.Now())

const numIters = 100
t.Run("disable", func(t *testing.T) {
const disableJitter = 0.0
for i := 0; i < numIters; i++ {
next := nextFlushWithJitter(ts, flushFrequency, disableJitter)
require.Equal(t, ts.Now().Add(flushFrequency), next)
ts.AdvanceTo(next)
}
})
t.Run("enable", func(t *testing.T) {
const jitter = 0.1
for i := 0; i < numIters; i++ {
next := nextFlushWithJitter(ts, flushFrequency, jitter)
// next flush should be at least flushFrequency into the future.
require.LessOrEqual(t, ts.Now().Add(flushFrequency), next, t)
// and jitter should be at most 10% more than flushFrequency (10ms)
require.Less(t, next.Sub(ts.Now()), flushFrequency+flushFrequency/10)
ts.AdvanceTo(next)
}
})
}

func TestChangefeedOrderingWithErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down