-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36455] Sinks retry synchronously #25547
Conversation
1ec3860
to
4aa6d6d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making this change. I left a few comments, but I have two main concerns.
- I wonder whether we should leave the failed comittable tracker
- The PR misses changes to the committer interface e.g.
CommitRequest#signalFailedWithKnownReason
/** The number of committables that have not been successfully committed. */ | ||
private final int numberOfPendingCommittables; | ||
|
||
@Deprecated | ||
/** The number of committables that are not retried and have been failed. */ | ||
private final int numberOfFailedCommittables; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we deprecate failed committables? I still see some value to provide the possibility to discard unrecoverable committables. The change also looks unrelated to the retry mechanism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I probably should motivate that change in the commit message first. The short answer is: it doesn't work. Let's go to the long one.
So far, we increase this number on known issue and don't emit the respective committable. That means that the global committer would need to wait for all committables except the failed once and then commit. However, it always used to wait for all known committables running in an infinite loop.
The change of this PR only creates the summary of successful commits. Unknown errors cause a restart loop and known errors cause the committables to be dropped from the statistics. So the global committer waits for all committables of the summary and works now.
The alternative would be to still update the stastics as you propose and ignore the failed in the global committer. However, I wonder what the value of that is. If you'd like to keep it just to have less disruptions, I can revert the change and fix it.
However, the custom topologies that I have seen so far, also run into the same issue as the global committer (and now I think that the compactor has the same issues). I think that emitting the stats on the ignored committables ultimately just increases complexity without giving downstream operators any good handle whatsoever.
WDYT?
@@ -95,6 +94,7 @@ | |||
public class GlobalCommitterOperator<CommT, GlobalCommT> extends AbstractStreamOperator<Void> | |||
implements OneInputStreamOperator<CommittableMessage<CommT>, Void> { | |||
|
|||
private static final int MAX_RETRIES = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should make this a Flink config specific to the sink and allow users to opt-out with the failed committable mechanism.
Currently the number is duplicated in committer and global committer.
int subtaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); | ||
int numberOfSubtasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I know this wasn't introduced by this PR but why do we need to fetch the subtask id and number of tasks on every emit
call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the concern here? Peformance? JVM should inline the call to pretty much result into a field access.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly should looked strange.
4aa6d6d
to
f8fd060
Compare
Reverted the deprecation of numFailed and added a config option for the retries. PTAL @fapaul |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM now, can you also add a small note to the release notes that we introduce a new config
.intType() | ||
.defaultValue(10) | ||
.withDescription( | ||
"The number of retries on a committable (e.g., transaction) before Flink application fails and potentially restarts."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Should we mention here that it applies to CommitRequest#signalFailedWithUnknownReason
and not generic exceptions that are thrown during calling commit
on the committer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A good point. I fear that giving too much details is confusing (which end user knows CommitRequest?).
I'd rephrase to
The number of retries a Flink application attempts for committable operations (such as transactions) on retriable errors, as specified by the sink connector, before Flink fails and potentially restarts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds much better 👍
Collection<CommT> getSuccessfulCommittables(); | ||
|
||
int getNumFailed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add doc strings to the method
f8fd060
to
22899ad
Compare
Sinks so far retried asynchronously to increase commit throughput in case of temporary issues. However, the contract of notifyCheckpointCompleted states that checkpoints must be side-effect free meaning all transactions have to be committed on return of the PRC call. This commit retries a fixed number of times and then fails in notifyCheckpointCompleted. Note that sync retries significantly simplify the committable handling. This commit starts a few simplifications; the next commit clears up more.
We can only set the gauge once.
Without async parts of committable summary, number of pending committables will always be 0. Failed committables will also be 0 as they will throw an error if unexpected or not they are silently ignored. The previous behavior with them being >0 actually led to infinite loops in the global committer.
22899ad
to
64a5257
Compare
What is the purpose of the change
Sinks so far retried asynchronously to increase commit throughput in case of temporary issues. However, the contract of notifyCheckpointCompleted states that checkpoints must be side-effect free meaning all transactions have to be committed on return of the PRC call.
Brief change log
Verifying this change
api/connector/sink2
runtime/operators/sink/committables
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation