Skip to content

Commit

Permalink
Adding "deep cancellation" API surface predicated by a NO_DEEP_CANCEL…
Browse files Browse the repository at this point in the history
…LATION symbol.
  • Loading branch information
bartdesmet committed Nov 17, 2018
1 parent 568e78f commit d596d9f
Show file tree
Hide file tree
Showing 40 changed files with 5,178 additions and 14 deletions.
15 changes: 15 additions & 0 deletions Ix.NET/Source/System.Linq.Async/System/Linq/AsyncIterator.Opt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace System.Linq
Expand All @@ -19,6 +20,13 @@ public virtual IAsyncEnumerable<TResult> Select<TResult>(Func<TSource, ValueTask
return new AsyncEnumerable.SelectEnumerableAsyncIteratorWithTask<TSource, TResult>(this, selector);
}

#if !NO_DEEP_CANCELLATION
public virtual IAsyncEnumerable<TResult> Select<TResult>(Func<TSource, CancellationToken, ValueTask<TResult>> selector)
{
return new AsyncEnumerable.SelectEnumerableAsyncIteratorWithTaskAndCancellation<TSource, TResult>(this, selector);
}
#endif

public virtual IAsyncEnumerable<TSource> Where(Func<TSource, bool> predicate)
{
return new AsyncEnumerable.WhereEnumerableAsyncIterator<TSource>(this, predicate);
Expand All @@ -28,5 +36,12 @@ public virtual IAsyncEnumerable<TSource> Where(Func<TSource, ValueTask<bool>> pr
{
return new AsyncEnumerable.WhereEnumerableAsyncIteratorWithTask<TSource>(this, predicate);
}

#if !NO_DEEP_CANCELLATION
public virtual IAsyncEnumerable<TSource> Where(Func<TSource, CancellationToken, ValueTask<bool>> predicate)
{
return new AsyncEnumerable.WhereEnumerableAsyncIteratorWithTaskAndCancellation<TSource>(this, predicate);
}
#endif
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace System.Linq
Expand All @@ -11,5 +12,9 @@ public interface IOrderedAsyncEnumerable<out TElement> : IAsyncEnumerable<TEleme
{
IOrderedAsyncEnumerable<TElement> CreateOrderedEnumerable<TKey>(Func<TElement, TKey> keySelector, IComparer<TKey> comparer, bool descending);
IOrderedAsyncEnumerable<TElement> CreateOrderedEnumerable<TKey>(Func<TElement, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending);

#if !NO_DEEP_CANCELLATION
IOrderedAsyncEnumerable<TElement> CreateOrderedEnumerable<TKey>(Func<TElement, CancellationToken, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending);
#endif
}
}
112 changes: 112 additions & 0 deletions Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Aggregate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ public static Task<TSource> AggregateAsync<TSource>(this IAsyncEnumerable<TSourc
return AggregateCore(source, accumulator, cancellationToken);
}

#if !NO_DEEP_CANCELLATION
public static Task<TSource> AggregateAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (accumulator == null)
throw Error.ArgumentNull(nameof(accumulator));

return AggregateCore(source, accumulator, cancellationToken);
}
#endif

public static Task<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
{
if (source == null)
Expand Down Expand Up @@ -90,6 +102,18 @@ public static Task<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsync
return AggregateCore(source, seed, accumulator, cancellationToken);
}

#if !NO_DEEP_CANCELLATION
public static Task<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (accumulator == null)
throw Error.ArgumentNull(nameof(accumulator));

return AggregateCore(source, seed, accumulator, cancellationToken);
}
#endif

public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector)
{
if (source == null)
Expand Down Expand Up @@ -138,6 +162,20 @@ public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this I
return AggregateCore(source, seed, accumulator, resultSelector, cancellationToken);
}

#if !NO_DEEP_CANCELLATION
public static Task<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (accumulator == null)
throw Error.ArgumentNull(nameof(accumulator));
if (resultSelector == null)
throw Error.ArgumentNull(nameof(resultSelector));

return AggregateCore(source, seed, accumulator, resultSelector, cancellationToken);
}
#endif

