Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Source.ActorRef not completing #5875

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/articles/streams/builtinstages.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,13 @@ Wrap an actor extending ``ActorPublisher`` as a source.

### ActorRef

Materialize an ``IActorRef``, sending messages to it will emit them on the stream. The actor contain
Materialize an ``IActorRef``, sending messages to it will emit them on the stream. The actor contains
a buffer but since communication is one way, there is no back pressure. Handling overflow is done by either dropping
elements or failing the stream, the strategy is chosen by the user.
elements or failing the stream; the strategy is chosen by the user.

**emits** when there is demand and there are messages in the buffer or a message is sent to the actorref
**emits** when there is demand and there are messages in the buffer or a message is sent to the ``IActorRef``

**completes** when the actorref is sent ``Akka.Actor.Status.Success`` or ``PoisonPill``
**completes** when the ``IActorRef`` is sent ``Akka.Actor.Status.Success``

### PreMaterialize

Expand Down
3 changes: 1 addition & 2 deletions docs/articles/streams/integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ for this Source type, i.e. elements will be dropped if the buffer is filled by s
at a rate that is faster than the stream can consume. You should consider using ``Source.Queue``
if you want a backpressured actor interface.

The stream can be completed successfully by sending ``Akka.Actor.PoisonPill`` or
``Akka.Actor.Status.Success`` to the actor reference.
The stream can be completed successfully by sending `Akka.Actor.Status.Success` to the actor reference.

The stream can be completed with failure by sending ``Akka.Actor.Status.Failure`` to the
actor reference.
Expand Down
35 changes: 6 additions & 29 deletions src/core/Akka.Streams.Tests/Dsl/ActorRefSourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ActorRefSourceSpec()
var settings = ActorMaterializerSettings.Create(Sys);
Materializer = ActorMaterializer.Create(Sys, settings);
}

[Fact]
public void A_ActorRefSource_must_emit_received_messages_to_the_stream()
{
Expand All @@ -45,7 +45,7 @@ public void A_ActorRefSource_must_emit_received_messages_to_the_stream()
actorRef.Tell(3);
s.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
}

[Fact]
public void A_ActorRefSource_must_buffer_when_needed()
{
Expand Down Expand Up @@ -118,21 +118,6 @@ public void A_ActorRefSource_must_not_fail_when_0_buffer_space_and_demand_is_sig
}, Materializer);
}

[Fact]
public void A_ActorRefSource_must_completes_the_stream_immediately_when_receiving_PoisonPill()
{
this.AssertAllStagesStopped(() =>
{
var s = this.CreateManualSubscriberProbe<int>();
var actorRef = Source.ActorRef<int>(10, OverflowStrategy.Fail)
.To(Sink.FromSubscriber(s))
.Run(Materializer);
s.ExpectSubscription();
actorRef.Tell(PoisonPill.Instance);
s.ExpectComplete();
}, Materializer);
}

[Fact]
public void A_ActorRefSource_must_signal_buffered_elements_and_complete_the_stream_after_receiving_Status_Success()
{
Expand Down Expand Up @@ -178,23 +163,15 @@ public void A_ActorRefSource_must_not_buffer_elements_after_receiving_Status_Suc
}

[Fact]
public void A_ActorRefSource_must_after_receiving_Status_Success_allow_for_earlier_completion_with_PoisonPill()
public void A_ActorRefSource_must_complete_and_materialize_the_stream_after_receiving_Status_Success()
{
this.AssertAllStagesStopped(() =>
{
var s = this.CreateManualSubscriberProbe<int>();
var actorRef = Source.ActorRef<int>(3, OverflowStrategy.DropBuffer)
.To(Sink.FromSubscriber(s))
var (actorRef, done) = Source.ActorRef<int>(3, OverflowStrategy.DropBuffer)
.ToMaterialized(Sink.Ignore<int>(), Keep.Both)
.Run(Materializer);
var sub = s.ExpectSubscription();
actorRef.Tell(1);
actorRef.Tell(2);
actorRef.Tell(3);
actorRef.Tell(new Status.Success("ok"));
sub.Request(2); // not all elements drained yet
s.ExpectNext(1, 2);
actorRef.Tell(PoisonPill.Instance);
s.ExpectComplete(); // element `3` not signaled
done.ContinueWith(_ => Done.Instance).Result.Should().Be(Done.Instance);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sink.Ignore is another opertator with the wrong signature: it should return Task<Done> not just Task

}, Materializer);
}

