-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
changefeedccl: fix panics when min_checkpoint_frequency is 0 #125317
Conversation
864728c
to
386b66a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @rharding6373 and @wenyihu6)
pkg/ccl/changefeedccl/changefeed_processors.go
line 644 at r1 (raw file):
func nextFlushWithJitter(s timeutil.TimeSource, d time.Duration, j float64) time.Time { if j < 0 || d < 0 { log.Fatalf(context.Background(), "invalid jitter value: %f, duration: %s", j, d)
Should this be at the level of a log.Fatalf
? Would a log.Warningf
be sufficient?
Previously, andyyang890 (Andy Yang) wrote…
I'm okay with either. Changed to Warningf. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @rharding6373 and @wenyihu6)
pkg/ccl/changefeedccl/changefeed_processors.go
line 644 at r1 (raw file):
Previously, wenyihu6 (Wenyi Hu) wrote…
I'm okay with either. Changed to Warningf.
On second thought, since this is for catching developer error, I think you probably want to return an error instead, i.e. a errors.AssertionFailedf
. The user isn't going to be able to do very much with a log message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick fix!
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @andyyang890 and @wenyihu6)
pkg/ccl/changefeedccl/changefeed_processors.go
line 644 at r1 (raw file):
Previously, andyyang890 (Andy Yang) wrote…
On second thought, since this is for catching developer error, I think you probably want to return an error instead, i.e. a
errors.AssertionFailedf
. The user isn't going to be able to do very much with a log message.
This seems like it's going to be very noisy, since this is based on cluster settings/options that aren't going to change often. But also the way we currently use this function this should never happen.
I'm with Andy, this should surface as an unexpected error.
pkg/ccl/changefeedccl/changefeed_test.go
line 7056 at r2 (raw file):
100 * time.Millisecond, } { t.Run(fmt.Sprintf("Disabled_FlushFrequency=%s", flushFrequency), func(t *testing.T) {
optional super minor nit: most tests in this file use lower case test names
I'm not sure if this is a common way to deal with changing return errors in a defer statement. Is there a cleaner way to handle this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @andyyang890 and @rharding6373)
pkg/ccl/changefeedccl/changefeed_test.go
line 7056 at r2 (raw file):
Previously, rharding6373 (Rachael Harding) wrote…
optional super minor nit: most tests in this file use lower case test names
Done.
10ee3e9
to
cf2b4b5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like it's really close!
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @andyyang890 and @wenyihu6)
pkg/ccl/changefeedccl/changefeed_processors.go
line 644 at r1 (raw file):
Previously, wenyihu6 (Wenyi Hu) wrote…
Hm it is a bit hard to return as an error since the function call is wrapped in a defer function. Do you see an easier way to do this?
You could do it with a named return value for the error in the parent function and errors.CombineErrors
. Here's one such example in the code base:
cockroach/pkg/workload/rand/rand.go
Line 182 in 4392d13
defer func() { retErr = errors.CombineErrors(retErr, rows.Close()) }() |
So this would look something like:
func nextFlushWithJitter(s timeutil.TimeSource, d time.Duration, j float64) (time.Time, error) { ... }
...
func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (err error) {
...
defer func() {
t, retErr := nextFlushWithJitter(...)
err = errors.CombineErrors(err, retErr) }()
...
Please try to add test coverage for this new code path (though it definitely seems like we should never hit it given current guardrails on the inputs).
pkg/ccl/changefeedccl/changefeed_processors.go
line 832 at r3 (raw file):
Previously, wenyihu6 (Wenyi Hu) wrote…
I'm not sure if this is a common way to deal with changing errors in a defer statement. Is there a cleaner way to handle this?
See other comment.
pkg/ccl/changefeedccl/changefeed_test.go
line 7056 at r2 (raw file):
Previously, wenyihu6 (Wenyi Hu) wrote…
Done.
Nice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @andyyang890)
pkg/ccl/changefeedccl/changefeed_processors.go
line 644 at r1 (raw file):
Previously, rharding6373 (Rachael Harding) wrote…
You could do it with a named return value for the error in the parent function and
errors.CombineErrors
. Here's one such example in the code base:cockroach/pkg/workload/rand/rand.go
Line 182 in 4392d13
defer func() { retErr = errors.CombineErrors(retErr, rows.Close()) }() So this would look something like:
func nextFlushWithJitter(s timeutil.TimeSource, d time.Duration, j float64) (time.Time, error) { ... } ... func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (err error) { ... defer func() { t, retErr := nextFlushWithJitter(...) err = errors.CombineErrors(err, retErr) }() ...
Please try to add test coverage for this new code path (though it definitely seems like we should never hit it given current guardrails on the inputs).
Done. I added a few more test cases for error. Are there any other test cases you would want me to add? On a side note, is this how we typically handle developer errors? I have a slight preference for using log.Fatal since we should never reach this state, and error handling here makes the code less clean. But I also understand that this is not a serious issue and we may want to be more defensive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @andyyang890 and @wenyihu6)
pkg/ccl/changefeedccl/changefeed_processors.go
line 644 at r1 (raw file):
Previously, wenyihu6 (Wenyi Hu) wrote…
Done. I added a few more test cases for error. Are there any other test cases you would want me to add? On a side note, is this how we typically handle developer errors? I have a slight preference for using log.Fatal since we should never reach this state, and error handling here makes the code less clean. But I also understand that this is not a serious issue and we may want to be more defensive.
Ideally when the user provides a bad input, we provide immediate feedback that their operation failed and why. I think we already do that for the user-provided input that is ultimately used in all production calls to this code.
We should generally try to handle errors gracefully and not with panics. The exception is that if you like you can add a test-only runtime assertion here, following a pattern like those described here: #94986
pkg/ccl/changefeedccl/changefeed_test.go
line 7138 at r4 (raw file):
if tc.expectedErr { require.Error(t, err) }
else { require.NoError }
Previously, we introduced the cluster setting changefeed.aggregator.flush_jitter to apply some jitter to the aggregator flush frequency. This was done to reduce the load on the coordinator by spreading out aggregator flush times. However, that PR did not account for the case where min_checkpoint_frequency is set to zero, which could cause panics due to rand.Int63(0). This patch fixes the issue by adding a check for when the frequency is zero, ensuring that jitter is not applied in such cases. Fixes: cockroachdb#125312 Release note (bug fix): Fixed a bug in v24.1, v23.2, and v23.1 where using changefeed.aggregator.flush_jitter with min_checkpoint_frequency set to zero could cause panics.
Thanks for the quick review! bors r=rharding6373 |
Encountered an error creating backports. Some common things that can go wrong:
You might need to create your backport manually using the backport tool. error creating merge commit from f28b826 to blathers/backport-release-23.1-125317: POST https://api.github.com/repos/cockroachdb/cockroach/merges: 409 Merge conflict [] you may need to manually resolve merge conflicts with the backport tool. Backport to branch 23.1.x failed. See errors above. error creating merge commit from f28b826 to blathers/backport-release-23.2-125317: POST https://api.github.com/repos/cockroachdb/cockroach/merges: 409 Merge conflict [] you may need to manually resolve merge conflicts with the backport tool. Backport to branch 23.2.x failed. See errors above. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
Previously, we introduced the cluster setting changefeed.aggregator.flush_jitter
to apply some jitter to the aggregator flush frequency. This was done to reduce
the load on the coordinator by spreading out aggregator flush times. However,
that PR did not account for the case where min_checkpoint_frequency is set to
zero, which could cause panics due to rand.Int63(0). This patch fixes the issue
by adding a check for when the frequency is zero, ensuring that jitter is not
applied in such cases.
Fixes: #125312
Release note (bug fix): Fixed a bug in v24.1, v23.2, and v23.1 where using
changefeed.aggregator.flush_jitter with min_checkpoint_frequency set to zero
could cause panics.