private static async Task<TResult> AggregateCore<TSource, TAccumulate, TResult>(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken)
{
var acc = seed;
Expand Down Expand Up @@ -206,6 +244,29 @@ private static async Task<TResult> AggregateCore<TSource, TResult>(IAsyncEnumera
return acc;
}

#if !NO_DEEP_CANCELLATION
private static async Task<TResult> AggregateCore<TSource, TResult>(IAsyncEnumerable<TSource> source, TResult seed, Func<TResult, TSource, CancellationToken, ValueTask<TResult>> accumulator, CancellationToken cancellationToken)
{
var acc = seed;

var e = source.GetAsyncEnumerator(cancellationToken);

try
{
while (await e.MoveNextAsync().ConfigureAwait(false))
{
acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
}
}
finally
{
await e.DisposeAsync().ConfigureAwait(false);
}

return acc;
}
#endif

private static async Task<TResult> AggregateCore<TSource, TAccumulate, TResult>(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
{
var acc = seed;
Expand All @@ -227,6 +288,29 @@ private static async Task<TResult> AggregateCore<TSource, TAccumulate, TResult>(
return await resultSelector(acc).ConfigureAwait(false);
}

#if !NO_DEEP_CANCELLATION
private static async Task<TResult> AggregateCore<TSource, TAccumulate, TResult>(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, ValueTask<TResult>> resultSelector, CancellationToken cancellationToken)
{
var acc = seed;

var e = source.GetAsyncEnumerator(cancellationToken);

try
{
while (await e.MoveNextAsync().ConfigureAwait(false))
{
acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
}
}
finally
{
await e.DisposeAsync().ConfigureAwait(false);
}

return await resultSelector(acc, cancellationToken).ConfigureAwait(false);
}
#endif

private static async Task<TSource> AggregateCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
Expand All @@ -252,5 +336,33 @@ private static async Task<TSource> AggregateCore<TSource>(IAsyncEnumerable<TSour
await e.DisposeAsync().ConfigureAwait(false);
}
}

#if !NO_DEEP_CANCELLATION
private static async Task<TSource> AggregateCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);

try
{
if (!await e.MoveNextAsync().ConfigureAwait(false))
{
throw Error.NoElements();
}

var acc = e.Current;

while (await e.MoveNextAsync().ConfigureAwait(false))
{
acc = await accumulator(acc, e.Current, cancellationToken).ConfigureAwait(false);
}

return acc;
}
finally
{
await e.DisposeAsync().ConfigureAwait(false);
}
}
#endif
}
}
34 changes: 34 additions & 0 deletions Ix.NET/Source/System.Linq.Async/System/Linq/Operators/All.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ public static Task<bool> AllAsync<TSource>(this IAsyncEnumerable<TSource> source
return AllCore(source, predicate, cancellationToken);
}

#if !NO_DEEP_CANCELLATION
public static Task<bool> AllAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate, CancellationToken cancellationToken)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (predicate == null)
throw Error.ArgumentNull(nameof(predicate));

return AllCore(source, predicate, cancellationToken);
}
#endif

private static async Task<bool> AllCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
Expand Down Expand Up @@ -89,5 +101,27 @@ private static async Task<bool> AllCore<TSource>(IAsyncEnumerable<TSource> sourc

return true;
}

#if !NO_DEEP_CANCELLATION
private static async Task<bool> AllCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);

try
{
while (await e.MoveNextAsync().ConfigureAwait(false))
{
if (!await predicate(e.Current, cancellationToken).ConfigureAwait(false))
return false;
}
}
finally
{
await e.DisposeAsync().ConfigureAwait(false);
}

return true;
}
#endif
}
}
34 changes: 34 additions & 0 deletions Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Any.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ public static Task<bool> AnyAsync<TSource>(this IAsyncEnumerable<TSource> source
return AnyCore(source, predicate, cancellationToken);
}

#if !NO_DEEP_CANCELLATION
public static Task<bool> AnyAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate, CancellationToken cancellationToken)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (predicate == null)
throw Error.ArgumentNull(nameof(predicate));

return AnyCore(source, predicate, cancellationToken);
}
#endif

private static async Task<bool> AnyCore<TSource>(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
Expand Down Expand Up @@ -119,5 +131,27 @@ private static async Task<bool> AnyCore<TSource>(IAsyncEnumerable<TSource> sourc

return false;
}

#if !NO_DEEP_CANCELLATION
private static async Task<bool> AnyCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);

try
{
while (await e.MoveNextAsync().ConfigureAwait(false))
{
if (await predicate(e.Current, cancellationToken).ConfigureAwait(false))
return true;
}
}
finally
{
await e.DisposeAsync().ConfigureAwait(false);
}

return false;
}
#endif
}
}
Loading

0 comments on commit d596d9f

Please sign in to comment.