Expand Down
26 changes: 19 additions & 7 deletions src/core/Akka.Streams/Dsl/Source.cs
Original file line number Diff line number Diff line change
Expand Up @@ -752,30 +752,42 @@ public static Source<T, IActorRef> ActorPublisher<T>(Props props)
/// Creates a <see cref="Source{TOut,TMat}"/> that is materialized as an <see cref="IActorRef"/>.
/// Messages sent to this actor will be emitted to the stream if there is demand from downstream,
/// otherwise they will be buffered until request for demand is received.
///
/// <para>
/// Depending on the defined <see cref="OverflowStrategy"/> it might drop elements if
/// there is no space available in the buffer.
///
/// </para>
/// <para>
/// The strategy <see cref="OverflowStrategy.Backpressure"/> is not supported, and an
/// IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.
///
/// </para>
/// <para>
/// The buffer can be disabled by using <paramref name="bufferSize"/> of 0 and then received messages are dropped
/// if there is no demand from downstream. When <paramref name="bufferSize"/> is 0 the <paramref name="overflowStrategy"/> does
/// not matter. An async boundary is added after this Source; as such, it is never safe to assume the downstream will always generate demand.
///
/// </para>
/// <para>
/// The stream can be completed successfully by sending the actor reference a <see cref="Status.Success"/>
/// message (whose content will be ignored) in which case already buffered elements will be signaled before signaling completion,
/// or by sending <see cref="PoisonPill"/> in which case completion will be signaled immediately.
///
/// </para>
/// <para>
/// The stream can be completed with failure by sending a <see cref="Status.Failure"/> to the
/// actor reference. In case the Actor is still draining its internal buffer (after having received
/// a <see cref="Status.Success"/>) before signaling completion and it receives a <see cref="Status.Failure"/>,
/// the failure will be signaled downstream immediately (instead of the completion signal).
///
/// </para>
/// <para>
/// Note that terminating the actor without first completing it, either with a success or a
/// failure, will prevent the actor triggering downstream completion and the stream will continue
/// to run even though the source actor is dead. Therefore you should **not** attempt to
/// manually terminate the actor such as with a <see cref="PoisonPill"/>.
/// </para>
/// <para>
/// The actor will be stopped when the stream is completed, failed or canceled from downstream,
/// i.e. you can watch it to get notified when that happens.
/// </para>
/// See also <seealso cref="Queue{T}"/>
/// </summary>
/// <seealso cref="Queue{T}"/>
/// <typeparam name="T">TBD</typeparam>
/// <param name="bufferSize">The size of the buffer in element count</param>
/// <param name="overflowStrategy">Strategy that is used when incoming elements cannot fit inside the buffer</param>
Expand Down
18 changes: 10 additions & 8 deletions src/core/Akka.Streams/Implementation/ActorRefSourceActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public ActorRefSourceActor(int bufferSize, OverflowStrategy overflowStrategy, in
{
BufferSize = bufferSize;
OverflowStrategy = overflowStrategy;
Buffer = bufferSize != 0 ? Implementation.Buffer.Create<T>(bufferSize, maxFixedBufferSize) : null;
Buffer = bufferSize != 0 ? Implementation.Buffer.Create<T>(bufferSize, maxFixedBufferSize) : null;
}

/// <summary>
Expand All @@ -76,7 +76,7 @@ public ActorRefSourceActor(int bufferSize, OverflowStrategy overflowStrategy, in
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
protected override bool Receive(object message)
=> DefaultReceive(message) || RequestElement(message) || (message is T && ReceiveElement((T) message));
=> DefaultReceive(message) || RequestElement(message) || (message is T && ReceiveElement((T)message));

/// <summary>
/// TBD
Expand All @@ -90,12 +90,12 @@ protected bool DefaultReceive(object message)
else if (message is Status.Success)
{
if (BufferSize == 0 || Buffer.IsEmpty)
Context.Stop(Self); // will complete the stream successfully
OnCompleteThenStop(); // will complete the stream successfully
else
Context.Become(DrainBufferThenComplete);
}
else if (message is Status.Failure && IsActive)
OnErrorThenStop(((Status.Failure)message).Cause);
else if (message is Status.Failure failure && IsActive)
OnErrorThenStop(failure.Cause);
else
return false;
return true;
Expand Down Expand Up @@ -179,12 +179,14 @@ protected virtual bool ReceiveElement(T message)
private bool DrainBufferThenComplete(object message)
{
if (message is Cancel)
{
Context.Stop(Self);
else if (message is Status.Failure && IsActive)
}
else if (message is Status.Failure failure && IsActive)
{
// errors must be signaled as soon as possible,
// even if previously valid completion was requested via Status.Success
OnErrorThenStop(((Status.Failure)message).Cause);
OnErrorThenStop(failure.Cause);
}
else if (message is Request)
{
Expand All @@ -193,7 +195,7 @@ private bool DrainBufferThenComplete(object message)
OnNext(Buffer.Dequeue());

if (Buffer.IsEmpty)
Context.Stop(Self); // will complete the stream successfully
OnCompleteThenStop(); // will complete the stream successfully
}
else if (IsActive)
Log.Debug(
Expand Down