Skip to content

Commit

Permalink
Chnaged tests under Akka.Tests.Actor.Dispatch to async/await (#5752)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
eaba and Aaronontheweb authored Mar 25, 2022
1 parent 90b9a24 commit 5bc6086
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 18 deletions.
4 changes: 2 additions & 2 deletions src/core/Akka.Tests/Actor/Dispatch/ActorModelSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ public void A_dispatcher_must_process_messages_one_at_a_time()
}

[Fact]
public void A_dispatcher_must_handle_queuing_from_multiple_threads()
public async Task A_dispatcher_must_handle_queuing_from_multiple_threads()
{
var dispatcher = InterceptedDispatcher();
var counter = new CountdownEvent(200);
Expand All @@ -517,7 +517,7 @@ public void A_dispatcher_must_handle_queuing_from_multiple_threads()
}
finally
{
var stats = a.Ask<InterceptorStats>(GetStats.Instance).Result;
var stats = await a.Ask<InterceptorStats>(GetStats.Instance);
_testOutputHelper.WriteLine("Observed stats: {0}", stats);

Sys.Stop(a);
Expand Down
23 changes: 13 additions & 10 deletions src/core/Akka.Tests/Actor/Dispatch/Bug2640Spec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,17 @@ public async Task ForkJoinExecutorShouldShutdownUponActorSystemTermination()
for (var i = 0; i < 100; i++)
actor.Tell(GetThread.Instance);

threads = ReceiveN(100).Cast<Thread>().GroupBy(x => x.ManagedThreadId)
var objs = await ReceiveNAsync(100, default).ToListAsync();
threads = objs.Cast<Thread>().GroupBy(x => x.ManagedThreadId)
.ToDictionary(x => x.Key, grouping => grouping.First());

await Sys.Terminate();
AwaitAssert(() =>
await AwaitAssertAsync(() =>
threads.Values.All(x => x.IsAlive == false).Should().BeTrue("All threads should be stopped"));
}

[Fact(DisplayName = "ForkJoinExecutor should terminate all threads upon all attached actors shutting down")]
public void ForkJoinExecutorShouldShutdownUponAllActorsTerminating()
public async Task ForkJoinExecutorShouldShutdownUponAllActorsTerminating()
{
var actor = Sys.ActorOf(Props.Create(() => new ThreadReporterActor())
.WithDispatcher("myapp.my-fork-join-dispatcher").WithRouter(new RoundRobinPool(4)));
Expand All @@ -99,29 +100,31 @@ public void ForkJoinExecutorShouldShutdownUponAllActorsTerminating()
for (var i = 0; i < 100; i++)
actor.Tell(GetThread.Instance);

threads = ReceiveN(100).Cast<Thread>().GroupBy(x => x.ManagedThreadId)
var objs = await ReceiveNAsync(100, default).ToListAsync();

threads = objs.Cast<Thread>().GroupBy(x => x.ManagedThreadId)
.ToDictionary(x => x.Key, grouping => grouping.First());

Sys.Stop(actor);
ExpectTerminated(actor);
AwaitAssert(() =>
await ExpectTerminatedAsync(actor);
await AwaitAssertAsync(() =>
threads.Values.All(x => x.IsAlive == false).Should().BeTrue("All threads should be stopped"));
}

[Fact(DisplayName = "PinnedDispatcher should terminate its thread upon actor shutdown")]
public void PinnedDispatcherShouldShutdownUponActorTermination()
public async Task PinnedDispatcherShouldShutdownUponActorTermination()
{
var actor = Sys.ActorOf(Props.Create(() => new ThreadReporterActor())
.WithDispatcher("myapp.my-pinned-dispatcher"));

Watch(actor);
actor.Tell(GetThread.Instance);
var thread = ExpectMsg<Thread>();
var thread = await ExpectMsgAsync<Thread>();
thread.IsAlive.Should().BeTrue();

Sys.Stop(actor);
ExpectTerminated(actor);
AwaitCondition(() => !thread.IsAlive); // wait for thread to terminate
await ExpectTerminatedAsync(actor);
await AwaitConditionAsync(() => !thread.IsAlive); // wait for thread to terminate
}
}
}
9 changes: 5 additions & 4 deletions src/core/Akka.Tests/Actor/Dispatch/Bug2751Spec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.TestKit;
using Xunit;
Expand Down Expand Up @@ -40,14 +41,14 @@ public StopActor(IActorRef testActor)
}

[Fact]
public void ShouldReceiveSysMsgBeforeUserMsg()
public async Task ShouldReceiveSysMsgBeforeUserMsg()
{
var stopper = Sys.ActorOf(Props.Create(() => new StopActor(TestActor)));
stopper.Tell("stop");
ExpectNoMsg(TimeSpan.FromMilliseconds(250));
await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(250));
Watch(stopper);
ExpectTerminated(stopper);
ExpectNoMsg(TimeSpan.FromMilliseconds(100));
await ExpectTerminatedAsync(stopper);
await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ public class CurrentSynchronizationContextDispatcherSpecs : AkkaSpec
public CurrentSynchronizationContextDispatcherSpecs() : base(_config) { }

[Fact]
public void CurrentSynchronizationContextDispatcher_should_start_without_error_Fix2172()
public async Task CurrentSynchronizationContextDispatcher_should_start_without_error_Fix2172()
{
var uiActor = Sys.ActorOf(EchoActor.Props(this), "some-ui-actor");
uiActor.Tell("ping");
ExpectMsg("ping");
await ExpectMsgAsync("ping");
}
}
}
Expand Down

0 comments on commit 5bc6086

Please sign in to comment.