Skip to content

Commit

Permalink
Minor updates
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzimarev committed Jan 30, 2025
1 parent 286aa67 commit e4769af
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public async Task<Result<TState>> Handle<TCommand>(TCommand command, Cancellatio
.LoadAggregate<TAggregate, TState, TId>(aggregateId, _streamNameMap, true, _factoryRegistry, cancellationToken)
.NoContext(),
ExpectedState.New => Create(aggregateId),
ExpectedState.Unknown => default,
ExpectedState.Unknown => null,
_ => throw new ArgumentOutOfRangeException(nameof(registeredHandler.ExpectedState), "Unknown expected state")
};

Expand Down
11 changes: 10 additions & 1 deletion src/Core/src/Eventuous.Persistence/ExpectedStreamVersion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,18 @@ public readonly record struct ExpectedStreamVersion(long Value) {
public static readonly ExpectedStreamVersion Any = new(-2);
}

public record struct StreamReadPosition(long Value) {
public record struct StreamReadPosition {
public StreamReadPosition(long Value) {
if (Value < 0) throw new ArgumentOutOfRangeException(nameof(Value), "StreamReadPosition cannot be negative.");
this.Value = Value;
}

public static readonly StreamReadPosition Start = new(0L);
public static readonly StreamReadPosition End = new(long.MaxValue);
public static implicit operator StreamReadPosition(long value) => new(value);
public long Value { get; set; }

public readonly void Deconstruct(out long value) => value = this.Value;
}

public record struct StreamTruncatePosition(long Value);
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ public FoldedEventStream(StreamName streamName, ExpectedStreamVersion streamVers
Events = events;
State = events.Aggregate(new T(), (state, o) => state.When(o));
}

public StreamName StreamName { get; }
public ExpectedStreamVersion StreamVersion { get; }
public object[] Events { get; }
public T State { get; init; }
}
}
3 changes: 3 additions & 0 deletions src/Core/src/Eventuous.Shared/Eventuous.Shared.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@
<ItemGroup>
<InternalsVisibleTo Include="Benchmarks" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Eventuous.Domain\Eventuous.Domain.csproj" />
</ItemGroup>
</Project>
1 change: 1 addition & 0 deletions src/Core/src/Eventuous.Shared/Store/StreamName.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public static StreamName ForState<TState>(string entityId) {
}

public readonly string GetId() => Value[(Value.IndexOf('-') + 1)..];
public readonly string GetCategory() => Value[..Value.IndexOf('-')];

public static implicit operator string(StreamName streamName) => streamName.Value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static async ValueTask Stop<T>(
await finalize(ts.Token).NoContext();
}

public static async IAsyncEnumerable<T[]> ReadAllBatches<T>(
static async IAsyncEnumerable<T[]> ReadAllBatches<T>(
this ChannelReader<T> source,
int batchSize,
TimeSpan timeSpan,
Expand Down
14 changes: 9 additions & 5 deletions src/EventStore/src/Eventuous.EventStore/EsdbEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ static Task<T> AnyOrNot<T>(ExpectedStreamVersion version, Func<Task<T>> whenAny,
=> version == ExpectedStreamVersion.Any ? whenAny() : otherwise();

[MethodImpl(MethodImplOptions.AggressiveInlining)]
StreamEvent ToStreamEvent(ResolvedEvent resolvedEvent) {
StreamEvent? ToStreamEvent(ResolvedEvent resolvedEvent) {
var deserialized = _serializer.DeserializeEvent(
resolvedEvent.Event.Data.Span,
resolvedEvent.Event.EventType,
Expand All @@ -217,12 +217,15 @@ StreamEvent ToStreamEvent(ResolvedEvent resolvedEvent) {

return deserialized switch {
SuccessfullyDeserialized success => AsStreamEvent(success.Payload),
FailedToDeserialize failed => throw new SerializationException(
$"Can't deserialize {resolvedEvent.Event.EventType}: {failed.Error}"
),
FailedToDeserialize failed => HandleFailure(failed),
_ => throw new SerializationException("Unknown deserialization result")
};

StreamEvent? HandleFailure(FailedToDeserialize failed) {
if (resolvedEvent.Event.EventType.StartsWith('$')) return null;
throw new SerializationException($"Can't deserialize {resolvedEvent.Event.EventType}: {failed.Error}");
}

Metadata? DeserializeMetadata() {
var meta = resolvedEvent.Event.Metadata.Span;

Expand Down Expand Up @@ -252,8 +255,9 @@ StreamEvent AsStreamEvent(object payload)

StreamEvent[] ToStreamEvents(ResolvedEvent[] resolvedEvents)
=> resolvedEvents
.Where(x => !x.Event.EventType.StartsWith('$'))
.Select(ToStreamEvent)
.Where(x => x != null)
.Select(x => x!.Value)
.ToArray();

record ErrorInfo(string Message, params object[] Args);
Expand Down

0 comments on commit e4769af

Please sign in to comment.