From e4769af7944d506624d148bafb788559307a957d Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Thu, 30 Jan 2025 10:22:33 +0100 Subject: [PATCH] Minor updates --- .../AggregateService/CommandService.cs | 2 +- .../Eventuous.Persistence/ExpectedStreamVersion.cs | 11 ++++++++++- .../StateStore/FoldedEventStream.cs | 4 ++-- .../src/Eventuous.Shared/Eventuous.Shared.csproj | 3 +++ src/Core/src/Eventuous.Shared/Store/StreamName.cs | 1 + .../Channels/ChannelExtensions.cs | 2 +- .../src/Eventuous.EventStore/EsdbEventStore.cs | 14 +++++++++----- 7 files changed, 27 insertions(+), 10 deletions(-) diff --git a/src/Core/src/Eventuous.Application/AggregateService/CommandService.cs b/src/Core/src/Eventuous.Application/AggregateService/CommandService.cs index 6b196946..66a58b91 100644 --- a/src/Core/src/Eventuous.Application/AggregateService/CommandService.cs +++ b/src/Core/src/Eventuous.Application/AggregateService/CommandService.cs @@ -80,7 +80,7 @@ public async Task> Handle(TCommand command, Cancellatio .LoadAggregate(aggregateId, _streamNameMap, true, _factoryRegistry, cancellationToken) .NoContext(), ExpectedState.New => Create(aggregateId), - ExpectedState.Unknown => default, + ExpectedState.Unknown => null, _ => throw new ArgumentOutOfRangeException(nameof(registeredHandler.ExpectedState), "Unknown expected state") }; diff --git a/src/Core/src/Eventuous.Persistence/ExpectedStreamVersion.cs b/src/Core/src/Eventuous.Persistence/ExpectedStreamVersion.cs index 49b39d1c..9dbafcfd 100644 --- a/src/Core/src/Eventuous.Persistence/ExpectedStreamVersion.cs +++ b/src/Core/src/Eventuous.Persistence/ExpectedStreamVersion.cs @@ -8,9 +8,18 @@ public readonly record struct ExpectedStreamVersion(long Value) { public static readonly ExpectedStreamVersion Any = new(-2); } -public record struct StreamReadPosition(long Value) { +public record struct StreamReadPosition { + public StreamReadPosition(long Value) { + if (Value < 0) throw new ArgumentOutOfRangeException(nameof(Value), "StreamReadPosition cannot be negative."); + this.Value = Value; + } + public static readonly StreamReadPosition Start = new(0L); public static readonly StreamReadPosition End = new(long.MaxValue); + public static implicit operator StreamReadPosition(long value) => new(value); + public long Value { get; set; } + + public readonly void Deconstruct(out long value) => value = this.Value; } public record struct StreamTruncatePosition(long Value); \ No newline at end of file diff --git a/src/Core/src/Eventuous.Persistence/StateStore/FoldedEventStream.cs b/src/Core/src/Eventuous.Persistence/StateStore/FoldedEventStream.cs index 1f67f609..112191e1 100644 --- a/src/Core/src/Eventuous.Persistence/StateStore/FoldedEventStream.cs +++ b/src/Core/src/Eventuous.Persistence/StateStore/FoldedEventStream.cs @@ -14,9 +14,9 @@ public FoldedEventStream(StreamName streamName, ExpectedStreamVersion streamVers Events = events; State = events.Aggregate(new T(), (state, o) => state.When(o)); } - + public StreamName StreamName { get; } public ExpectedStreamVersion StreamVersion { get; } public object[] Events { get; } public T State { get; init; } -} +} \ No newline at end of file diff --git a/src/Core/src/Eventuous.Shared/Eventuous.Shared.csproj b/src/Core/src/Eventuous.Shared/Eventuous.Shared.csproj index f16c1f42..f39e3ea8 100644 --- a/src/Core/src/Eventuous.Shared/Eventuous.Shared.csproj +++ b/src/Core/src/Eventuous.Shared/Eventuous.Shared.csproj @@ -22,4 +22,7 @@ + + + diff --git a/src/Core/src/Eventuous.Shared/Store/StreamName.cs b/src/Core/src/Eventuous.Shared/Store/StreamName.cs index 4d57ec10..2f02d9be 100644 --- a/src/Core/src/Eventuous.Shared/Store/StreamName.cs +++ b/src/Core/src/Eventuous.Shared/Store/StreamName.cs @@ -28,6 +28,7 @@ public static StreamName ForState(string entityId) { } public readonly string GetId() => Value[(Value.IndexOf('-') + 1)..]; + public readonly string GetCategory() => Value[..Value.IndexOf('-')]; public static implicit operator string(StreamName streamName) => streamName.Value; diff --git a/src/Core/src/Eventuous.Subscriptions/Channels/ChannelExtensions.cs b/src/Core/src/Eventuous.Subscriptions/Channels/ChannelExtensions.cs index dbda0b6a..b52a1ee1 100644 --- a/src/Core/src/Eventuous.Subscriptions/Channels/ChannelExtensions.cs +++ b/src/Core/src/Eventuous.Subscriptions/Channels/ChannelExtensions.cs @@ -67,7 +67,7 @@ public static async ValueTask Stop( await finalize(ts.Token).NoContext(); } - public static async IAsyncEnumerable ReadAllBatches( + static async IAsyncEnumerable ReadAllBatches( this ChannelReader source, int batchSize, TimeSpan timeSpan, diff --git a/src/EventStore/src/Eventuous.EventStore/EsdbEventStore.cs b/src/EventStore/src/Eventuous.EventStore/EsdbEventStore.cs index 9cdddf09..62093ed9 100644 --- a/src/EventStore/src/Eventuous.EventStore/EsdbEventStore.cs +++ b/src/EventStore/src/Eventuous.EventStore/EsdbEventStore.cs @@ -208,7 +208,7 @@ static Task AnyOrNot(ExpectedStreamVersion version, Func> whenAny, => version == ExpectedStreamVersion.Any ? whenAny() : otherwise(); [MethodImpl(MethodImplOptions.AggressiveInlining)] - StreamEvent ToStreamEvent(ResolvedEvent resolvedEvent) { + StreamEvent? ToStreamEvent(ResolvedEvent resolvedEvent) { var deserialized = _serializer.DeserializeEvent( resolvedEvent.Event.Data.Span, resolvedEvent.Event.EventType, @@ -217,12 +217,15 @@ StreamEvent ToStreamEvent(ResolvedEvent resolvedEvent) { return deserialized switch { SuccessfullyDeserialized success => AsStreamEvent(success.Payload), - FailedToDeserialize failed => throw new SerializationException( - $"Can't deserialize {resolvedEvent.Event.EventType}: {failed.Error}" - ), + FailedToDeserialize failed => HandleFailure(failed), _ => throw new SerializationException("Unknown deserialization result") }; + StreamEvent? HandleFailure(FailedToDeserialize failed) { + if (resolvedEvent.Event.EventType.StartsWith('$')) return null; + throw new SerializationException($"Can't deserialize {resolvedEvent.Event.EventType}: {failed.Error}"); + } + Metadata? DeserializeMetadata() { var meta = resolvedEvent.Event.Metadata.Span; @@ -252,8 +255,9 @@ StreamEvent AsStreamEvent(object payload) StreamEvent[] ToStreamEvents(ResolvedEvent[] resolvedEvents) => resolvedEvents - .Where(x => !x.Event.EventType.StartsWith('$')) .Select(ToStreamEvent) + .Where(x => x != null) + .Select(x => x!.Value) .ToArray(); record ErrorInfo(string Message, params object[] Args);