Skip to content

Commit 7df896a

Browse files
Youssef1313nohwndCopilot
authored
Avoid BlockingCollection in AsyncConsumerDataProcessor (#6887)
Co-authored-by: Jakub Jareš <me@jakubjares.com> Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
1 parent c9f13b4 commit 7df896a

File tree

7 files changed

+468
-16
lines changed

7 files changed

+468
-16
lines changed

src/Platform/Microsoft.Testing.Platform/Messages/ChannelConsumerDataProcessor.cs renamed to src/Platform/Microsoft.Testing.Platform/Messages/AsyncConsumerDataProcessor.net.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
namespace Microsoft.Testing.Platform.Messages;
1212

1313
[DebuggerDisplay("DataConsumer = {DataConsumer.Uid}")]
14-
internal sealed class AsyncConsumerDataProcessor : IDisposable
14+
internal sealed class AsyncConsumerDataProcessor : IAsyncConsumerDataProcessor
1515
{
1616
private readonly ITask _task;
1717
private readonly CancellationToken _cancellationToken;
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
#if !NETCOREAPP
5+
using Microsoft.Testing.Platform.Extensions;
6+
using Microsoft.Testing.Platform.Extensions.Messages;
7+
using Microsoft.Testing.Platform.Helpers;
8+
9+
namespace Microsoft.Testing.Platform.Messages;
10+
11+
internal sealed class AsyncConsumerDataProcessor : IAsyncConsumerDataProcessor
12+
{
13+
private readonly ITask _task;
14+
private readonly CancellationToken _cancellationToken;
15+
private readonly SingleConsumerUnboundedChannel<(IDataProducer DataProducer, IData Data)> _channel = new();
16+
17+
// This is needed to avoid possible race condition between drain and _totalPayloadProcessed race condition.
18+
// This is the "logical" consume workflow state.
19+
private readonly TaskCompletionSource<object> _consumerState = new();
20+
private readonly Task _consumeTask;
21+
private long _totalPayloadReceived;
22+
private long _totalPayloadProcessed;
23+
24+
public AsyncConsumerDataProcessor(IDataConsumer dataConsumer, ITask task, CancellationToken cancellationToken)
25+
{
26+
DataConsumer = dataConsumer;
27+
_task = task;
28+
_cancellationToken = cancellationToken;
29+
_consumeTask = task.Run(ConsumeAsync, cancellationToken);
30+
}
31+
32+
public IDataConsumer DataConsumer { get; }
33+
34+
public Task PublishAsync(IDataProducer dataProducer, IData data)
35+
{
36+
_cancellationToken.ThrowIfCancellationRequested();
37+
Interlocked.Increment(ref _totalPayloadReceived);
38+
_channel.Write((dataProducer, data));
39+
return Task.CompletedTask;
40+
}
41+
42+
private async Task ConsumeAsync()
43+
{
44+
try
45+
{
46+
while (await _channel.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
47+
{
48+
while (_channel.TryRead(out (IDataProducer DataProducer, IData Data) item))
49+
{
50+
try
51+
{
52+
// We don't enqueue the data if the consumer is the producer of the data.
53+
// We could optimize this if and make a get with type/all but producers, but it
54+
// could be over-engineering.
55+
if (item.DataProducer.Uid == DataConsumer.Uid)
56+
{
57+
continue;
58+
}
59+
60+
try
61+
{
62+
await DataConsumer.ConsumeAsync(item.DataProducer, item.Data, _cancellationToken).ConfigureAwait(false);
63+
}
64+
65+
// We let the catch below to handle the graceful cancellation of the process
66+
catch (Exception ex) when (ex is not OperationCanceledException)
67+
{
68+
// If we're draining before to increment the _totalPayloadProcessed we need to signal that we should throw because
69+
// it's possible we have a race condition where the payload check at line 106 return false and the current task is not yet in a
70+
// "faulted state".
71+
_consumerState.SetException(ex);
72+
73+
// We let current task to move to fault state, checked inside CompleteAddingAsync.
74+
throw;
75+
}
76+
}
77+
finally
78+
{
79+
Interlocked.Increment(ref _totalPayloadProcessed);
80+
}
81+
}
82+
}
83+
}
84+
catch (OperationCanceledException oc) when (oc.CancellationToken == _cancellationToken)
85+
{
86+
// Ignore we're shutting down
87+
}
88+
catch (Exception ex)
89+
{
90+
// For all other exception we signal the state if not already faulted
91+
if (!_consumerState.Task.IsFaulted)
92+
{
93+
_consumerState.SetException(ex);
94+
}
95+
96+
// let the exception bubble up
97+
throw;
98+
}
99+
100+
// We're exiting gracefully, signal the correct state.
101+
_consumerState.SetResult(new object());
102+
}
103+
104+
public async Task CompleteAddingAsync()
105+
{
106+
// Signal that no more items will be added to the collection
107+
// It's possible that we call this method multiple times
108+
_channel.Complete();
109+
110+
// Wait for the consumer to complete
111+
await _consumeTask.ConfigureAwait(false);
112+
}
113+
114+
public async Task<long> DrainDataAsync()
115+
{
116+
// We go volatile because we race with Interlocked.Increment in PublishAsync
117+
long totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed);
118+
long totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived);
119+
const int minDelayTimeMs = 25;
120+
int currentDelayTimeMs = minDelayTimeMs;
121+
while (Interlocked.CompareExchange(ref _totalPayloadReceived, totalPayloadReceived, totalPayloadProcessed) != totalPayloadProcessed)
122+
{
123+
// When we cancel we throw inside ConsumeAsync and we won't drain anymore any data
124+
if (_cancellationToken.IsCancellationRequested)
125+
{
126+
break;
127+
}
128+
129+
await _task.Delay(currentDelayTimeMs).ConfigureAwait(false);
130+
currentDelayTimeMs = Math.Min(currentDelayTimeMs + minDelayTimeMs, 200);
131+
132+
if (_consumerState.Task.IsFaulted)
133+
{
134+
// Rethrow the exception
135+
await _consumerState.Task.ConfigureAwait(false);
136+
}
137+
138+
// Wait for the consumer to complete the current enqueued items
139+
totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed);
140+
totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived);
141+
}
142+
143+
// It' possible that we fail and we have consumed the item
144+
if (_consumerState.Task.IsFaulted)
145+
{
146+
// Rethrow the exception
147+
await _consumerState.Task.ConfigureAwait(false);
148+
}
149+
150+
return _totalPayloadReceived;
151+
}
152+
153+
public void Dispose()
154+
=> _channel.Complete();
155+
}
156+
#endif

