Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port Akka.Tests.Actor tests to async/await - CoordinatedShutdownSpec #5770

Merged
merged 4 commits into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 40 additions & 36 deletions src/core/Akka.Tests/Actor/CoordinatedShutdownSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
using FluentAssertions;
using Xunit;
using static Akka.Actor.CoordinatedShutdown;
using Akka.Tests.Util;
using FluentAssertions;
using FluentAssertions.Extensions;
using static FluentAssertions.FluentActions;

namespace Akka.Tests.Actor
{
Expand Down Expand Up @@ -114,12 +118,12 @@ public void CoordinatedShutdown_must_sort_phases_in_topological_order()
[Fact]
public void CoordinatedShutdown_must_detect_cycles_in_phases_non_DAG()
{
Intercept<ArgumentException>(() =>
Assert.Throws<ArgumentException>(() =>
{
CoordinatedShutdown.TopologicalSort(new Dictionary<string, Phase>() { { "a", Phase("a") } });
});

Intercept<ArgumentException>(() =>
Assert.Throws<ArgumentException>(() =>
{
CoordinatedShutdown.TopologicalSort(new Dictionary<string, Phase>()
{
Expand All @@ -128,7 +132,7 @@ public void CoordinatedShutdown_must_detect_cycles_in_phases_non_DAG()
});
});

Intercept<ArgumentException>(() =>
Assert.Throws<ArgumentException>(() =>
{
CoordinatedShutdown.TopologicalSort(new Dictionary<string, Phase>()
{
Expand All @@ -138,7 +142,7 @@ public void CoordinatedShutdown_must_detect_cycles_in_phases_non_DAG()
});
});

Intercept<ArgumentException>(() =>
Assert.Throws<ArgumentException>(() =>
{
CoordinatedShutdown.TopologicalSort(new Dictionary<string, Phase>()
{
Expand Down Expand Up @@ -171,7 +175,7 @@ public void CoordinatedShutdown_must_predefined_phases_from_config()
}

[Fact]
public void CoordinatedShutdown_must_run_ordered_phases()
public async Task CoordinatedShutdown_must_run_ordered_phases()
{
var phases = new Dictionary<string, Phase>()
{
Expand All @@ -193,12 +197,12 @@ public void CoordinatedShutdown_must_run_ordered_phases()
return TaskEx.Completed;
});

co.AddTask("b", "b2", () =>
co.AddTask("b", "b2", async () =>
{
// to verify that c is not performed before b
Task.Delay(TimeSpan.FromMilliseconds(100)).Wait();
await Task.Delay(TimeSpan.FromMilliseconds(100));
TestActor.Tell("B");
return TaskEx.Completed;
return Done.Instance;
});

co.AddTask("c", "c1", () =>
Expand All @@ -207,12 +211,12 @@ public void CoordinatedShutdown_must_run_ordered_phases()
return TaskEx.Completed;
});

co.Run(CoordinatedShutdown.UnknownReason.Instance).Wait(RemainingOrDefault);
ReceiveN(4).Should().Equal(new object[] { "A", "B", "B", "C" });
await co.Run(CoordinatedShutdown.UnknownReason.Instance).AwaitWithTimeout(RemainingOrDefault);
(await ReceiveNAsync(4, default).ToListAsync()).Should().Equal(new object[] { "A", "B", "B", "C" });
}

[Fact]
public void CoordinatedShutdown_must_run_from_given_phase()
public async Task CoordinatedShutdown_must_run_from_given_phase()
{
var phases = new Dictionary<string, Phase>()
{
Expand Down Expand Up @@ -240,13 +244,13 @@ public void CoordinatedShutdown_must_run_from_given_phase()
return TaskEx.Completed;
});

co.Run(customReason, "b").Wait(RemainingOrDefault);
ReceiveN(2).Should().Equal(new object[] { "B", "C" });
await co.Run(customReason, "b").AwaitWithTimeout(RemainingOrDefault);
(await ReceiveNAsync(2, default).ToListAsync()).Should().Equal(new object[] { "B", "C" });
co.ShutdownReason.Should().BeEquivalentTo(customReason);
}

[Fact]
public void CoordinatedShutdown_must_only_run_once()
public async Task CoordinatedShutdown_must_only_run_once()
{
var phases = new Dictionary<string, Phase>()
{
Expand All @@ -261,17 +265,17 @@ public void CoordinatedShutdown_must_only_run_once()
});

co.ShutdownReason.Should().BeNull();
co.Run(customReason).Wait(RemainingOrDefault);
await co.Run(customReason).AwaitWithTimeout(RemainingOrDefault);
co.ShutdownReason.Should().BeEquivalentTo(customReason);
ExpectMsg("A");
co.Run(CoordinatedShutdown.UnknownReason.Instance).Wait(RemainingOrDefault);
await ExpectMsgAsync("A");
await co.Run(CoordinatedShutdown.UnknownReason.Instance).AwaitWithTimeout(RemainingOrDefault);
TestActor.Tell("done");
ExpectMsg("done"); // no additional A
await ExpectMsgAsync("done"); // no additional A
co.ShutdownReason.Should().BeEquivalentTo(customReason);
}

[Fact]
public void CoordinatedShutdown_must_continue_after_timeout_or_failure()
public async Task CoordinatedShutdown_must_continue_after_timeout_or_failure()
{
var phases = new Dictionary<string, Phase>()
{
Expand Down Expand Up @@ -306,15 +310,15 @@ public void CoordinatedShutdown_must_continue_after_timeout_or_failure()
return TaskEx.Completed;
});

co.Run(CoordinatedShutdown.UnknownReason.Instance).Wait(RemainingOrDefault);
ExpectMsg("A");
ExpectMsg("A");
ExpectMsg("B");
ExpectMsg("C");
await co.Run(CoordinatedShutdown.UnknownReason.Instance).AwaitWithTimeout(RemainingOrDefault);
await ExpectMsgAsync("A");
await ExpectMsgAsync("A");
await ExpectMsgAsync("B");
await ExpectMsgAsync("C");
}

[Fact]
public void CoordinatedShutdown_must_abort_if_recover_is_off()
public async Task CoordinatedShutdown_must_abort_if_recover_is_off()
{
var phases = new Dictionary<string, Phase>()
{
Expand All @@ -335,14 +339,14 @@ public void CoordinatedShutdown_must_abort_if_recover_is_off()
return TaskEx.Completed;
});

var result = co.Run(CoordinatedShutdown.UnknownReason.Instance);
ExpectMsg("B");
Intercept<TimeoutException>(() => result.Wait(RemainingOrDefault));
ExpectNoMsg(TimeSpan.FromMilliseconds(200)); // C not run
var task = co.Run(CoordinatedShutdown.UnknownReason.Instance);
await ExpectMsgAsync("B");
Arkatufus marked this conversation as resolved.
Show resolved Hide resolved
await Assert.ThrowsAsync<TimeoutException>(async() => await task.AwaitWithTimeout(RemainingOrDefault));
await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); // C not run
}

[Fact]
public void CoordinatedShutdown_must_be_possible_to_add_tasks_in_later_phase_from_earlier_phase()
public async Task CoordinatedShutdown_must_be_possible_to_add_tasks_in_later_phase_from_earlier_phase()
{
var phases = new Dictionary<string, Phase>()
{
Expand All @@ -362,9 +366,9 @@ public void CoordinatedShutdown_must_be_possible_to_add_tasks_in_later_phase_fro
return TaskEx.Completed;
});

co.Run(CoordinatedShutdown.UnknownReason.Instance).Wait(RemainingOrDefault);
ExpectMsg("A");
ExpectMsg("B");
await co.Run(CoordinatedShutdown.UnknownReason.Instance).AwaitWithTimeout(RemainingOrDefault);
await ExpectMsgAsync("A");
await ExpectMsgAsync("B");
}

[Fact]
Expand Down Expand Up @@ -392,10 +396,10 @@ public void CoordinatedShutdown_must_be_possible_to_parse_phases_from_config()
}

[Fact]
public void CoordinatedShutdown_must_terminate_ActorSystem()
public async Task CoordinatedShutdown_must_terminate_ActorSystem()
{
var shutdownSystem = CoordinatedShutdown.Get(Sys).Run(customReason);
shutdownSystem.Wait(TimeSpan.FromSeconds(10)).Should().BeTrue();
(await CoordinatedShutdown.Get(Sys).Run(customReason)
.AwaitWithTimeout(TimeSpan.FromSeconds(10))).Should().BeTrue();

Sys.WhenTerminated.IsCompleted.Should().BeTrue();
CoordinatedShutdown.Get(Sys).ShutdownReason.Should().BeEquivalentTo(customReason);
Expand Down
25 changes: 22 additions & 3 deletions src/core/Akka.Tests/Util/TaskHelpers.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;

namespace Akka.Tests.Util
Expand All @@ -7,9 +9,26 @@ public static class TaskHelpers
{
public static async Task<bool> AwaitWithTimeout(this Task parentTask, TimeSpan timeout)
{
var delayed = Task.Delay(timeout);
await Task.WhenAny(delayed, parentTask);
return parentTask.IsCompleted;
using (var cts = new CancellationTokenSource())
{
try
{
var delayed = Task.Delay(timeout, cts.Token);
var returnedTask = await Task.WhenAny(delayed, parentTask);

if(returnedTask == parentTask && returnedTask.Exception != null)
{
var flattened = returnedTask.Exception.Flatten();
ExceptionDispatchInfo.Capture(flattened.InnerException).Throw();
}

return parentTask.IsCompleted;
}
finally
{
cts.Cancel();
}
}
}
}
}