Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus committed Jun 6, 2017
1 parent e44cc65 commit 792657f
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 1 deletion.
52 changes: 52 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowFlattenMergeSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,5 +289,57 @@ public void A_FlattenMerge_must_propagate_attributes_to_inner_stream()
}, Materializer);

}

[Fact]
public void A_FlattenMerge_must_bubble_up_substream_materialization_exception()
{
this.AssertAllStagesStopped(() => {
var matFail = new TestException("fail!");

var task = Source.Single("whatever")
.MergeMany(4, x => Source.FromGraph(new FailingInnerMat(matFail)))
.RunWith(Sink.Ignore<string>(), Materializer);

try
{
task.Wait(TimeSpan.FromSeconds(1));
}
catch (AggregateException) { }

task.IsFaulted.ShouldBe(true);
task.Exception.ShouldNotBe(null);
task.Exception.InnerException.ShouldBeEquivalentTo(matFail);

}, Materializer);
}

private sealed class FailingInnerMat : GraphStage<SourceShape<string>>
{
#region Logic
private sealed class FailingLogic : GraphStageLogic
{
public FailingLogic(Shape shape, TestException ex) : base(shape)
{
throw ex;
}
}
#endregion

public FailingInnerMat(TestException ex)
{
var outlet = new Outlet<string>("out");
Shape = new SourceShape<string>(outlet);
_ex = ex;
}

private readonly TestException _ex;

public override SourceShape<string> Shape { get; }

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
{
return new FailingLogic(Shape, _ex);
}
}
}
}
55 changes: 55 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowRecoverWithSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Linq;
using Akka.Streams.Dsl;
using Akka.Streams.Stage;
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
using Akka.TestKit;
Expand Down Expand Up @@ -278,5 +279,59 @@ public void A_RecoverWith_must_throw_ArgumentException_if_number_of_retries_is_l
.ShouldThrow<ArgumentException>();
}, Materializer);
}

[Fact]
public void A_RecoverWith_must_fail_correctly_when_materialization_of_recover_source_fails()
{
this.AssertAllStagesStopped(() =>
{
var matFail = new TestException("fail!");

var task = Source.Failed<string>(new TestException("trigger"))
.RecoverWithRetries(ex => Source.FromGraph(new FailingInnerMat(matFail)), 1)
.RunWith(Sink.Ignore<string>(), Materializer);

try
{
task.Wait(TimeSpan.FromSeconds(1));
}
catch (AggregateException) { }

task.IsFaulted.ShouldBe(true);
task.Exception.ShouldNotBe(null);
task.Exception.InnerException.ShouldBeEquivalentTo(matFail);

}, Materializer);
}

private sealed class FailingInnerMat : GraphStage<SourceShape<string>>
{
#region Logic
private sealed class FailingLogic : GraphStageLogic
{
public FailingLogic(Shape shape, TestException ex) : base(shape)
{
throw ex;
}
}
#endregion

public FailingInnerMat(TestException ex)
{
var outlet = new Outlet<string>("out");
Shape = new SourceShape<string>(outlet);
_ex = ex;
}

private readonly TestException _ex;

public override SourceShape<string> Shape { get; }

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
{
return new FailingLogic(Shape, _ex);
}
}

}
}
56 changes: 56 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/LazySinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Stage;
using Akka.Streams.Supervision;
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
Expand Down Expand Up @@ -238,5 +239,60 @@ public void A_LazySink_must_fail_task_when_zero_throws_expception()
taskProbe.Invoking(t => t.Wait(TimeSpan.FromMilliseconds(300))).ShouldThrow<TestException>();
}, Materializer);
}

[Fact]
public void A_LazySink_must_fail_correctly_when_materialization_of_inner_sink_fails()
{
this.AssertAllStagesStopped(() =>
{
var matFail = new TestException("fail!");

var task = Source.Single("whatever")
.RunWith(Sink.LazySink<string, NotUsed>(
str => Task.FromResult(Sink.FromGraph(new FailingInnerMat(matFail))),
() => NotUsed.Instance), Materializer);

try
{
task.Wait(TimeSpan.FromSeconds(1));
}
catch (AggregateException) { }

task.IsFaulted.ShouldBe(true);
task.Exception.ShouldNotBe(null);
task.Exception.InnerException.ShouldBeEquivalentTo(matFail);

}, Materializer);
}

private sealed class FailingInnerMat : GraphStage<SinkShape<string>>
{
#region Logic
private sealed class FailingLogic : GraphStageLogic
{
public FailingLogic(Shape shape, TestException ex) : base(shape)
{
throw ex;
}
}
#endregion

public FailingInnerMat(TestException ex)
{
var inlet = new Inlet<string>("in");
Shape = new SinkShape<string>(inlet);
_ex = ex;
}

private readonly TestException _ex;

public override SinkShape<string> Shape { get; }

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
{
return new FailingLogic(Shape, _ex);
}
}

}
}
54 changes: 54 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/LazySourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Immutable;
using System.Linq;
using Akka.Streams.Dsl;
Expand Down Expand Up @@ -152,6 +153,59 @@ public void A_lazy_source_must_propagate_attributes_to_inner_stream()
}, Materializer);
}

[Fact]
public void A_lazy_source_must_fail_correctly_when_materialization_of_inner_source_fails()
{
this.AssertAllStagesStopped(() =>
{
var matFail = new TestException("fail!");

var task = Source.Lazily(() => Source.FromGraph(new FailingInnerMat(matFail)))
.To(Sink.Ignore<string>())
.Run(Materializer);

try
{
task.Wait(TimeSpan.FromSeconds(1));
}
catch (AggregateException) {}

task.IsFaulted.ShouldBe(true);
task.Exception.ShouldNotBe(null);
task.Exception.InnerException.ShouldBeEquivalentTo(matFail);

}, Materializer);
}

private sealed class FailingInnerMat : GraphStage<SourceShape<string>>
{
#region Logic
private sealed class FailingLogic : GraphStageLogic
{
public FailingLogic(Shape shape, TestException ex) : base(shape)
{
throw ex;
}
}
#endregion

public FailingInnerMat(TestException ex)
{
var outlet = new Outlet<string>("out");
Shape = new SourceShape<string>(outlet);
_ex = ex;
}

private readonly TestException _ex;

public override SourceShape<string> Shape { get; }

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
{
return new FailingLogic(Shape, _ex);
}
}

private sealed class AttibutesSourceStage : GraphStage<SourceShape<Attributes>>
{
#region Logic
Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka.Streams/Implementation/Sources.cs
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,9 @@ public override void OnPull()
}
catch (Exception e)
{
_completion.SetException(e);
subSink.Cancel();
FailStage(e);
_completion.TrySetException(e);
}
}

Expand Down

0 comments on commit 792657f

Please sign in to comment.