Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using nonblocking task completion source hack for Source.Queue #3692

Merged
merged 1 commit into from
Dec 31, 2018

Conversation

Horusiath
Copy link
Contributor

@Horusiath Horusiath commented Dec 31, 2018

When we Source.Queue<> method, it can be materialized into queue with OfferAsync method. Problem is that, awaiting for it may cause a deadlock - it's a classical problem with awaiting on the TaskCompletionSource<>.Task and using SetResult on that task in the same execution context. We already met it before.

This issue is easy to fix in .NET Standard 2.0 by using special option when creating TaskCompletionSource<>. For .NET Standard < 1.6 we already provided a hack around it (which uses Task.Run to escape from execution context): it's suboptimal, but it's used only for older framework versions.

This PR uses our fixer method set to overcome this deadlock issue.

Example, I've used to recreate this issue, was a Akka.Streams TCP server:

// Server
var connectionHandler =
    Framing.SimpleFramingProtocol(128)
        .Reversed()
        .Join(Flow.Identity<ByteString>()
            .Select(bytes =>
            {
                var text = bytes.ToString(Encoding.UTF8);
                return ByteString.FromString("ECHO" + text);
            }));

var binding =
    await actorSystem.TcpStream()
        .Bind("localhost", 7000)
        .To(Sink.ForEach<Akka.Streams.Dsl.Tcp.IncomingConnection>(connection =>
        {
            connection.Flow.Join(connectionHandler).Run(materializer);
        }))
        .Run(materializer);

Console.WriteLine($"Server listening on {binding.LocalAddress}. Press Enter to stop");
Console.ReadLine();

// close the server
await binding.Unbind();

And client:

// Client
var input = Source.Queue<string>(100, OverflowStrategy.Backpressure)
    .Select(str =>
    {
        return ByteString.FromString(str);
    });

var output = Flow.Identity<ByteString>()
    .Select(bs => bs.ToString(Encoding.UTF8))
    .To(Sink.ForEach<string>(Console.WriteLine));

var connectionDef = actorSystem.TcpStream()
    .OutgoingConnection(new IPEndPoint(IPAddress.IPv6Loopback, 7000))
    .JoinMaterialized(Framing.SimpleFramingProtocol(128).Reversed(), Keep.Left);

var client = input
    .Log("stdin")
    .ToMaterialized(connectionDef.ToMaterialized(output, Keep.Left), Keep.Both);

var (queue, connectionTask) = client.Run(materializer);

var connection = await connectionTask;

Console.WriteLine($"Connected to: {connection.RemoteAddress}. Start typing:");

while (true)
{
    var data = Console.ReadLine();
    await queue.OfferAsync(data).ConfigureAwait(false); // we'll deadlock here
}

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb Aaronontheweb added this to the 1.3.12 milestone Dec 31, 2018
@Aaronontheweb Aaronontheweb merged commit e56b0e2 into akkadotnet:dev Dec 31, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants