-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure that BatchingSqlJournal will propagate sql connection opening failure #3754
Ensure that BatchingSqlJournal will propagate sql connection opening failure #3754
Conversation
@@ -837,7 +875,8 @@ private void TryProcess() | |||
|
|||
var chunk = DequeueChunk(_remainingOperations); | |||
var context = Context; | |||
_circuitBreaker.WithCircuitBreaker(() => ExecuteChunk(chunk, context)).PipeTo(Self); | |||
_circuitBreaker.WithCircuitBreaker(() => ExecuteChunk(chunk, context)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Horusiath Halfway there, but still when the circuit breaker throws an OpenCircuitException
the restarts won't happen again. Wrapping this piece in a try...catch... and self issuing a new ChunkExecutionFailure(ex, chunk.Requests)
seems to do the trick.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you saying that OpenCircuitException
is not propagated to PipeTo
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PipeTo
will work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless @ismaelhamed means that the _circuitBreaker
itself throws instead of injecting the exception into its Task
, in which case his feedback is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the _circuitbreaker
is throwing instead of propagating to PipeTo
, hence why I had to wrap in a try...catch... that call to _circuitBreaker.WithCircuitBreaker()
in order for it to work.
I´ll do some more tests tomorrow to confirm it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the _circuitbreaker is throwing instead of propagating to PipeTo, hence why I had to wrap in a try...catch... that call to _circuitBreaker.WithCircuitBreaker() in order for it to work.
Not to get side-tracked, but if that's the case then isn't that a bug with the way the CircuitBreaker
is designed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated CircuitBreaker
implementation to raise exceptions within returned Task
s (simply by marking corresponding methods as async).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work @Horusiath
{ | ||
if (!_lock.CompareAndSet(true, false)) | ||
{ | ||
throw new OpenCircuitException(); | ||
} | ||
return CallThrough(body); | ||
return await CallThrough(body); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - we probably should have done this all along
|
||
namespace Akka.Persistence.Sqlite.Tests.Batching | ||
{ | ||
public class BatchingSqliteJournalConnectionFailureSpec : SqlJournalConnectionFailureSpec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
||
foreach (var req in message.Requests) | ||
{ | ||
switch (req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -351,6 +351,18 @@ public abstract class BatchingSqlJournal<TConnection, TCommand> : WriteJournalBa | |||
where TCommand : DbCommand | |||
{ | |||
#region internal classes | |||
|
|||
private sealed class ChunkExecutionFailure : IDeadLetterSuppression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@Horusiath thanks, working like a charm now. |
…failure (akkadotnet#3754) * ensure that BatchingSqlJournal will propagate sql connection opening failure * fixed CircuitBreaker behavior
Should fix #3753.
I think, that the issue lies in combination of ExecuteChunk which doesn't have guard around
connection.OpenAsync
(it was sending failures to subscribers only when the SQL statements themselves were failing) and ExecuteChunk method call (which send failure asStatus.Failure
toSelf
but was never handled).This PR solves this case by adding custom
ChunkExecutionFailure
message that is used to catch those cases and inform awaiting requesters about failure.