-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: #242 Explicit terminal and retry exceptions for cleaner logging and poison pills #291
Conversation
...src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
Outdated
Show resolved
Hide resolved
...src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
Outdated
Show resolved
Hide resolved
#268 ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, I'm happy to approve.
log.error("Exception caught in user function running stage, registering WC as failed, returning to mailbox", e); | ||
String msg = "Exception caught in user function running stage, registering WC as failed, returning to mailbox"; | ||
if (e instanceof RetriableException) { | ||
log.debug("Explicit " + RetriableException.class.getSimpleName() + " caught, logging at DEBUG only. " + msg, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.debug("Explicit " + RetriableException.class.getSimpleName() + " caught, logging at DEBUG only. " + msg, e); | |
log.debug("Explicit {} caught - {}", RetriableException.class.getSimpleName(), msg, e); |
ParallelConsumerOptions.class.getSimpleName(), | ||
context); | ||
|
||
closeDontDrainFirst(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it shutdown gracefully in this case? i.e. commit all that succeeded by this point, maybe should give some time for inflight processes as well (if it doesnt already) ? - to reduce possible duplicates on restart / rebalance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, it is graceful - closeDontDrainFirst
will commit everything that's done first - drain isn't graceful, it means process everything in buffers.
Inflight - good point, not 100% will need to double check.
} catch (Exception e) { | ||
log.error("Unknown internal error handling user function dispatch, terminating"); | ||
|
||
closeDontDrainFirst(); | ||
|
||
// throw again to make the future failed | ||
throw e; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is current behaviour for user function exceptions ? is it a breaking change in behaviour?
I think i am missing the flow here - wouldn't this just kill PC and not retry the message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
current = retry :)
wouldn't this just kill PC and not retry the message?
yeah there was a bug - check latest version?
Consider splitting the partial batch failure support into a seperate task. |
…ption # Conflicts: # parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java # parallel-consumer-core/src/test/resources/logback-test.xml
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...
*/ | ||
|
||
/** | ||
* todo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs
/** | ||
* @see UserFunctionRunner | ||
*/ | ||
class UserFunctionRunnerTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finish test
@@ -27,8 +27,8 @@ | |||
</root> | |||
|
|||
<!-- primary --> | |||
<logger name="io.confluent.parallelconsumer" level="info"/> | |||
<!-- <logger name="io.confluent.parallelconsumer" level="debug"/>--> | |||
<logger name="io.confluent.parallelconsumer" level="debug"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reset
/** | ||
* A user's processing function can throw this exception, which signals to PC that processing of the message has failed, | ||
* and that it should be retired at a later time. | ||
* <p> | ||
* The advantage of throwing this exception explicitly, is that PC will not log an ERROR. If any other type of exception | ||
* is thrown by the user's function, that will be logged as an error (but will still be retried later). | ||
* <p> | ||
* So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be | ||
* logged as an error. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update java doc
|
||
@AllArgsConstructor | ||
@Slf4j | ||
public class UserFunctionRunner<K, V> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic seems ok.
Not really sure about the new class and abstraction / concern separation - seems most of handling is calling back to PC - so the class seems like a utility / mixin type of class.
Not sure how it could be refactored better but now it seems adding more spaghetti-ness then making things cleaner - flow became PC-> UserFunctionRunner.runUserFunction -> PC with backward reference - that feels like a smell...
Would messaging / handling of execution result be part of the message based eventing / shared nothing refactor?
Alternatively could the success / failure notifications be moved back to PC and passed as callbacks?
Basically i think it would benefit from a bit of mapping of flows and rethinking of cleaner architecture / class split.
} catch (PCUserException e) { | ||
// throw again to make the future failed | ||
throw e; | ||
} catch (Exception e) { | ||
log.error("Unknown internal error handling user function dispatch, terminating"); | ||
|
||
pc.closeDontDrainFirst(); | ||
|
||
// throw again to make the future failed | ||
throw e; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not too keen on this - you are still killing PC on general Exception thrown from userFunction - effectively making default behaviour for non-specified exceptions a worst case one ( Terminal with Shutdown behaviour).
Did you mean to re-throw Exception as PCUserException on line 88?
Do you need a general Exception catch here at all - only other place that could throw is
Line 47 boolean workIsStale = pc.getWm().checkIfWorkIsStale(workContainerBatch);
could it handle exception inside that call instead - to leave here handling of userFunction call only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be optional behaviour - I thought I had implemented that in this PR, but might be somewhere else. Either way, it should do like KS does - and have this be configurable.
I'll also probably copy their default behaviour.
/** | ||
* Run the supplied function. | ||
*/ | ||
protected <R> List<Tuple<ConsumerRecord<K, V>, R>> runUserFunction(Function<PollContextInternal<K, V>, List<R>> usersFunction, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be better to change it to be a bit more readable?
ie. this would become something along the lines of
execute(){
try{
doDebugLogging(...);
skipOnStaleWork(...);
PollContextInternal<K, V> context = new PollContextInternal<>(workContainerBatch);
runWithUserExceptions(...)
}catch(...){
...
}
}
If exception from stale work check is specific or handled in that call itself - can even get rid of inner runWithUserExceptions method and pull exception handling up here. esp given its the main purpose of this class...
Closing - Stale. |
Hi @astubbs Is a released date is planned ? Thanks |
@nizarsalhaji94 hi sorry I am no longer involved with this project since February 2023. Try asking @eddyv |
Hi @eddyv Is a released date is planned ? Thanks |
@nizarsalhaji94 Just in general - i haven't seen your comment here until now - in the future - it would be more visible if you raise it as a question / issue with link to the PR if its relevant. |
Related:
Supersedes:
Related branches: