Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams update 2.4.18 #2737

Merged
merged 13 commits into from
Jun 25, 2017
Merged
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
</ItemGroup>

<PropertyGroup Condition=" '$(TargetFramework)' == 'net452' ">
<DefineConstants>$(DefineConstants);SERIALIZATION;CONFIGURATION;UNSAFE_THREADING</DefineConstants>
<DefineConstants>TRACE;DEBUG;SERIALIZATION;CONFIGURATION;UNSAFE_THREADING;NET452;NET452</DefineConstants>
</PropertyGroup>

<PropertyGroup Condition=" '$(TargetFramework)' == 'netcoreapp1.1' ">
Expand Down
52 changes: 52 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowFlattenMergeSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,5 +289,57 @@ public void A_FlattenMerge_must_propagate_attributes_to_inner_stream()
}, Materializer);

}

[Fact]
public void A_FlattenMerge_must_bubble_up_substream_materialization_exception()
{
this.AssertAllStagesStopped(() => {
var matFail = new TestException("fail!");

var task = Source.Single("whatever")
.MergeMany(4, x => Source.FromGraph(new FailingInnerMat(matFail)))
.RunWith(Sink.Ignore<string>(), Materializer);

try
{
task.Wait(TimeSpan.FromSeconds(1));
}
catch (AggregateException) { }

task.IsFaulted.ShouldBe(true);
task.Exception.ShouldNotBe(null);
task.Exception.InnerException.ShouldBeEquivalentTo(matFail);

}, Materializer);
}

private sealed class FailingInnerMat : GraphStage<SourceShape<string>>
{
#region Logic
private sealed class FailingLogic : GraphStageLogic
{
public FailingLogic(Shape shape, TestException ex) : base(shape)
{
throw ex;
}
}
#endregion

public FailingInnerMat(TestException ex)
{
var outlet = new Outlet<string>("out");
Shape = new SourceShape<string>(outlet);
_ex = ex;
}

private readonly TestException _ex;

public override SourceShape<string> Shape { get; }

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
{
return new FailingLogic(Shape, _ex);
}
}
}
}
55 changes: 55 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowRecoverWithSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Linq;
using Akka.Streams.Dsl;
using Akka.Streams.Stage;
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
using Akka.TestKit;
Expand Down Expand Up @@ -278,5 +279,59 @@ public void A_RecoverWith_must_throw_ArgumentException_if_number_of_retries_is_l
.ShouldThrow<ArgumentException>();
}, Materializer);
}

[Fact]
public void A_RecoverWith_must_fail_correctly_when_materialization_of_recover_source_fails()
{
this.AssertAllStagesStopped(() =>
{
var matFail = new TestException("fail!");

var task = Source.Failed<string>(new TestException("trigger"))
.RecoverWithRetries(ex => Source.FromGraph(new FailingInnerMat(matFail)), 1)
.RunWith(Sink.Ignore<string>(), Materializer);

try
{
task.Wait(TimeSpan.FromSeconds(1));
}
catch (AggregateException) { }

task.IsFaulted.ShouldBe(true);
task.Exception.ShouldNotBe(null);
task.Exception.InnerException.ShouldBeEquivalentTo(matFail);

}, Materializer);
}

private sealed class FailingInnerMat : GraphStage<SourceShape<string>>
{
#region Logic
private sealed class FailingLogic : GraphStageLogic
{
public FailingLogic(Shape shape, TestException ex) : base(shape)
{
throw ex;
}
}
#endregion

public FailingInnerMat(TestException ex)
{
var outlet = new Outlet<string>("out");
Shape = new SourceShape<string>(outlet);
_ex = ex;
}

private readonly TestException _ex;

public override SourceShape<string> Shape { get; }

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
{
return new FailingLogic(Shape, _ex);
}
}

}
}
56 changes: 56 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/LazySinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Stage;
using Akka.Streams.Supervision;
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
Expand Down Expand Up @@ -238,5 +239,60 @@ public void A_LazySink_must_fail_task_when_zero_throws_expception()
taskProbe.Invoking(t => t.Wait(TimeSpan.FromMilliseconds(300))).ShouldThrow<TestException>();
}, Materializer);
}

[Fact]
public void A_LazySink_must_fail_correctly_when_materialization_of_inner_sink_fails()
{
this.AssertAllStagesStopped(() =>
{
var matFail = new TestException("fail!");

var task = Source.Single("whatever")
.RunWith(Sink.LazySink<string, NotUsed>(
str => Task.FromResult(Sink.FromGraph(new FailingInnerMat(matFail))),
() => NotUsed.Instance), Materializer);

try
{
task.Wait(TimeSpan.FromSeconds(1));
}
catch (AggregateException) { }

task.IsFaulted.ShouldBe(true);
task.Exception.ShouldNotBe(null);
task.Exception.InnerException.ShouldBeEquivalentTo(matFail);

}, Materializer);
}

private sealed class FailingInnerMat : GraphStage<SinkShape<string>>
{
#region Logic
private sealed class FailingLogic : GraphStageLogic
{
public FailingLogic(Shape shape, TestException ex) : base(shape)
{
throw ex;
}
}
#endregion

public FailingInnerMat(TestException ex)
{
var inlet = new Inlet<string>("in");
Shape = new SinkShape<string>(inlet);
_ex = ex;
}

private readonly TestException _ex;

public override SinkShape<string> Shape { get; }

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
{
return new FailingLogic(Shape, _ex);
}
}

}
}
54 changes: 54 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/LazySourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Immutable;
using System.Linq;
using Akka.Streams.Dsl;
Expand Down Expand Up @@ -152,6 +153,59 @@ public void A_lazy_source_must_propagate_attributes_to_inner_stream()
}, Materializer);
}

[Fact]
public void A_lazy_source_must_fail_correctly_when_materialization_of_inner_source_fails()
{
this.AssertAllStagesStopped(() =>
{
var matFail = new TestException("fail!");

var task = Source.Lazily(() => Source.FromGraph(new FailingInnerMat(matFail)))
.To(Sink.Ignore<string>())
.Run(Materializer);

try
{
task.Wait(TimeSpan.FromSeconds(1));
}
catch (AggregateException) {}

task.IsFaulted.ShouldBe(true);
task.Exception.ShouldNotBe(null);
task.Exception.InnerException.ShouldBeEquivalentTo(matFail);

}, Materializer);
}

private sealed class FailingInnerMat : GraphStage<SourceShape<string>>
{
#region Logic
private sealed class FailingLogic : GraphStageLogic
{
public FailingLogic(Shape shape, TestException ex) : base(shape)
{
throw ex;
}
}
#endregion

public FailingInnerMat(TestException ex)
{
var outlet = new Outlet<string>("out");
Shape = new SourceShape<string>(outlet);
_ex = ex;
}

private readonly TestException _ex;

public override SourceShape<string> Shape { get; }

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
{
return new FailingLogic(Shape, _ex);
}
}

private sealed class AttibutesSourceStage : GraphStage<SourceShape<Attributes>>
{
#region Logic
Expand Down
62 changes: 62 additions & 0 deletions src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Akka.Streams.IO;
using Akka.Streams.TestKit.Tests;
using Akka.TestKit;
using Akka.Util;
using Akka.Util.Internal;
using FluentAssertions;
using Xunit;
Expand Down Expand Up @@ -148,6 +149,44 @@ public void SynchronousFileSink_should_allow_appending_to_file()
}, _materializer);
}

[Fact]
public void SynchronousFileSink_should_allow_writing_from_specific_position_to_the_file()
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.AssertAllStagesStopped(() => { is missing

TargetFile(f => {
var testLinesCommon = new List<string>
{
new string('a', 1000) + "\n",
new string('b', 1000) + "\n",
new string('c', 1000) + "\n",
new string('d', 1000) + "\n",
};

var commonByteString = ByteString.FromString(testLinesCommon.Join("")).Compact();
var startPosition = commonByteString.Count;

var testLinesPart2 = new List<string>()
{
new string('x', 1000) + "\n",
new string('x', 1000) + "\n",
};

Func<List<string>, long, Task<IOResult>> write = (lines, pos) => Source.From(lines)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use local function, is easier to read ;-)

.Select(ByteString.FromString)
.RunWith(FileIO.ToFile(f, fileMode:FileMode.OpenOrCreate, startPosition:pos), _materializer);

var completion1 = write(_testLines, 0);
completion1.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a AwaitResult overload that does the same as the scala version, so you can write var result1 = completion.AwaitResult(); 3 seconds is the default timeout

var result1 = completion1.Result;

var completion2 = write(testLinesPart2, startPosition);
completion2.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

var result2 = completion2.Result;

f.Length.ShouldBe(startPosition + result2.Count);
CheckFileContent(f, testLinesCommon.Join("") + testLinesPart2.Join(""));
});
}

[Fact]
public void SynchronousFileSink_should_use_dedicated_blocking_io_dispatcher_by_default()
{
Expand Down Expand Up @@ -209,6 +248,29 @@ public void SynchronousFileSink_should_allow_overriding_the_dispatcher_using_Att
}, _materializer);
}

// Needed help converting this test case
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be removed ? :)

[Fact]
public void SynchronousFileSink_should_write_single_line_to_a_file_from_lazy_sink()
{
this.AssertAllStagesStopped(() => {
TargetFile(f => {
var lazySink = Sink.LazySink(
(ByteString _) => Task.FromResult(FileIO.ToFile(f)),
() => Task.FromResult(IOResult.Success(0)))
.MapMaterializedValue(t => {
t.Wait(TimeSpan.FromSeconds(3));
return t.Result;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's more like t.ContinueWIth(r=>r.Result.Result) but not sure if it matters, if the test passes leave it as it is ;)

Copy link
Contributor Author

@Arkatufus Arkatufus Jun 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure either, the origin code is a flatMap to ExecutionContexts, so the closest I can think of is to await the Task<Task<IOResult>> TMat. It does work though, I ran all the tests.
Should I leave AKKAIO defined for Stream.Tests?

});

var completion = Source.From(new []{_testByteStrings.Head()})
.RunWith(lazySink, _materializer);

completion.Wait(TimeSpan.FromSeconds(3));
CheckFileContent(f, _testLines.Head());
});
}, _materializer);
}

private static void TargetFile(Action<FileInfo> block, bool create = true)
{
var targetFile = new FileInfo(Path.Combine(Path.GetTempPath(), "synchronous-file-sink.tmp"));
Expand Down
Loading