diff --git a/src/libraries/System.Threading.Tasks.Parallel/ref/System.Threading.Tasks.Parallel.cs b/src/libraries/System.Threading.Tasks.Parallel/ref/System.Threading.Tasks.Parallel.cs index 41be9b17b088c..abe73d4db72e2 100644 --- a/src/libraries/System.Threading.Tasks.Parallel/ref/System.Threading.Tasks.Parallel.cs +++ b/src/libraries/System.Threading.Tasks.Parallel/ref/System.Threading.Tasks.Parallel.cs @@ -4,6 +4,8 @@ // Changes to this file must follow the https://aka.ms/api-review process. // ------------------------------------------------------------------------------ +using System.Numerics; + namespace System.Threading.Tasks { public static partial class Parallel @@ -16,6 +18,9 @@ public static partial class Parallel public static System.Threading.Tasks.ParallelLoopResult For(long fromInclusive, long toExclusive, System.Action body) { throw null; } public static System.Threading.Tasks.ParallelLoopResult For(long fromInclusive, long toExclusive, System.Threading.Tasks.ParallelOptions parallelOptions, System.Action body) { throw null; } public static System.Threading.Tasks.ParallelLoopResult For(long fromInclusive, long toExclusive, System.Threading.Tasks.ParallelOptions parallelOptions, System.Action body) { throw null; } + public static System.Threading.Tasks.Task ForAsync(T fromInclusive, T toExclusive, System.Func body) where T : notnull, System.Numerics.IBinaryInteger { throw null; } + public static System.Threading.Tasks.Task ForAsync(T fromInclusive, T toExclusive, System.Threading.CancellationToken cancellationToken, System.Func body) where T : notnull, System.Numerics.IBinaryInteger { throw null; } + public static System.Threading.Tasks.Task ForAsync(T fromInclusive, T toExclusive, System.Threading.Tasks.ParallelOptions parallelOptions, System.Func body) where T : notnull, System.Numerics.IBinaryInteger { throw null; } public static System.Threading.Tasks.ParallelLoopResult ForEach(System.Collections.Concurrent.OrderablePartitioner source, System.Action body) { throw null; } public static System.Threading.Tasks.ParallelLoopResult ForEach(System.Collections.Concurrent.OrderablePartitioner source, System.Threading.Tasks.ParallelOptions parallelOptions, System.Action body) { throw null; } public static System.Threading.Tasks.ParallelLoopResult ForEach(System.Collections.Concurrent.Partitioner source, System.Action body) { throw null; } diff --git a/src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs b/src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs index 7862243c1ae4f..b3c6d50cafe2e 100644 --- a/src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs +++ b/src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs @@ -3,16 +3,214 @@ using System.Collections.Generic; using System.Diagnostics; +using System.Numerics; +using System.Runtime.CompilerServices; namespace System.Threading.Tasks { public static partial class Parallel { - /// Executes a for each operation on an in which iterations may run in parallel. + /// Executes a for loop in which iterations may run in parallel. + /// The start index, inclusive. + /// The end index, exclusive. + /// An asynchronous delegate that is invoked once per element in the data source. + /// The argument is . + /// A task that represents the entire for each operation. + /// The operation will execute at most operations in parallel. + public static Task ForAsync(T fromInclusive, T toExclusive, Func body) + where T : notnull, IBinaryInteger + { + if (fromInclusive is null) throw new ArgumentNullException(nameof(fromInclusive)); + if (toExclusive is null) throw new ArgumentNullException(nameof(toExclusive)); + ArgumentNullException.ThrowIfNull(body); + + return ForAsync(fromInclusive, toExclusive, DefaultDegreeOfParallelism, TaskScheduler.Default, default, body); + } + + /// Executes a for loop in which iterations may run in parallel. + /// The start index, inclusive. + /// The end index, exclusive. + /// A cancellation token that may be used to cancel the for each operation. + /// An asynchronous delegate that is invoked once per element in the data source. + /// The argument is . + /// A task that represents the entire for each operation. + /// The operation will execute at most operations in parallel. + public static Task ForAsync(T fromInclusive, T toExclusive, CancellationToken cancellationToken, Func body) + where T : notnull, IBinaryInteger + { + if (fromInclusive is null) throw new ArgumentNullException(nameof(fromInclusive)); + if (toExclusive is null) throw new ArgumentNullException(nameof(toExclusive)); + ArgumentNullException.ThrowIfNull(body); + + return ForAsync(fromInclusive, toExclusive, DefaultDegreeOfParallelism, TaskScheduler.Default, cancellationToken, body); + } + + /// Executes a for loop in which iterations may run in parallel. + /// The start index, inclusive. + /// The end index, exclusive. + /// An object that configures the behavior of this operation. + /// An asynchronous delegate that is invoked once per element in the data source. + /// The argument is . + /// A task that represents the entire for each operation. + /// The operation will execute at most operations in parallel. + public static Task ForAsync(T fromInclusive, T toExclusive, ParallelOptions parallelOptions, Func body) + where T : notnull, IBinaryInteger + { + if (fromInclusive is null) throw new ArgumentNullException(nameof(fromInclusive)); + if (toExclusive is null) throw new ArgumentNullException(nameof(toExclusive)); + ArgumentNullException.ThrowIfNull(parallelOptions); + ArgumentNullException.ThrowIfNull(body); + + return ForAsync(fromInclusive, toExclusive, parallelOptions.EffectiveMaxConcurrencyLevel, parallelOptions.EffectiveTaskScheduler, parallelOptions.CancellationToken, body); + } + + /// Executes a for each operation on an in which iterations may run in parallel. + /// The type of the data in the source. + /// The start index, inclusive. + /// The end index, exclusive. + /// The degree of parallelism, or the number of operations to allow to run in parallel. + /// The task scheduler on which all code should execute. + /// A cancellation token that may be used to cancel the for each operation. + /// An asynchronous delegate that is invoked once per element in the data source. + /// The argument is . + /// A task that represents the entire for each operation. + private static Task ForAsync(T fromInclusive, T toExclusive, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func body) + where T : notnull, IBinaryInteger + { + Debug.Assert(fromInclusive != null); + Debug.Assert(toExclusive != null); + Debug.Assert(scheduler != null); + Debug.Assert(body != null); + + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + if (fromInclusive >= toExclusive) + { + return Task.CompletedTask; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + static bool Interlockable() => + typeof(T) == typeof(int) || + typeof(T) == typeof(uint) || + typeof(T) == typeof(long) || + typeof(T) == typeof(ulong) || + typeof(T) == typeof(nint) || + typeof(T) == typeof(nuint); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + static bool CompareExchange(ref T location, T value, T comparand) => + typeof(T) == typeof(int) ? Interlocked.CompareExchange(ref Unsafe.As(ref location), Unsafe.As(ref value), Unsafe.As(ref comparand)) == Unsafe.As(ref comparand) : + typeof(T) == typeof(uint) ? Interlocked.CompareExchange(ref Unsafe.As(ref location), Unsafe.As(ref value), Unsafe.As(ref comparand)) == Unsafe.As(ref comparand) : + typeof(T) == typeof(long) ? Interlocked.CompareExchange(ref Unsafe.As(ref location), Unsafe.As(ref value), Unsafe.As(ref comparand)) == Unsafe.As(ref comparand) : + typeof(T) == typeof(ulong) ? Interlocked.CompareExchange(ref Unsafe.As(ref location), Unsafe.As(ref value), Unsafe.As(ref comparand)) == Unsafe.As(ref comparand) : + typeof(T) == typeof(nint) ? Interlocked.CompareExchange(ref Unsafe.As(ref location), Unsafe.As(ref value), Unsafe.As(ref comparand)) == Unsafe.As(ref comparand) : + typeof(T) == typeof(nuint) ? Interlocked.CompareExchange(ref Unsafe.As(ref location), Unsafe.As(ref value), Unsafe.As(ref comparand)) == Unsafe.As(ref comparand) : + throw new UnreachableException(); + + // The worker body. Each worker will execute this same body. + Func taskBody = static async o => + { + var state = (ForEachState)o; + bool launchedNext = false; + +#pragma warning disable CA2007 // Explicitly don't use ConfigureAwait, as we want to perform all work on the specified scheduler that's now current + try + { + // Continue to loop while there are more elements to be processed. + while (!state.Cancellation.IsCancellationRequested) + { + // Get the next element from the enumerator. For some types, we can get the next element with just + // interlocked operations, avoiding the need to take a lock. For other types, we need to take a lock. + T element; + if (Interlockable()) + { + TryAgain: + element = state.NextAvailable; + if (element >= state.ToExclusive) + { + break; + } + + if (!CompareExchange(ref state.NextAvailable, element + T.One, element)) + { + goto TryAgain; + } + } + else + { + await state.AcquireLock(); + try + { + if (state.Cancellation.IsCancellationRequested || // check now that the lock has been acquired + state.NextAvailable >= state.ToExclusive) + { + break; + } + + element = state.NextAvailable; + state.NextAvailable++; + } + finally + { + state.ReleaseLock(); + } + } + + // If the remaining dop allows it and we've not yet queued the next worker, do so now. We wait + // until after we've grabbed an item from the enumerator to a) avoid unnecessary contention on the + // serialized resource, and b) avoid queueing another work if there aren't any more items. Each worker + // is responsible only for creating the next worker, which in turn means there can't be any contention + // on creating workers (though it's possible one worker could be executing while we're creating the next). + if (!launchedNext) + { + launchedNext = true; + state.QueueWorkerIfDopAvailable(); + } + + // Process the loop body. + await state.LoopBody(element, state.Cancellation.Token); + } + } + catch (Exception e) + { + // Record the failure and then don't let the exception propagate. The last worker to complete + // will propagate exceptions as is appropriate to the top-level task. + state.RecordException(e); + } + finally + { + // If we're the last worker to complete, complete the operation. + if (state.SignalWorkerCompletedIterating()) + { + state.Complete(); + } + } +#pragma warning restore CA2007 + }; + + try + { + // Construct a state object that encapsulates all state to be passed and shared between + // the workers, and queues the first worker. + var state = new ForEachState(fromInclusive, toExclusive, taskBody, !Interlockable(), dop, scheduler, cancellationToken, body); + state.QueueWorkerIfDopAvailable(); + return state.Task; + } + catch (Exception e) + { + return Task.FromException(e); + } + } + + /// Executes a for each operation on an in which iterations may run in parallel. /// The type of the data in the source. /// An enumerable data source. /// An asynchronous delegate that is invoked once per element in the data source. - /// The exception that is thrown when the argument or argument is null. + /// The argument or argument is . /// A task that represents the entire for each operation. /// The operation will execute at most operations in parallel. public static Task ForEachAsync(IEnumerable source, Func body) @@ -23,12 +221,12 @@ public static Task ForEachAsync(IEnumerable source, FuncExecutes a for each operation on an in which iterations may run in parallel. + /// Executes a for each operation on an in which iterations may run in parallel. /// The type of the data in the source. /// An enumerable data source. /// A cancellation token that may be used to cancel the for each operation. /// An asynchronous delegate that is invoked once per element in the data source. - /// The exception that is thrown when the argument or argument is null. + /// The argument or argument is . /// A task that represents the entire for each operation. /// The operation will execute at most operations in parallel. public static Task ForEachAsync(IEnumerable source, CancellationToken cancellationToken, Func body) @@ -39,12 +237,12 @@ public static Task ForEachAsync(IEnumerable source, Cancellati return ForEachAsync(source, DefaultDegreeOfParallelism, TaskScheduler.Default, cancellationToken, body); } - /// Executes a for each operation on an in which iterations may run in parallel. + /// Executes a for each operation on an in which iterations may run in parallel. /// The type of the data in the source. /// An enumerable data source. /// An object that configures the behavior of this operation. /// An asynchronous delegate that is invoked once per element in the data source. - /// The exception that is thrown when the argument or argument is null. + /// The argument or argument is . /// A task that represents the entire for each operation. public static Task ForEachAsync(IEnumerable source, ParallelOptions parallelOptions, Func body) { @@ -55,14 +253,14 @@ public static Task ForEachAsync(IEnumerable source, ParallelOp return ForEachAsync(source, parallelOptions.EffectiveMaxConcurrencyLevel, parallelOptions.EffectiveTaskScheduler, parallelOptions.CancellationToken, body); } - /// Executes a for each operation on an in which iterations may run in parallel. + /// Executes a for each operation on an in which iterations may run in parallel. /// The type of the data in the source. /// An enumerable data source. /// A integer indicating how many operations to allow to run in parallel. /// The task scheduler on which all code should execute. /// A cancellation token that may be used to cancel the for each operation. /// An asynchronous delegate that is invoked once per element in the data source. - /// The exception that is thrown when the argument or argument is null. + /// The argument or argument is . /// A task that represents the entire for each operation. private static Task ForEachAsync(IEnumerable source, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func body) { @@ -76,11 +274,6 @@ private static Task ForEachAsync(IEnumerable source, int dop, return Task.FromCanceled(cancellationToken); } - if (dop < 0) - { - dop = DefaultDegreeOfParallelism; - } - // The worker body. Each worker will execute this same body. Func taskBody = static async o => { @@ -168,11 +361,11 @@ private static Task ForEachAsync(IEnumerable source, int dop, } } - /// Executes a for each operation on an in which iterations may run in parallel. + /// Executes a for each operation on an in which iterations may run in parallel. /// The type of the data in the source. /// An asynchronous enumerable data source. /// An asynchronous delegate that is invoked once per element in the data source. - /// The exception that is thrown when the argument or argument is null. + /// The argument or argument is . /// A task that represents the entire for each operation. /// The operation will execute at most operations in parallel. public static Task ForEachAsync(IAsyncEnumerable source, Func body) @@ -183,12 +376,12 @@ public static Task ForEachAsync(IAsyncEnumerable source, Func< return ForEachAsync(source, DefaultDegreeOfParallelism, TaskScheduler.Default, default(CancellationToken), body); } - /// Executes a for each operation on an in which iterations may run in parallel. + /// Executes a for each operation on an in which iterations may run in parallel. /// The type of the data in the source. /// An asynchronous enumerable data source. /// A cancellation token that may be used to cancel the for each operation. /// An asynchronous delegate that is invoked once per element in the data source. - /// The exception that is thrown when the argument or argument is null. + /// The argument or argument is . /// A task that represents the entire for each operation. /// The operation will execute at most operations in parallel. public static Task ForEachAsync(IAsyncEnumerable source, CancellationToken cancellationToken, Func body) @@ -199,12 +392,12 @@ public static Task ForEachAsync(IAsyncEnumerable source, Cance return ForEachAsync(source, DefaultDegreeOfParallelism, TaskScheduler.Default, cancellationToken, body); } - /// Executes a for each operation on an in which iterations may run in parallel. + /// Executes a for each operation on an in which iterations may run in parallel. /// The type of the data in the source. /// An asynchronous enumerable data source. /// An object that configures the behavior of this operation. /// An asynchronous delegate that is invoked once per element in the data source. - /// The exception that is thrown when the argument or argument is null. + /// The argument or argument is . /// A task that represents the entire for each operation. public static Task ForEachAsync(IAsyncEnumerable source, ParallelOptions parallelOptions, Func body) { @@ -215,14 +408,14 @@ public static Task ForEachAsync(IAsyncEnumerable source, Paral return ForEachAsync(source, parallelOptions.EffectiveMaxConcurrencyLevel, parallelOptions.EffectiveTaskScheduler, parallelOptions.CancellationToken, body); } - /// Executes a for each operation on an in which iterations may run in parallel. + /// Executes a for each operation on an in which iterations may run in parallel. /// The type of the data in the source. /// An asynchronous enumerable data source. /// A integer indicating how many operations to allow to run in parallel. /// The task scheduler on which all code should execute. /// A cancellation token that may be used to cancel the for each operation. /// An asynchronous delegate that is invoked once per element in the data source. - /// The exception that is thrown when the argument or argument is null. + /// The argument or argument is . /// A task that represents the entire for each operation. private static Task ForEachAsync(IAsyncEnumerable source, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func body) { @@ -236,11 +429,6 @@ private static Task ForEachAsync(IAsyncEnumerable source, int return Task.FromCanceled(cancellationToken); } - if (dop < 0) - { - dop = DefaultDegreeOfParallelism; - } - // The worker body. Each worker will execute this same body. Func taskBody = static async o => { @@ -352,7 +540,7 @@ private abstract class ForEachAsyncState : TaskCompletionSource, IThrea /// The present at the time of the ForEachAsync invocation. This is only used if on the default scheduler. private readonly ExecutionContext? _executionContext; /// Semaphore used to provide exclusive access to the enumerator. - private readonly SemaphoreSlim _lock = new SemaphoreSlim(initialCount: 1, maxCount: 1); + private readonly SemaphoreSlim? _lock; /// The number of outstanding workers. When this hits 0, the operation has completed. private int _completionRefCount; @@ -367,10 +555,11 @@ private abstract class ForEachAsyncState : TaskCompletionSource, IThrea public readonly CancellationTokenSource Cancellation = new CancellationTokenSource(); /// Initializes the state object. - protected ForEachAsyncState(Func taskBody, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func body) + protected ForEachAsyncState(Func taskBody, bool needsLock, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func body) { _taskBody = taskBody; - _remainingDop = dop; + _lock = needsLock ? new SemaphoreSlim(initialCount: 1, maxCount: 1) : null; + _remainingDop = dop < 0 ? DefaultDegreeOfParallelism : dop; LoopBody = body; _scheduler = scheduler; if (scheduler == TaskScheduler.Default) @@ -417,7 +606,8 @@ public void QueueWorkerIfDopAvailable() public bool SignalWorkerCompletedIterating() => Interlocked.Decrement(ref _completionRefCount) == 0; /// Asynchronously acquires exclusive access to the enumerator. - public Task AcquireLock() => + public Task AcquireLock() + { // We explicitly don't pass this.Cancellation to WaitAsync. Doing so adds overhead, and it isn't actually // necessary. All of the operations that monitor the lock are part of the same ForEachAsync operation, and the Task // returned from ForEachAsync can't complete until all of the constituent operations have completed, including whoever @@ -426,10 +616,16 @@ public Task AcquireLock() => // the face of cancellation, in exchange for making it a bit slower / more overhead in the common case of cancellation // not being requested. We want to optimize for the latter. This also then avoids an exception throw / catch when // cancellation is requested. - _lock.WaitAsync(CancellationToken.None); + Debug.Assert(_lock is not null, "Should only be invoked when _lock is non-null"); + return _lock.WaitAsync(CancellationToken.None); + } /// Relinquishes exclusive access to the enumerator. - public void ReleaseLock() => _lock.Release(); + public void ReleaseLock() + { + Debug.Assert(_lock is not null, "Should only be invoked when _lock is non-null"); + _lock.Release(); + } /// Stores an exception and triggers cancellation in order to alert all workers to stop as soon as possible. /// The exception. @@ -513,7 +709,7 @@ public SyncForEachAsyncState( IEnumerable source, Func taskBody, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func body) : - base(taskBody, dop, scheduler, cancellationToken, body) + base(taskBody, needsLock: true, dop, scheduler, cancellationToken, body) { Enumerator = source.GetEnumerator() ?? throw new InvalidOperationException(SR.Parallel_ForEach_NullEnumerator); } @@ -535,7 +731,7 @@ public AsyncForEachAsyncState( IAsyncEnumerable source, Func taskBody, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func body) : - base(taskBody, dop, scheduler, cancellationToken, body) + base(taskBody, needsLock: true, dop, scheduler, cancellationToken, body) { Enumerator = source.GetAsyncEnumerator(Cancellation.Token) ?? throw new InvalidOperationException(SR.Parallel_ForEach_NullEnumerator); } @@ -546,5 +742,23 @@ public ValueTask DisposeAsync() return Enumerator.DisposeAsync(); } } + + /// Stores the state associated with an IAsyncEnumerable ForEachAsync operation, shared between all its workers. + /// Specifies the type of data being enumerated. + private sealed class ForEachState : ForEachAsyncState + { + public T NextAvailable; + public readonly T ToExclusive; + + public ForEachState( + T fromExclusive, T toExclusive, Func taskBody, + bool needsLock, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, + Func body) : + base(taskBody, needsLock, dop, scheduler, cancellationToken, body) + { + NextAvailable = fromExclusive; + ToExclusive = toExclusive; + } + } } } diff --git a/src/libraries/System.Threading.Tasks.Parallel/tests/ParallelForEachAsyncTests.cs b/src/libraries/System.Threading.Tasks.Parallel/tests/ParallelForEachAsyncTests.cs index b68b3d2c7b26b..5708643c1d28f 100644 --- a/src/libraries/System.Threading.Tasks.Parallel/tests/ParallelForEachAsyncTests.cs +++ b/src/libraries/System.Threading.Tasks.Parallel/tests/ParallelForEachAsyncTests.cs @@ -5,6 +5,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Numerics; using System.Runtime.CompilerServices; using Xunit; @@ -23,9 +24,14 @@ public void InvalidArguments_ThrowsException() AssertExtensions.Throws("source", () => { Parallel.ForEachAsync((IAsyncEnumerable)null, CancellationToken.None, (item, cancellationToken) => default); }); AssertExtensions.Throws("source", () => { Parallel.ForEachAsync((IAsyncEnumerable)null, new ParallelOptions(), (item, cancellationToken) => default); }); + AssertExtensions.Throws("parallelOptions", () => { Parallel.ForAsync(1, 10, null, (item, cancellationToken) => default); }); AssertExtensions.Throws("parallelOptions", () => { Parallel.ForEachAsync(Enumerable.Range(1, 10), null, (item, cancellationToken) => default); }); AssertExtensions.Throws("parallelOptions", () => { Parallel.ForEachAsync(EnumerableRangeAsync(1, 10), null, (item, cancellationToken) => default); }); + AssertExtensions.Throws("body", () => { Parallel.ForAsync(1, 10, null); }); + AssertExtensions.Throws("body", () => { Parallel.ForAsync(1, 10, CancellationToken.None, null); }); + AssertExtensions.Throws("body", () => { Parallel.ForAsync(1, 10, new ParallelOptions(), null); }); + AssertExtensions.Throws("body", () => { Parallel.ForEachAsync(Enumerable.Range(1, 10), null); }); AssertExtensions.Throws("body", () => { Parallel.ForEachAsync(Enumerable.Range(1, 10), CancellationToken.None, null); }); AssertExtensions.Throws("body", () => { Parallel.ForEachAsync(Enumerable.Range(1, 10), new ParallelOptions(), null); }); @@ -54,9 +60,11 @@ void AssertCanceled(Task t) return default; }; + AssertCanceled(Parallel.ForAsync(1, 10, cts.Token, body)); AssertCanceled(Parallel.ForEachAsync(MarkStart(box), cts.Token, body)); AssertCanceled(Parallel.ForEachAsync(MarkStartAsync(box), cts.Token, body)); + AssertCanceled(Parallel.ForAsync(1, 10, new ParallelOptions { CancellationToken = cts.Token }, body)); AssertCanceled(Parallel.ForEachAsync(MarkStart(box), new ParallelOptions { CancellationToken = cts.Token }, body)); AssertCanceled(Parallel.ForEachAsync(MarkStartAsync(box), new ParallelOptions { CancellationToken = cts.Token }, body)); @@ -79,6 +87,39 @@ static async IAsyncEnumerable MarkStartAsync(StrongBox box) } } + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [InlineData(-1)] + [InlineData(1)] + [InlineData(2)] + [InlineData(4)] + [InlineData(128)] + public async Task Dop_WorkersCreatedRespectingLimit_For(int dop) + { + bool exit = false; + + int activeWorkers = 0; + var block = new TaskCompletionSource(); + + Task t = Parallel.ForAsync(long.MinValue, long.MaxValue, new ParallelOptions { MaxDegreeOfParallelism = dop }, async (item, cancellationToken) => + { + Interlocked.Increment(ref activeWorkers); + await block.Task; + if (Volatile.Read(ref exit)) + { + throw new FormatException(); + } + }); + Assert.False(t.IsCompleted); + + await Task.Delay(20); // give the loop some time to run + + Volatile.Write(ref exit, true); + block.SetResult(); + await Assert.ThrowsAsync(() => t); + + Assert.InRange(activeWorkers, 0, dop == -1 ? Environment.ProcessorCount : dop); + } + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] [InlineData(-1)] [InlineData(1)] @@ -117,6 +158,40 @@ static IEnumerable IterateUntilSet(StrongBox box) Assert.InRange(activeWorkers, 0, dop == -1 ? Environment.ProcessorCount : dop); } + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [InlineData(-1)] + [InlineData(1)] + [InlineData(2)] + [InlineData(4)] + [InlineData(128)] + public async Task Dop_WorkersCreatedRespectingLimitAndTaskScheduler_For(int dop) + { + bool exit = false; + int activeWorkers = 0; + var block = new TaskCompletionSource(); + + int MaxSchedulerLimit = Math.Min(2, Environment.ProcessorCount); + + Task t = Parallel.ForAsync(long.MinValue, long.MaxValue, new ParallelOptions { MaxDegreeOfParallelism = dop, TaskScheduler = new MaxConcurrencyLevelPassthroughTaskScheduler(MaxSchedulerLimit) }, async (item, cancellationToken) => + { + Interlocked.Increment(ref activeWorkers); + await block.Task; + if (Volatile.Read(ref exit)) + { + throw new FormatException(); + } + }); + Assert.False(t.IsCompleted); + + await Task.Delay(20); // give the loop some time to run + + Volatile.Write(ref exit, true); + block.SetResult(); + await Assert.ThrowsAsync(() => t); + + Assert.InRange(activeWorkers, 0, Math.Min(MaxSchedulerLimit, dop == -1 ? Environment.ProcessorCount : dop)); + } + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] [InlineData(-1)] [InlineData(1)] @@ -157,6 +232,33 @@ static IEnumerable IterateUntilSet(StrongBox box) Assert.InRange(activeWorkers, 0, Math.Min(MaxSchedulerLimit, dop == -1 ? Environment.ProcessorCount : dop)); } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + public async Task Dop_NegativeTaskSchedulerLimitTreatedAsDefault_For() + { + bool exit = false; + int activeWorkers = 0; + var block = new TaskCompletionSource(); + + Task t = Parallel.ForAsync(long.MinValue, long.MaxValue, new ParallelOptions { TaskScheduler = new MaxConcurrencyLevelPassthroughTaskScheduler(-42) }, async (item, cancellationToken) => + { + Interlocked.Increment(ref activeWorkers); + await block.Task; + if (Volatile.Read(ref exit)) + { + throw new FormatException(); + } + }); + Assert.False(t.IsCompleted); + + await Task.Delay(20); // give the loop some time to run + + Volatile.Write(ref exit, true); + block.SetResult(); + await Assert.ThrowsAsync(() => t); + + Assert.InRange(activeWorkers, 0, Environment.ProcessorCount); + } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] public async Task Dop_NegativeTaskSchedulerLimitTreatedAsDefault_Sync() { @@ -224,6 +326,19 @@ static async IAsyncEnumerable IterateUntilSet(StrongBox box) Assert.InRange(activeWorkers, 0, Environment.ProcessorCount); } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + public async Task RunsAsynchronously_For() + { + var cts = new CancellationTokenSource(); + + Task t = Parallel.ForAsync(long.MinValue, long.MaxValue, cts.Token, (item, cancellationToken) => default); + Assert.False(t.IsCompleted); + + cts.Cancel(); + + await Assert.ThrowsAnyAsync(() => t); + } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] public async Task RunsAsynchronously_EvenForEntirelySynchronousWork_Sync() { @@ -301,6 +416,20 @@ static async IAsyncEnumerable IterateUntilSetAsync(StrongBox box) Assert.InRange(activeWorkers, 0, dop == -1 ? Environment.ProcessorCount : dop); } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + public void EmptyRange_For() + { + int counter = 0; + Task t = Parallel.ForAsync(10, 10, (item, cancellationToken) => + { + Interlocked.Increment(ref counter); + return default; + }); + Assert.True(t.IsCompletedSuccessfully); + + Assert.Equal(0, counter); + } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] public async Task EmptySource_Sync() { @@ -327,6 +456,51 @@ await Parallel.ForEachAsync(EnumerableRangeAsync(0, 0), (item, cancellationToken Assert.Equal(0, counter); } + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [InlineData(false)] + [InlineData(true)] + public async Task AllItemsEnumeratedOnce_For(bool yield) + { + await Test(yield); + await Test(yield); + await Test(yield); + await Test(yield); + await Test(yield); + await Test(yield); + await Test(yield); + await Test(yield); + await Test(yield); + await Test(yield); + await Test(yield); + + async Task Test(bool yield) where T : IBinaryInteger + { + const int Start = 10, Count = 10_000; + + var set = new HashSet(); + + await Parallel.ForAsync(T.CreateTruncating(Start), T.CreateTruncating(Start + Count), async (item, cancellationToken) => + { + lock (set) + { + Assert.True(set.Add(item)); + } + + if (yield) + { + await Task.Yield(); + } + }); + + Assert.False(set.Contains(T.CreateTruncating(Start - 1))); + for (int i = Start; i < Start + Count; i++) + { + Assert.True(set.Contains(T.CreateTruncating(i))); + } + Assert.False(set.Contains(T.CreateTruncating(Start + Count + 1))); + } + } + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] [InlineData(false)] [InlineData(true)] @@ -349,10 +523,12 @@ await Parallel.ForEachAsync(Enumerable.Range(Start, Count), async (item, cancell } }); + Assert.False(set.Contains(Start - 1)); for (int i = Start; i < Start + Count; i++) { Assert.True(set.Contains(i)); } + Assert.False(set.Contains(Start + Count + 1)); } [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] @@ -377,10 +553,40 @@ await Parallel.ForEachAsync(EnumerableRangeAsync(Start, Count, yield), async (it } }); + Assert.False(set.Contains(Start - 1)); for (int i = Start; i < Start + Count; i++) { Assert.True(set.Contains(i)); } + Assert.False(set.Contains(Start + Count + 1)); + } + + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [InlineData(false)] + [InlineData(true)] + public async Task TaskScheduler_AllCodeExecutedOnCorrectScheduler_For(bool defaultScheduler) + { + TaskScheduler scheduler = defaultScheduler ? + TaskScheduler.Default : + new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler; + + TaskScheduler otherScheduler = new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler; + + var cq = new ConcurrentQueue(); + + await Parallel.ForAsync(1, 101, new ParallelOptions { TaskScheduler = scheduler }, async (item, cancellationToken) => + { + Assert.Same(scheduler, TaskScheduler.Current); + await Task.Yield(); + cq.Enqueue(item); + + if (item % 10 == 0) + { + await new SwitchTo(otherScheduler); + } + }); + + Assert.Equal(Enumerable.Range(1, 100), cq.OrderBy(i => i)); } [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] @@ -460,6 +666,17 @@ async IAsyncEnumerable Iterate() Assert.Equal(Enumerable.Range(1, 100), cq.OrderBy(i => i)); } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + public async Task Cancellation_CancelsIterationAndReturnsCanceledTask_For() + { + using var cts = new CancellationTokenSource(10); + OperationCanceledException oce = await Assert.ThrowsAnyAsync(() => Parallel.ForAsync(long.MinValue, long.MaxValue, cts.Token, async (item, cancellationToken) => + { + await Task.Yield(); + })); + Assert.Equal(cts.Token, oce.CancellationToken); + } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] public async Task Cancellation_CancelsIterationAndReturnsCanceledTask_Sync() { @@ -518,6 +735,21 @@ await Parallel.ForEachAsync(YieldTokenAsync(default), (item, cancellationToken) }); } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + public async Task Cancellation_SameTokenPassedToEveryInvocation_For() + { + var cq = new ConcurrentQueue(); + + await Parallel.ForAsync(1, 101, async (item, cancellationToken) => + { + cq.Enqueue(cancellationToken); + await Task.Yield(); + }); + + Assert.Equal(100, cq.Count); + Assert.Equal(1, cq.Distinct().Count()); + } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] public async Task Cancellation_SameTokenPassedToEveryInvocation_Sync() { @@ -548,6 +780,32 @@ await Parallel.ForEachAsync(EnumerableRangeAsync(1, 100), async (item, cancellat Assert.Equal(1, cq.Distinct().Count()); } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + public async Task Cancellation_HasPriorityOverExceptions_For() + { + var tcs = new TaskCompletionSource(); + var cts = new CancellationTokenSource(); + + Task t = Parallel.ForAsync(0, long.MaxValue, new ParallelOptions { CancellationToken = cts.Token, MaxDegreeOfParallelism = 2 }, async (item, cancellationToken) => + { + if (item == 0) + { + await tcs.Task; + cts.Cancel(); + throw new FormatException(); + } + else + { + tcs.TrySetResult(); + await Task.Yield(); + } + }); + + OperationCanceledException oce = await Assert.ThrowsAnyAsync(() => t); + Assert.Equal(cts.Token, oce.CancellationToken); + Assert.True(t.IsCanceled); + } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] public async Task Cancellation_HasPriorityOverExceptions_Sync() { @@ -616,6 +874,22 @@ static async IAsyncEnumerable Iterate() Assert.True(t.IsCanceled); } + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [InlineData(false)] + [InlineData(true)] + public async Task Cancellation_FaultsForOceForNonCancellation_For(bool internalToken) + { + var cts = new CancellationTokenSource(); + + Task t = Parallel.ForAsync(long.MinValue, long.MaxValue, new ParallelOptions { CancellationToken = cts.Token }, (item, cancellationToken) => + { + throw new OperationCanceledException(internalToken ? cancellationToken : cts.Token); + }); + + await Assert.ThrowsAnyAsync(() => t); + Assert.True(t.IsFaulted); + } + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] [InlineData(false)] [InlineData(true)] @@ -642,6 +916,38 @@ static async IAsyncEnumerable Iterate() Assert.True(t.IsFaulted); } + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [InlineData(0, 4)] + [InlineData(1, 4)] + [InlineData(2, 4)] + [InlineData(3, 4)] + [InlineData(4, 4)] + public async Task Cancellation_InternalCancellationExceptionsArentFilteredOut_For(int numThrowingNonCanceledOce, int total) + { + var cts = new CancellationTokenSource(); + + var barrier = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + int remainingCount = total; + + Task t = Parallel.ForAsync(0, total, new ParallelOptions { CancellationToken = cts.Token, MaxDegreeOfParallelism = total }, async (item, cancellationToken) => + { + // Wait for all operations to be started + if (Interlocked.Decrement(ref remainingCount) == 0) + { + barrier.SetResult(); + } + await barrier.Task; + + throw item < numThrowingNonCanceledOce ? + new OperationCanceledException(cancellationToken) : + throw new FormatException(); + }); + + await Assert.ThrowsAnyAsync(() => t); + Assert.Equal(total, t.Exception.InnerExceptions.Count); + Assert.Equal(numThrowingNonCanceledOce, t.Exception.InnerExceptions.Count(e => e is OperationCanceledException)); + } + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] [InlineData(0, 4)] [InlineData(1, 4)] @@ -751,6 +1057,27 @@ static async IAsyncEnumerable Iterate() Assert.True(t.IsFaulted); } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + public async Task Exception_FromLoopBody_For() + { + var barrier = new Barrier(2); + Task t = Parallel.ForAsync(1, 3, new ParallelOptions { MaxDegreeOfParallelism = barrier.ParticipantCount }, (item, cancellationToken) => + { + barrier.SignalAndWait(); + throw item switch + { + 1 => new FormatException(), + 2 => new InvalidTimeZoneException(), + _ => new Exception() + }; + }); + await Assert.ThrowsAnyAsync(() => t); + Assert.True(t.IsFaulted); + Assert.Equal(2, t.Exception.InnerExceptions.Count); + Assert.Contains(t.Exception.InnerExceptions, e => e is FormatException); + Assert.Contains(t.Exception.InnerExceptions, e => e is InvalidTimeZoneException); + } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] public async Task Exception_FromLoopBody_Sync() { @@ -853,6 +1180,34 @@ public async Task Exception_FromDisposeAndCancellationCallback_Async() Assert.Contains(t.Exception.InnerExceptions, e => e is InvalidTimeZoneException); } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + public async Task Exception_ImplicitlyCancelsOtherWorkers_For() + { + await Assert.ThrowsAsync(() => Parallel.ForAsync(0, long.MaxValue, async (item, cancellationToken) => + { + await Task.Yield(); + if (item == 1000) + { + throw new Exception(); + } + })); + + await Assert.ThrowsAsync(() => Parallel.ForAsync(0, long.MaxValue, new ParallelOptions { MaxDegreeOfParallelism = 2 }, async (item, cancellationToken) => + { + if (item == 0) + { + throw new FormatException(); + } + else + { + Assert.Equal(1, item); + var tcs = new TaskCompletionSource(); + cancellationToken.Register(() => tcs.SetResult()); + await tcs.Task; + } + })); + } + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] public async Task Exception_ImplicitlyCancelsOtherWorkers_Sync() { @@ -958,6 +1313,24 @@ static async IAsyncEnumerable Iterate(Task signal) Assert.IsType(ae.InnerException); } + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [InlineData(false)] + [InlineData(true)] + public async Task ExecutionContext_FlowsToWorkerBodies_For(bool defaultScheduler) + { + TaskScheduler scheduler = defaultScheduler ? + TaskScheduler.Default : + new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler; + + var al = new AsyncLocal(); + al.Value = 42; + await Parallel.ForAsync(0, 100, async (item, cancellationToken) => + { + await Task.Yield(); + Assert.Equal(42, al.Value); + }); + } + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] [InlineData(false)] [InlineData(true)]