From f28b82623c5ace44a352f13a64483cbd7f9f8242 Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Fri, 7 Jun 2024 12:19:38 -0400 Subject: [PATCH] changefeedccl: fix panics when min_checkpoint_frequency is 0 Previously, we introduced the cluster setting changefeed.aggregator.flush_jitter to apply some jitter to the aggregator flush frequency. This was done to reduce the load on the coordinator by spreading out aggregator flush times. However, that PR did not account for the case where min_checkpoint_frequency is set to zero, which could cause panics due to rand.Int63(0). This patch fixes the issue by adding a check for when the frequency is zero, ensuring that jitter is not applied in such cases. Fixes: #125312 Release note (bug fix): Fixed a bug in v24.1, v23.2, and v23.1 where using changefeed.aggregator.flush_jitter with min_checkpoint_frequency set to zero could cause panics. --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- .../changefeedccl/changefeed_processors.go | 24 ++-- pkg/ccl/changefeedccl/changefeed_test.go | 126 +++++++++++++++--- 4 files changed, 122 insertions(+), 32 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 3ab61bfea0f9..6125b2d78098 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -9,7 +9,7 @@ bulkio.backup.deprecated_full_backup_with_subdir.enabled boolean false when true bulkio.backup.file_size byte size 128 MiB target size for individual data files produced during BACKUP application bulkio.backup.read_timeout duration 5m0s amount of time after which a read attempt is considered timed out, which causes the backup to fail application bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads application -changefeed.aggregator.flush_jitter float 0.1 jitter aggregator flushes as a fraction of min_checkpoint_frequency application +changefeed.aggregator.flush_jitter float 0.1 jitter aggregator flushes as a fraction of min_checkpoint_frequency. This setting has no effect if min_checkpoint_frequency is set to 0. application changefeed.backfill.concurrent_scan_requests integer 0 number of concurrent scan requests per node issued during a backfill application changefeed.backfill.scan_request_size integer 524288 the maximum number of bytes returned by each scan request application changefeed.batch_reduction_retry.enabled boolean false if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes application diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index f2e6487cf9b8..d64613c35ba2 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -14,7 +14,7 @@
bulkio.backup.read_timeout
duration5m0samount of time after which a read attempt is considered timed out, which causes the backup to failServerless/Dedicated/Self-Hosted
bulkio.backup.read_with_priority_after
duration1m0samount of time since the read-as-of time above which a BACKUP should use priority when retrying readsServerless/Dedicated/Self-Hosted
physical_replication.consumer.minimum_flush_interval
duration5sthe minimum timestamp between flushes; flushes may still occur if internal buffers fill upDedicated/Self-Hosted -
changefeed.aggregator.flush_jitter
float0.1jitter aggregator flushes as a fraction of min_checkpoint_frequencyServerless/Dedicated/Self-Hosted +
changefeed.aggregator.flush_jitter
float0.1jitter aggregator flushes as a fraction of min_checkpoint_frequency. This setting has no effect if min_checkpoint_frequency is set to 0.Serverless/Dedicated/Self-Hosted
changefeed.backfill.concurrent_scan_requests
integer0number of concurrent scan requests per node issued during a backfillServerless/Dedicated/Self-Hosted
changefeed.backfill.scan_request_size
integer524288the maximum number of bytes returned by each scan requestServerless/Dedicated/Self-Hosted
changefeed.batch_reduction_retry.enabled
booleanfalseif true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizesServerless/Dedicated/Self-Hosted diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 4b29e7dfe1f6..a06d4b7a8cba 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -632,18 +632,22 @@ var aggregatorHeartbeatFrequency = settings.RegisterDurationSetting( var aggregatorFlushJitter = settings.RegisterFloatSetting( settings.ApplicationLevel, "changefeed.aggregator.flush_jitter", - "jitter aggregator flushes as a fraction of min_checkpoint_frequency", + "jitter aggregator flushes as a fraction of min_checkpoint_frequency. This "+ + "setting has no effect if min_checkpoint_frequency is set to 0.", 0.1, /* 10% */ settings.NonNegativeFloat, settings.WithPublic, ) -func nextFlushWithJitter(s timeutil.TimeSource, d time.Duration, j float64) time.Time { - if j == 0 { - return s.Now().Add(d) +func nextFlushWithJitter(s timeutil.TimeSource, d time.Duration, j float64) (time.Time, error) { + if j < 0 || d < 0 { + return s.Now(), errors.AssertionFailedf("invalid jitter value: %f, duration: %s", j, d) + } + if j == 0 || d == 0 { + return s.Now().Add(d), nil } nextFlush := d + time.Duration(rand.Int63n(int64(j*float64(d)))) - return s.Now().Add(nextFlush) + return s.Now().Add(nextFlush), nil } // Next is part of the RowSource interface. @@ -789,7 +793,7 @@ func (ca *changeAggregator) flushBufferedEvents() error { // noteResolvedSpan periodically flushes Frontier progress from the current // changeAggregator node to the changeFrontier node to allow the changeFrontier // to persist the overall changefeed's progress -func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error { +func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (returnErr error) { if resolved.Timestamp.IsEmpty() { // @0.0 resolved timestamps could come in from rangefeed checkpoint. // When rangefeed starts running, it emits @0.0 resolved timestamp. @@ -825,8 +829,11 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error if checkpointFrontier { defer func() { - ca.nextHighWaterFlush = nextFlushWithJitter( + ca.nextHighWaterFlush, err = nextFlushWithJitter( timeutil.DefaultTimeSource{}, ca.flushFrequency, aggregatorFlushJitter.Get(sv)) + if err != nil { + returnErr = errors.CombineErrors(returnErr, err) + } }() return ca.flushFrontier() } @@ -844,8 +851,7 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error }() return ca.flushFrontier() } - - return nil + return returnErr } // flushFrontier flushes sink and emits resolved timestamp if needed. diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 7d126f34ab3a..d24cc74495ee 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -7044,29 +7044,113 @@ func TestFlushJitter(t *testing.T) { // min_flush_frequency period, and thus it would be hard to tell if the // difference is due to jitter or not. Just verify nextFlushWithJitter function // works as expected with controlled time source. - const flushFrequency time.Duration = 100 * time.Millisecond - ts := timeutil.NewManualTime(timeutil.Now()) + ts := timeutil.NewManualTime(timeutil.Now()) const numIters = 100 - t.Run("disable", func(t *testing.T) { - const disableJitter = 0.0 - for i := 0; i < numIters; i++ { - next := nextFlushWithJitter(ts, flushFrequency, disableJitter) - require.Equal(t, ts.Now().Add(flushFrequency), next) - ts.AdvanceTo(next) - } - }) - t.Run("enable", func(t *testing.T) { - const jitter = 0.1 - for i := 0; i < numIters; i++ { - next := nextFlushWithJitter(ts, flushFrequency, jitter) - // next flush should be at least flushFrequency into the future. - require.LessOrEqual(t, ts.Now().Add(flushFrequency), next, t) - // and jitter should be at most 10% more than flushFrequency (10ms) - require.Less(t, next.Sub(ts.Now()), flushFrequency+flushFrequency/10) - ts.AdvanceTo(next) - } - }) + + for _, tc := range []struct { + flushFrequency time.Duration + jitter float64 + expectedFlushDuration time.Duration + expectedErr bool + }{ + // Negative jitter. + { + flushFrequency: -1, + jitter: -0.1, + expectedFlushDuration: 0, + expectedErr: true, + }, + { + flushFrequency: 0, + jitter: -0.1, + expectedFlushDuration: 0, + expectedErr: true, + }, + { + flushFrequency: 10 * time.Millisecond, + jitter: -0.1, + expectedFlushDuration: 0, + expectedErr: true, + }, + { + flushFrequency: 100 * time.Millisecond, + jitter: -0.1, + expectedFlushDuration: 0, + expectedErr: true, + }, + // Disable Jitter. + { + flushFrequency: -1, + jitter: 0, + expectedFlushDuration: 0, + expectedErr: true, + }, + { + flushFrequency: 0, + jitter: 0, + expectedFlushDuration: 0, + expectedErr: false, + }, + { + flushFrequency: 10 * time.Millisecond, + jitter: 0, + expectedFlushDuration: 10 * time.Millisecond, + expectedErr: false, + }, + { + flushFrequency: 100 * time.Millisecond, + jitter: 0, + expectedFlushDuration: 100 * time.Millisecond, + expectedErr: false, + }, + // Enable Jitter. + { + flushFrequency: -1, + jitter: 0.1, + expectedFlushDuration: 0, + expectedErr: true, + }, + { + flushFrequency: 0, + jitter: 0.1, + expectedFlushDuration: 0, + expectedErr: false, + }, + { + flushFrequency: 10 * time.Millisecond, + jitter: 0.1, + expectedFlushDuration: 10 * time.Millisecond, + expectedErr: false, + }, + { + flushFrequency: 100 * time.Millisecond, + jitter: 0.1, + expectedFlushDuration: 100 * time.Millisecond, + expectedErr: false, + }, + } { + t.Run(fmt.Sprintf("flushfrequency=%sjitter=%f", tc.flushFrequency, tc.jitter), func(t *testing.T) { + for i := 0; i < numIters; i++ { + next, err := nextFlushWithJitter(ts, tc.flushFrequency, tc.jitter) + if tc.expectedErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + if tc.jitter > 0 { + minBound := tc.expectedFlushDuration + maxBound := tc.expectedFlushDuration + time.Duration(float64(tc.expectedFlushDuration)*tc.jitter) + actualDuration := next.Sub(ts.Now()) + require.LessOrEqual(t, minBound, actualDuration) + require.LessOrEqual(t, actualDuration, maxBound) + } else { + require.Equal(t, tc.expectedFlushDuration, next.Sub(ts.Now())) + } + ts.AdvanceTo(next) + } + }) + } } func TestChangefeedOrderingWithErrors(t *testing.T) {