Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Async TestKit] Convert Akka.Stream.TestKit to async - StreamTestKitSpec #5912

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 84 additions & 77 deletions src/core/Akka.Streams.TestKit.Tests/StreamTestKitSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.TestKit;
using FluentAssertions;
Expand All @@ -27,27 +28,27 @@ public StreamTestKitSpec(ITestOutputHelper output = null) : base(output)
private Exception Ex() => new TestException("Boom!");

[Fact]
public void TestSink_Probe_ToStrict()
public async Task TestSink_Probe_ToStrictAsync()
{
Source.From(Enumerable.Range(1, 4))
.RunWith(this.SinkProbe<int>(), Materializer)
.ToStrict(TimeSpan.FromMilliseconds(300))
(await Source.From(Enumerable.Range(1, 4))
.RunWith(this.SinkProbe<int>(), Materializer)
.ToStrictAsync(TimeSpan.FromMilliseconds(300)))
.Should()
.Equal(1, 2, 3, 4);
}

[Fact(Skip = "Skipped for async_testkit conversion build")]
public void TestSink_Probe_ToStrict_with_failing_source()
[Fact]
public async Task TestSink_Probe_ToStrictAsync_with_failing_source()
{
var error = Record.Exception(() =>
var error = await Record.ExceptionAsync(async () =>
{
Source.From(Enumerable.Range(1, 3).Select(i =>
{
if (i == 3)
throw Ex();
return i;
})).RunWith(this.SinkProbe<int>(), Materializer)
.ToStrict(TimeSpan.FromMilliseconds(300));
await Source.From(Enumerable.Range(1, 3).Select(i =>
{
if (i == 3)
throw Ex();
return i;
})).RunWith(this.SinkProbe<int>(), Materializer)
.ToStrictAsync(TimeSpan.FromMilliseconds(300));
});

var aggregateException = error.InnerException;
Expand All @@ -56,165 +57,171 @@ public void TestSink_Probe_ToStrict_with_failing_source()
}

[Fact]
public void TestSink_Probe_ToStrict_when_subscription_was_already_obtained()
public async Task TestSink_Probe_ToStrictAsync_when_subscription_was_already_obtained()
{
var p = Source.From(Enumerable.Range(1, 4)).RunWith(this.SinkProbe<int>(), Materializer);
p.ExpectSubscription();
p.ToStrict(TimeSpan.FromMilliseconds(300)).Should().Equal(1, 2, 3, 4);
await p.ExpectSubscriptionAsync();
(await p.ToStrictAsync(TimeSpan.FromMilliseconds(300))).Should().Equal(1, 2, 3, 4);
}

[Fact]
public void TestSink_Probe_ExpectNextOrError_with_right_element()
public async Task TestSink_Probe_ExpectNextOrErrorAsync_with_right_element()
{
Source.From(Enumerable.Range(1, 4)).RunWith(this.SinkProbe<int>(), Materializer)
await Source.From(Enumerable.Range(1, 4)).RunWith(this.SinkProbe<int>(), Materializer)
.Request(4)
.ExpectNextOrError(1, Ex());
.ExpectNextOrErrorAsync(1, Ex()).Task;
}

[Fact]
public void TestSink_Probe_ExpectNextOrError_with_right_exception()
public async Task TestSink_Probe_ExpectNextOrErrorAsync_with_right_exception()
{
Source.Failed<int>(Ex()).RunWith(this.SinkProbe<int>(), Materializer)
await Source.Failed<int>(Ex()).RunWith(this.SinkProbe<int>(), Materializer)
.Request(4)
.ExpectNextOrError(1, Ex());
.ExpectNextOrErrorAsync(1, Ex()).Task;
}

[Fact]
public void TestSink_Probe_ExpectNextOrError_fail_if_the_next_element_is_not_the_expected_one()
public async Task TestSink_Probe_ExpectNextOrErrorAsync_fail_if_the_next_element_is_not_the_expected_one()
{
Record.Exception(() =>
var error = await Record.ExceptionAsync(async () =>
{
Source.From(Enumerable.Range(1, 4)).RunWith(this.SinkProbe<int>(), Materializer)
await Source.From(Enumerable.Range(1, 4)).RunWith(this.SinkProbe<int>(), Materializer)
.Request(4)
.ExpectNextOrError(100, Ex());
}).Message.Should().Contain("OnNext(100)");
.ExpectNextOrErrorAsync(100, Ex()).Task;
});
error.Message.Should().Contain("OnNext(100)");
}

[Fact]
public void TestSink_Probe_ExpectError()
public async Task TestSink_Probe_ExpectErrorAsync()
{
Source.Failed<int>(Ex()).RunWith(this.SinkProbe<int>(), Materializer)
(await Source.Failed<int>(Ex()).RunWith(this.SinkProbe<int>(), Materializer)
.Request(1)
.ExpectError().Should().Be(Ex());
.ExpectErrorAsync()).Should().Be(Ex());
}

[Fact]
public void TestSink_Probe_ExpectError_fail_if_no_error_signalled()
public async Task TestSink_Probe_ExpectErrorAsync_fail_if_no_error_signalled()
{
Record.Exception(() =>
var error = await Record.ExceptionAsync(async () =>
{
Source.From(Enumerable.Range(1, 4)).RunWith(this.SinkProbe<int>(), Materializer)
await Source.From(Enumerable.Range(1, 4)).RunWith(this.SinkProbe<int>(), Materializer)
.Request(1)
.ExpectError();
}).Message.Should().Contain("OnNext");
.ExpectErrorAsync();
});
error.Message.Should().Contain("OnNext");
}

[Fact]
public void TestSink_Probe_ExpectComplete_should_fail_if_error_signalled()
public void TestSink_Probe_ExpectCompleteAsync_should_fail_if_error_signalled()
{
Record.Exception(() =>
var error = Record.Exception(() =>
{
Source.Failed<int>(Ex()).RunWith(this.SinkProbe<int>(), Materializer)
.Request(1)
.ExpectComplete();
}).Message.Should().Contain("OnError");
});
error.Message.Should().Contain("OnError");
}

[Fact]
public void TestSink_Probe_ExpectComplete_should_fail_if_next_element_signalled()
public async Task TestSink_Probe_ExpectCompleteAsync_should_fail_if_next_element_signalled()
{
Record.Exception(() =>
var error = await Record.ExceptionAsync(async () =>
{
Source.From(Enumerable.Range(1, 4)).RunWith(this.SinkProbe<int>(), Materializer)
await Source.From(Enumerable.Range(1, 4)).RunWith(this.SinkProbe<int>(), Materializer)
.Request(1)
.ExpectComplete();
}).Message.Should().Contain("OnNext");
.ExpectCompleteAsync().Task;
});
error.Message.Should().Contain("OnNext");
}

[Fact]
public void TestSink_Probe_ExpectNextOrComplete_with_right_element()
public async Task TestSink_Probe_ExpectNextOrCompleteAsync_with_right_element()
{
Source.From(Enumerable.Range(1, 4)).RunWith(this.SinkProbe<int>(), Materializer)
await Source.From(Enumerable.Range(1, 4)).RunWith(this.SinkProbe<int>(), Materializer)
.Request(4)
.ExpectNextOrComplete(1);
.ExpectNextOrCompleteAsync(1).Task;
}

[Fact]
public void TestSink_Probe_ExpectNextOrComplete_with_completion()
public async Task TestSink_Probe_ExpectNextOrCompleteAsync_with_completion()
{
Source.Single(1).RunWith(this.SinkProbe<int>(), Materializer)
await Source.Single(1).RunWith(this.SinkProbe<int>(), Materializer)
.Request(4)
.ExpectNextOrComplete(1)
.ExpectNextOrComplete(1337);
.ExpectNextOrCompleteAsync(1)
.ExpectNextOrCompleteAsync(1337).Task;
}

[Fact]
public void TestSink_Probe_ExpectNextPredicate_should_pass_with_right_element()
public async Task TestSink_Probe_ExpectNextAsync_should_pass_with_right_element()
{
Source.Single(1)
(await Source.Single(1)
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(1)
.ExpectNext<int>(i => i == 1)
.ExpectNextAsync<int>(i => i == 1))
.ShouldBe(1);
}

[Fact]
public void TestSink_Probe_ExpectNextPredicate_should_fail_with_wrong_element()
public async Task TestSink_Probe_ExpectNextPredicateAsync_should_fail_with_wrong_element()
{
Record.Exception(() =>
var error = await Record.ExceptionAsync(async () =>
{
Source.Single(1)
await Source.Single(1)
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(1)
.ExpectNext<int>(i => i == 2);
}).Message.ShouldStartWith("Got a message of the expected type");
.ExpectNextAsync<int>(i => i == 2);
});
error.Message.ShouldStartWith("Got a message of the expected type");
}

[Fact]
public void TestSink_Probe_MatchNext_should_pass_with_right_element()
public async Task TestSink_Probe_MatchNextAsync_should_pass_with_right_element()
{
Source.Single(1)
await Source.Single(1)
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(1)
.MatchNext<int>(i => i == 1);
.MatchNextAsync<int>(i => i == 1).Task;
}

[Fact]
public void TestSink_Probe_MatchNext_should_allow_to_chain_test_methods()
public async Task TestSink_Probe_MatchNextAsync_should_allow_to_chain_test_methods()
{
Source.From(Enumerable.Range(1, 2))
await Source.From(Enumerable.Range(1, 2))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(2)
.MatchNext<int>(i => i == 1)
.ExpectNext(2);
.MatchNextAsync<int>(i => i == 1)
.ExpectNextAsync(2).Task;
}

[Fact]
public void TestSink_Probe_MatchNext_should_fail_with_wrong_element()
public async Task TestSink_Probe_MatchNextAsync_should_fail_with_wrong_element()
{
Record.Exception(() =>
var error = await Record.ExceptionAsync(async () =>
{
Source.Single(1)
await Source.Single(1)
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(1)
.MatchNext<int>(i => i == 2);
}).Message.ShouldStartWith("Got a message of the expected type");
.MatchNextAsync<int>(i => i == 2).Task;
});
error.Message.ShouldStartWith("Got a message of the expected type");
}

[Fact]
public void TestSink_Probe_ExpectNextN_given_a_number_of_elements()
public async Task TestSink_Probe_ExpectNextNAsync_given_a_number_of_elements()
{
Source.From(Enumerable.Range(1, 4)).RunWith(this.SinkProbe<int>(), Materializer)
(await Source.From(Enumerable.Range(1, 4)).RunWith(this.SinkProbe<int>(), Materializer)
.Request(4)
.ExpectNextN(4).Should().Equal(1, 2, 3, 4);
.ExpectNextNAsync(4).ToListAsync()).Should().Equal(1, 2, 3, 4);
}

[Fact]
public void TestSink_Probe_ExpectNextN_given_specific_elements()
public async Task TestSink_Probe_ExpectNextNAsync_given_specific_elements()
{
Source.From(Enumerable.Range(1, 4)).RunWith(this.SinkProbe<int>(), Materializer)
await Source.From(Enumerable.Range(1, 4)).RunWith(this.SinkProbe<int>(), Materializer)
.Request(4)
.ExpectNextN(new[] {1, 2, 3, 4});
.ExpectNextNAsync(new[] {1, 2, 3, 4}).Task;
}
}
}