Skip to content

Commit

Permalink
Simplfied AsyncEnumerable batching
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed May 24, 2024
1 parent 1b01820 commit a65bffe
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 74 deletions.
29 changes: 18 additions & 11 deletions Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Text.RegularExpressions;
using Core.Events;
using Core.EventStoreDB.Subscriptions.Batch;
using Core.EventStoreDB.Subscriptions.Checkpoints;
Expand All @@ -10,22 +11,27 @@
using Polly.Wrap;

namespace Core.EventStoreDB.Subscriptions;

using static ISubscriptionCheckpointRepository;

public class EventStoreDBSubscriptionToAllOptions
{
public static readonly IEventFilter ExcludeSystemAndCheckpointEvents =
EventTypeFilter.RegularExpression(
@"^(?!\$)(?!Core\.EventStoreDB\.Subscriptions\.Checkpoints\.CheckpointStored$).+");

public required string SubscriptionId { get; init; }

public SubscriptionFilterOptions FilterOptions { get; set; } =
new(EventTypeFilter.ExcludeSystemEvents());
new(ExcludeSystemAndCheckpointEvents);

public Action<EventStoreClientOperationOptions>? ConfigureOperation { get; set; }
public UserCredentials? Credentials { get; set; }
public bool ResolveLinkTos { get; set; }
public bool IgnoreDeserializationErrors { get; set; } = true;

public int BatchSize { get; set; } = 1;
public int BatchDeadline { get; set; } = 50;
public TimeSpan BatchDeadline { get; set; } = TimeSpan.FromMilliseconds(50);
}

public class EventStoreDBSubscriptionToAll(
Expand Down Expand Up @@ -67,19 +73,21 @@ await RetryPolicy.ExecuteAsync(token =>

private async Task OnSubscribe(Checkpoint checkpoint, CancellationToken ct)
{
var subscription = eventStoreClient.SubscribeToAll(
checkpoint != Checkpoint.None ? FromAll.After(checkpoint) : FromAll.Start,
Options.ResolveLinkTos,
Options.FilterOptions,
Options.Credentials,
ct
);
var subscription = eventStoreClient
.SubscribeToAll(
checkpoint != Checkpoint.None ? FromAll.After(checkpoint) : FromAll.Start,
Options.ResolveLinkTos,
Options.FilterOptions,
Options.Credentials,
ct
)
.Batch(Options.BatchSize, Options.BatchDeadline, ct);

Status = ProcessingStatus.Started;

logger.LogInformation("Subscription to all '{SubscriptionId}' started", SubscriptionId);

await foreach(var events in subscription.Batch(Options.BatchSize, Options.BatchDeadline, ct))
await foreach (var events in subscription)
{
var batch = new EventBatch(Options.SubscriptionId, events.ToArray());
var result = await ProcessBatch(batch, checkpoint, ct).ConfigureAwait(false);
Expand Down Expand Up @@ -149,5 +157,4 @@ private static bool IsCancelledByUser(RpcException rpcException) =>
private static bool IsCancelledByUser(Exception exception) =>
exception is OperationCanceledException
|| exception is RpcException rpcException && IsCancelledByUser(rpcException);

}
4 changes: 2 additions & 2 deletions Core.Testing/TestWebApplicationFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ protected override IHost CreateHost(IHostBuilder builder)
Environment.SetEnvironmentVariable("SchemaName", schemaName);

return Policy.Handle<Exception>()
.WaitAndRetry(5, _=>TimeSpan.FromMilliseconds(500))
.Execute(() =>base.CreateHost(builder));
.WaitAndRetry(5, _ => TimeSpan.FromMilliseconds(500))
.Execute(() => base.CreateHost(builder));
}

public void PublishedExternalEventsOfType<TEvent>() where TEvent : IExternalEvent =>
Expand Down
68 changes: 7 additions & 61 deletions Core/Extensions/AsyncEnumerableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,70 +5,16 @@ namespace Core.Extensions;

public static class AsyncEnumerableExtensions
{
public static async Task Pipe<T, TResult>(
this IAsyncEnumerable<T> enumerable,
ChannelWriter<TResult> cw,
Func<List<T>, TResult> transform,
int batchSize,
int timeout,
CancellationToken ct
)
{
var channel = Channel.CreateUnbounded<T>(
new UnboundedChannelOptions
{
SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = false
}
);

channel.Reader.Batch(batchSize).WithTimeout(timeout).PipeAsync(async batch =>
{
await cw.WriteAsync(transform(batch), ct).ConfigureAwait(false);

return batch;
}, cancellationToken: ct);

await foreach (var @event in enumerable.WithCancellation(ct))
{
await channel.Writer.WriteAsync(@event, ct).ConfigureAwait(false);
}
}

public static IAsyncEnumerable<List<T>> Batch<T>(
this IAsyncEnumerable<T> enumerable,
int batchSize,
int timeout,
TimeSpan deadline,
CancellationToken ct
)
{
var channel = Channel.CreateUnbounded<T>(
new UnboundedChannelOptions
{
SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = false
}
);

// Start the producer in the background
_ = ProduceAsync(enumerable, channel, ct).ContinueWith(async t =>
{
if (t.IsFaulted)
{
await channel.CompleteAsync(t.Exception).ConfigureAwait(false);
}
}, ct, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Current);


return channel.Reader.Batch(batchSize).WithTimeout(timeout).AsAsyncEnumerable(cancellationToken: ct);
}


private static async Task ProduceAsync<T>(this IAsyncEnumerable<T> enumerable, Channel<T> channel, CancellationToken ct)
{
await foreach (var @event in enumerable.WithCancellation(ct))
{
await channel.Writer.WriteAsync(@event, ct).ConfigureAwait(false);
}
) =>
enumerable
.ToChannel(cancellationToken: ct)
.Batch(batchSize)
.WithTimeout(deadline)
.AsAsyncEnumerable(cancellationToken: ct);

await channel.CompleteAsync().ConfigureAwait(false);
}
}
1 change: 1 addition & 0 deletions Sample/ECommerce/Orders/Orders/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public static IServiceCollection AddOrdersModule(this IServiceCollection service
)
.AddMarten(config, options =>
{
options.Projections.DaemonLockId = 44444;
options.ConfigureOrders();
options.DisableNpgsqlLogging = true;
})
Expand Down

0 comments on commit a65bffe

Please sign in to comment.