-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix BatchingSqlJournal emitting WriteMessageSuccess before transaction was complete. #4953
Fix BatchingSqlJournal emitting WriteMessageSuccess before transaction was complete. #4953
Conversation
…s before chunk execution
var array = new IJournalRequest[operationsCount]; | ||
for (int i = 0; i < operationsCount; i++) | ||
// Need a lock here to ensure that buffer doesn't change during this operation | ||
lock (_lock) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the very janky code to split the queued requests into either a read request chunk or write request chunk.
Explanation on why it is coded in this peculiar way is written in the method comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not modify Akka.Persistence API surface area to save the BatchingSqlJournal
.
/// <summary> | ||
/// Marker for batched write operations | ||
/// </summary> | ||
public interface IJournalWrite : IJournalRequest { } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this and try to avoid making the BatchingJournal
's problem all of Akka.Persistence's problem.
This reverts commit 002a4d8.
…saction, possible database deadlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall but left some questions for @Arkatufus
{ | ||
if (cause is DbException) | ||
{ | ||
// database-related exceptions should result in failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
{ | ||
var id = _bufferIdCounter.GetAndIncrement(); | ||
// Enqueue writes and delete operation requests into the write queue, | ||
// else if they are query operations, enqueue them into the read queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
for (int i = 0; i < operationsCount; i++) | ||
var currentBuffer = _buffers | ||
.Where(q => q.Count > 0) | ||
.OrderBy(q => q.Peek().id).First(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the buffer already sorted this way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a way to pick the buffer with the lowest id in its first item. We're trying to make sure that request order are roughly executed in order, if it was a choice between a read and a write.
@@ -1388,29 +1389,111 @@ protected void AddParameter(TCommand command, string paramName, DbType dbType, o | |||
/// <param name="param">Parameter to customize</param> | |||
protected virtual void PreAddParameterToCommand(TCommand command, DbParameter param) { } | |||
|
|||
/// <summary> | |||
/// Select the buffer that has the smallest id on its first item, retrieve a maximum Setup.MaxBatchSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this indicates the "oldest" outstanding operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is "roughly first in, first out". Question is, how sensitive is journal request ordering between read and write operations, are they strict or can operations happen in asynchrony?
If they're tightly coupled, I'd have to change this logic so that only consecutive requests of the same operations are allowed to be batched, ie. all queries, all writes, and all deletes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a big question that needs to be addressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, I know that persistence-jdbc makes a point of blocking ReadHighestSequenceNrAsync
for a given PersistenceId (example in ported code.
I think this is done, because the first step of Recovery in AsyncWriteJournal
is to read the max sequence number from which to replay. This way, if somehow the PersistentActor crashes before a PersistAsync
write completes, Recovery sequence will still read up to the correct sequence number.
Edit for clarity: In short, ReadHighestSequenceNrAsync
calls for a given PersistenceId need to be queued until any pending Writes for that PersistenceId are completed, but other operations you shouldn't need to worry as much about ordering. Otherwise you have edge cases like the one I pointed out above around PersistAsync
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, to make things as safe as possible, i guess i should only batch consecutive similar commands, unless it is a ReadHighestSequenceNr
, in which i should stop. This is turning into an NotBatchedSqlJournal
real fast...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, all query types should be safe to be batched together as long s they're consecutive, right. The danger comes when it is interleaved with writes, and for write operations, consecutive writes and consecutive deletes are fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After pondering it, I think we're probably Safe since BatchingSqlJournal is not from AsyncJournalBase and therefore doing recovery differently.
At -worst-, I think we would see a scenario where after recovery, the actor crashes b/c it didn't pick up the right sequence number. In that at worst, I can't imagine it would be much more than restarting the actor (or at worst the whole system.)
That's still a better worst case than with current state of BatchingSqlJournal (where writes are being confirmed when they may still fail.) Under heavy loads I would see torn write behavior (i.e. missing sequence numbers in DB) basically corrupting the journal state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@to11mtm do you approve this set of changes then?
|
||
return new RequestChunk(chunkId, array); | ||
return new RequestChunk(chunkId, operations.ToArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just pass the List<>
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RequestChunk is a struct, I'm not sure if there is a specific reason why it is using an array and not a list, I assumed that it is for optimization reason, so I didn't change it.
cc @ismaelhamed |
{ | ||
cause = e; | ||
tx.Rollback(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: .Rollback()
may throw. I'm not certain whether we're worried about losing the original exception in that case...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would probably be a good idea to include it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a try...catch around the rollback and throw an aggregated exception of it and the original exception
LGTM. This was trickier than I thought, nice job! Would a I'm not concerned with @Arkatufus coding at all ;) but, given that this has deviated from the original implementation and that the original author was not involved to pick his brain in some of the issues that came up, it may be more sensible to do so. |
|
||
foreach (var req in message.Requests) | ||
Log.Error(cause, "An error occurred during event batch processing (chunkId: {0})", message.ChunkId); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd keep the number of requests (i.e., (chunkId: {0}) of {1} requests
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Yep, we can do that @ismaelhamed - I can make those into beta releases |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Should also fix #4265, but would need further field testing.