Skip to content

Commit

Permalink
using nonblocking task completion source hack for Source.Queue (#3692)
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath authored and Aaronontheweb committed Dec 31, 2018
1 parent 64f41a2 commit e56b0e2
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions src/core/Akka.Streams/Implementation/Sources.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using Akka.Streams.Supervision;
using Akka.Streams.Util;
using Akka.Util;
using Akka.Util.Internal;

namespace Akka.Streams.Implementation
{
Expand Down Expand Up @@ -123,7 +124,7 @@ public void OnPull()
if (_pendingOffer != null)
{
Push(_stage.Out, _pendingOffer.Element);
_pendingOffer.CompletionSource.SetResult(QueueOfferResult.Enqueued.Instance);
_pendingOffer.CompletionSource.NonBlockingTrySetResult(QueueOfferResult.Enqueued.Instance);
_pendingOffer = null;
if (_terminating)
{
Expand Down Expand Up @@ -153,7 +154,7 @@ public void OnDownstreamFinish()
{
if (_pendingOffer != null)
{
_pendingOffer.CompletionSource.SetResult(QueueOfferResult.QueueClosed.Instance);
_pendingOffer.CompletionSource.NonBlockingTrySetResult(QueueOfferResult.QueueClosed.Instance);
_pendingOffer = null;
}
_completion.SetResult(new object());
Expand All @@ -175,15 +176,15 @@ public override void PostStop()
if (offer != null)
{
var promise = offer.CompletionSource;
promise.SetException(new IllegalStateException("Stream is terminated. SourceQueue is detached."));
promise.NonBlockingTrySetException(new IllegalStateException("Stream is terminated. SourceQueue is detached."));
}
});
}

private void EnqueueAndSuccess(Offer<TOut> offer)
{
_buffer.Enqueue(offer.Element);
offer.CompletionSource.SetResult(QueueOfferResult.Enqueued.Instance);
offer.CompletionSource.NonBlockingTrySetResult(QueueOfferResult.Enqueued.Instance);
}

private void BufferElement(Offer<TOut> offer)
Expand All @@ -207,18 +208,18 @@ private void BufferElement(Offer<TOut> offer)
EnqueueAndSuccess(offer);
break;
case OverflowStrategy.DropNew:
offer.CompletionSource.SetResult(QueueOfferResult.Dropped.Instance);
offer.CompletionSource.NonBlockingTrySetResult(QueueOfferResult.Dropped.Instance);
break;
case OverflowStrategy.Fail:
var bufferOverflowException =
new BufferOverflowException($"Buffer overflow (max capacity was: {_stage._maxBuffer})!");
offer.CompletionSource.SetResult(new QueueOfferResult.Failure(bufferOverflowException));
offer.CompletionSource.NonBlockingTrySetResult(new QueueOfferResult.Failure(bufferOverflowException));
_completion.SetException(bufferOverflowException);
FailStage(bufferOverflowException);
break;
case OverflowStrategy.Backpressure:
if (_pendingOffer != null)
offer.CompletionSource.SetException(
offer.CompletionSource.NonBlockingTrySetException(
new IllegalStateException(
"You have to wait for previous offer to be resolved to send another request."));
else
Expand All @@ -245,7 +246,7 @@ private Action<IInput> Callback()
else if (IsAvailable(_stage.Out))
{
Push(_stage.Out, offer.Element);
offer.CompletionSource.SetResult(QueueOfferResult.Enqueued.Instance);
offer.CompletionSource.NonBlockingTrySetResult(QueueOfferResult.Enqueued.Instance);
}
else if (_pendingOffer == null)
_pendingOffer = offer;
Expand All @@ -255,23 +256,23 @@ private Action<IInput> Callback()
{
case OverflowStrategy.DropHead:
case OverflowStrategy.DropBuffer:
_pendingOffer.CompletionSource.SetResult(QueueOfferResult.Dropped.Instance);
_pendingOffer.CompletionSource.NonBlockingTrySetResult(QueueOfferResult.Dropped.Instance);
_pendingOffer = offer;
break;
case OverflowStrategy.DropTail:
case OverflowStrategy.DropNew:
offer.CompletionSource.SetResult(QueueOfferResult.Dropped.Instance);
offer.CompletionSource.NonBlockingTrySetResult(QueueOfferResult.Dropped.Instance);
break;
case OverflowStrategy.Backpressure:
offer.CompletionSource.SetException(
offer.CompletionSource.NonBlockingTrySetException(
new IllegalStateException(
"You have to wait for previous offer to be resolved to send another request"));
break;
case OverflowStrategy.Fail:
var bufferOverflowException =
new BufferOverflowException(
$"Buffer overflow (max capacity was: {_stage._maxBuffer})!");
offer.CompletionSource.SetResult(new QueueOfferResult.Failure(bufferOverflowException));
offer.CompletionSource.NonBlockingTrySetResult(new QueueOfferResult.Failure(bufferOverflowException));
_completion.SetException(bufferOverflowException);
FailStage(bufferOverflowException);
break;
Expand Down Expand Up @@ -331,7 +332,7 @@ public Materialized(Action<IInput> invokeLogic, TaskCompletionSource<object> com
/// <returns>TBD</returns>
public Task<IQueueOfferResult> OfferAsync(TOut element)
{
var promise = new TaskCompletionSource<IQueueOfferResult>();
var promise = TaskEx.NonBlockingTaskCompletionSource<IQueueOfferResult>(); // new TaskCompletionSource<IQueueOfferResult>();
_invokeLogic(new Offer<TOut>(element, promise));
return promise.Task;
}
Expand Down

0 comments on commit e56b0e2

Please sign in to comment.