Skip to content

Commit

Permalink
Port Akka.Tests.Event tests to async/await - EventStreamSpec (#…
Browse files Browse the repository at this point in the history
…5794)

* Port `Akka.Tests.Event` tests to `async/await` - `EventStreamSpec`

* Revert `ForEach` await

* Changed the last `XAssert` to `Assert`

Co-authored-by: Gregorius Soedharmo <arkatufus@yahoo.com>
  • Loading branch information
eaba and Arkatufus authored Mar 31, 2022
1 parent 3a22cd0 commit 69d72a2
Showing 1 changed file with 44 additions and 45 deletions.
89 changes: 44 additions & 45 deletions src/core/Akka.Tests/Event/EventStreamSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using System.Linq;
using Akka.Util.Internal;
using Xunit;
using System.Threading.Tasks;

namespace Akka.Tests.Event
{
Expand Down Expand Up @@ -61,25 +62,25 @@ private class CC { }
private class CCATBT : CC, ATT, BTT { }

[Fact]
public void Manage_subscriptions()
public async Task Manage_subscriptions()
{

var bus = new EventStream(true);
bus.StartUnsubscriber(Sys.AsInstanceOf<ActorSystemImpl>());
bus.Subscribe(TestActor, typeof(M));

bus.Publish(new M { Value = 42 });
ExpectMsg(new M { Value = 42 });
await ExpectMsgAsync(new M { Value = 42 });
bus.Unsubscribe(TestActor);
bus.Publish(new M { Value = 43 });
ExpectNoMsg(TimeSpan.FromSeconds(1));
await ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
}

[Fact]
public void Not_allow_null_as_subscriber()
{
var bus = new EventStream(true);
XAssert.Throws<ArgumentNullException>(() =>
Assert.Throws<ArgumentNullException>(() =>
{
bus.Subscribe(null, typeof(M));
});
Expand All @@ -89,18 +90,18 @@ public void Not_allow_null_as_subscriber()
public void Not_allow_null_as_unsubscriber()
{
var bus = new EventStream(true);
XAssert.Throws<ArgumentNullException>(() =>
Assert.Throws<ArgumentNullException>(() =>
{
bus.Unsubscribe(null, typeof(M));
});
XAssert.Throws<ArgumentNullException>(() =>
Assert.Throws<ArgumentNullException>(() =>
{
bus.Unsubscribe(null);
});
}

[Fact]
public void Be_able_to_log_unhandled_messages()
public async Task Be_able_to_log_unhandled_messages()
{
using (var system = ActorSystem.Create("EventStreamSpecUnhandled", GetDebugUnhandledMessagesConfig()))
{
Expand All @@ -110,7 +111,7 @@ public void Be_able_to_log_unhandled_messages()

system.EventStream.Publish(msg);

var debugMsg = ExpectMsg<Debug>();
var debugMsg = await ExpectMsgAsync<Debug>();

debugMsg.Message.ToString().StartsWith("Unhandled message from").ShouldBeTrue();
debugMsg.Message.ToString().EndsWith(": 42").ShouldBeTrue();
Expand All @@ -121,7 +122,7 @@ public void Be_able_to_log_unhandled_messages()
/// Reproduction spec for https://github.com/akkadotnet/akka.net/issues/3267
/// </summary>
[Fact]
public void Bugfix3267_able_to_log_unhandled_messages_with_nosender()
public async Task Bugfix3267_able_to_log_unhandled_messages_with_nosender()
{
using (var system = ActorSystem.Create("EventStreamSpecUnhandled", GetDebugUnhandledMessagesConfig()))
{
Expand All @@ -132,15 +133,15 @@ public void Bugfix3267_able_to_log_unhandled_messages_with_nosender()

system.EventStream.Publish(msg);

var debugMsg = ExpectMsg<Debug>();
var debugMsg = await ExpectMsgAsync<Debug>();

debugMsg.Message.ToString().StartsWith("Unhandled message from").ShouldBeTrue();
debugMsg.Message.ToString().EndsWith(": 42").ShouldBeTrue();
}
}

[Fact]
public void Manage_sub_channels_using_classes()
public async Task Manage_sub_channels_using_classes()
{
var a = new A();
var b1 = new B1();
Expand All @@ -150,24 +151,24 @@ public void Manage_sub_channels_using_classes()
bus.Subscribe(TestActor, typeof(B2));
bus.Publish(c);
bus.Publish(b2);
ExpectMsg(b2);
await ExpectMsgAsync(b2);
bus.Subscribe(TestActor, typeof(A));
bus.Publish(c);
ExpectMsg(c);
await ExpectMsgAsync(c);
bus.Publish(b1);
ExpectMsg(b1);
await ExpectMsgAsync(b1);

bus.Unsubscribe(TestActor, typeof(B1));
bus.Publish(c); //should not publish
bus.Publish(b2); //should publish
bus.Publish(a); //should publish
ExpectMsg(b2);
ExpectMsg(a);
ExpectNoMsg(TimeSpan.FromSeconds(1));
await ExpectMsgAsync(b2);
await ExpectMsgAsync(a);
await ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
}

[Fact(DisplayName = "manage sub-channels using classes and traits (update on subscribe)")]
public void Manage_sub_channels_using_classes_and_interfaces_update_on_subscribe()
public async Task Manage_sub_channels_using_classes_and_interfaces_update_on_subscribe()
{
var es = new EventStream(false);
var tm1 = new CC();
Expand All @@ -183,11 +184,11 @@ public void Manage_sub_channels_using_classes_and_interfaces_update_on_subscribe
es.Subscribe(a4.Ref, typeof(CCATBT)).ShouldBeTrue();
es.Publish(tm1);
es.Publish(tm2);
a1.ExpectMsg((object)tm2);
a2.ExpectMsg((object)tm2);
a3.ExpectMsg((object)tm1);
a3.ExpectMsg((object)tm2);
a4.ExpectMsg((object)tm2);
await a1.ExpectMsgAsync((object)tm2);
await a2.ExpectMsgAsync((object)tm2);
await a3.ExpectMsgAsync((object)tm1);
await a3.ExpectMsgAsync((object)tm2);
await a4.ExpectMsgAsync((object)tm2);
es.Unsubscribe(a1.Ref, typeof(AT)).ShouldBeTrue();
es.Unsubscribe(a2.Ref, typeof(BT)).ShouldBeTrue();
es.Unsubscribe(a3.Ref, typeof(CC)).ShouldBeTrue();
Expand All @@ -196,7 +197,7 @@ public void Manage_sub_channels_using_classes_and_interfaces_update_on_subscribe

//"manage sub-channels using classes and traits (update on unsubscribe)"
[Fact]
public void Manage_sub_channels_using_classes_and_interfaces_update_on_unsubscribe()
public async Task Manage_sub_channels_using_classes_and_interfaces_update_on_unsubscribe()
{
var es = new EventStream(false);
var tm1 = new CC();
Expand All @@ -213,18 +214,18 @@ public void Manage_sub_channels_using_classes_and_interfaces_update_on_unsubscri
es.Unsubscribe(a3.Ref, typeof(CC));
es.Publish(tm1);
es.Publish(tm2);
a1.ExpectMsg((object)tm2);
a2.ExpectMsg((object)tm2);
a3.ExpectNoMsg(TimeSpan.FromSeconds(1));
a4.ExpectMsg((object)tm2);
await a1.ExpectMsgAsync((object)tm2);
await a2.ExpectMsgAsync((object)tm2);
await a3.ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
await a4.ExpectMsgAsync((object)tm2);
es.Unsubscribe(a1.Ref, typeof(AT)).ShouldBeTrue();
es.Unsubscribe(a2.Ref, typeof(BT)).ShouldBeTrue();
es.Unsubscribe(a3.Ref, typeof(CC)).ShouldBeFalse();
es.Unsubscribe(a4.Ref, typeof(CCATBT)).ShouldBeTrue();
}

[Fact]
public void Manage_sub_channels_using_classes_and_interfaces_update_on_unsubscribe_all()
public async Task Manage_sub_channels_using_classes_and_interfaces_update_on_unsubscribe_all()
{
var es = new EventStream(false);
var tm1 = new CC();
Expand All @@ -241,10 +242,10 @@ public void Manage_sub_channels_using_classes_and_interfaces_update_on_unsubscri
es.Unsubscribe(a3.Ref).ShouldBeTrue();
es.Publish(tm1);
es.Publish(tm2);
a1.ExpectMsg((object)tm2);
a2.ExpectMsg((object)tm2);
a3.ExpectNoMsg(TimeSpan.FromSeconds(1));
a4.ExpectMsg((object)tm2);
await a1.ExpectMsgAsync((object)tm2);
await a2.ExpectMsgAsync((object)tm2);
await a3.ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
await a4.ExpectMsgAsync((object)tm2);
es.Unsubscribe(a1.Ref, typeof(AT)).ShouldBeTrue();
es.Unsubscribe(a2.Ref, typeof(BT)).ShouldBeTrue();
es.Unsubscribe(a3.Ref, typeof(CC)).ShouldBeFalse();
Expand All @@ -262,12 +263,12 @@ public SetTarget(IActorRef @ref)
}

[Fact]
public void Manage_log_levels()
public async Task Manage_log_levels()
{
var bus = new EventStream(false);
bus.StartDefaultLoggers((ActorSystemImpl)Sys);
bus.Publish(new SetTarget(TestActor));
ExpectMsg("OK", TimeSpan.FromSeconds(5));
await ExpectMsgAsync("OK", TimeSpan.FromSeconds(5));

verifyLevel(bus, LogLevel.InfoLevel);
bus.SetLogLevel(LogLevel.WarningLevel);
Expand Down Expand Up @@ -304,28 +305,26 @@ private static string GetDebugUnhandledMessagesConfig()
".Replace("%logger%", typeof(MyLog).AssemblyQualifiedName);
}

public class MyLog : UntypedActor
public class MyLog : ReceiveActor
{
private IActorRef dst = Context.System.DeadLetters;

protected override void OnReceive(object message)
public MyLog()
{
PatternMatch.Match(message)
.With<InitializeLogger>(m =>
Receive<InitializeLogger>(m =>
{
var bus = m.LoggingBus;
bus.Subscribe(this.Self, typeof(SetTarget));
bus.Subscribe(this.Self, typeof(UnhandledMessage));
Sender.Tell(new LoggerInitialized());
})
.With<SetTarget>(m =>
});
Receive<SetTarget>(m =>
{
dst = m.Ref;
dst.Tell("OK");
})
.With<LogEvent>(m => dst.Tell(m))
.With<UnhandledMessage>(m => dst.Tell(m));
});
Receive<LogEvent>(m => dst.Tell(m));
Receive<UnhandledMessage>(m => dst.Tell(m));
}
}

Expand Down

0 comments on commit 69d72a2

Please sign in to comment.