Skip to content

Commit

Permalink
feat: 2110 memory_leak_bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
AqlaSolutions committed Apr 2, 2024
1 parent 8e5b42e commit bbe5322
Showing 1 changed file with 40 additions and 4 deletions.
44 changes: 40 additions & 4 deletions src/Proto.Remote/Endpoints/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -375,17 +375,19 @@ private void RejectRemoteDeliver(RemoteDeliver env)

private async Task RunAsync()
{
var waiter = new MultiTaskReuseWaiter<bool>(
() => _remoteDelivers.Reader.WaitToReadAsync(CancellationToken),
() => _remotePriorityDelivers.Reader.WaitToReadAsync(CancellationToken));

while (!CancellationToken.IsCancellationRequested)
{
try
{
var messages = new List<RemoteDeliver>(RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize);

while (true)
{
var t1 = _remoteDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask();
var t2 = _remotePriorityDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask();
await Task.WhenAny(t1, t2);
await waiter.WaitAnyAsync();

var i = 0;
while (true)
{
Expand Down Expand Up @@ -571,4 +573,38 @@ private MessageBatch CreateBatch(IReadOnlyCollection<RemoteDeliver> m)
// Logger.LogTrace("[{SystemAddress}] Sending {Count} envelopes for {Address}", System.Address, envelopes.Count, Address);
return batch;
}

/// <summary>
/// Preserves non completed Tasks between <see cref="WaitAnyAsync"/> calls.
/// This is necessary to prevent memory leak https://github.com/asynkron/protoactor-dotnet/issues/2110
/// </summary>
class MultiTaskReuseWaiter<T>
{
private readonly Func<ValueTask<T>>[] _taskFactories;
private readonly Task<T>?[] _tasks;

public MultiTaskReuseWaiter(params Func<ValueTask<T>>[] taskFactories)
{
_taskFactories = taskFactories;
_tasks = new Task<T>?[taskFactories.Length];
}

public async ValueTask<T> WaitAnyAsync()
{
for (var i = 0; i < _taskFactories.Length; i++)
{
if (_tasks[i]?.IsCompleted == false)
continue;

var vt = _taskFactories[i].Invoke();
if (vt.IsCompleted)
return await vt;

_tasks[i] = vt.AsTask();
}

return await await Task.WhenAny(_tasks!);
}

}
}

0 comments on commit bbe5322

Please sign in to comment.