Skip to content

Commit

Permalink
Merge pull request #114748 from miretskiy/backport23.1-114161
Browse files Browse the repository at this point in the history
release-23.1: changefeedccl: Add jitter to aggregator flushes
  • Loading branch information
miretskiy authored Nov 21, 2023
2 parents d11ae0f + 02cfc2b commit df99abe
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 5 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ bulkio.backup.file_size byte size 128 MiB target size for individual data files
bulkio.backup.read_timeout duration 5m0s amount of time after which a read attempt is considered timed out, which causes the backup to fail
bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads
bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up
changefeed.aggregator.flush_jitter float 0 jitter aggregator flushes as a fraction of min_checkpoint_frequency
changefeed.backfill.scan_request_size integer 524288 the maximum number of bytes returned by each scan request
changefeed.balance_range_distribution.enable boolean false if enabled, the ranges are balanced equally among all nodes
changefeed.batch_reduction_retry_enabled boolean false if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<tr><td><div id="setting-bulkio-backup-read-timeout" class="anchored"><code>bulkio.backup.read_timeout</code></div></td><td>duration</td><td><code>5m0s</code></td><td>amount of time after which a read attempt is considered timed out, which causes the backup to fail</td></tr>
<tr><td><div id="setting-bulkio-backup-read-with-priority-after" class="anchored"><code>bulkio.backup.read_with_priority_after</code></div></td><td>duration</td><td><code>1m0s</code></td><td>amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads</td></tr>
<tr><td><div id="setting-bulkio-stream-ingestion-minimum-flush-interval" class="anchored"><code>bulkio.stream_ingestion.minimum_flush_interval</code></div></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td></tr>
<tr><td><div id="setting-changefeed-aggregator-flush-jitter" class="anchored"><code>changefeed.aggregator.flush_jitter</code></div></td><td>float</td><td><code>0</code></td><td>jitter aggregator flushes as a fraction of min_checkpoint_frequency</td></tr>
<tr><td><div id="setting-changefeed-backfill-scan-request-size" class="anchored"><code>changefeed.backfill.scan_request_size</code></div></td><td>integer</td><td><code>524288</code></td><td>the maximum number of bytes returned by each scan request</td></tr>
<tr><td><div id="setting-changefeed-balance-range-distribution-enable" class="anchored"><code>changefeed.balance_range_distribution.enable</code></div></td><td>boolean</td><td><code>false</code></td><td>if enabled, the ranges are balanced equally among all nodes</td></tr>
<tr><td><div id="setting-changefeed-batch-reduction-retry-enabled" class="anchored"><code>changefeed.batch_reduction_retry_enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes</td></tr>
Expand Down
35 changes: 30 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package changefeedccl
import (
"context"
"fmt"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils"
Expand Down Expand Up @@ -79,7 +80,7 @@ type changeAggregator struct {
// eventConsumer consumes the event.
eventConsumer eventConsumer

lastHighWaterFlush time.Time // last time high watermark was checkpointed.
nextHighWaterFlush time.Time // next time high watermark may be flushed.
flushFrequency time.Duration // how often high watermark can be checkpointed.
lastSpanFlush time.Time // last time expensive, span based checkpoint was written.

Expand Down Expand Up @@ -531,6 +532,22 @@ func (ca *changeAggregator) close() {
ca.InternalClose()
}

var aggregatorFlushJitter = settings.RegisterFloatSetting(
settings.TenantWritable,
"changefeed.aggregator.flush_jitter",
"jitter aggregator flushes as a fraction of min_checkpoint_frequency",
0, /* disabled by default */
settings.NonNegativeFloat,
).WithPublic()

func nextFlushWithJitter(s timeutil.TimeSource, d time.Duration, j float64) time.Time {
if j == 0 {
return s.Now().Add(d)
}
nextFlush := d + time.Duration(rand.Int63n(int64(j*float64(d))))
return s.Now().Add(nextFlush)
}

// Next is part of the RowSource interface.
func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
for ca.State == execinfra.StateRunning {
Expand Down Expand Up @@ -630,12 +647,20 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error

forceFlush := resolved.BoundaryType != jobspb.ResolvedSpan_NONE

// NB: if we miss flush window, and the flush frequency is fairly high (minutes),
// it might be a while before frontier advances again (particularly if
// the number of ranges and closed timestamp settings are high).
// TODO(yevgeniy): Consider doing something similar to how job checkpointing
// works in the frontier where if we missed the window to checkpoint, we will attempt
// the checkpoint at the next opportune moment.
checkpointFrontier := advanced &&
(forceFlush || timeutil.Since(ca.lastHighWaterFlush) > ca.flushFrequency)
(forceFlush || timeutil.Now().After(ca.nextHighWaterFlush))

sv := &ca.flowCtx.Cfg.Settings.SV
if checkpointFrontier {
defer func() {
ca.lastHighWaterFlush = timeutil.Now()
ca.nextHighWaterFlush = nextFlushWithJitter(
timeutil.DefaultTimeSource{}, ca.flushFrequency, aggregatorFlushJitter.Get(sv))
}()
return ca.flushFrontier()
}
Expand All @@ -644,8 +669,8 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error
// either in backfills or if the highwater mark is excessively lagging behind
checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */
(resolved.Timestamp.Equal(ca.frontier.BackfillTS()) ||
ca.frontier.hasLaggingSpans(ca.spec.Feed.StatementTime, &ca.flowCtx.Cfg.Settings.SV)) &&
canCheckpointSpans(&ca.flowCtx.Cfg.Settings.SV, ca.lastSpanFlush)
ca.frontier.hasLaggingSpans(ca.spec.Feed.StatementTime, sv)) &&
canCheckpointSpans(sv, ca.lastSpanFlush)

if checkpointSpans {
defer func() {
Expand Down
35 changes: 35 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7180,6 +7180,41 @@ func TestCheckpointFrequency(t *testing.T) {
require.False(t, js.progressUpdatesSkipped)
}

func TestFlushJitter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Test the logic around applying jitter to the flush logic.
// The more involved test that would try to capture flush times would likely
// be pretty flaky due to the fact that flush times do not happen at exactly
// min_flush_frequency period, and thus it would be hard to tell if the
// difference is due to jitter or not. Just verify nextFlushWithJitter function
// works as expected with controlled time source.
const flushFrequency time.Duration = 100 * time.Millisecond
ts := timeutil.NewManualTime(timeutil.Now())

const numIters = 100
t.Run("disable", func(t *testing.T) {
const disableJitter = 0.0
for i := 0; i < numIters; i++ {
next := nextFlushWithJitter(ts, flushFrequency, disableJitter)
require.Equal(t, ts.Now().Add(flushFrequency), next)
ts.AdvanceTo(next)
}
})
t.Run("enable", func(t *testing.T) {
const jitter = 0.1
for i := 0; i < numIters; i++ {
next := nextFlushWithJitter(ts, flushFrequency, jitter)
// next flush should be at least flushFrequency into the future.
require.LessOrEqual(t, ts.Now().Add(flushFrequency), next, t)
// and jitter should be at most 10% more than flushFrequency (10ms)
require.Less(t, next.Sub(ts.Now()), flushFrequency+flushFrequency/10)
ts.AdvanceTo(next)
}
})
}

func TestChangefeedOrderingWithErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit df99abe

Please sign in to comment.