Skip to content

Commit

Permalink
changefeedccl: fix panics when min_checkpoint_frequency is 0
Browse files Browse the repository at this point in the history
Previously, we introduced the cluster setting changefeed.aggregator.flush_jitter
to apply some jitter to the aggregator flush frequency. This was done to reduce
the load on the coordinator by spreading out aggregator flush times. However,
that PR did not account for the case where min_checkpoint_frequency is set to
zero, which could cause panics due to rand.Int63(0). This patch fixes the issue
by adding a check for when the frequency is zero, ensuring that jitter is not
applied in such cases.

Fixes: cockroachdb#125312

Release note (bug fix): Fixed a bug in v24.1, v23.2, and v23.1 where using
changefeed.aggregator.flush_jitter with min_checkpoint_frequency set to zero
could cause panics.
  • Loading branch information
wenyihu6 committed Jun 14, 2024
1 parent cdc755b commit 7a59c66
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 32 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ bulkio.backup.deprecated_full_backup_with_subdir.enabled boolean false when true
bulkio.backup.file_size byte size 128 MiB target size for individual data files produced during BACKUP application
bulkio.backup.read_timeout duration 5m0s amount of time after which a read attempt is considered timed out, which causes the backup to fail application
bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads application
changefeed.aggregator.flush_jitter float 0.1 jitter aggregator flushes as a fraction of min_checkpoint_frequency application
changefeed.aggregator.flush_jitter float 0.1 jitter aggregator flushes as a fraction of min_checkpoint_frequency. This setting has no effect if min_checkpoint_frequency is set to 0. application
changefeed.backfill.concurrent_scan_requests integer 0 number of concurrent scan requests per node issued during a backfill application
changefeed.backfill.scan_request_size integer 524288 the maximum number of bytes returned by each scan request application
changefeed.batch_reduction_retry.enabled (alias: changefeed.batch_reduction_retry_enabled) boolean false if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes application
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<tr><td><div id="setting-bulkio-backup-read-timeout" class="anchored"><code>bulkio.backup.read_timeout</code></div></td><td>duration</td><td><code>5m0s</code></td><td>amount of time after which a read attempt is considered timed out, which causes the backup to fail</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-bulkio-backup-read-with-priority-after" class="anchored"><code>bulkio.backup.read_with_priority_after</code></div></td><td>duration</td><td><code>1m0s</code></td><td>amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-bulkio-stream-ingestion-minimum-flush-interval" class="anchored"><code>physical_replication.consumer.minimum_flush_interval<br />(alias: bulkio.stream_ingestion.minimum_flush_interval)</code></div></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-aggregator-flush-jitter" class="anchored"><code>changefeed.aggregator.flush_jitter</code></div></td><td>float</td><td><code>0.1</code></td><td>jitter aggregator flushes as a fraction of min_checkpoint_frequency</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-aggregator-flush-jitter" class="anchored"><code>changefeed.aggregator.flush_jitter</code></div></td><td>float</td><td><code>0.1</code></td><td>jitter aggregator flushes as a fraction of min_checkpoint_frequency. This setting has no effect if min_checkpoint_frequency is set to 0.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-backfill-concurrent-scan-requests" class="anchored"><code>changefeed.backfill.concurrent_scan_requests</code></div></td><td>integer</td><td><code>0</code></td><td>number of concurrent scan requests per node issued during a backfill</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-backfill-scan-request-size" class="anchored"><code>changefeed.backfill.scan_request_size</code></div></td><td>integer</td><td><code>524288</code></td><td>the maximum number of bytes returned by each scan request</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-batch-reduction-retry-enabled" class="anchored"><code>changefeed.batch_reduction_retry.enabled<br />(alias: changefeed.batch_reduction_retry_enabled)</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
24 changes: 15 additions & 9 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,18 +642,22 @@ var aggregatorHeartbeatFrequency = settings.RegisterDurationSetting(
var aggregatorFlushJitter = settings.RegisterFloatSetting(
settings.ApplicationLevel,
"changefeed.aggregator.flush_jitter",
"jitter aggregator flushes as a fraction of min_checkpoint_frequency",
"jitter aggregator flushes as a fraction of min_checkpoint_frequency. This "+
"setting has no effect if min_checkpoint_frequency is set to 0.",
0.1, /* 10% */
settings.NonNegativeFloat,
settings.WithPublic,
)

func nextFlushWithJitter(s timeutil.TimeSource, d time.Duration, j float64) time.Time {
if j == 0 {
return s.Now().Add(d)
func nextFlushWithJitter(s timeutil.TimeSource, d time.Duration, j float64) (time.Time, error) {
if j < 0 || d < 0 {
return s.Now(), errors.AssertionFailedf("invalid jitter value: %f, duration: %s", j, d)
}
if j == 0 || d == 0 {
return s.Now().Add(d), nil
}
nextFlush := d + time.Duration(rand.Int63n(int64(j*float64(d))))
return s.Now().Add(nextFlush)
return s.Now().Add(nextFlush), nil
}

// Next is part of the RowSource interface.
Expand Down Expand Up @@ -799,7 +803,7 @@ func (ca *changeAggregator) flushBufferedEvents() error {
// noteResolvedSpan periodically flushes Frontier progress from the current
// changeAggregator node to the changeFrontier node to allow the changeFrontier
// to persist the overall changefeed's progress
func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error {
func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (returnErr error) {
if resolved.Timestamp.IsEmpty() {
// @0.0 resolved timestamps could come in from rangefeed checkpoint.
// When rangefeed starts running, it emits @0.0 resolved timestamp.
Expand Down Expand Up @@ -835,8 +839,11 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error

if checkpointFrontier {
defer func() {
ca.nextHighWaterFlush = nextFlushWithJitter(
ca.nextHighWaterFlush, err = nextFlushWithJitter(
timeutil.DefaultTimeSource{}, ca.flushFrequency, aggregatorFlushJitter.Get(sv))
if err != nil {
returnErr = errors.CombineErrors(returnErr, err)
}
}()
return ca.flushFrontier()
}
Expand All @@ -854,8 +861,7 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error
}()
return ca.flushFrontier()
}

return nil
return returnErr
}

// flushFrontier flushes sink and emits resolved timestamp if needed.
Expand Down
126 changes: 105 additions & 21 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7044,29 +7044,113 @@ func TestFlushJitter(t *testing.T) {
// min_flush_frequency period, and thus it would be hard to tell if the
// difference is due to jitter or not. Just verify nextFlushWithJitter function
// works as expected with controlled time source.
const flushFrequency time.Duration = 100 * time.Millisecond
ts := timeutil.NewManualTime(timeutil.Now())

ts := timeutil.NewManualTime(timeutil.Now())
const numIters = 100
t.Run("disable", func(t *testing.T) {
const disableJitter = 0.0
for i := 0; i < numIters; i++ {
next := nextFlushWithJitter(ts, flushFrequency, disableJitter)
require.Equal(t, ts.Now().Add(flushFrequency), next)
ts.AdvanceTo(next)
}
})
t.Run("enable", func(t *testing.T) {
const jitter = 0.1
for i := 0; i < numIters; i++ {
next := nextFlushWithJitter(ts, flushFrequency, jitter)
// next flush should be at least flushFrequency into the future.
require.LessOrEqual(t, ts.Now().Add(flushFrequency), next, t)
// and jitter should be at most 10% more than flushFrequency (10ms)
require.Less(t, next.Sub(ts.Now()), flushFrequency+flushFrequency/10)
ts.AdvanceTo(next)
}
})

for _, tc := range []struct {
flushFrequency time.Duration
jitter float64
expectedFlushDuration time.Duration
expectedErr bool
}{
// Negative jitter.
{
flushFrequency: -1,
jitter: -0.1,
expectedFlushDuration: 0,
expectedErr: true,
},
{
flushFrequency: 0,
jitter: -0.1,
expectedFlushDuration: 0,
expectedErr: true,
},
{
flushFrequency: 10 * time.Millisecond,
jitter: -0.1,
expectedFlushDuration: 0,
expectedErr: true,
},
{
flushFrequency: 100 * time.Millisecond,
jitter: -0.1,
expectedFlushDuration: 0,
expectedErr: true,
},
// Disable Jitter.
{
flushFrequency: -1,
jitter: 0,
expectedFlushDuration: 0,
expectedErr: true,
},
{
flushFrequency: 0,
jitter: 0,
expectedFlushDuration: 0,
expectedErr: false,
},
{
flushFrequency: 10 * time.Millisecond,
jitter: 0,
expectedFlushDuration: 10 * time.Millisecond,
expectedErr: false,
},
{
flushFrequency: 100 * time.Millisecond,
jitter: 0,
expectedFlushDuration: 100 * time.Millisecond,
expectedErr: false,
},
// Enable Jitter.
{
flushFrequency: -1,
jitter: 0.1,
expectedFlushDuration: 0,
expectedErr: true,
},
{
flushFrequency: 0,
jitter: 0.1,
expectedFlushDuration: 0,
expectedErr: false,
},
{
flushFrequency: 10 * time.Millisecond,
jitter: 0.1,
expectedFlushDuration: 10 * time.Millisecond,
expectedErr: false,
},
{
flushFrequency: 100 * time.Millisecond,
jitter: 0.1,
expectedFlushDuration: 100 * time.Millisecond,
expectedErr: false,
},
} {
t.Run(fmt.Sprintf("flushfrequency=%sjitter=%f", tc.flushFrequency, tc.jitter), func(t *testing.T) {
for i := 0; i < numIters; i++ {
next, err := nextFlushWithJitter(ts, tc.flushFrequency, tc.jitter)
if tc.expectedErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
if tc.jitter > 0 {
minBound := tc.expectedFlushDuration
maxBound := tc.expectedFlushDuration + time.Duration(float64(tc.expectedFlushDuration)*tc.jitter)
actualDuration := next.Sub(ts.Now())
require.LessOrEqual(t, minBound, actualDuration)
require.LessOrEqual(t, actualDuration, maxBound)
} else {
require.Equal(t, tc.expectedFlushDuration, next.Sub(ts.Now()))
}
ts.AdvanceTo(next)
}
})
}
}

func TestChangefeedOrderingWithErrors(t *testing.T) {
Expand Down

0 comments on commit 7a59c66

Please sign in to comment.