-
Notifications
You must be signed in to change notification settings - Fork 8
/
AsyncCoroutineProxy.cs
76 lines (66 loc) · 2.43 KB
/
AsyncCoroutineProxy.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// https://github.com/noseratio/coroutines-talk
#nullable enable
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Coroutines
{
public interface IAsyncCoroutineProxy<T>
{
public Task<IAsyncEnumerable<T>> AsAsyncEnumerable(CancellationToken token = default);
}
public class AsyncCoroutineProxy<T> : IAsyncCoroutineProxy<T>
{
readonly TaskCompletionSource<IAsyncEnumerable<T>> _proxyTcs =
new TaskCompletionSource<IAsyncEnumerable<T>>(
TaskCreationOptions.RunContinuationsAsynchronously);
public AsyncCoroutineProxy()
{
}
async Task<IAsyncEnumerable<T>> IAsyncCoroutineProxy<T>.AsAsyncEnumerable(CancellationToken token)
{
using var _ = token.Register(() => _proxyTcs.TrySetCanceled(), useSynchronizationContext: false);
return await _proxyTcs.Task;
}
public async Task RunAsync(Func<CancellationToken, IAsyncEnumerable<T>> coroutine, CancellationToken token)
{
token.ThrowIfCancellationRequested();
var channel = Channel.CreateUnbounded<T>();
var writer = channel.Writer;
var proxy = channel.Reader.ReadAllAsync(token);
_proxyTcs.SetResult(proxy);
try
{
await foreach (var item in coroutine(token).WithCancellation(token))
{
await writer.WriteAsync(item, token);
}
writer.Complete();
}
catch (Exception ex)
{
writer.Complete(ex);
throw;
}
}
}
public static class AsyncCoroutineExtensions
{
public async static ValueTask<IAsyncEnumerator<T>> AsAsyncEnumerator<T>(
this IAsyncCoroutineProxy<T> @this,
CancellationToken token = default)
{
return (await @this.AsAsyncEnumerable(token)).GetAsyncEnumerator(token);
}
public async static ValueTask<T> GetNextAsync<T>(this IAsyncEnumerator<T> @this)
{
if (!await @this.MoveNextAsync())
{
throw new IndexOutOfRangeException(nameof(GetNextAsync));
}
return @this.Current;
}
}
}