diff --git a/src/Proto.Remote/Endpoints/Endpoint.cs b/src/Proto.Remote/Endpoints/Endpoint.cs index cbe25f00ff..c932a07a5c 100644 --- a/src/Proto.Remote/Endpoints/Endpoint.cs +++ b/src/Proto.Remote/Endpoints/Endpoint.cs @@ -375,17 +375,19 @@ private void RejectRemoteDeliver(RemoteDeliver env) private async Task RunAsync() { + var waiter = new MultiTaskReuseWaiter( + () => _remoteDelivers.Reader.WaitToReadAsync(CancellationToken), + () => _remotePriorityDelivers.Reader.WaitToReadAsync(CancellationToken)); + while (!CancellationToken.IsCancellationRequested) { try { var messages = new List(RemoteConfig.EndpointWriterOptions.EndpointWriterBatchSize); - while (true) { - var t1 = _remoteDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask(); - var t2 = _remotePriorityDelivers.Reader.WaitToReadAsync(CancellationToken).AsTask(); - await Task.WhenAny(t1, t2); + await waiter.WaitAnyAsync(); + var i = 0; while (true) { @@ -571,4 +573,38 @@ private MessageBatch CreateBatch(IReadOnlyCollection m) // Logger.LogTrace("[{SystemAddress}] Sending {Count} envelopes for {Address}", System.Address, envelopes.Count, Address); return batch; } + + /// + /// Preserves non completed Tasks between calls. + /// This is necessary to prevent memory leak https://github.com/asynkron/protoactor-dotnet/issues/2110 + /// + class MultiTaskReuseWaiter + { + private readonly Func>[] _taskFactories; + private readonly Task?[] _tasks; + + public MultiTaskReuseWaiter(params Func>[] taskFactories) + { + _taskFactories = taskFactories; + _tasks = new Task?[taskFactories.Length]; + } + + public async ValueTask WaitAnyAsync() + { + for (var i = 0; i < _taskFactories.Length; i++) + { + if (_tasks[i]?.IsCompleted == false) + continue; + + var vt = _taskFactories[i].Invoke(); + if (vt.IsCompleted) + return await vt; + + _tasks[i] = vt.AsTask(); + } + + return await await Task.WhenAny(_tasks!); + } + + } } \ No newline at end of file