Skip to content

Commit

Permalink
use Async api whenver possible
Browse files Browse the repository at this point in the history
  • Loading branch information
valdisz committed Aug 13, 2019
1 parent 8e0f81e commit e32dfff
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 120 deletions.
24 changes: 18 additions & 6 deletions src/core/Akka.Persistence.TestKit.Tests/TestJournalSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,36 @@ public void must_return_ack_after_new_write_interceptor_is_set()
}

[Fact]
public void works_as_memory_journal_by_default()
public async Task works_as_memory_journal_by_default()
{
var actor = Sys.ActorOf<PersistActor>();

// should pass
Journal.OnWrite.Pass();
await Journal.OnWrite.Pass();
actor.Tell("write", TestActor);

ExpectMsg("ack", TimeSpan.FromSeconds(3));
}

[Fact]
public void when_fail_on_write_is_set_all_writes_to_journal_will_fail()
public async Task when_fail_on_write_is_set_all_writes_to_journal_will_fail()
{
var actor = Sys.ActorOf<PersistActor>();
Watch(actor);

Journal.OnWrite.Fail();
await Journal.OnWrite.Fail();
actor.Tell("write", TestActor);

ExpectTerminated(actor, TimeSpan.FromSeconds(3));
}

[Fact]
public void when_reject_on_write_is_set_all_writes_to_journal_will_be_rejected()
public async Task when_reject_on_write_is_set_all_writes_to_journal_will_be_rejected()
{
var actor = Sys.ActorOf<PersistActor>();
Watch(actor);

Journal.OnWrite.Reject();
await Journal.OnWrite.Reject();
actor.Tell("write", TestActor);

ExpectMsg("rejected", TimeSpan.FromSeconds(3));
Expand All @@ -66,7 +66,19 @@ public async Task during_recovery_by_setting_fail_will_cause_recovery_failure()

ExpectTerminated(actor, TimeSpan.FromSeconds(3));
});
}

[Fact]
public async Task new_api()
{
await WithJournalWrite(write => write.Fail(), () =>
{
var actor = Sys.ActorOf<PersistActor>();
Watch(actor);

actor.Tell("write", TestActor);
ExpectTerminated(actor, TimeSpan.FromSeconds(3));
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,12 @@
<NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.6' ">1.6.1</NetStandardImplicitPackageVersion>
</PropertyGroup>

<ItemGroup>
<None Remove="test-journal.conf" />
</ItemGroup>

<ItemGroup>
<EmbeddedResource Include="test-journal.conf" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\contrib\testkits\Akka.TestKit.Xunit\Akka.TestKit.Xunit.csproj" />
<ProjectReference Include="..\..\contrib\testkits\Akka.TestKit.Xunit2\Akka.TestKit.Xunit2.csproj" />
<ProjectReference Include="..\Akka.Persistence\Akka.Persistence.csproj" />
<ProjectReference Include="..\Akka.TestKit\Akka.TestKit.csproj" />
</ItemGroup>
Expand Down
15 changes: 8 additions & 7 deletions src/core/Akka.Persistence.TestKit/IJournalBehaviorSetter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
namespace Akka.Persistence.TestKit
{
using System;
using System.Threading.Tasks;
using Actor;

public interface IJournalBehaviorSetter
{
void SetInterceptor(IJournalInterceptor interceptor);
Task SetInterceptorAsync(IJournalInterceptor interceptor);
}

internal class JournalWriteBehaviorSetter : IJournalBehaviorSetter
Expand All @@ -24,11 +25,11 @@ internal JournalWriteBehaviorSetter(IActorRef journal)

private readonly IActorRef _journal;

public void SetInterceptor(IJournalInterceptor interceptor)
=> _journal.Ask(
public Task SetInterceptorAsync(IJournalInterceptor interceptor)
=> _journal.Ask<TestJournal.Ack>(
new TestJournal.UseWriteInterceptor(interceptor),
TimeSpan.FromSeconds(3)
).Wait();
);
}

internal class JournalRecoveryBehaviorSetter : IJournalBehaviorSetter
Expand All @@ -40,10 +41,10 @@ internal JournalRecoveryBehaviorSetter(IActorRef journal)

private readonly IActorRef _journal;

public void SetInterceptor(IJournalInterceptor interceptor)
=> _journal.Ask(
public Task SetInterceptorAsync(IJournalInterceptor interceptor)
=> _journal.Ask<TestJournal.Ack>(
new TestJournal.UseRecoveryInterceptor(interceptor),
TimeSpan.FromSeconds(3)
).Wait();
);
}
}
58 changes: 29 additions & 29 deletions src/core/Akka.Persistence.TestKit/JournalRecoveryBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,85 +19,85 @@ internal JournalRecoveryBehavior(IJournalBehaviorSetter setter)

protected readonly IJournalBehaviorSetter Setter;

protected void SetInterceptor(IJournalInterceptor interceptor) => Setter.SetInterceptor(interceptor);
public Task SetInterceptorAsync(IJournalInterceptor interceptor) => Setter.SetInterceptorAsync(interceptor);

public void Pass() => SetInterceptor(JournalInterceptors.Noop.Instance);
public Task Pass() => SetInterceptorAsync(JournalInterceptors.Noop.Instance);

public void PassWithDelay(TimeSpan delay)
public Task PassWithDelay(TimeSpan delay)
{
if (delay <= TimeSpan.Zero)
{
throw new ArgumentException("Delay must be greater than zero", nameof(delay));
}

SetInterceptor(new JournalInterceptors.Delay(delay, JournalInterceptors.Noop.Instance));
return SetInterceptorAsync(new JournalInterceptors.Delay(delay, JournalInterceptors.Noop.Instance));
}

public void Fail() => SetInterceptor(JournalInterceptors.Failure.Instance);
public Task Fail() => SetInterceptorAsync(JournalInterceptors.Failure.Instance);

public void FailOnType<TMessage>() => FailOnType(typeof(TMessage));
public Task FailOnType<TMessage>() => FailOnType(typeof(TMessage));

public void FailOnType(Type messageType)
public Task FailOnType(Type messageType)
{
if (messageType is null)
{
throw new ArgumentNullException(nameof(messageType));
}

SetInterceptor(new JournalInterceptors.OnType(messageType, JournalInterceptors.Failure.Instance));
return SetInterceptorAsync(new JournalInterceptors.OnType(messageType, JournalInterceptors.Failure.Instance));
}

public void FailIf(Func<IPersistentRepresentation, bool> predicate)
public Task FailIf(Func<IPersistentRepresentation, bool> predicate)
{
if (predicate is null)
{
throw new ArgumentNullException(nameof(predicate));
}

SetInterceptor(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance));
return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance));
}

public void FailIf(Func<IPersistentRepresentation, Task<bool>> predicate)
public Task FailIf(Func<IPersistentRepresentation, Task<bool>> predicate)
{
if (predicate is null)
{
throw new ArgumentNullException(nameof(predicate));
}

SetInterceptor(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance));
return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance));
}

public void FailUnless(Func<IPersistentRepresentation, bool> predicate)
public Task FailUnless(Func<IPersistentRepresentation, bool> predicate)
{
if (predicate is null)
{
throw new ArgumentNullException(nameof(predicate));
}

SetInterceptor(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance, negate: true));
return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance, negate: true));
}

public void FailUnless(Func<IPersistentRepresentation, Task<bool>> predicate)
public Task FailUnless(Func<IPersistentRepresentation, Task<bool>> predicate)
{
if (predicate is null)
{
throw new ArgumentNullException(nameof(predicate));
}

SetInterceptor(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance, negate: true));
return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance, negate: true));
}

