diff --git a/CHANGELOG.md b/CHANGELOG.md index ee75fb8b8..2d366014f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,15 @@ Release Notes ==== +# 07-15-2024 +DotNext 5.8.0 +* Added `FirstOrNone` and `LastOrNone` extension methods back from .NEXT 4.x as requested in [247](https://github.com/dotnet/dotNext/issues/247) + +DotNext.Threading 5.10.0 +* Added `TaskQueue` class +* Added `Completion` optional property to [TaskCompletionPipe<T>](https://dotnet.github.io/dotNext/api/DotNext.Threading.Tasks.TaskCompletionPipe-1.html) that allows to synchronize on full completion of the pipe +* Added one-shot static methods to [TaskCompletionPipe](https://dotnet.github.io/dotNext/api/DotNext.Threading.Tasks.TaskCompletionPipe.html) to take `IAsyncEnumerable` over tasks as they complete + # 07-09-2024 DotNext.IO 5.7.1 * Improved performance of `FileWriter` in some corner cases diff --git a/README.md b/README.md index 44081df19..69f40b0b8 100644 --- a/README.md +++ b/README.md @@ -44,16 +44,15 @@ All these things are implemented in 100% managed code on top of existing .NET AP * [NuGet Packages](https://www.nuget.org/profiles/rvsakno) # What's new -Release Date: 07-09-2024 +Release Date: 07-15-2024 -DotNext.IO 5.7.1 -* Improved performance of `FileWriter` in some corner cases +DotNext 5.8.0 +* Added `FirstOrNone` and `LastOrNone` extension methods back from .NEXT 4.x as requested in [247](https://github.com/dotnet/dotNext/issues/247) -DotNext.Net.Cluster 5.7.3 -* Fixed [244](https://github.com/dotnet/dotNext/issues/244) - -DotNext.AspNetCore.Cluster 5.7.3 -* Fixed [244](https://github.com/dotnet/dotNext/issues/244) +DotNext.Threading 5.10.0 +* Added `TaskQueue` class +* Added `Completion` optional property to [TaskCompletionPipe<T>](https://dotnet.github.io/dotNext/api/DotNext.Threading.Tasks.TaskCompletionPipe-1.html) that allows to synchronize on full completion of the pipe +* Added one-shot static methods to [TaskCompletionPipe](https://dotnet.github.io/dotNext/api/DotNext.Threading.Tasks.TaskCompletionPipe.html) to take `IAsyncEnumerable` over tasks as they complete Changelog for previous versions located [here](./CHANGELOG.md). diff --git a/src/DotNext.Tests/Collections/Generic/AsyncEnumerableTests.cs b/src/DotNext.Tests/Collections/Generic/AsyncEnumerableTests.cs index 23721f896..2ebb718aa 100644 --- a/src/DotNext.Tests/Collections/Generic/AsyncEnumerableTests.cs +++ b/src/DotNext.Tests/Collections/Generic/AsyncEnumerableTests.cs @@ -35,11 +35,25 @@ public static async Task ForEachTestAsync() Equal(3, counter.value); counter.value = 0; - list = new int[] { 1, 2, 10, 11, 15 }.ToAsyncEnumerable(); + list = new[] { 1, 2, 10, 11, 15 }.ToAsyncEnumerable(); await list.ForEachAsync(counter.Accept); Equal(5, counter.value); } + [Fact] + public static async Task ForEachTest1Async() + { + var list = new List { 1, 10, 20 }.ToAsyncEnumerable(); + var counter = new CollectionTests.Counter(); + await list.ForEachAsync(counter.AcceptAsync); + Equal(3, counter.value); + counter.value = 0; + + list = new[] { 1, 2, 10, 11, 15 }.ToAsyncEnumerable(); + await list.ForEachAsync(counter.AcceptAsync); + Equal(5, counter.value); + } + [Fact] public static async Task FirstOrNullTestAsync() { diff --git a/src/DotNext.Tests/Collections/Generic/CollectionTests.cs b/src/DotNext.Tests/Collections/Generic/CollectionTests.cs index 49cddc42d..1e56a7884 100644 --- a/src/DotNext.Tests/Collections/Generic/CollectionTests.cs +++ b/src/DotNext.Tests/Collections/Generic/CollectionTests.cs @@ -1,4 +1,6 @@ using System.Collections.Concurrent; +using System.Collections.Immutable; +using System.Runtime.InteropServices; namespace DotNext.Collections.Generic; @@ -50,6 +52,21 @@ internal sealed class Counter public int value; public void Accept(T item) => value += 1; + + public ValueTask AcceptAsync(T item, CancellationToken token) + { + ValueTask task = ValueTask.CompletedTask; + try + { + Accept(item); + } + catch (Exception e) + { + task = ValueTask.FromException(e); + } + + return task; + } } [Fact] @@ -64,6 +81,20 @@ public static void ForEachTest() array2.ForEach(counter.Accept); Equal(5, counter.value); } + + [Fact] + public static async Task ForEachTestAsync() + { + IList list = new List { 1, 10, 20 }; + var counter = new Counter(); + await list.ForEachAsync(counter.AcceptAsync); + Equal(3, counter.value); + counter.value = 0; + + var array2 = new int[] { 1, 2, 10, 11, 15 }; + await array2.ForEachAsync(counter.AcceptAsync); + Equal(5, counter.value); + } [Fact] public static void ElementAtIndex() @@ -260,4 +291,57 @@ public static void CopyString() using var copy = "abcd".Copy(); Equal("abcd", copy.Memory.ToString()); } + + [Fact] + public static void FirstOrNone() + { + Equal(5, new[] { 5, 6 }.FirstOrNone()); + Equal(5, new List { 5, 6 }.FirstOrNone()); + Equal(5, new LinkedList([5, 6]).FirstOrNone()); + Equal('5', "56".FirstOrNone()); + Equal(5, ImmutableArray.Create([5, 6]).FirstOrNone()); + Equal(5, GetValues().FirstOrNone()); + + Equal(Optional.None, Array.Empty().FirstOrNone()); + Equal(Optional.None, new List().FirstOrNone()); + Equal(Optional.None, new LinkedList().FirstOrNone()); + Equal(Optional.None, string.Empty.FirstOrNone()); + Equal(Optional.None, ImmutableArray.Empty.FirstOrNone()); + Equal(Optional.None, EmptyEnumerable().FirstOrNone()); + + static IEnumerable GetValues() + { + yield return 5; + yield return 6; + } + } + + [Fact] + public static void LastOrNone() + { + Equal(6, new[] { 5, 6 }.LastOrNone()); + Equal(6, new List { 5, 6 }.LastOrNone()); + Equal(6, new LinkedList([5, 6]).LastOrNone()); + Equal('6', "56".LastOrNone()); + Equal(6, ImmutableArray.Create([5, 6]).LastOrNone()); + Equal(6, GetValues().LastOrNone()); + + Equal(Optional.None, Array.Empty().LastOrNone()); + Equal(Optional.None, new List().LastOrNone()); + Equal(Optional.None, new LinkedList().LastOrNone()); + Equal(Optional.None, string.Empty.LastOrNone()); + Equal(Optional.None, ImmutableArray.Empty.LastOrNone()); + Equal(Optional.None, EmptyEnumerable().LastOrNone()); + + static IEnumerable GetValues() + { + yield return 5; + yield return 6; + } + } + + static IEnumerable EmptyEnumerable() + { + yield break; + } } \ No newline at end of file diff --git a/src/DotNext.Tests/Threading/AsyncBarrierTests.cs b/src/DotNext.Tests/Threading/AsyncBarrierTests.cs index 6804c6ef3..7f94689f7 100644 --- a/src/DotNext.Tests/Threading/AsyncBarrierTests.cs +++ b/src/DotNext.Tests/Threading/AsyncBarrierTests.cs @@ -1,3 +1,5 @@ +using static System.Threading.Timeout; + namespace DotNext.Threading; public sealed class AsyncBarrierTests : Test @@ -51,7 +53,7 @@ public static async Task PhaseCompletion() ICollection tasks = new LinkedList(); Equal(0, barrier.CurrentPhaseNumber); tasks.Add(barrier.SignalAndWaitAsync().AsTask()); - tasks.Add(barrier.SignalAndWaitAsync().AsTask()); + tasks.Add(barrier.SignalAndWaitAsync(InfiniteTimeSpan).AsTask()); tasks.Add(barrier.SignalAndWaitAsync().AsTask()); await Task.WhenAll(tasks); Equal(1, barrier.CurrentPhaseNumber); diff --git a/src/DotNext.Tests/Threading/AsyncBridgeTests.cs b/src/DotNext.Tests/Threading/AsyncBridgeTests.cs index 313687f0f..1d46d7263 100644 --- a/src/DotNext.Tests/Threading/AsyncBridgeTests.cs +++ b/src/DotNext.Tests/Threading/AsyncBridgeTests.cs @@ -133,4 +133,49 @@ public static async Task Interruption() var e = await ThrowsAsync(Func.Constant(task)); Equal(interruptionReason, e.Reason); } + + [Fact] + public static void CompletedTaskAsToken() + { + var token = Task.CompletedTask.AsCancellationToken(); + True(token.IsCancellationRequested); + + token = Task.CompletedTask.AsCancellationToken(out var diposeSource); + True(token.IsCancellationRequested); + False(diposeSource()); + } + + [Fact] + public static async Task TaskAsToken() + { + var source = new TaskCompletionSource(); + var token = source.Task.AsCancellationToken(); + False(token.IsCancellationRequested); + + source.SetResult(); + await token.WaitAsync(); + } + + [Fact] + public static void DisposeTaskTokenBeforeCompletion() + { + var source = new TaskCompletionSource(); + var token = source.Task.AsCancellationToken(out var disposeTokenSource); + False(token.IsCancellationRequested); + + True(disposeTokenSource()); + source.SetResult(); + } + + [Fact] + public static async Task DisposeTaskTokenAfterCompletion() + { + var source = new TaskCompletionSource(); + var token = source.Task.AsCancellationToken(out var disposeTokenSource); + False(token.IsCancellationRequested); + + source.SetResult(); + await token.WaitAsync(); + False(disposeTokenSource()); + } } \ No newline at end of file diff --git a/src/DotNext.Tests/Threading/AsyncTriggerTests.cs b/src/DotNext.Tests/Threading/AsyncTriggerTests.cs index 993e7f1d9..49cd74b1d 100644 --- a/src/DotNext.Tests/Threading/AsyncTriggerTests.cs +++ b/src/DotNext.Tests/Threading/AsyncTriggerTests.cs @@ -56,6 +56,22 @@ public static async Task SignalAndWait() await task2; } + + [Fact] + public static async Task SignalAndWaitWithTimeout() + { + using var trigger = new AsyncTrigger(); + + var task1 = trigger.WaitAsync(); + var task2 = trigger.SignalAndWaitAsync(false, true, DefaultTimeout); + + await task1; + False(task2.IsCompleted); + + True(trigger.Signal()); + + True(await task2); + } [Fact] public static async Task SignalEmptyQueue() diff --git a/src/DotNext.Tests/Threading/Tasks/TaskCompletionPipeTests.cs b/src/DotNext.Tests/Threading/Tasks/TaskCompletionPipeTests.cs index 7b4038d9e..8b3d97364 100644 --- a/src/DotNext.Tests/Threading/Tasks/TaskCompletionPipeTests.cs +++ b/src/DotNext.Tests/Threading/Tasks/TaskCompletionPipeTests.cs @@ -158,4 +158,56 @@ public static async Task WrongIteratorVersion() pipe.Add(Task.FromResult(42)); False(await enumerator.MoveNextAsync()); } + + [Fact] + public static async Task CompletedTaskGroupToCollection() + { + await foreach (var t in TaskCompletionPipe.Create([Task.CompletedTask, Task.CompletedTask])) + { + True(t.IsCompleted); + } + } + + [Fact] + public static async Task TaskGroupToCollection() + { + var source1 = new TaskCompletionSource(); + var source2 = new TaskCompletionSource(); + await using var consumer = TaskCompletionPipe.GetConsumer([source1.Task, source2.Task]).GetAsyncEnumerator(); + + source1.SetResult(42); + True(await consumer.MoveNextAsync()); + Equal(42, consumer.Current); + + source2.SetResult(43); + True(await consumer.MoveNextAsync()); + Equal(43, consumer.Current); + + False(await consumer.MoveNextAsync()); + } + + [Fact] + public static async Task CompletionTask() + { + var pipe = new TaskCompletionPipe { IsCompletionTaskSupported = true }; + True(pipe.IsCompletionTaskSupported); + + var source1 = new TaskCompletionSource(); + var source2 = new TaskCompletionSource(); + + pipe.Add([source1.Task, source2.Task], complete: true); + + source1.SetResult(); + source2.SetResult(); + + await pipe.Completion.WaitAsync(DefaultTimeout); + + var count = 0; + while (pipe.TryRead(out _)) + { + count++; + } + + Equal(2, count); + } } \ No newline at end of file diff --git a/src/DotNext.Tests/Threading/Tasks/TaskQueueTests.cs b/src/DotNext.Tests/Threading/Tasks/TaskQueueTests.cs new file mode 100644 index 000000000..b0b7f4704 --- /dev/null +++ b/src/DotNext.Tests/Threading/Tasks/TaskQueueTests.cs @@ -0,0 +1,142 @@ +namespace DotNext.Threading.Tasks; + +public class TaskQueueTests : Test +{ + [Fact] + public static async Task EmptyQueue() + { + var queue = new TaskQueue(10); + True(queue.CanEnqueue); + Null(queue.HeadTask); + False(queue.TryDequeue(out _)); + Null(await queue.TryDequeueAsync()); + } + + [Fact] + public static async Task QueueOverflow() + { + var queue = new TaskQueue(3); + True(queue.TryEnqueue(Task.CompletedTask)); + True(queue.TryEnqueue(Task.CompletedTask)); + True(queue.TryEnqueue(Task.CompletedTask)); + False(queue.CanEnqueue); + NotNull(queue.HeadTask); + + var enqueueTask = queue.EnqueueAsync(Task.CompletedTask).AsTask(); + False(enqueueTask.IsCompleted); + + True(queue.TryDequeue(out var task)); + True(task.IsCompleted); + + await enqueueTask.WaitAsync(DefaultTimeout); + queue.Clear(); + } + + [Fact] + public static async Task EnumerateCompletedTasks() + { + var queue = new TaskQueue(3); + True(queue.TryEnqueue(Task.CompletedTask)); + True(queue.TryEnqueue(Task.CompletedTask)); + True(queue.TryEnqueue(Task.CompletedTask)); + + var count = 0; + await foreach (var task in queue) + { + Same(Task.CompletedTask, task); + count++; + } + + Equal(3, count); + } + + [Fact] + public static async Task TryDequeueCompletedTasks() + { + var queue = new TaskQueue(3); + True(queue.TryEnqueue(Task.CompletedTask)); + True(queue.TryEnqueue(Task.CompletedTask)); + True(queue.TryEnqueue(Task.CompletedTask)); + + var count = 0; + while (await queue.TryDequeueAsync() is { } task) + { + Same(Task.CompletedTask, task); + count++; + } + + Equal(3, count); + } + + [Fact] + public static async Task EnumerateTasks() + { + var queue = new TaskQueue(3); + await queue.EnqueueAsync(Task.Delay(10)); + await queue.EnqueueAsync(Task.Delay(15)); + await queue.EnqueueAsync(Task.Delay(20)); + + var count = 0; + await foreach (var task in queue) + { + True(task.IsCompleted); + count++; + } + + Equal(3, count); + } + + [Fact] + public static async Task DelayedDequeue() + { + var queue = new TaskQueue(3); + var enqueueTask = queue.DequeueAsync().AsTask(); + False(enqueueTask.IsCompleted); + + True(queue.TryEnqueue(Task.CompletedTask)); + + await enqueueTask.WaitAsync(DefaultTimeout); + Null(await queue.TryDequeueAsync()); + } + + [Fact] + public static async Task DequeueCancellation() + { + var source = new TaskCompletionSource(); + var queue = new TaskQueue(3); + True(queue.TryEnqueue(source.Task)); + + await ThrowsAnyAsync(queue.DequeueAsync(new(canceled: true)).AsTask); + } + + [Fact] + public static async Task FailedTask() + { + var source = new TaskCompletionSource(); + var queue = new TaskQueue(3); + True(queue.TryEnqueue(source.Task)); + + var dequeueTask = queue.DequeueAsync().AsTask(); + False(dequeueTask.IsCompleted); + + source.SetException(new Exception()); + Same(source.Task, await dequeueTask); + } + + [Fact] + public static async Task EnsureFreeSpace() + { + var queue = new TaskQueue(3); + await queue.EnsureFreeSpaceAsync(); + + True(queue.TryEnqueue(Task.CompletedTask)); + True(queue.TryEnqueue(Task.CompletedTask)); + True(queue.TryEnqueue(Task.CompletedTask)); + + var task = queue.EnsureFreeSpaceAsync().AsTask(); + False(task.IsCompleted); + + True(queue.TryDequeue(out _)); + await task.WaitAsync(DefaultTimeout); + } +} \ No newline at end of file diff --git a/src/DotNext.Threading/DotNext.Threading.csproj b/src/DotNext.Threading/DotNext.Threading.csproj index 5dae6a67c..a854b128c 100644 --- a/src/DotNext.Threading/DotNext.Threading.csproj +++ b/src/DotNext.Threading/DotNext.Threading.csproj @@ -7,7 +7,7 @@ true true nullablePublicOnly - 5.9.0 + 5.10.0 .NET Foundation and Contributors .NEXT Family of Libraries diff --git a/src/DotNext.Threading/ExceptionMessages.cs b/src/DotNext.Threading/ExceptionMessages.cs index a9736dcc7..410da3378 100644 --- a/src/DotNext.Threading/ExceptionMessages.cs +++ b/src/DotNext.Threading/ExceptionMessages.cs @@ -11,8 +11,6 @@ internal static class ExceptionMessages { private static readonly ResourceManager Resources = new("DotNext.ExceptionMessages", Assembly.GetExecutingAssembly()); - internal static string CollectionIsEmpty => (string)Resources.Get(); - internal static string NotInLock => (string)Resources.Get(); internal static string TokenNotCancelable => (string)Resources.Get(); diff --git a/src/DotNext.Threading/ExceptionMessages.restext b/src/DotNext.Threading/ExceptionMessages.restext index 5d9717e9b..9d59e5c86 100644 --- a/src/DotNext.Threading/ExceptionMessages.restext +++ b/src/DotNext.Threading/ExceptionMessages.restext @@ -1,4 +1,3 @@ -CollectionIsEmpty=Collection is empty NotInLock=The current async flow has not entered the lock TokenNotCancelable=The token is not cancelable UnsupportedLockAcquisition=Lock cannot be acquired for this type of object diff --git a/src/DotNext.Threading/Threading/AsyncBarrier.cs b/src/DotNext.Threading/Threading/AsyncBarrier.cs index efd44f35f..6db4af6c4 100644 --- a/src/DotNext.Threading/Threading/AsyncBarrier.cs +++ b/src/DotNext.Threading/Threading/AsyncBarrier.cs @@ -9,7 +9,7 @@ namespace DotNext.Threading; /// This is asynchronous version of with small differences: /// /// Post-phase action is presented by virtual method . -/// It it possible to wait for phase completion without signal. +/// It is possible to wait for phase completion without signal. /// It is possible to signal without waiting of phase completion. /// Post-phase action is asynchronous. /// Number of phases is limited by data type. diff --git a/src/DotNext.Threading/Threading/AsyncBridge.CancellationToken.cs b/src/DotNext.Threading/Threading/AsyncBridge.CancellationToken.cs index 39b4e8d6c..ff69ba67e 100644 --- a/src/DotNext.Threading/Threading/AsyncBridge.CancellationToken.cs +++ b/src/DotNext.Threading/Threading/AsyncBridge.CancellationToken.cs @@ -1,5 +1,6 @@ using System.Buffers; using System.Collections.Concurrent; +using System.Diagnostics.CodeAnalysis; using System.Runtime.InteropServices; using Debug = System.Diagnostics.Debug; using Unsafe = System.Runtime.CompilerServices.Unsafe; @@ -184,4 +185,48 @@ private static CancellationTokenValueTaskPool TokenPool return Unsafe.As(CancellationTokenValueTaskCompletionCallback.Target); } } + + private sealed class TaskToCancellationTokenCallback + { + private volatile WeakReference? sourceRef; + + internal TaskToCancellationTokenCallback(out CancellationToken token) + { + var source = new CancellationTokenSource(); + token = source.Token; + sourceRef = new(source); + } + + private bool TryStealSource([NotNullWhen(true)] out CancellationTokenSource? source) + { + source = null; + return Interlocked.Exchange(ref sourceRef, null) is { } weakRef + && weakRef.TryGetTarget(out source); + } + + internal bool TryDispose() + { + if (TryStealSource(out var source)) + { + source.Dispose(); + } + + return source is not null; + } + + internal void CancelAndDispose() + { + if (TryStealSource(out var source)) + { + try + { + source.Cancel(); + } + finally + { + source.Dispose(); + } + } + } + } } \ No newline at end of file diff --git a/src/DotNext.Threading/Threading/AsyncBridge.cs b/src/DotNext.Threading/Threading/AsyncBridge.cs index 67a631f4e..689ceb439 100644 --- a/src/DotNext.Threading/Threading/AsyncBridge.cs +++ b/src/DotNext.Threading/Threading/AsyncBridge.cs @@ -185,6 +185,59 @@ public static ValueTask WaitAsync(this WaitHandle handle, CancellationToken toke .As>() .Invoke(InfiniteTimeSpan, token); + /// + /// Returns a cancellation token that gets signaled when the task completes. + /// + /// The task to observe. + /// The token that represents completion state of the task. + /// is . + public static CancellationToken AsCancellationToken(this Task task) + { + ArgumentNullException.ThrowIfNull(task); + + CancellationToken result; + if (task.IsCompleted) + { + result = new(canceled: true); + } + else + { + task.ConfigureAwait(false).GetAwaiter().UnsafeOnCompleted(new TaskToCancellationTokenCallback(out result).CancelAndDispose); + } + + return result; + } + + /// + /// Returns a cancellation token that gets signaled when the task completes. + /// + /// The task to observe. + /// + /// A delegate that can be used to destroy the source of the returned token if no longer needed. + /// It returns if token was not canceled by the task; otherwise, . + /// + /// The token that represents completion state of the task. + /// is . + public static CancellationToken AsCancellationToken(this Task task, out Func disposeTokenSource) + { + ArgumentNullException.ThrowIfNull(task); + + CancellationToken result; + if (task.IsCompleted) + { + result = new(canceled: true); + disposeTokenSource = Func.Constant(false); + } + else + { + var callback = new TaskToCancellationTokenCallback(out result); + disposeTokenSource = callback.TryDispose; + task.ConfigureAwait(false).GetAwaiter().UnsafeOnCompleted(callback.CancelAndDispose); + } + + return result; + } + /// /// Gets or sets the capacity of the internal pool used to create awaitable tasks returned /// from the public methods in this class. diff --git a/src/DotNext.Threading/Threading/Tasks/TaskCompletionPipe.Consumer.cs b/src/DotNext.Threading/Threading/Tasks/TaskCompletionPipe.Consumer.cs index f39f19d3f..95e1c3475 100644 --- a/src/DotNext.Threading/Threading/Tasks/TaskCompletionPipe.Consumer.cs +++ b/src/DotNext.Threading/Threading/Tasks/TaskCompletionPipe.Consumer.cs @@ -1,8 +1,11 @@ using System.Runtime.InteropServices; +using DotNext.Collections.Generic; using Debug = System.Diagnostics.Debug; namespace DotNext.Threading.Tasks; +using static Collections.Generic.AsyncEnumerable; + /// /// Provides various extension methods for class. /// @@ -17,6 +20,54 @@ public static class TaskCompletionPipe public static Consumer GetConsumer(this TaskCompletionPipe> pipe) => new(pipe); + /// + /// Gets a collection over tasks to be available as they complete. + /// + /// A collection of tasks. + /// The result type of tasks. + /// A collection over task results to be available as they complete. + public static Consumer GetConsumer(this ReadOnlySpan> tasks) + { + Consumer result; + if (tasks.IsEmpty) + { + result = default; + } + else + { + var pipe = new TaskCompletionPipe>(); + pipe.Add(tasks, complete: true); + result = new(pipe); + } + + return result; + } + + /// + /// Creates a collection over tasks to be available as they complete. + /// + /// A collection of tasks. + /// The type of tasks. + /// A collection over tasks to be available as they complete. + public static IAsyncEnumerable Create(ReadOnlySpan tasks) + where T : Task + { + IAsyncEnumerable result; + + if (tasks.IsEmpty) + { + result = Empty(); + } + else + { + var pipe = new TaskCompletionPipe(); + pipe.Add(tasks, complete: true); + result = pipe; + } + + return result; + } + private static async IAsyncEnumerator GetAsyncEnumerator(TaskCompletionPipe> pipe, uint expectedVersion, CancellationToken token) { while (await pipe.TryDequeue(expectedVersion, out var task, token).ConfigureAwait(false)) @@ -48,6 +99,6 @@ internal Consumer(TaskCompletionPipe> pipe) /// The token that can be used to cancel the operation. /// The asynchronous enumerator over completed tasks. public IAsyncEnumerator GetAsyncEnumerator(CancellationToken token = default) - => GetAsyncEnumerator(pipe, pipe.Version, token); + => pipe is null ? Empty().GetAsyncEnumerator(token) : GetAsyncEnumerator(pipe, pipe.Version, token); } } \ No newline at end of file diff --git a/src/DotNext.Threading/Threading/Tasks/TaskCompletionPipe.Queue.cs b/src/DotNext.Threading/Threading/Tasks/TaskCompletionPipe.Queue.cs index 43920f1f9..3ec5909f3 100644 --- a/src/DotNext.Threading/Threading/Tasks/TaskCompletionPipe.Queue.cs +++ b/src/DotNext.Threading/Threading/Tasks/TaskCompletionPipe.Queue.cs @@ -69,10 +69,10 @@ internal void Invoke() private LinkedTaskNode? firstTask, lastTask; - private ManualResetCompletionSource? EnqueueCompletedTask(LinkedTaskNode node) + private void AddCompletedTaskNode(LinkedTaskNode node) { Debug.Assert(Monitor.IsEntered(SyncRoot)); - Debug.Assert(node is { Task: { IsCompleted: true } }); + Debug.Assert(node.Task.IsCompleted); if (lastTask is null) { @@ -82,14 +82,25 @@ internal void Invoke() { lastTask = lastTask.Next = node; } + } + + private ManualResetCompletionSource? EnqueueCompletedTask(LinkedTaskNode node) + { + AddCompletedTaskNode(node); - scheduledTasksCount--; + if (--scheduledTasksCount is 0U && completionRequested && completedAll is not null) + completedAll.TrySetResult(); // Detaches continuation to call later out of monitor lock. // This approach increases response time (the time needed to submit completed task asynchronously), // but also improves throughput (number of submitted tasks per second). // Typically, the pipe has single consumer and multiple producers. In that // case, improved throughput is most preferred. + return TryDetachSuspendedCaller(); + } + + private LinkedValueTaskCompletionSource? TryDetachSuspendedCaller() + { for (LinkedValueTaskCompletionSource? current = waitQueue.First, next; current is not null; current = next) { next = current.Next; diff --git a/src/DotNext.Threading/Threading/Tasks/TaskCompletionPipe.cs b/src/DotNext.Threading/Threading/Tasks/TaskCompletionPipe.cs index e28027ab4..3da86c9c1 100644 --- a/src/DotNext.Threading/Threading/Tasks/TaskCompletionPipe.cs +++ b/src/DotNext.Threading/Threading/Tasks/TaskCompletionPipe.cs @@ -18,11 +18,30 @@ public partial class TaskCompletionPipe : IAsyncEnumerable, IResettable // Allows to skip scheduled tasks in case of reuse private uint version; + // if null then completion not enabled + private TaskCompletionSource? completedAll; + /// /// Initializes a new pipe. /// public TaskCompletionPipe() => pool = new(OnCompleted); + /// + /// Gets or sets a value indicating that this pipe supports property. + /// + public bool IsCompletionTaskSupported + { + get => Volatile.Read(in completedAll) is not null; + init => completedAll = value ? new(TaskCreationOptions.RunContinuationsAsynchronously) : null; + } + + /// + /// Gets a task that turns into completed state when all submitted tasks are completed. + /// + /// is . + /// was called before completion. + public Task Completion => Volatile.Read(in completedAll)?.Task ?? Task.FromException(new NotSupportedException()); + private object SyncRoot => this; private void OnCompleted(Signal signal) @@ -49,9 +68,15 @@ public void Complete() throw new InvalidOperationException(); completionRequested = true; - suspendedCallers = scheduledTasksCount is 0U - ? DetachWaitQueue()?.SetResult(false, out _) - : null; + if (scheduledTasksCount is 0U) + { + completedAll?.TrySetResult(); + suspendedCallers = DetachWaitQueue()?.SetResult(false, out _); + } + else + { + suspendedCallers = null; + } } suspendedCallers?.Unwind(); @@ -96,7 +121,7 @@ private bool TryAdd(T task, out uint currentVersion, object? userData) /// The task to add. /// is . /// The pipe is closed. - public void Add(T task) + public void Add(T task) // TODO: Remove in future with optional param => Add(task, userData: null); /// @@ -114,7 +139,66 @@ public void Add(T task, object? userData) ArgumentNullException.ThrowIfNull(task); if (!TryAdd(task, out var expectedVersion, userData)) - task.ConfigureAwait(false).GetAwaiter().OnCompleted(new LazyLinkedTaskNode(task, this, expectedVersion) { UserData = userData }.Invoke); + EnqueueCompletion(task, expectedVersion, userData); + } + + private void EnqueueCompletion(T task, uint expectedVersion, object? userData) + { + task + .ConfigureAwait(false) + .GetAwaiter() + .UnsafeOnCompleted(new LazyLinkedTaskNode(task, this, expectedVersion) { UserData = userData }.Invoke); + } + + /// + /// Submits a group of tasks and mark this pipe as completed. + /// + /// A group of tasks. + /// to submit tasks and complete the pipe; to submit tasks. + /// Arbitrary object associated with the tasks. + /// The pipe is closed. + public void Add(ReadOnlySpan tasks, bool complete = false, object? userData = null) + { + LinkedValueTaskCompletionSource? suspendedCaller; + lock (SyncRoot) + { + if (completionRequested) + throw new InvalidOperationException(); + + scheduledTasksCount += (uint)tasks.Length; + var completionDetected = false; + foreach (var task in tasks) + { + if (task.IsCompleted) + { + AddCompletedTaskNode(new(task) { UserData = userData }); + scheduledTasksCount--; + completionDetected = true; + } + else + { + EnqueueCompletion(task, version, userData); + } + } + + completionRequested = complete; + + if (scheduledTasksCount is 0U && complete) + { + completedAll?.TrySetResult(); + suspendedCaller = DetachWaitQueue()?.SetResult(false, out _); + } + else if (completionDetected) + { + suspendedCaller = TryDetachSuspendedCaller(); + } + else + { + suspendedCaller = null; + } + } + + suspendedCaller?.Unwind(); } /// @@ -134,6 +218,11 @@ public void Reset() completionRequested = false; ClearTaskQueue(); suspendedCallers = DetachWaitQueue()?.SetResult(false, out _); + if (completedAll is not null) + { + completedAll.TrySetException(new PendingTaskInterruptedException()); + completedAll = new(TaskCreationOptions.RunContinuationsAsynchronously); + } } suspendedCallers?.Unwind(); diff --git a/src/DotNext.Threading/Threading/Tasks/TaskQueue.cs b/src/DotNext.Threading/Threading/Tasks/TaskQueue.cs new file mode 100644 index 000000000..18e92f1ff --- /dev/null +++ b/src/DotNext.Threading/Threading/Tasks/TaskQueue.cs @@ -0,0 +1,352 @@ +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +namespace DotNext.Threading.Tasks; + +/// +/// Represents a queue of scheduled tasks. +/// +/// +/// The queue returns tasks in the order as they added (FIFO) in contrast +/// to . +/// +/// The type of tasks in the queue. +public class TaskQueue : IAsyncEnumerable, IResettable + where T : Task +{ + private readonly T?[] array; + private int tail, head, count; + private Signal? signal; + + /// + /// Initializes a new empty queue. + /// + /// The maximum number of tasks in the queue. + /// is less than or equal to zero. + public TaskQueue(int capacity) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(capacity); + + array = new T[capacity]; + } + + private ref T? this[int index] + { + get + { + Debug.Assert((uint)index < (uint)array.Length); + + // Perf: skip bounds and variance check + return ref Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(array), index); + } + } + + /// + /// Gets a head of this queue. + /// + public T? HeadTask + { + get + { + lock (array) + { + return count > 0 ? this[head] : null; + } + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void ChangeCount([ConstantExpected] bool increment) + { + Debug.Assert(Monitor.IsEntered(array)); + + count += increment ? +1 : -1; + if (signal?.TrySetResult() ?? false) + signal = null; + } + + private void MoveNext(ref int index) + { + Debug.Assert(Monitor.IsEntered(array)); + + var value = index + 1; + index = value == array.Length ? 0 : value; + } + + /// + /// Gets a value indicating that the queue has free space to place a task. + /// + public bool CanEnqueue + { + get + { + lock (array) + { + return count < array.Length; + } + } + } + + /// + /// Ensures that the queue has free space to enqueue a task. + /// + /// The token that can be used to cancel the operation. + /// The task representing asynchronous execution of the operation. + /// The operation has been canceled. + public ValueTask EnsureFreeSpaceAsync(CancellationToken token = default) + { + Task task; + lock (array) + { + if (count < array.Length) + { + task = Task.CompletedTask; + } + else + { + signal ??= new(); + task = signal.Task; + } + } + + return new(task.WaitAsync(token)); + } + + /// + /// Tries to enqueue the task. + /// + /// The task to enqueue. + /// if the task is enqueued successfully; if this queue is full. + public bool TryEnqueue(T task) + { + ArgumentNullException.ThrowIfNull(task); + + bool result; + lock (array) + { + if (result = count < array.Length) + { + this[tail] = task; + MoveNext(ref tail); + ChangeCount(increment: true); + } + } + + return result; + } + + private bool TryEnqueue(T task, out Task waitTask) + { + bool result; + lock (array) + { + if (result = count < array.Length) + { + this[tail] = task; + MoveNext(ref tail); + ChangeCount(increment: true); + waitTask = Task.CompletedTask; + } + else + { + signal ??= new(); + waitTask = signal.Task; + } + } + + return result; + } + + /// + /// Enqueues the task. + /// + /// + /// The caller suspends if the queue is full. + /// + /// The task to enqueue. + /// The token that can be used to cancel the operation. + /// The operation has been canceled. + public async ValueTask EnqueueAsync(T task, CancellationToken token = default) + { + ArgumentNullException.ThrowIfNull(task); + + while (!TryEnqueue(task, out var waitTask)) + { + await waitTask.WaitAsync(token).ConfigureAwait(false); + } + } + + private T? TryPeekOrDequeue(out int head, out Task enqueueTask, out bool completed) + { + T? result; + lock (array) + { + if (count > 0) + { + result = this[head = this.head]; + enqueueTask = Task.CompletedTask; + if (completed = result is { IsCompleted: true }) + { + MoveNext(ref head); + ChangeCount(increment: false); + } + } + else + { + head = default; + result = null; + completed = default; + signal ??= new(); + enqueueTask = signal.Task; + } + } + + return result; + } + + private bool TryDequeue(int expectedHead, T task) + { + bool result; + lock (array) + { + ref var element = ref this[expectedHead]; + if (result = count > 0 && head == expectedHead && ReferenceEquals(element, task)) + { + MoveNext(ref head); + element = null; + ChangeCount(increment: false); + } + } + + return result; + } + + /// + /// Tries to dequeue the completed task. + /// + /// The completed task. + /// if is completed; otherwise, . + public bool TryDequeue([NotNullWhen(true)] out T? task) + { + lock (array) + { + ref var element = ref this[head]; + task = element; + if (count > 0 && task is { IsCompleted: true }) + { + MoveNext(ref head); + element = null; + ChangeCount(increment: false); + } + else + { + task = null; + } + } + + return task is not null; + } + + /// + /// Dequeues the task asynchronously. + /// + /// The caller suspends if the queue is empty. + /// The token that can be used to cancel the operation. + /// The completed task. + /// The operation has been canceled. + public async ValueTask DequeueAsync(CancellationToken token = default) + { + for (var filter = token.CanBeCanceled ? null : Predicate.Constant(true);;) + { + if (TryPeekOrDequeue(out var expectedHead, out var enqueueTask, out var completed) is not { } task) + { + await enqueueTask.WaitAsync(token).ConfigureAwait(false); + continue; + } + + if (!completed) + { + await task.WaitAsync(token).SuspendException(filter ??= token.SuspendAllExceptCancellation).ConfigureAwait(false); + + if (!TryDequeue(expectedHead, task)) + continue; + } + + return task; + } + } + + /// + /// Tries to dequeue the completed task. + /// + /// The token that can be used to cancel the operation. + /// The completed task; or if the queue is empty. + /// The operation has been canceled. + public async ValueTask TryDequeueAsync(CancellationToken token = default) + { + for (var filter = token.CanBeCanceled ? null : Predicate.Constant(true);;) + { + T? task; + if ((task = TryPeekOrDequeue(out var expectedHead, out _, out var completed)) is not null && !completed) + { + await task.WaitAsync(token).SuspendException(filter ??= token.SuspendAllExceptCancellation).ConfigureAwait(false); + + if (!TryDequeue(expectedHead, task)) + continue; + } + + return task; + } + } + + /// + /// Gets consuming enumerator over tasks in the queue. + /// + /// + /// The enumerator stops if the queue is empty. + /// + /// The token that can be used to cancel the operation. + /// The enumerator over completed tasks. + public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken token) + { + for (var filter = token.CanBeCanceled ? null : Predicate.Constant(true); + TryPeekOrDequeue(out var expectedHead, out _, out var completed) is { } task;) + { + if (!completed) + { + await task.WaitAsync(token).SuspendException(filter ??= token.SuspendAllExceptCancellation).ConfigureAwait(false); + if (!TryDequeue(expectedHead, task)) + continue; + } + + yield return task; + } + } + + /// + /// Clears the queue. + /// + public void Clear() + { + lock (array) + { + head = tail = count = 0; + Array.Clear(array); + if (signal?.TrySetResult() ?? false) + signal = null; + } + } + + /// + void IResettable.Reset() => Clear(); + + private sealed class Signal() : TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); +} + +file static class CancellationTokenExtensions +{ + internal static bool SuspendAllExceptCancellation(this object token, Exception e) + => e is not OperationCanceledException canceledEx || !canceledEx.CancellationToken.Equals(token); +} \ No newline at end of file diff --git a/src/DotNext/Collections/Generic/AsyncEnumerable.cs b/src/DotNext/Collections/Generic/AsyncEnumerable.cs index 355789a4d..5eb5bcdf2 100644 --- a/src/DotNext/Collections/Generic/AsyncEnumerable.cs +++ b/src/DotNext/Collections/Generic/AsyncEnumerable.cs @@ -8,7 +8,7 @@ namespace DotNext.Collections.Generic; public static partial class AsyncEnumerable { /// - /// Applies specified action to each collection element asynchronously. + /// Applies specified action to each element of the collection asynchronously. /// /// Type of elements in the collection. /// A collection to enumerate. Cannot be . @@ -23,7 +23,7 @@ public static async ValueTask ForEachAsync(this IAsyncEnumerable collectio } /// - /// Applies specified action to each collection element asynchronously. + /// Applies the specified action to each element of the collection asynchronously. /// /// Type of elements in the collection. /// A collection to enumerate. Cannot be . @@ -38,8 +38,8 @@ public static async ValueTask ForEachAsync(this IAsyncEnumerable collectio } /// - /// Obtains first value type in the sequence; or - /// if sequence is empty. + /// Obtains the first value of a sequence; or + /// if the sequence is empty. /// /// Type of elements in the sequence. /// A sequence to check. Cannot be . @@ -55,8 +55,8 @@ public static async ValueTask ForEachAsync(this IAsyncEnumerable collectio } /// - /// Obtains the last value type in the sequence; or - /// if sequence is empty. + /// Obtains the last value of a sequence; or + /// if the sequence is empty. /// /// Type of elements in the sequence. /// A sequence to check. Cannot be . @@ -74,8 +74,8 @@ public static async ValueTask ForEachAsync(this IAsyncEnumerable collectio } /// - /// Obtains first element in the sequence; or - /// if sequence is empty. + /// Obtains the first element of a sequence; or + /// if the sequence is empty. /// /// Type of elements in the sequence. /// A sequence to check. Cannot be . @@ -90,8 +90,8 @@ public static async ValueTask> FirstOrNoneAsync(this IAsyncEnumer } /// - /// Obtains the last element in the sequence; or - /// if sequence is empty. + /// Obtains the last element of a sequence; or + /// if the sequence is empty. /// /// Type of elements in the sequence. /// A sequence to check. Cannot be . @@ -118,6 +118,8 @@ public static async ValueTask> LastOrNoneAsync(this IAsyncEnumera /// The operation has been canceled. public static async ValueTask> FirstOrNoneAsync(this IAsyncEnumerable seq, Predicate filter, CancellationToken token = default) { + ArgumentNullException.ThrowIfNull(filter); + await foreach (var item in seq.WithCancellation(token).ConfigureAwait(false)) { if (filter(item)) @@ -137,11 +139,10 @@ public static async ValueTask> FirstOrNoneAsync(this IAsyncEnumer /// The operation has been canceled. public static async ValueTask SkipAsync(this IAsyncEnumerator enumerator, int count) { - while (count > 0) + for (; count > 0; count--) { if (!await enumerator.MoveNextAsync().ConfigureAwait(false)) return false; - count--; } return true; @@ -161,11 +162,9 @@ public static async ValueTask> ElementAtAsync(this IAsyncEnumerab var enumerator = collection.GetAsyncEnumerator(token); await using (enumerator.ConfigureAwait(false)) { - await enumerator.SkipAsync(index).ConfigureAwait(false); - - return await enumerator.MoveNextAsync().ConfigureAwait(false) ? - enumerator.Current : - Optional.None; + return await enumerator.SkipAsync(index).ConfigureAwait(false) && await enumerator.MoveNextAsync().ConfigureAwait(false) + ? enumerator.Current + : Optional.None; } } diff --git a/src/DotNext/Collections/Generic/Collection.cs b/src/DotNext/Collections/Generic/Collection.cs index 3a2747efb..8fd65623f 100644 --- a/src/DotNext/Collections/Generic/Collection.cs +++ b/src/DotNext/Collections/Generic/Collection.cs @@ -165,6 +165,67 @@ public static async ValueTask ForEachAsync(this IEnumerable collection, Fu await action.Invoke(item, token).ConfigureAwait(false); } + /// + /// Obtains the first element of a sequence; or + /// if the sequence is empty. + /// + /// The collection to return the first element of. + /// The type of the element of a collection. + /// The first element; or + public static Optional FirstOrNone(this IEnumerable collection) + { + return collection switch + { + null => throw new ArgumentNullException(nameof(collection)), + List list => Span.FirstOrNone(CollectionsMarshal.AsSpan(list)), + T[] array => Span.FirstOrNone(array), + string str => Unsafe.BitCast, Optional>(Span.FirstOrNone(str)), + LinkedList list => list.First is { } first ? first.Value : Optional.None, + IList list => list.Count > 0 ? list[0] : Optional.None, + IReadOnlyList readOnlyList => readOnlyList.Count > 0 ? readOnlyList[0] : Optional.None, + _ => FirstOrNoneSlow(collection), + }; + + static Optional FirstOrNoneSlow(IEnumerable collection) + { + using var enumerator = collection.GetEnumerator(); + return enumerator.MoveNext() ? enumerator.Current : Optional.None; + } + } + + /// + /// Obtains the last element of a sequence; or + /// if the sequence is empty. + /// + /// The collection to return the first element of. + /// The type of the element of a collection. + /// The first element; or + public static Optional LastOrNone(this IEnumerable collection) + { + return collection switch + { + null => throw new ArgumentNullException(nameof(collection)), + List list => Span.LastOrNone(CollectionsMarshal.AsSpan(list)), + T[] array => Span.LastOrNone(array), + string str => Unsafe.BitCast, Optional>(Span.LastOrNone(str)), + LinkedList list => list.Last is { } last ? last.Value : Optional.None, + IList list => list.Count > 0 ? list[^1] : Optional.None, + IReadOnlyList readOnlyList => readOnlyList.Count > 0 ? readOnlyList[^1] : Optional.None, + _ => LastOrNoneSlow(collection), + }; + + static Optional LastOrNoneSlow(IEnumerable collection) + { + var result = Optional.None(); + foreach (var item in collection) + { + result = item; + } + + return result; + } + } + /// /// Obtains element at the specified index in the sequence. /// @@ -181,6 +242,7 @@ public static bool ElementAt(this IEnumerable collection, int index, [Mayb { return collection switch { + null => throw new ArgumentNullException(nameof(collection)), List list => Span.ElementAt(CollectionsMarshal.AsSpan(list), index, out element), T[] array => Span.ElementAt(array, index, out element), LinkedList list => NodeValueAt(list, index, out element), @@ -211,14 +273,15 @@ static bool NodeValueAt(LinkedList list, int matchIndex, [MaybeNullWhen(false static bool ElementAtSlow(IEnumerable collection, int index, [MaybeNullWhen(false)] out T element) { using var enumerator = collection.GetEnumerator(); - enumerator.Skip(index); - if (enumerator.MoveNext()) + + // enumerator.Skip(index + 1) may overflow, replace it with two calls + if (enumerator.Skip(index) && enumerator.MoveNext()) { element = enumerator.Current; return true; } - element = default!; + element = default; return false; } @@ -230,7 +293,7 @@ static bool ListElementAt(IList list, int index, [MaybeNullWhen(false)] out T return true; } - element = default!; + element = default; return false; } @@ -242,7 +305,7 @@ static bool ReadOnlyListElementAt(IReadOnlyList list, int index, [MaybeNullWh return true; } - element = default!; + element = default; return false; } } diff --git a/src/DotNext/Collections/Generic/Enumerator.cs b/src/DotNext/Collections/Generic/Enumerator.cs index c451ddfff..abadeb7d8 100644 --- a/src/DotNext/Collections/Generic/Enumerator.cs +++ b/src/DotNext/Collections/Generic/Enumerator.cs @@ -19,12 +19,10 @@ public static partial class Enumerator /// , if current element is available; otherwise, . public static bool Skip(this IEnumerator enumerator, int count) { - while (count > 0) + for (; count > 0; count--) { if (!enumerator.MoveNext()) return false; - - count--; } return true; @@ -41,12 +39,10 @@ public static bool Skip(this IEnumerator enumerator, int count) public static bool Skip(this ref TEnumerator enumerator, int count) where TEnumerator : struct, IEnumerator { - while (count > 0) + for (; count > 0; count--) { if (!enumerator.MoveNext()) return false; - - count--; } return true; diff --git a/src/DotNext/DotNext.csproj b/src/DotNext/DotNext.csproj index 5c78d8cf4..1b52222ee 100644 --- a/src/DotNext/DotNext.csproj +++ b/src/DotNext/DotNext.csproj @@ -11,7 +11,7 @@ .NET Foundation and Contributors .NEXT Family of Libraries - 5.7.0 + 5.8.0 DotNext MIT diff --git a/src/DotNext/Span.cs b/src/DotNext/Span.cs index b203057b6..9644d7193 100644 --- a/src/DotNext/Span.cs +++ b/src/DotNext/Span.cs @@ -419,11 +419,10 @@ public static void CopyTo(this Span source, Span destination, out int w /// The source span. /// A function to test each element for a condition. /// The first element in the span that matches to the specified filter; or . - /// is . - public static Optional FirstOrNone(this ReadOnlySpan span, Predicate filter) + public static Optional FirstOrNone(this ReadOnlySpan span, Predicate? filter = null) { - ArgumentNullException.ThrowIfNull(filter); - + filter ??= Predicate.Constant(true); + for (var i = 0; i < span.Length; i++) { var item = span[i]; @@ -434,6 +433,13 @@ public static Optional FirstOrNone(this ReadOnlySpan span, Predicate return Optional.None; } + internal static Optional LastOrNone(ReadOnlySpan span) + { + ref var elementRef = ref MemoryMarshal.GetReference(span); + var length = span.Length; + return length > 0 ? Unsafe.Add(ref elementRef, length - 1) : Optional.None; + } + internal static bool ElementAt(ReadOnlySpan span, int index, [MaybeNullWhen(false)] out T element) { if ((uint)index < (uint)span.Length)