Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert racy unit tests to async #5713

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
Expand Down Expand Up @@ -96,45 +97,46 @@ public void Join(ActorSystem from, ActorSystem to, IActorRef probe)
}

[Fact]
public void ClusterSingleton_that_is_leaving_must()
public async Task ClusterSingleton_that_is_leaving_must()
{
ClusterSingleton_that_is_leaving_must_join_cluster();
ClusterSingleton_that_is_leaving_must_quickly_hand_over_to_next_oldest();
await ClusterSingleton_that_is_leaving_must_join_cluster();
await ClusterSingleton_that_is_leaving_must_quickly_hand_over_to_next_oldest();
}

private void ClusterSingleton_that_is_leaving_must_join_cluster()
private async Task ClusterSingleton_that_is_leaving_must_join_cluster()
{
for (int i = 0; i < _systems.Length; i++)
for (var i = 0; i < _systems.Length; i++)
Join(_systems[i], _systems[0], _probes[i]);

// leader is most likely on system, lowest port
Join(Sys, _systems[0], TestActor);

_probes[0].ExpectMsg("started");
await _probes[0].ExpectMsgAsync("started");
}

private void ClusterSingleton_that_is_leaving_must_quickly_hand_over_to_next_oldest()
private async Task ClusterSingleton_that_is_leaving_must_quickly_hand_over_to_next_oldest()
{
var durations = new List<(TimeSpan stoppedDuration, TimeSpan startDuration)>();
var sw = new Stopwatch();
sw.Start();
for (var i = 0; i < _systems.Length; i++)
{
var leaveAddress = Cluster.Get(_systems[i]).SelfAddress;
CoordinatedShutdown.Get(_systems[i]).Run(CoordinatedShutdown.ClusterLeavingReason.Instance);
_probes[i].ExpectMsg("stopped", TimeSpan.FromSeconds(10));
await CoordinatedShutdown.Get(_systems[i]).Run(CoordinatedShutdown.ClusterLeavingReason.Instance);

await _probes[i].ExpectMsgAsync("stopped", TimeSpan.FromSeconds(10));
var stoppedDuration = sw.Elapsed;

if (i != _systems.Length - 1)
_probes[i + 1].ExpectMsg("started", TimeSpan.FromSeconds(30));
await _probes[i + 1].ExpectMsgAsync("started", TimeSpan.FromSeconds(30));
else
ExpectMsg("started", TimeSpan.FromSeconds(30));
await ExpectMsgAsync("started", TimeSpan.FromSeconds(30));

var startedDuration = sw.Elapsed;

Within(TimeSpan.FromSeconds(15), () =>
await WithinAsync(TimeSpan.FromSeconds(15), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
Cluster.Get(_systems[i]).IsTerminated.Should().BeTrue();
Cluster.Get(Sys).State.Members.Select(m => m.Address).Should().NotContain(leaveAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,16 @@ await Assert.ThrowsAsync<TestJournalRejectionException>(async () =>
[Fact]
public async Task delay_must_call_next_interceptor_after_specified_delay()
{
var duration = TimeSpan.FromMilliseconds(100);
var duration = TimeSpan.FromMilliseconds(200);
var epsilon = TimeSpan.FromMilliseconds(50);
var probe = new InterceptorProbe();
var delay = new JournalInterceptors.Delay(duration, probe);

var startedAt = DateTime.Now;
await delay.InterceptAsync(null);

probe.WasCalled.Should().BeTrue();
probe.CalledAt.Should().BeOnOrAfter(startedAt + duration);
probe.CalledAt.Should().BeOnOrAfter(startedAt + duration - epsilon);
}

[Fact]
Expand Down
33 changes: 23 additions & 10 deletions src/core/Akka.Remote.Tests/Transport/AkkaProtocolSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public void ProtocolStateActor_must_in_outbound_mode_delay_readiness_until_hands
}

[Fact]
public void ProtocolStateActor_must_handle_explicit_disassociate_messages()
public async Task ProtocolStateActor_must_handle_explicit_disassociate_messages()
{
var collaborators = GetCollaborators();
collaborators.Transport.AssociateBehavior.PushConstant(collaborators.Handle);
Expand All @@ -248,25 +248,38 @@ public void ProtocolStateActor_must_handle_explicit_disassociate_messages()
statusPromise, collaborators.Transport,
new AkkaProtocolSettings(config), codec, collaborators.FailureDetector));

AwaitCondition(() => LastActivityIsAssociate(collaborators.Registry, 42), DefaultTimeout);
await AwaitConditionAsync(() => LastActivityIsAssociate(collaborators.Registry, 42), DefaultTimeout);

reader.Tell(testAssociate(33), Self);

statusPromise.Task.Wait(TimeSpan.FromSeconds(3));
statusPromise.Task.Result.Match()
.With<AkkaProtocolHandle>(h =>
{
var cts = new CancellationTokenSource();
using (cts)
{
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(3));
var completeTask = await Task.WhenAny(timeoutTask, statusPromise.Task);
cts.Cancel();
if (completeTask == timeoutTask)
throw new TimeoutException();
}

var result = statusPromise.Task.Result;
switch (result)
{
case AkkaProtocolHandle h:
Assert.Equal(_remoteAkkaAddress, h.RemoteAddress);
Assert.Equal(_localAkkaAddress, h.LocalAddress);
})
.Default(msg => Assert.True(false, "Did not receive expected AkkaProtocolHandle from handshake"));
var wrappedHandle = statusPromise.Task.Result.AsInstanceOf<AkkaProtocolHandle>();
break;
default:
Assert.True(false, "Did not receive expected AkkaProtocolHandle from handshake");
break;
}

var wrappedHandle = (AkkaProtocolHandle) result;
wrappedHandle.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));

reader.Tell(testDisassociate(DisassociateInfo.Unknown), Self);

ExpectMsgPf<Disassociated>("expected Disassociated(DisassociateInfo.Unknown", o =>
await ExpectMsgOfAsync("expected Disassociated(DisassociateInfo.Unknown", o =>
{
var disassociated = o.AsInstanceOf<Disassociated>();

Expand Down
137 changes: 121 additions & 16 deletions src/core/Akka.Tests.Shared.Internals/AkkaSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,34 +115,139 @@ private static string GetCallerName()

public static Config AkkaSpecConfig { get { return _akkaSpecConfig; } }

protected T ExpectMsgPf<T>(TimeSpan? timeout, string hint, Func<object, T> function)
=> ExpectMsgPf(timeout, hint, this, function);
protected T ExpectMsgOf<T>(
TimeSpan? timeout,
string hint,
Func<object, T> function,
CancellationToken cancellationToken = default)
=> ExpectMsgOfAsync(timeout, hint, this, function, cancellationToken)
.ConfigureAwait(false).GetAwaiter().GetResult();

protected async Task<T> ExpectMsgOfAsync<T>(
TimeSpan? timeout,
string hint,
Func<object, T> function,
CancellationToken cancellationToken = default)
=> await ExpectMsgOfAsync(timeout, hint, this, function, cancellationToken)
.ConfigureAwait(false);

protected async Task<T> ExpectMsgOfAsync<T>(
TimeSpan? timeout,
string hint,
Func<object, Task<T>> function,
CancellationToken cancellationToken = default)
=> await ExpectMsgOfAsync(timeout, hint, this, function, cancellationToken)
.ConfigureAwait(false);

protected T ExpectMsgPf<T>(TimeSpan? timeout, string hint, TestKitBase probe, Func<object, T> function)
protected T ExpectMsgOf<T>(
TimeSpan? timeout,
string hint,
TestKitBase probe,
Func<object, T> function,
CancellationToken cancellationToken = default)
=> ExpectMsgOfAsync(timeout, hint, probe, function, cancellationToken)
.ConfigureAwait(false).GetAwaiter().GetResult();

protected async Task<T> ExpectMsgOfAsync<T>(
TimeSpan? timeout,
string hint,
TestKitBase probe,
Func<object, T> function,
CancellationToken cancellationToken = default)
=> await ExpectMsgOfAsync(timeout, hint, probe, o => Task.FromResult(function(o)), cancellationToken)
.ConfigureAwait(false);

protected async Task<T> ExpectMsgOfAsync<T>(
TimeSpan? timeout,
string hint,
TestKitBase probe,
Func<object, Task<T>> function,
CancellationToken cancellationToken = default)
{
MessageEnvelope envelope;
var success = probe.TryReceiveOne(out envelope, timeout);
var (success, envelope) = await probe.TryReceiveOneAsync(timeout, cancellationToken)
.ConfigureAwait(false);

if(!success)
Assertions.Fail(string.Format("expected message of type {0} but timed out after {1}", typeof(T), GetTimeoutOrDefault(timeout)));
Assertions.Fail($"expected message of type {typeof(T)} but timed out after {GetTimeoutOrDefault(timeout)}");

var message = envelope.Message;
Assertions.AssertTrue(message != null, string.Format("expected {0} but got null message", hint));
Assertions.AssertTrue(message != null, $"expected {hint} but got null message");
//TODO: Check next line.
Assertions.AssertTrue(function.GetMethodInfo().GetParameters().Any(x => x.ParameterType.IsInstanceOfType(message)), string.Format("expected {0} but got {1} instead", hint, message));
return function.Invoke(message);
Assertions.AssertTrue(
function.GetMethodInfo().GetParameters().Any(x => x.ParameterType.IsInstanceOfType(message)),
$"expected {hint} but got {message} instead");

return await function(message).ConfigureAwait(false);
}

protected T ExpectMsgPf<T>(string hint, Func<object, T> pf)
=> ExpectMsgPf(hint, this, pf);

protected T ExpectMsgPf<T>(string hint, TestKitBase probe, Func<object, T> pf)
protected T ExpectMsgOf<T>(
string hint,
TestKitBase probe,
Func<object, T> pf,
CancellationToken cancellationToken = default)
=> ExpectMsgOfAsync(hint, probe, pf, cancellationToken)
.ConfigureAwait(false).GetAwaiter().GetResult();

protected async Task<T> ExpectMsgOfAsync<T>(
string hint,
TestKitBase probe,
Func<object, T> pf,
CancellationToken cancellationToken = default)
=> await ExpectMsgOfAsync(hint, probe, o => Task.FromResult(pf(o)), cancellationToken)
.ConfigureAwait(false);

protected async Task<T> ExpectMsgOfAsync<T>(
string hint,
TestKitBase probe,
Func<object, Task<T>> pf,
CancellationToken cancellationToken = default)
{
var t = probe.ExpectMsg<T>();
var t = await probe.ExpectMsgAsync<T>(cancellationToken: cancellationToken)
.ConfigureAwait(false);

//TODO: Check if this really is needed:
Assertions.AssertTrue(pf.GetMethodInfo().GetParameters().Any(x => x.ParameterType.IsInstanceOfType(t)), string.Format("expected {0} but got {1} instead", hint, t));
return pf.Invoke(t);
Assertions.AssertTrue(pf.GetMethodInfo().GetParameters().Any(x => x.ParameterType.IsInstanceOfType(t)),
$"expected {hint} but got {t} instead");
return await pf(t);
}

protected T ExpectMsgOf<T>(
string hint,
Func<object, T> pf,
CancellationToken cancellationToken = default)
=> ExpectMsgOfAsync(hint, this, pf, cancellationToken)
.ConfigureAwait(false).GetAwaiter().GetResult();

protected async Task<T> ExpectMsgOfAsync<T>(
string hint,
Func<object, T> pf,
CancellationToken cancellationToken = default)
=> await ExpectMsgOfAsync(hint, this, pf, cancellationToken)
.ConfigureAwait(false);

protected async Task<T> ExpectMsgOfAsync<T>(
string hint,
Func<object, Task<T>> pf,
CancellationToken cancellationToken = default)
=> await ExpectMsgOfAsync(hint, this, pf, cancellationToken)
.ConfigureAwait(false);

[Obsolete("Method name typo, please use ExpectMsgOf instead")]
protected T ExpectMsgPf<T>(TimeSpan? timeout, string hint, Func<object, T> function)
=> ExpectMsgOf(timeout, hint, this, function);

[Obsolete("Method name typo, please use ExpectMsgOf instead")]
protected T ExpectMsgPf<T>(TimeSpan? timeout, string hint, TestKitBase probe, Func<object, T> function)
=> ExpectMsgOf(timeout, hint, probe, function);

[Obsolete("Method name typo, please use ExpectMsgOf instead")]
protected T ExpectMsgPf<T>(string hint, Func<object, T> pf)
=> ExpectMsgOf(hint, this, pf);

[Obsolete("Method name typo, please use ExpectMsgOf instead")]
protected T ExpectMsgPf<T>(string hint, TestKitBase probe, Func<object, T> pf)
=> ExpectMsgOf(hint, probe, pf);

/// <summary>
/// Intercept and return an exception that's expected to be thrown by the passed function value. The thrown
/// exception must be an instance of the type specified by the type parameter of this method. This method
Expand Down
21 changes: 15 additions & 6 deletions src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -463,15 +463,17 @@ public AsyncPipeToDelayActor()
{
ReceiveAsync<string>(async msg =>
{
Task.Run(() =>
{
Thread.Sleep(10);
return msg;
}).PipeTo(Sender, Self); //LogicalContext is lost?!?
Delayed(msg).PipeTo(Sender, Self);

Thread.Sleep(3000);
await Task.Delay(3000);
});
}

private async Task<string> Delayed(string msg)
{
await Task.Delay(10);
return msg;
}
}

public class AsyncReentrantActor : ReceiveActor
Expand All @@ -491,6 +493,13 @@ public AsyncReentrantActor()
Thread.Sleep(3000);
});
}

private async Task<string> Delayed(string msg)
{
// Sleep to make sure the task is not completed when ContinueWith is called
await Task.Delay(100);
return msg;
}
}

[Fact]
Expand Down