From 676f566dd76067ca7fb97e4dd88aa833af18d666 Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Wed, 21 Apr 2021 14:41:33 +0200 Subject: [PATCH] UnfoldResourceSource closing twice on failure (#4969) * Added test cases where close would be called twice * Bugfix UnfoldResource closed resource twice on failure --- .../Dsl/UnfoldResourceSourceSpec.cs | 42 +++++++++++++++++++ .../Akka.Streams/Implementation/Sources.cs | 2 + 2 files changed, 44 insertions(+) diff --git a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs index 317bb9ec6c8..7615a63a062 100644 --- a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs @@ -266,6 +266,48 @@ public void A_UnfoldResourceSource_must_fail_when_close_throws_exception() }, Materializer); } + [Fact] + public void A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_fails() + { + var closedCounter = new AtomicCounter(0); + var testException = new TestException("failing read"); + + var probe = Source.UnfoldResource( + () => 23, // the best resource there is + _ => throw testException, + _ => closedCounter.IncrementAndGet() + ).RunWith(this.SinkProbe(), Materializer); + + probe.Request(1); + probe.ExpectError().Should().Be(testException); + closedCounter.Current.Should().Be(1); + } + + [Fact] + public void A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_fails_and_then_close_fails() + { + var closedCounter = new AtomicCounter(0); + var testException = new TestException("boom"); + + var probe = Source.UnfoldResource( + () => 23, // the best resource there is + _ => throw new TestException("failing read"), + _ => + { + closedCounter.IncrementAndGet(); + if (closedCounter.Current == 1) throw testException; + } + ).RunWith(this.SinkProbe(), Materializer); + + EventFilter.Exception().Expect(1, () => + { + probe.Request(1); + probe.ExpectError().Should().Be(testException); + }); + + closedCounter.Current.Should().Be(1); + } + protected override void AfterAll() { base.AfterAll(); diff --git a/src/core/Akka.Streams/Implementation/Sources.cs b/src/core/Akka.Streams/Implementation/Sources.cs index 41a480bf812..f94b0037b62 100644 --- a/src/core/Akka.Streams/Implementation/Sources.cs +++ b/src/core/Akka.Streams/Implementation/Sources.cs @@ -441,6 +441,7 @@ public override void OnPull() switch (directive) { case Directive.Stop: + _open = false; _stage._close(_blockingStream); FailStage(ex); stop = true; @@ -467,6 +468,7 @@ public override void PreStart() private void RestartState() { + _open = false; _stage._close(_blockingStream); _blockingStream = _stage._create(); _open = true;