-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Pull-based Ingestion] Add error handling strategy to pull-based ingestion #17427
base: main
Are you sure you want to change the base?
[Pull-based Ingestion] Add error handling strategy to pull-based ingestion #17427
Conversation
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
❌ Gradle check result for dc4722f: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
private Map<String, Object> params; | ||
|
||
public IngestionSource(String type, String pointerInitReset, Map<String, Object> params) { | ||
public IngestionSource(String type, String pointerInitReset, String errorStrategy, Map<String, Object> params) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
errorStrategy can be enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do have an enum and convert to it later. But good point, will update it to the enum from the start.
@@ -188,6 +197,7 @@ protected void startPoll() { | |||
continue; | |||
} | |||
blockingQueue.put(result); | |||
lastSuccessfulPointer = result.getPointer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does lastSuccessfulPointer
need to be a class field? looks like it can be a var for this method startPoll
messageProcessor.process(result.getMessage(), result.getPointer()); | ||
} catch (Exception e) { | ||
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.PROCESSING); | ||
if (errorStrategy.shouldPauseIngestion(e, IngestionErrorStrategy.ErrorStage.PROCESSING)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
once paused, how to resume? seems we need to provide a way for users manipulate the offset to skip bad messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only required for the 'block' strategy, the 'drop' strategy skips the bad messages by default. However, for 'block', it might not be possible to fix the bad message in place so the user has to take a call. But I was thinking once the offset PR is merged, we can let the user modify offset and resume ingestion in case they later decide to skip past it. Let me add a todo here.
Description
This PR is a follow up for pull-based-ingestion to add error handling support. We introduce the following two strategies:
This PR adds the drop/block support along with required interfaces. A follow up PR will add metric emission and record the errors.
Related Issues
Resolves #17085
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.