Skip to content

Commit

Permalink
UnfoldResourceSource closing twice on failure (#4969)
Browse files Browse the repository at this point in the history
* Added test cases where close would be called twice

* Bugfix UnfoldResource closed resource twice on failure
  • Loading branch information
ismaelhamed authored Apr 21, 2021
1 parent c4c6443 commit 676f566
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
42 changes: 42 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,48 @@ public void A_UnfoldResourceSource_must_fail_when_close_throws_exception()
}, Materializer);
}

[Fact]
public void A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_fails()
{
var closedCounter = new AtomicCounter(0);
var testException = new TestException("failing read");

var probe = Source.UnfoldResource<int, int>(
() => 23, // the best resource there is
_ => throw testException,
_ => closedCounter.IncrementAndGet()
).RunWith(this.SinkProbe<int>(), Materializer);

probe.Request(1);
probe.ExpectError().Should().Be(testException);
closedCounter.Current.Should().Be(1);
}

[Fact]
public void A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_fails_and_then_close_fails()
{
var closedCounter = new AtomicCounter(0);
var testException = new TestException("boom");

var probe = Source.UnfoldResource<int, int>(
() => 23, // the best resource there is
_ => throw new TestException("failing read"),
_ =>
{
closedCounter.IncrementAndGet();
if (closedCounter.Current == 1) throw testException;
}
).RunWith(this.SinkProbe<int>(), Materializer);

EventFilter.Exception<TestException>().Expect(1, () =>
{
probe.Request(1);
probe.ExpectError().Should().Be(testException);
});

closedCounter.Current.Should().Be(1);
}

protected override void AfterAll()
{
base.AfterAll();
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka.Streams/Implementation/Sources.cs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ public override void OnPull()
switch (directive)
{
case Directive.Stop:
_open = false;
_stage._close(_blockingStream);
FailStage(ex);
stop = true;
Expand All @@ -467,6 +468,7 @@ public override void PreStart()

private void RestartState()
{
_open = false;
_stage._close(_blockingStream);
_blockingStream = _stage._create();
_open = true;
Expand Down

0 comments on commit 676f566

Please sign in to comment.