Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: fix panics when min_checkpoint_frequency is 0 #125317

Merged
merged 1 commit into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ bulkio.backup.deprecated_full_backup_with_subdir.enabled boolean false when true
bulkio.backup.file_size byte size 128 MiB target size for individual data files produced during BACKUP application
bulkio.backup.read_timeout duration 5m0s amount of time after which a read attempt is considered timed out, which causes the backup to fail application
bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads application
changefeed.aggregator.flush_jitter float 0.1 jitter aggregator flushes as a fraction of min_checkpoint_frequency application
changefeed.aggregator.flush_jitter float 0.1 jitter aggregator flushes as a fraction of min_checkpoint_frequency. This setting has no effect if min_checkpoint_frequency is set to 0. application
changefeed.backfill.concurrent_scan_requests integer 0 number of concurrent scan requests per node issued during a backfill application
changefeed.backfill.scan_request_size integer 524288 the maximum number of bytes returned by each scan request application
changefeed.batch_reduction_retry.enabled boolean false if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes application
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<tr><td><div id="setting-bulkio-backup-read-timeout" class="anchored"><code>bulkio.backup.read_timeout</code></div></td><td>duration</td><td><code>5m0s</code></td><td>amount of time after which a read attempt is considered timed out, which causes the backup to fail</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-bulkio-backup-read-with-priority-after" class="anchored"><code>bulkio.backup.read_with_priority_after</code></div></td><td>duration</td><td><code>1m0s</code></td><td>amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-bulkio-stream-ingestion-minimum-flush-interval" class="anchored"><code>physical_replication.consumer.minimum_flush_interval</code></div></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-aggregator-flush-jitter" class="anchored"><code>changefeed.aggregator.flush_jitter</code></div></td><td>float</td><td><code>0.1</code></td><td>jitter aggregator flushes as a fraction of min_checkpoint_frequency</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-aggregator-flush-jitter" class="anchored"><code>changefeed.aggregator.flush_jitter</code></div></td><td>float</td><td><code>0.1</code></td><td>jitter aggregator flushes as a fraction of min_checkpoint_frequency. This setting has no effect if min_checkpoint_frequency is set to 0.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-backfill-concurrent-scan-requests" class="anchored"><code>changefeed.backfill.concurrent_scan_requests</code></div></td><td>integer</td><td><code>0</code></td><td>number of concurrent scan requests per node issued during a backfill</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-backfill-scan-request-size" class="anchored"><code>changefeed.backfill.scan_request_size</code></div></td><td>integer</td><td><code>524288</code></td><td>the maximum number of bytes returned by each scan request</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-batch-reduction-retry-enabled" class="anchored"><code>changefeed.batch_reduction_retry.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
24 changes: 15 additions & 9 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,18 +632,22 @@ var aggregatorHeartbeatFrequency = settings.RegisterDurationSetting(
var aggregatorFlushJitter = settings.RegisterFloatSetting(
settings.ApplicationLevel,
"changefeed.aggregator.flush_jitter",
"jitter aggregator flushes as a fraction of min_checkpoint_frequency",
"jitter aggregator flushes as a fraction of min_checkpoint_frequency. This "+
"setting has no effect if min_checkpoint_frequency is set to 0.",
0.1, /* 10% */
settings.NonNegativeFloat,
settings.WithPublic,
)

func nextFlushWithJitter(s timeutil.TimeSource, d time.Duration, j float64) time.Time {
if j == 0 {
return s.Now().Add(d)
func nextFlushWithJitter(s timeutil.TimeSource, d time.Duration, j float64) (time.Time, error) {
if j < 0 || d < 0 {
return s.Now(), errors.AssertionFailedf("invalid jitter value: %f, duration: %s", j, d)
}
if j == 0 || d == 0 {
return s.Now().Add(d), nil
}
nextFlush := d + time.Duration(rand.Int63n(int64(j*float64(d))))
return s.Now().Add(nextFlush)
return s.Now().Add(nextFlush), nil
}

// Next is part of the RowSource interface.
Expand Down Expand Up @@ -789,7 +793,7 @@ func (ca *changeAggregator) flushBufferedEvents() error {
// noteResolvedSpan periodically flushes Frontier progress from the current
// changeAggregator node to the changeFrontier node to allow the changeFrontier
// to persist the overall changefeed's progress
func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error {
func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (returnErr error) {
if resolved.Timestamp.IsEmpty() {
// @0.0 resolved timestamps could come in from rangefeed checkpoint.
// When rangefeed starts running, it emits @0.0 resolved timestamp.
Expand Down Expand Up @@ -825,8 +829,11 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error

if checkpointFrontier {
defer func() {
ca.nextHighWaterFlush = nextFlushWithJitter(
ca.nextHighWaterFlush, err = nextFlushWithJitter(
timeutil.DefaultTimeSource{}, ca.flushFrequency, aggregatorFlushJitter.Get(sv))
if err != nil {
returnErr = errors.CombineErrors(returnErr, err)
}
}()
return ca.flushFrontier()
}
Expand All @@ -844,8 +851,7 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error
}()
return ca.flushFrontier()
}

return nil
return returnErr
}

// flushFrontier flushes sink and emits resolved timestamp if needed.
Expand Down
126 changes: 105 additions & 21 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7044,29 +7044,113 @@ func TestFlushJitter(t *testing.T) {
// min_flush_frequency period, and thus it would be hard to tell if the
// difference is due to jitter or not. Just verify nextFlushWithJitter function
// works as expected with controlled time source.
const flushFrequency time.Duration = 100 * time.Millisecond
ts := timeutil.NewManualTime(timeutil.Now())

ts := timeutil.NewManualTime(timeutil.Now())
const numIters = 100
t.Run("disable", func(t *testing.T) {
const disableJitter = 0.0
for i := 0; i < numIters; i++ {
next := nextFlushWithJitter(ts, flushFrequency, disableJitter)
require.Equal(t, ts.Now().Add(flushFrequency), next)
ts.AdvanceTo(next)
}
})
t.Run("enable", func(t *testing.T) {
const jitter = 0.1
for i := 0; i < numIters; i++ {
next := nextFlushWithJitter(ts, flushFrequency, jitter)
// next flush should be at least flushFrequency into the future.
require.LessOrEqual(t, ts.Now().Add(flushFrequency), next, t)
// and jitter should be at most 10% more than flushFrequency (10ms)
require.Less(t, next.Sub(ts.Now()), flushFrequency+flushFrequency/10)
ts.AdvanceTo(next)
}
})

for _, tc := range []struct {
flushFrequency time.Duration
jitter float64
expectedFlushDuration time.Duration
expectedErr bool
}{
// Negative jitter.
{
flushFrequency: -1,
jitter: -0.1,
expectedFlushDuration: 0,
expectedErr: true,
},
{
flushFrequency: 0,
jitter: -0.1,
expectedFlushDuration: 0,
expectedErr: true,
},
{
flushFrequency: 10 * time.Millisecond,
jitter: -0.1,
expectedFlushDuration: 0,
expectedErr: true,
},
{
flushFrequency: 100 * time.Millisecond,
jitter: -0.1,
expectedFlushDuration: 0,
expectedErr: true,
},
// Disable Jitter.
{
flushFrequency: -1,
jitter: 0,
expectedFlushDuration: 0,
expectedErr: true,
},
{
flushFrequency: 0,
jitter: 0,
expectedFlushDuration: 0,
expectedErr: false,
},
{
flushFrequency: 10 * time.Millisecond,
jitter: 0,
expectedFlushDuration: 10 * time.Millisecond,
expectedErr: false,
},
{
flushFrequency: 100 * time.Millisecond,
jitter: 0,
expectedFlushDuration: 100 * time.Millisecond,
expectedErr: false,
},
// Enable Jitter.
{
flushFrequency: -1,
jitter: 0.1,
expectedFlushDuration: 0,
expectedErr: true,
},
{
flushFrequency: 0,
jitter: 0.1,
expectedFlushDuration: 0,
expectedErr: false,
},
{
flushFrequency: 10 * time.Millisecond,
jitter: 0.1,
expectedFlushDuration: 10 * time.Millisecond,
expectedErr: false,
},
{
flushFrequency: 100 * time.Millisecond,
jitter: 0.1,
expectedFlushDuration: 100 * time.Millisecond,
expectedErr: false,
},
} {
t.Run(fmt.Sprintf("flushfrequency=%sjitter=%f", tc.flushFrequency, tc.jitter), func(t *testing.T) {
for i := 0; i < numIters; i++ {
next, err := nextFlushWithJitter(ts, tc.flushFrequency, tc.jitter)
if tc.expectedErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
if tc.jitter > 0 {
minBound := tc.expectedFlushDuration
maxBound := tc.expectedFlushDuration + time.Duration(float64(tc.expectedFlushDuration)*tc.jitter)
actualDuration := next.Sub(ts.Now())
require.LessOrEqual(t, minBound, actualDuration)
require.LessOrEqual(t, actualDuration, maxBound)
} else {
require.Equal(t, tc.expectedFlushDuration, next.Sub(ts.Now()))
}
ts.AdvanceTo(next)
}
})
}
}

func TestChangefeedOrderingWithErrors(t *testing.T) {
Expand Down
Loading