Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that BatchingSqlJournal will propagate sql connection opening failure #3754

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Akka.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpKeepExistingMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpRenamePlacementToArrangementMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpUseContinuousIndentInsideBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAddAccessorOwnerDeclarationBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002ECSharpPlaceAttributeOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,18 @@ public abstract class BatchingSqlJournal<TConnection, TCommand> : WriteJournalBa
where TCommand : DbCommand
{
#region internal classes

private sealed class ChunkExecutionFailure : IDeadLetterSuppression
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

{
public Exception Cause { get; }
public IJournalRequest[] Requests { get; }

public ChunkExecutionFailure(Exception cause, IJournalRequest[] requests)
{
Cause = cause;
Requests = requests;
}
}

private sealed class BatchComplete
{
Expand Down Expand Up @@ -658,10 +670,36 @@ protected sealed override bool Receive(object message)
else if (message is Terminated) RemoveSubscriber(((Terminated)message).ActorRef);
else if (message is GetCurrentPersistenceIds) InitializePersistenceIds();
else if (message is CurrentPersistenceIds) SendCurrentPersistenceIds((CurrentPersistenceIds)message);
else if (message is ChunkExecutionFailure) FailChunkExecution((ChunkExecutionFailure)message);
else return false;
return true;
}

private void FailChunkExecution(ChunkExecutionFailure message)
{
var cause = message.Cause;
Log.Error(cause, "Failed to execute chunk for {0} requests", message.Requests.Length);

foreach (var req in message.Requests)
{
switch (req)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

{
case WriteMessages write:
write.PersistentActor.Tell(new WriteMessagesFailed(cause));
break;
case ReplayMessages replay:
replay.PersistentActor.Tell(new ReplayMessagesFailure(cause));
break;
case DeleteMessagesTo delete:
delete.PersistentActor.Tell(new DeleteMessagesFailure(cause, delete.ToSequenceNr));
break;
case ReplayTaggedMessages replayTagged:
replayTagged.ReplyTo.Tell(new ReplayMessagesFailure(cause));
break;
}
}
}

private void SendCurrentPersistenceIds(CurrentPersistenceIds message)
{
foreach (var persistenceId in message.AllPersistenceIds)
Expand Down Expand Up @@ -837,7 +875,8 @@ private void TryProcess()

var chunk = DequeueChunk(_remainingOperations);
var context = Context;
_circuitBreaker.WithCircuitBreaker(() => ExecuteChunk(chunk, context)).PipeTo(Self);
_circuitBreaker.WithCircuitBreaker(() => ExecuteChunk(chunk, context))
Copy link
Member

@ismaelhamed ismaelhamed Apr 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Horusiath Halfway there, but still when the circuit breaker throws an OpenCircuitException the restarts won't happen again. Wrapping this piece in a try...catch... and self issuing a new ChunkExecutionFailure(ex, chunk.Requests) seems to do the trick.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying that OpenCircuitException is not propagated to PipeTo?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PipeTo will work

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless @ismaelhamed means that the _circuitBreaker itself throws instead of injecting the exception into its Task, in which case his feedback is correct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the _circuitbreaker is throwing instead of propagating to PipeTo, hence why I had to wrap in a try...catch... that call to _circuitBreaker.WithCircuitBreaker() in order for it to work.

I´ll do some more tests tomorrow to confirm it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the _circuitbreaker is throwing instead of propagating to PipeTo, hence why I had to wrap in a try...catch... that call to _circuitBreaker.WithCircuitBreaker() in order for it to work.

Not to get side-tracked, but if that's the case then isn't that a bug with the way the CircuitBreaker is designed?

Copy link
Contributor Author

@Horusiath Horusiath Apr 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated CircuitBreaker implementation to raise exceptions within returned Tasks (simply by marking corresponding methods as async).

.PipeTo(Self, failure: ex => new ChunkExecutionFailure(ex, chunk.Requests));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//-----------------------------------------------------------------------
// <copyright file="SqliteJournalConnectionFailureSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2018 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Configuration;
using Akka.Persistence.Sql.TestKit;
using Xunit.Abstractions;

namespace Akka.Persistence.Sqlite.Tests.Batching
{
public class BatchingSqliteJournalConnectionFailureSpec : SqlJournalConnectionFailureSpec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

{
public BatchingSqliteJournalConnectionFailureSpec(ITestOutputHelper output)
: base(CreateSpecConfig(DefaultInvalidConnectionString), output)
{
}

private static Config CreateSpecConfig(string connectionString)
{
return ConfigurationFactory.ParseString(@"
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.sqlite""
sqlite {
class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = """ + connectionString + @"""
}
}
}");
}
}
}
12 changes: 6 additions & 6 deletions src/core/Akka/Pattern/CircuitBreakerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public Open(CircuitBreaker breaker)
/// <param name="body">N/A</param>
/// <exception cref="OpenCircuitException">This exception is thrown automatically since the circuit is open.</exception>
/// <returns>N/A</returns>
public override Task<T> Invoke<T>(Func<Task<T>> body)
public override async Task<T> Invoke<T>(Func<Task<T>> body)
{
throw new OpenCircuitException();
}
Expand All @@ -48,7 +48,7 @@ public override Task<T> Invoke<T>(Func<Task<T>> body)
/// <param name="body">N/A</param>
/// <exception cref="OpenCircuitException">This exception is thrown automatically since the circuit is open.</exception>
/// <returns>N/A</returns>
public override Task Invoke(Func<Task> body)
public override async Task Invoke(Func<Task> body)
{
throw new OpenCircuitException();
}
Expand Down Expand Up @@ -106,13 +106,13 @@ public HalfOpen(CircuitBreaker breaker)
/// <param name="body">Implementation of the call that needs protected</param>
/// <exception cref="OpenCircuitException">TBD</exception>
/// <returns><see cref="Task"/> containing result of protected call</returns>
public override Task<T> Invoke<T>(Func<Task<T>> body)
public override async Task<T> Invoke<T>(Func<Task<T>> body)
{
if (!_lock.CompareAndSet(true, false))
{
throw new OpenCircuitException();
}
return CallThrough(body);
return await CallThrough(body);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - we probably should have done this all along

}

/// <summary>
Expand All @@ -122,13 +122,13 @@ public override Task<T> Invoke<T>(Func<Task<T>> body)
/// <param name="body">Implementation of the call that needs protected</param>
/// <exception cref="OpenCircuitException">TBD</exception>
/// <returns><see cref="Task"/> containing result of protected call</returns>
public override Task Invoke(Func<Task> body)
public override async Task Invoke(Func<Task> body)
{
if (!_lock.CompareAndSet(true, false))
{
throw new OpenCircuitException();
}
return CallThrough(body);
await CallThrough(body);
}

/// <summary>
Expand Down