public void FailWithDelay(TimeSpan delay)
public Task FailWithDelay(TimeSpan delay)
{
if (delay <= TimeSpan.Zero)
{
throw new ArgumentException("Delay must be greater than zero", nameof(delay));
}

SetInterceptor(new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance));
return SetInterceptorAsync(new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance));
}

public void FailIfWithDelay(TimeSpan delay, Func<IPersistentRepresentation, Task<bool>> predicate)
public Task FailIfWithDelay(TimeSpan delay, Func<IPersistentRepresentation, Task<bool>> predicate)
{
if (delay <= TimeSpan.Zero)
{
Expand All @@ -109,13 +109,13 @@ public void FailIfWithDelay(TimeSpan delay, Func<IPersistentRepresentation, Task
throw new ArgumentNullException(nameof(predicate));
}

SetInterceptor(new JournalInterceptors.OnCondition(
return SetInterceptorAsync(new JournalInterceptors.OnCondition(
predicate,
new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance)
));
}

public void FailIfWithDelay(TimeSpan delay, Func<IPersistentRepresentation, bool> predicate)
public Task FailIfWithDelay(TimeSpan delay, Func<IPersistentRepresentation, bool> predicate)
{
if (delay <= TimeSpan.Zero)
{
Expand All @@ -127,13 +127,13 @@ public void FailIfWithDelay(TimeSpan delay, Func<IPersistentRepresentation, bool
throw new ArgumentNullException(nameof(predicate));
}

SetInterceptor(new JournalInterceptors.OnCondition(
return SetInterceptorAsync(new JournalInterceptors.OnCondition(
predicate,
new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance)
));
}

public void FailUnlessWithDelay(TimeSpan delay, Func<IPersistentRepresentation, bool> predicate)
public Task FailUnlessWithDelay(TimeSpan delay, Func<IPersistentRepresentation, bool> predicate)
{
if (delay <= TimeSpan.Zero)
{
Expand All @@ -145,14 +145,14 @@ public void FailUnlessWithDelay(TimeSpan delay, Func<IPersistentRepresentation,
throw new ArgumentNullException(nameof(predicate));
}

SetInterceptor(new JournalInterceptors.OnCondition(
return SetInterceptorAsync(new JournalInterceptors.OnCondition(
predicate,
new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance),
negate: true
));
}

public void FailUnlessWithDelay(TimeSpan delay, Func<IPersistentRepresentation, Task<bool>> predicate)
public Task FailUnlessWithDelay(TimeSpan delay, Func<IPersistentRepresentation, Task<bool>> predicate)
{
if (delay <= TimeSpan.Zero)
{
Expand All @@ -164,16 +164,16 @@ public void FailUnlessWithDelay(TimeSpan delay, Func<IPersistentRepresentation,
throw new ArgumentNullException(nameof(predicate));
}

SetInterceptor(new JournalInterceptors.OnCondition(
return SetInterceptorAsync(new JournalInterceptors.OnCondition(
predicate,
new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance),
negate: true
));
}

public void FailOnTypeWithDelay<TMessage>(TimeSpan delay) => FailOnTypeWithDelay(delay, typeof(TMessage));
public Task FailOnTypeWithDelay<TMessage>(TimeSpan delay) => FailOnTypeWithDelay(delay, typeof(TMessage));

public void FailOnTypeWithDelay(TimeSpan delay, Type messageType)
public Task FailOnTypeWithDelay(TimeSpan delay, Type messageType)
{
if (delay <= TimeSpan.Zero)
{
Expand All @@ -185,7 +185,7 @@ public void FailOnTypeWithDelay(TimeSpan delay, Type messageType)
throw new ArgumentNullException(nameof(messageType));
}

SetInterceptor(new JournalInterceptors.OnType(
return SetInterceptorAsync(new JournalInterceptors.OnType(
messageType,
new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance)
));
Expand Down
Loading

0 comments on commit e32dfff

Please sign in to comment.