diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index 1e1b438b049..0b94ea4a422 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -890,6 +890,10 @@ namespace Akka.Streams public override bool Equals(object obj) { } public override int GetHashCode() { } } + public class StreamDetachedException : System.Exception + { + public static readonly Akka.Streams.StreamDetachedException Instance; + } public class StreamLimitReachedException : System.Exception { public StreamLimitReachedException(long max) { } diff --git a/src/core/Akka.Streams.Tests/Dsl/QueueSinkSpec.cs b/src/core/Akka.Streams.Tests/Dsl/QueueSinkSpec.cs index 1d80932b3e3..e9aefadff79 100644 --- a/src/core/Akka.Streams.Tests/Dsl/QueueSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/QueueSinkSpec.cs @@ -153,7 +153,7 @@ public void QueueSink_should_timeout_future_when_stream_cannot_provide_data() }, _materializer); } - [Fact(Skip = "Racy, see https://github.com/akkadotnet/akka.net/pull/4424#issuecomment-632284459")] + [Fact] public void QueueSink_should_fail_pull_future_when_stream_is_completed() { this.AssertAllStagesStopped(() => @@ -170,10 +170,8 @@ public void QueueSink_should_fail_pull_future_when_stream_is_completed() var result = queue.PullAsync().Result; result.Should().Be(Option.None); - ((Task)queue.PullAsync()).ContinueWith(t => - { - t.Exception.InnerException.Should().BeOfType(); - }, TaskContinuationOptions.OnlyOnFaulted).Wait(); + var exception = Record.ExceptionAsync(async () => await queue.PullAsync()).Result; + exception.Should().BeOfType(); }, _materializer); } diff --git a/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs index a82315dbd32..9e1d124236b 100644 --- a/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs @@ -338,11 +338,12 @@ public void QueueSource_should_fail_offer_future_when_stream_is_completed() .Run(_materializer); var sub = s.ExpectSubscription(); - queue.WatchCompletionAsync().ContinueWith(t => "done").PipeTo(TestActor); + queue.WatchCompletionAsync().ContinueWith(t => Done.Instance).PipeTo(TestActor); sub.Cancel(); - ExpectMsg("done"); + ExpectMsg(Done.Instance); - queue.OfferAsync(1).ContinueWith(t => t.Exception.Should().BeOfType()); + var exception = Record.ExceptionAsync(async () => await queue.OfferAsync(1)).Result; + exception.Should().BeOfType(); }, _materializer); } diff --git a/src/core/Akka.Streams/Implementation/Sinks.cs b/src/core/Akka.Streams/Implementation/Sinks.cs index 14356f0f39a..17de6266fb0 100644 --- a/src/core/Akka.Streams/Implementation/Sinks.cs +++ b/src/core/Akka.Streams/Implementation/Sinks.cs @@ -624,10 +624,10 @@ public override void OnUpstreamFailure(Exception e) public override void PostStop() { - if(!_completionSignalled) + if (!_completionSignalled) _promise.TrySetException(new AbruptStageTerminationException(this)); } - + public override void PreStart() => Pull(_stage.In); } @@ -805,12 +805,8 @@ public override void PreStart() Pull(_stage.In); } - public override void PostStop() - { - StopCallback( - promise => - promise.SetException(new IllegalStateException("Stream is terminated. QueueSink is detached"))); - } + public override void PostStop() => + StopCallback(promise => promise.SetException(StreamDetachedException.Instance)); private Action>> Callback() { @@ -1024,7 +1020,8 @@ private void InitInternalSource(Sink sink, TIn firstElement) { var sourceOut = new SubSource(this, firstElement); - try { + try + { var matVal = Source.FromGraph(sourceOut.Source) .RunWith(sink, Interpreter.SubFusingMaterializer); _completion.TrySetResult(matVal); @@ -1034,7 +1031,7 @@ private void InitInternalSource(Sink sink, TIn firstElement) _completion.TrySetException(ex); FailStage(ex); } - + } #region SubSource @@ -1176,7 +1173,7 @@ public void Dispose(bool unregister) } } } - + private sealed class ObservableLogic : GraphStageLogic, IObservable { private readonly ObservableSinkStage _stage; @@ -1216,12 +1213,12 @@ public void Remove(IObserver observer) ImmutableInterlocked.TryRemove(ref _observers, observer, out var _); } - public IDisposable Subscribe(IObserver observer) => + public IDisposable Subscribe(IObserver observer) => ImmutableInterlocked.GetOrAdd(ref _observers, observer, new ObserverDisposable(this, observer)); } #endregion - + public ObservableSinkStage() { diff --git a/src/core/Akka.Streams/Implementation/Sources.cs b/src/core/Akka.Streams/Implementation/Sources.cs index 53a884e7c6f..b4149d36b9c 100644 --- a/src/core/Akka.Streams/Implementation/Sources.cs +++ b/src/core/Akka.Streams/Implementation/Sources.cs @@ -168,7 +168,7 @@ public override void PreStart() public override void PostStop() { - var exception = new AbruptStageTerminationException(this); + var exception = StreamDetachedException.Instance; _completion.TrySetException(exception); StopCallback(input => { diff --git a/src/core/Akka.Streams/StreamTcpException.cs b/src/core/Akka.Streams/StreamTcpException.cs index 90abae6b157..6b6aefd3277 100644 --- a/src/core/Akka.Streams/StreamTcpException.cs +++ b/src/core/Akka.Streams/StreamTcpException.cs @@ -44,6 +44,22 @@ protected StreamTcpException(SerializationInfo info, StreamingContext context) : #endif } + /// + /// This exception signals that materialized value is already detached from stream. This usually happens + /// when stream is completed and an ActorSystem is shut down while materialized object is still available. + /// + public class StreamDetachedException : Exception + { + /// + /// Initializes a single instance of the class. + /// + public static readonly StreamDetachedException Instance = new StreamDetachedException(); + + private StreamDetachedException() : base("Stream is terminated. Materialized value is detached.") + { + } + } + /// /// TBD ///