Skip to content

Commit

Permalink
Simplfied AsyncEnumerable batching
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed May 24, 2024
1 parent 1b01820 commit 0a8021b
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 71 deletions.
16 changes: 6 additions & 10 deletions Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Polly.Wrap;

namespace Core.EventStoreDB.Subscriptions;

using static ISubscriptionCheckpointRepository;

public class EventStoreDBSubscriptionToAllOptions
Expand All @@ -25,7 +26,7 @@ public class EventStoreDBSubscriptionToAllOptions
public bool IgnoreDeserializationErrors { get; set; } = true;

public int BatchSize { get; set; } = 1;
public int BatchDeadline { get; set; } = 50;
public TimeSpan BatchDeadline { get; set; } = TimeSpan.FromMilliseconds(50);
}

public class EventStoreDBSubscriptionToAll(
Expand Down Expand Up @@ -67,19 +68,15 @@ await RetryPolicy.ExecuteAsync(token =>

private async Task OnSubscribe(Checkpoint checkpoint, CancellationToken ct)
{
var subscription = eventStoreClient.SubscribeToAll(
checkpoint != Checkpoint.None ? FromAll.After(checkpoint) : FromAll.Start,
Options.ResolveLinkTos,
Options.FilterOptions,
Options.Credentials,
ct
);
var subscription = eventStoreClient
.SubscribeToAll(FromAll.Start, cancellationToken: ct)
.Batch(Options.BatchSize, Options.BatchDeadline, ct);

Status = ProcessingStatus.Started;

logger.LogInformation("Subscription to all '{SubscriptionId}' started", SubscriptionId);

await foreach(var events in subscription.Batch(Options.BatchSize, Options.BatchDeadline, ct))
await foreach (var events in subscription)
{
var batch = new EventBatch(Options.SubscriptionId, events.ToArray());
var result = await ProcessBatch(batch, checkpoint, ct).ConfigureAwait(false);
Expand Down Expand Up @@ -149,5 +146,4 @@ private static bool IsCancelledByUser(RpcException rpcException) =>
private static bool IsCancelledByUser(Exception exception) =>
exception is OperationCanceledException
|| exception is RpcException rpcException && IsCancelledByUser(rpcException);

}
68 changes: 7 additions & 61 deletions Core/Extensions/AsyncEnumerableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,70 +5,16 @@ namespace Core.Extensions;

public static class AsyncEnumerableExtensions
{
public static async Task Pipe<T, TResult>(
this IAsyncEnumerable<T> enumerable,
ChannelWriter<TResult> cw,
Func<List<T>, TResult> transform,
int batchSize,
int timeout,
CancellationToken ct
)
{
var channel = Channel.CreateUnbounded<T>(
new UnboundedChannelOptions
{
SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = false
}
);

channel.Reader.Batch(batchSize).WithTimeout(timeout).PipeAsync(async batch =>
{
await cw.WriteAsync(transform(batch), ct).ConfigureAwait(false);

return batch;
}, cancellationToken: ct);

await foreach (var @event in enumerable.WithCancellation(ct))
{
await channel.Writer.WriteAsync(@event, ct).ConfigureAwait(false);
}
}

public static IAsyncEnumerable<List<T>> Batch<T>(
this IAsyncEnumerable<T> enumerable,
int batchSize,
int timeout,
TimeSpan deadline,
CancellationToken ct
)
{
var channel = Channel.CreateUnbounded<T>(
new UnboundedChannelOptions
{
SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = false
}
);

// Start the producer in the background
_ = ProduceAsync(enumerable, channel, ct).ContinueWith(async t =>
{
if (t.IsFaulted)
{
await channel.CompleteAsync(t.Exception).ConfigureAwait(false);
}
}, ct, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Current);


return channel.Reader.Batch(batchSize).WithTimeout(timeout).AsAsyncEnumerable(cancellationToken: ct);
}


private static async Task ProduceAsync<T>(this IAsyncEnumerable<T> enumerable, Channel<T> channel, CancellationToken ct)
{
await foreach (var @event in enumerable.WithCancellation(ct))
{
await channel.Writer.WriteAsync(@event, ct).ConfigureAwait(false);
}
) =>
enumerable
.ToChannel(cancellationToken: ct)
.Batch(batchSize)
.WithTimeout(deadline)
.AsAsyncEnumerable(cancellationToken: ct);

await channel.CompleteAsync().ConfigureAwait(false);
}
}

0 comments on commit 0a8021b

Please sign in to comment.