Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use concurrent bag for shared futures #1697

Merged
merged 2 commits into from
Jul 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 8 additions & 23 deletions src/Proto.Actor/Future/SharedFuture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
Expand All @@ -16,8 +17,8 @@ namespace Proto.Future;
public sealed class SharedFutureProcess : Process, IDisposable
{
private readonly FutureHandle[] _slots;
private readonly ChannelWriter<FutureHandle> _completedFutures;
private readonly ChannelReader<FutureHandle> _availableFutures;
private readonly ConcurrentBag<FutureHandle> _futures = new();


private long _createdRequests;
private long _completedRequests;
Expand Down Expand Up @@ -54,15 +55,13 @@ internal SharedFutureProcess(ActorSystem system, int size) : base(system)

_slots = new FutureHandle[size];

Channel<FutureHandle> channel = Channel.CreateUnbounded<FutureHandle>();
_availableFutures = channel.Reader;
_completedFutures = channel.Writer;


for (var i = 0; i < _slots.Length; i++)
{
var requestSlot = new FutureHandle(this, ToRequestId(i));
_slots[i] = requestSlot;
_completedFutures.TryWrite(requestSlot);
_futures.Add(requestSlot);
}

_maxRequestId = (int.MaxValue - (int.MaxValue % size));
Expand All @@ -83,7 +82,8 @@ public int RequestsInFlight {

public IFuture? TryCreateHandle()
{
if (Stopping || !_availableFutures.TryRead(out var requestSlot)) return default;

if (Stopping || !_futures.TryTake(out var requestSlot)) return default;

var pid = requestSlot.Init();
Interlocked.Increment(ref _createdRequests);
Expand Down Expand Up @@ -129,7 +129,7 @@ private void Complete(uint requestId, FutureHandle slot)
{
if (slot.TryComplete((int) requestId))
{
_completedFutures.TryWrite(slot);
_futures.Add(slot);

Interlocked.Increment(ref _completedRequests);

Expand All @@ -151,21 +151,6 @@ public void Dispose()
}
}

public void Stop()
{
Stopping = true;
_completedFutures.TryComplete();

while (_availableFutures.TryRead(out _))
{
}

if (RequestsInFlight == 0)
{
Stop(Pid);
}
}

private void Cancel(uint requestId)
{
if (!TryGetRequestSlot(requestId, out var slot)) return;
Expand Down
171 changes: 93 additions & 78 deletions src/Proto.Remote/Endpoints/EndpointReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,38 @@ ServerCallContext context
throw new RpcException(Status.DefaultCancelled, "Suspended");
}

using (_endpointManager.CancellationToken.Register(async () => {
try
{
await responseStream.WriteAsync(new RemoteMessage
{
DisconnectRequest = new DisconnectRequest()
}
).ConfigureAwait(false);
}
catch (Exception x)
{
x.CheckFailFast();
Logger.LogWarning("[EndpointReader][{SystemAddress}] Failed to write disconnect message to the stream", _system.Address);
}
}
))
async void Disconnect()
{
try
{
var disconnectMsg = new RemoteMessage
{
DisconnectRequest = new DisconnectRequest()
};
await responseStream.WriteAsync(disconnectMsg).ConfigureAwait(false);
}
catch (Exception x)
{
x.CheckFailFast();
Logger.LogWarning("[EndpointReader][{SystemAddress}] Failed to write disconnect message to the stream", _system.Address);
}
}

await using (_endpointManager.CancellationToken.Register(Disconnect))
{
IEndpoint endpoint;
string? address = null;
string systemId;



Logger.LogInformation("[EndpointReader][{SystemAddress}] Accepted connection request from {Remote} to {Local}",
_system.Address, context.Peer, context.Host
);

if (await requestStream.MoveNext().ConfigureAwait(false) &&
if (await requestStream.MoveNext(_endpointManager.CancellationToken).ConfigureAwait(false) &&
requestStream.Current.MessageTypeCase != RemoteMessage.MessageTypeOneofCase.ConnectRequest)
{
throw new RpcException(Status.DefaultCancelled, "Expected connection message");
}

var connectRequest = requestStream.Current.ConnectRequest;

Expand Down Expand Up @@ -104,54 +107,7 @@ await responseStream.WriteAsync(new RemoteMessage
).ConfigureAwait(false);
systemId = clientConnection.MemberId;
endpoint = _endpointManager.GetOrAddClientEndpoint(systemId);
_ = Task.Run(async () => {
try
{
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
while (!cancellationTokenSource.Token.IsCancellationRequested &&
endpoint.OutgoingStash.TryPop(out var message))
{
try
{
await responseStream.WriteAsync(message).ConfigureAwait(false);
}
catch (Exception)
{
_ = endpoint.OutgoingStash.Append(message);
throw;
}
}

while (endpoint.OutgoingStash.IsEmpty && !cancellationTokenSource.Token.IsCancellationRequested)
{
var message = await endpoint.Outgoing.Reader.ReadAsync(cancellationTokenSource.Token)
.ConfigureAwait(false);

try
{
// Logger.LogInformation($"Sending {message}");
await responseStream.WriteAsync(message).ConfigureAwait(false);
}
catch (Exception)
{
_ = endpoint.OutgoingStash.Append(message);
throw;
}
}
}
}
catch (OperationCanceledException)
{
Logger.LogDebug("[EndpointReader][{SystemAddress}] Writer closed for {SystemId}", _system.Address, systemId);
}
catch (Exception e)
{
e.CheckFailFast();
Logger.LogWarning(e, "[EndpointReader][{SystemAddress}] Writing error to {SystemId}", _system.Address, systemId);
}
}
);
_ = Task.Run(async () => { await RunClientWriter(responseStream, cancellationTokenSource, endpoint, systemId); });
}
break;
case ConnectRequest.ConnectionTypeOneofCase.ServerConnection: {
Expand Down Expand Up @@ -207,24 +163,83 @@ await responseStream.WriteAsync(new RemoteMessage
throw new ArgumentOutOfRangeException();
}

try
await RunReader(requestStream, address, cancellationTokenSource, systemId);
}
}

private async Task RunReader(IAsyncStreamReader<RemoteMessage> requestStream, string? address, CancellationTokenSource cancellationTokenSource, string systemId)
{
try
{
while (await requestStream.MoveNext(CancellationToken.None).ConfigureAwait(false))
{
while (await requestStream.MoveNext().ConfigureAwait(false))
var currentMessage = requestStream.Current;
if (_endpointManager.CancellationToken.IsCancellationRequested)
{
var currentMessage = requestStream.Current;
if (_endpointManager.CancellationToken.IsCancellationRequested)
continue;

_endpointManager.RemoteMessageHandler.HandleRemoteMessage(currentMessage, address!);
continue;
}

_endpointManager.RemoteMessageHandler.HandleRemoteMessage(currentMessage, address!);
}
finally
}
finally
{
cancellationTokenSource.Cancel();

if (address is null && systemId is not null)
{
cancellationTokenSource.Cancel();
if (address is null && systemId is not null)
_system.EventStream.Publish(new EndpointTerminatedEvent(false, null, systemId));
_system.EventStream.Publish(new EndpointTerminatedEvent(false, null, systemId));
}
}
}

private async Task RunClientWriter(IAsyncStreamWriter<RemoteMessage> responseStream, CancellationTokenSource cancellationTokenSource, IEndpoint endpoint, string systemId)
{
try
{
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
//consume stash
while (!cancellationTokenSource.Token.IsCancellationRequested && endpoint.OutgoingStash.TryPop(out var message))
{
try
{
await responseStream.WriteAsync(message).ConfigureAwait(false);
}
catch (Exception)
{
_ = endpoint.OutgoingStash.Append(message);
throw;
}
}

//
while (endpoint.OutgoingStash.IsEmpty && !cancellationTokenSource.Token.IsCancellationRequested)
{
var message = await endpoint.Outgoing.Reader.ReadAsync(cancellationTokenSource.Token).ConfigureAwait(false);

try
{
// Logger.LogInformation($"Sending {message}");
await responseStream.WriteAsync(message).ConfigureAwait(false);
}
catch (Exception)
{
_ = endpoint.OutgoingStash.Append(message);
throw;
}
}
}
}
catch (OperationCanceledException)
{
Logger.LogDebug("[EndpointReader][{SystemAddress}] Writer closed for {SystemId}", _system.Address, systemId);
}
catch (Exception e)
{
e.CheckFailFast();
Logger.LogWarning(e, "[EndpointReader][{SystemAddress}] Writing error to {SystemId}", _system.Address, systemId);
}
}

public override Task<ListProcessesResponse> ListProcesses(ListProcessesRequest request, ServerCallContext context)
Expand Down
4 changes: 3 additions & 1 deletion src/Proto.Remote/GrpcNet/GrpcNetRemote.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ public Task StartAsync()
if (_config.ConfigureKestrel == null)
{
serverOptions.Listen(ipAddress, Config.Port,
listenOptions => listenOptions.Protocols = HttpProtocols.Http2
listenOptions => {
listenOptions.Protocols = HttpProtocols.Http2;
}
);
}
else
Expand Down