Skip to content

Commit

Permalink
BatchingSqlJournal now preserves Sender in PersistCallback (akkadotne…
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed authored and Aaronontheweb committed Jul 21, 2019
1 parent a7c6805 commit 7ce0bd5
Showing 1 changed file with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public sealed class CircuitBreakerSettings
/// Maximum number of failures that can happen before the circuit opens.
/// </summary>
public int MaxFailures { get; }

/// <summary>
/// Maximum time available for operation to execute before
/// <see cref="CircuitBreaker"/> considers it a failure.
Expand Down Expand Up @@ -229,7 +229,7 @@ public abstract class BatchingSqlJournalSetup
/// </summary>
public string DefaultSerializer { get; }

/// <summary>
/// <summary>
/// Initializes a new instance of the <see cref="BatchingSqlJournalSetup" /> class.
/// </summary>
/// <param name="config">The configuration used to configure the journal.</param>
Expand Down Expand Up @@ -346,12 +346,12 @@ protected BatchingSqlJournalSetup(string connectionString, int maxConcurrentOper
/// </summary>
/// <typeparam name="TConnection">A concrete implementation of <see cref="DbConnection"/> for targeted database provider.</typeparam>
/// <typeparam name="TCommand">A concrete implementation of <see cref="DbCommand"/> for targeted database provider.</typeparam>
public abstract class BatchingSqlJournal<TConnection, TCommand> : WriteJournalBase
public abstract class BatchingSqlJournal<TConnection, TCommand> : WriteJournalBase
where TConnection : DbConnection
where TCommand : DbCommand
{
#region internal classes

private sealed class ChunkExecutionFailure : IDeadLetterSuppression
{
public Exception Cause { get; }
Expand Down Expand Up @@ -458,7 +458,7 @@ public RequestChunk(int chunkId, IJournalRequest[] requests)
/// SQL statement executed as result of <see cref="WriteMessages"/> request to journal.
/// </summary>
protected virtual string InsertEventSql { get; }

/// <summary>
/// SQL query executed as result of <see cref="GetCurrentPersistenceIds"/> request to journal.
/// It's a part of persistence query protocol.
Expand Down Expand Up @@ -509,7 +509,7 @@ public RequestChunk(int chunkId, IJournalRequest[] requests)
/// <see cref="PersistenceIdAdded"/> messages.
/// </summary>
protected bool HasAllIdsSubscribers => _allIdsSubscribers.Count != 0;

/// <summary>
/// Flag determining if incoming journal requests should be published in current actor system event stream.
/// Useful mostly for tests.
Expand Down Expand Up @@ -862,7 +862,7 @@ protected virtual void OnBufferOverflow(IJournalMessage request)
}
else if (request is ReplayTaggedMessages)
{
var r = (ReplayTaggedMessages) request;
var r = (ReplayTaggedMessages)request;
r.ReplyTo.Tell(new ReplayMessagesFailure(JournalBufferOverflowException.Instance), ActorRefs.NoSender);
}
}
Expand All @@ -887,11 +887,11 @@ private async Task<BatchComplete> ExecuteChunk(RequestChunk chunk, IActorContext
using (var connection = CreateConnection(Setup.ConnectionString))
{
await connection.OpenAsync();

using (var tx = connection.BeginTransaction(Setup.IsolationLevel))
using (var command = (TCommand)connection.CreateCommand())
{
command.CommandTimeout = (int) Setup.ConnectionTimeout.TotalMilliseconds;
command.CommandTimeout = (int)Setup.ConnectionTimeout.TotalMilliseconds;
command.Transaction = tx;
try
{
Expand Down Expand Up @@ -1034,10 +1034,10 @@ protected virtual async Task HandleReplayMessages(ReplayMessages req, TCommand c
var replaySettings = Setup.ReplayFilterSettings;
var replyTo = replaySettings.IsEnabled
? context.ActorOf(ReplayFilter.Props(
persistentActor: req.PersistentActor,
mode: replaySettings.Mode,
persistentActor: req.PersistentActor,
mode: replaySettings.Mode,
windowSize: replaySettings.WindowSize,
maxOldWriters: replaySettings.MaxOldWriters,
maxOldWriters: replaySettings.MaxOldWriters,
debugEnabled: replaySettings.IsDebug))
: req.PersistentActor;
var persistenceId = req.PersistenceId;
Expand Down Expand Up @@ -1086,15 +1086,15 @@ protected virtual async Task HandleReplayMessages(ReplayMessages req, TCommand c
private async Task HandleWriteMessages(WriteMessages req, TCommand command)
{
IJournalResponse summary = null;
var responses = new List<IJournalResponse>();
var responses = new List<Tuple<IJournalResponse, IActorRef>>();
var tags = new HashSet<string>();
var persistenceIds = new HashSet<string>();
var actorInstanceId = req.ActorInstanceId;

try
{
command.CommandText = InsertEventSql;

var tagBuilder = new StringBuilder(16); // magic number

foreach (var envelope in req.Messages)
Expand All @@ -1113,7 +1113,7 @@ private async Task HandleWriteMessages(WriteMessages req, TCommand command)
var persistent = AdaptToJournal(unadapted);
if (persistent.Payload is Tagged)
{
var tagged = (Tagged) persistent.Payload;
var tagged = (Tagged)persistent.Payload;
if (tagged.Tags.Count != 0)
{
tagBuilder.Append(';');
Expand All @@ -1130,7 +1130,7 @@ private async Task HandleWriteMessages(WriteMessages req, TCommand command)

await command.ExecuteNonQueryAsync();

var response = new WriteMessageSuccess(unadapted, actorInstanceId);
var response = Tuple.Create<IJournalResponse, IActorRef>(new WriteMessageSuccess(unadapted, actorInstanceId), unadapted.Sender);
responses.Add(response);
persistenceIds.Add(persistent.PersistenceId);

Expand All @@ -1140,23 +1140,23 @@ private async Task HandleWriteMessages(WriteMessages req, TCommand command)
{
// database-related exceptions should result in failure
summary = new WriteMessagesFailed(cause);
var response = new WriteMessageFailure(unadapted, cause, actorInstanceId);
var response = Tuple.Create<IJournalResponse, IActorRef>(new WriteMessageFailure(unadapted, cause, actorInstanceId), unadapted.Sender);
responses.Add(response);
}
catch (Exception cause)
{
//TODO: this scope wraps atomic write. Atomic writes have all-or-nothing commits.
// so we should revert transaction here. But we need to check how this affect performance.

var response = new WriteMessageRejected(unadapted, cause, actorInstanceId);
var response = Tuple.Create<IJournalResponse, IActorRef>(new WriteMessageRejected(unadapted, cause, actorInstanceId), unadapted.Sender);
responses.Add(response);
}
}
}
else
{
//TODO: other cases?
var response = new LoopMessageSuccess(envelope.Payload, actorInstanceId);
var response = Tuple.Create<IJournalResponse, IActorRef>(new LoopMessageSuccess(envelope.Payload, actorInstanceId), envelope.Sender);
responses.Add(response);
}
}
Expand Down Expand Up @@ -1187,12 +1187,12 @@ private async Task HandleWriteMessages(WriteMessages req, TCommand command)
var aref = req.PersistentActor;

aref.Tell(summary);
foreach (var response in responses)
foreach (var r in responses)
{
aref.Tell(response);
aref.Tell(r.Item1, r.Item2);
}
}

/// <summary>
/// Perform write of persistent event with specified <paramref name="tags"/>
/// into database using given <paramref name="command"/>.
Expand Down

0 comments on commit 7ce0bd5

Please sign in to comment.