src/Platform/Microsoft.Testing.Platform/Messages/AsynchronousMessageBus.cs

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ internal sealed class AsynchronousMessageBus : BaseMessageBus, IMessageBus, IDis
1818
private readonly IEnvironment _environment;
1919
private readonly ILogger<AsynchronousMessageBus> _logger;
2020
private readonly bool _isTraceLoggingEnabled;
21-
private readonly Dictionary<IDataConsumer, AsyncConsumerDataProcessor> _consumerProcessor = [];
22-
private readonly Dictionary<Type, List<AsyncConsumerDataProcessor>> _dataTypeConsumers = [];
21+
private readonly Dictionary<IDataConsumer, IAsyncConsumerDataProcessor> _consumerProcessor = [];
22+
private readonly Dictionary<Type, List<IAsyncConsumerDataProcessor>> _dataTypeConsumers = [];
2323
private readonly IDataConsumer[] _dataConsumers;
2424
private readonly ITestApplicationCancellationTokenSource _testApplicationCancellationTokenSource;
25+
#if !NETCOREAPP
26+
private readonly bool _forceBlockingCollection;
27+
#endif
2528
private bool _disabled;
2629

2730
public AsynchronousMessageBus(
@@ -37,6 +40,12 @@ public AsynchronousMessageBus(
3740
_environment = environment;
3841
_logger = loggerFactory.CreateLogger<AsynchronousMessageBus>();
3942
_isTraceLoggingEnabled = _logger.IsEnabled(LogLevel.Trace);
43+
#if !NETCOREAPP
44+
// Note: This env variable is only present temporarily.
45+
// Please, don't use it except for working around an issue that was reported to microsoft/testfx repo **and** a team member instructs you to do so.
46+
// This env variable is undocumented and we will remove it in a soon release.
47+
_forceBlockingCollection = _environment.GetEnvironmentVariable("MicrosoftTestingPlatform.MessageBus.UseBlockingCollection") == "1";
48+
#endif
4049
}
4150

4251
public override IDataConsumer[] DataConsumerServices
@@ -53,7 +62,7 @@ public override async Task InitAsync()
5362

5463
foreach (Type dataType in consumer.DataTypesConsumed)
5564
{
56-
if (!_dataTypeConsumers.TryGetValue(dataType, out List<AsyncConsumerDataProcessor>? asyncMultiProducerMultiConsumerDataProcessors))
65+
if (!_dataTypeConsumers.TryGetValue(dataType, out List<IAsyncConsumerDataProcessor>? asyncMultiProducerMultiConsumerDataProcessors))
5766
{
5867
asyncMultiProducerMultiConsumerDataProcessors = [];
5968
_dataTypeConsumers.Add(dataType, asyncMultiProducerMultiConsumerDataProcessors);
@@ -64,9 +73,15 @@ public override async Task InitAsync()
6473
throw new InvalidOperationException($"Consumer registered two time for data type '{dataType}', consumer '{consumer}'");
6574
}
6675

67-
if (!_consumerProcessor.TryGetValue(consumer, out AsyncConsumerDataProcessor? asyncMultiProducerMultiConsumerDataProcessor))
76+
if (!_consumerProcessor.TryGetValue(consumer, out IAsyncConsumerDataProcessor? asyncMultiProducerMultiConsumerDataProcessor))
6877
{
78+
#if !NETCOREAPP
79+
asyncMultiProducerMultiConsumerDataProcessor = _forceBlockingCollection
80+
? new BlockingCollectionConsumerDataProcessor(consumer, _task, _testApplicationCancellationTokenSource.CancellationToken)
81+
: new AsyncConsumerDataProcessor(consumer, _task, _testApplicationCancellationTokenSource.CancellationToken);
82+
#else
6983
asyncMultiProducerMultiConsumerDataProcessor = new AsyncConsumerDataProcessor(consumer, _task, _testApplicationCancellationTokenSource.CancellationToken);
84+
#endif
7085
_consumerProcessor.Add(consumer, asyncMultiProducerMultiConsumerDataProcessor);
7186
}
7287

@@ -103,7 +118,7 @@ public override async Task PublishAsync(IDataProducer dataProducer, IData data)
103118
throw new InvalidOperationException($"Unexpected data type '{dataType}' produced by '{dataProducer.Uid}'");
104119
}
105120

106-
if (!_dataTypeConsumers.TryGetValue(dataType, out List<AsyncConsumerDataProcessor>? values))
121+
if (!_dataTypeConsumers.TryGetValue(dataType, out List<IAsyncConsumerDataProcessor>? values))
107122
{
108123
return;
109124
}
@@ -127,7 +142,7 @@ private async Task LogDataAsync(IDataProducer dataProducer, IData data)
127142

128143
public override async Task DrainDataAsync()
129144
{
130-
Dictionary<AsyncConsumerDataProcessor, long> consumerToDrain = [];
145+
Dictionary<IAsyncConsumerDataProcessor, long> consumerToDrain = [];
131146
bool anotherRound = true;
132147
string? customAttempts = _environment.GetEnvironmentVariable(EnvironmentVariableConstants.TESTINGPLATFORM_MESSAGEBUS_DRAINDATA_ATTEMPTS);
133148
if (!int.TryParse(customAttempts, out int totalNumberOfDrainAttempt))
@@ -149,7 +164,7 @@ public override async Task DrainDataAsync()
149164
StringBuilder builder = new();
150165
builder.Append(CultureInfo.InvariantCulture, $"Publisher/Consumer loop detected during the drain after {stopwatch.Elapsed}.\n{builder}");
151166

152-
foreach ((AsyncConsumerDataProcessor key, long value) in consumerToDrain)
167+
foreach ((IAsyncConsumerDataProcessor key, long value) in consumerToDrain)
153168
{
154169
builder.AppendLine(CultureInfo.InvariantCulture, $"Consumer '{key.DataConsumer}' payload received {value}.");
155170
}
@@ -159,9 +174,9 @@ public override async Task DrainDataAsync()
159174

160175
totalNumberOfDrainAttempt--;
161176
anotherRound = false;
162-
foreach (List<AsyncConsumerDataProcessor> dataProcessors in _dataTypeConsumers.Values)
177+
foreach (List<IAsyncConsumerDataProcessor> dataProcessors in _dataTypeConsumers.Values)
163178
{
164-
foreach (AsyncConsumerDataProcessor asyncMultiProducerMultiConsumerDataProcessor in dataProcessors)
179+
foreach (IAsyncConsumerDataProcessor asyncMultiProducerMultiConsumerDataProcessor in dataProcessors)
165180
{
166181
consumerToDrain.TryAdd(asyncMultiProducerMultiConsumerDataProcessor, 0);
167182

@@ -185,9 +200,9 @@ public override async Task DisableAsync()
185200

186201
_disabled = true;
187202

188-
foreach (List<AsyncConsumerDataProcessor> dataProcessors in _dataTypeConsumers.Values)
203+
foreach (List<IAsyncConsumerDataProcessor> dataProcessors in _dataTypeConsumers.Values)
189204
{
190-
foreach (AsyncConsumerDataProcessor asyncMultiProducerMultiConsumerDataProcessor in dataProcessors)
205+
foreach (IAsyncConsumerDataProcessor asyncMultiProducerMultiConsumerDataProcessor in dataProcessors)
191206
{
192207
await asyncMultiProducerMultiConsumerDataProcessor.CompleteAddingAsync().ConfigureAwait(false);
193208
}
@@ -196,9 +211,9 @@ public override async Task DisableAsync()
196211

197212
public override void Dispose()
198213
{
199-
foreach (List<AsyncConsumerDataProcessor> dataProcessors in _dataTypeConsumers.Values)
214+
foreach (List<IAsyncConsumerDataProcessor> dataProcessors in _dataTypeConsumers.Values)
200215
{
201-
foreach (AsyncConsumerDataProcessor asyncMultiProducerMultiConsumerDataProcessor in dataProcessors)
216+
foreach (IAsyncConsumerDataProcessor asyncMultiProducerMultiConsumerDataProcessor in dataProcessors)
202217
{
203218
asyncMultiProducerMultiConsumerDataProcessor.Dispose();
204219
}

src/Platform/Microsoft.Testing.Platform/Messages/ConsumingEnumerableConsumerDataProcessor.cs renamed to src/Platform/Microsoft.Testing.Platform/Messages/BlockingCollectionConsumerDataProcessor.netstandard.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
33

44
#if !NETCOREAPP
5+
56
using Microsoft.Testing.Platform.Extensions;
67
using Microsoft.Testing.Platform.Extensions.Messages;
78
using Microsoft.Testing.Platform.Helpers;
89

910
namespace Microsoft.Testing.Platform.Messages;
1011

11-
internal sealed class AsyncConsumerDataProcessor : IDisposable
12+
internal sealed class BlockingCollectionConsumerDataProcessor : IAsyncConsumerDataProcessor
1213
{
1314
// The default underlying collection is a ConcurrentQueue<T> object, which provides first in, first out (FIFO) behavior.
1415
private readonly BlockingCollection<(IDataProducer DataProducer, IData Data)> _payloads = [];
@@ -24,7 +25,7 @@ internal sealed class AsyncConsumerDataProcessor : IDisposable
2425
private long _totalPayloadReceived;
2526
private long _totalPayloadProcessed;
2627

27-
public AsyncConsumerDataProcessor(IDataConsumer dataConsumer, ITask task, CancellationToken cancellationToken)
28+
public BlockingCollectionConsumerDataProcessor(IDataConsumer dataConsumer, ITask task, CancellationToken cancellationToken)
2829
{
2930
DataConsumer = dataConsumer;
3031
_task = task;
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
using Microsoft.Testing.Platform.Extensions;
5+
using Microsoft.Testing.Platform.Extensions.Messages;
6+
7+
namespace Microsoft.Testing.Platform.Messages;
8+
9+
internal interface IAsyncConsumerDataProcessor : IDisposable
10+
{
11+
IDataConsumer DataConsumer { get; }
12+
13+
Task CompleteAddingAsync();
14+
15+
Task<long> DrainDataAsync();
16+
17+
Task PublishAsync(IDataProducer dataProducer, IData data);
18+
}

0 commit comments

Comments
 (0)