Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions .github/workflows/security-analysis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: Code Security Testing

on:
push:
branches:
- main
pull_request:
branches:
- main
schedule:
- cron: "0 20 * * 5"

permissions:
security-events: write

concurrency:
group: security-${{ github.ref }}
cancel-in-progress: true

jobs:
codeQL:
name: Analyze
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis

- name: Initialize CodeQL
uses: github/codeql-action/init@v3
with:
languages: csharp

- name: Setup dotnet
uses: actions/setup-dotnet@v4
with:
dotnet-version: '8.x.x'

- name: Build
run: dotnet build --configuration Release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v3
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
![GitHub License](https://img.shields.io/github/license/pmdevers/streamwave)
![Github Issues Open](https://img.shields.io/github/issues/pmdevers/streamwave)
![Github Pull Request Open](https://img.shields.io/github/issues-pr/pmdevers/streamwave)
[![Scheduled Code Security Testing](https://github.com/pmdevers/streamwave/actions/workflows/security-analysis.yml/badge.svg?event=schedule)](https://github.com/pmdevers/streamwave/actions/workflows/security-analysis.yml)


An aggregate root designed for a streaming environment, addressing the dual write problem by decoupling the domain model from the event stream.
Expand Down
6 changes: 4 additions & 2 deletions src/StreamWave.EntityFramework/AggregateBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ public static IAggregateBuilder<TState, TKey> WithEntityFramework<TContext, TSta
(id) =>
{
var context = s.GetRequiredService<TContext>();
return AggregateStore<TState, TKey>.LoadAsync(context, id);
var serializer = s.GetRequiredService<IEventSerializer>();
return new AggregateStore<TState, TKey>(context, serializer).LoadAsync(id);
}
)
.WithSaver((s) =>
(a) => {
var context = s.GetRequiredService<TContext>();
return AggregateStore<TState, TKey>.SaveAsync(context, a);
var serializer = s.GetRequiredService<IEventSerializer>();
return new AggregateStore<TState, TKey>(context, serializer).SaveAsync(a);
});
return builder;
}
Expand Down
50 changes: 40 additions & 10 deletions src/StreamWave.EntityFramework/AggregateStore.cs
Original file line number Diff line number Diff line change
@@ -1,34 +1,64 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using System.Text;
using System.Text.Json;
using static System.Runtime.InteropServices.JavaScript.JSType;

namespace StreamWave.EntityFramework;

public static partial class AggregateStore<TState, TId>
public interface IEventSerializer
{
object? Deserialize(byte[] data, Type type);
byte[] Serialize(object value, Type type);
}

public class DefaultSerializer(JsonSerializerOptions options) : IEventSerializer
{
public JsonSerializerOptions Options { get; } = options;

public object? Deserialize(byte[] data, Type type)
{
using var stream = new MemoryStream(data);
return JsonSerializer.Deserialize(stream, type, Options);
}

public byte[] Serialize(object value, Type type)
{
using var stream = new MemoryStream();
JsonSerializer.Serialize(stream, value, type, Options);
stream.Flush();
return stream.ToArray();
}
}

public class AggregateStore<TState, TId>(DbContext context, IEventSerializer serializer)
where TState : class
where TId : struct
{
public static async Task<IEventStream<TId>> SaveAsync(DbContext context, IAggregate<TState, TId> aggregate)
private readonly IEventSerializer _serializer = serializer;

public async Task<IEventStream<TId>> SaveAsync(IAggregate<TState, TId> aggregate)
{
if (!aggregate.Stream.HasUncommittedChanges)
{
return aggregate.Stream;
}

SaveState(context, aggregate.State);
SaveEvents(context, aggregate.Stream);
SaveState(aggregate.State);
SaveEvents(aggregate.Stream);

await context.SaveChangesAsync();

return aggregate.Stream.Commit();
}

private static void SaveState(DbContext context, TState state)
private void SaveState(TState state)
{
context.Remove(state);
context.Add(state);
}

private static void SaveEvents(DbContext context, IEventStream<TId> stream)
private void SaveEvents(IEventStream<TId> stream)
{
var events = stream.GetUncommittedEvents();

Expand All @@ -42,12 +72,12 @@ private static void SaveEvents(DbContext context, IEventStream<TId> stream)
stream.Id,
version,
e.GetType().AssemblyQualifiedName ?? string.Empty,
JsonSerializer.Serialize(e, e.GetType(), JsonSerializerOptions.Default)
_serializer.Serialize(e, e.GetType())
));
}
}

public static async Task<IEventStream<TId>?> LoadAsync(DbContext context, TId id)
public async Task<IEventStream<TId>?> LoadAsync(TId id)
{
var events = await context.Set<PersistendEvent<TId>>()
.Where(x => x.StreamId.Equals(id))
Expand All @@ -58,7 +88,7 @@ private static void SaveEvents(DbContext context, IEventStream<TId> stream)
return EventStream.Create(id, events);
}

private static Event GetEvent(PersistendEvent<TId> x)
private Event GetEvent(PersistendEvent<TId> x)
{
var eventType = Type.GetType(x.EventName);

Expand All @@ -67,7 +97,7 @@ private static Event GetEvent(PersistendEvent<TId> x)
return new UnkownEventType(x.EventName, x.Payload);
}

var e = JsonSerializer.Deserialize(x.Payload, eventType);
var e = _serializer.Deserialize(x.Payload, eventType);
if (e is Event ev)
{
return ev;
Expand Down
2 changes: 1 addition & 1 deletion src/StreamWave.EntityFramework/PersistendEvent.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
namespace StreamWave.EntityFramework;

public record PersistendEvent<TId>(Guid Id, TId StreamId, int Version, string EventName, string Payload);
public record PersistendEvent<TId>(Guid Id, TId StreamId, int Version, string EventName, byte[] Payload);
7 changes: 1 addition & 6 deletions src/StreamWave.EntityFramework/UnkownEventType.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
namespace StreamWave.EntityFramework;

public static partial class AggregateStore<TState, TId>
where TState : class
where TId : struct
{
public record UnkownEventType(string EventName, string Payload) : Event;
}
public record UnkownEventType(string EventName, byte[] Payload) : Event;
4 changes: 3 additions & 1 deletion src/StreamWave/Aggregate.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace StreamWave;
using StreamWave.Extensions;

namespace StreamWave;

public delegate TState CreateStateDelegate<out TState, in TId>()
where TState : IAggregateState<TId>;
Expand Down
4 changes: 3 additions & 1 deletion src/StreamWave/AggregateBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace StreamWave;
using StreamWave.Extensions;

namespace StreamWave;

public class AggregateBuilder<TState, TId> : IAggregateBuilder<TState, TId>
where TState : IAggregateState<TId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,19 @@
namespace StreamWave;

public interface IEventStream<TId> : IReadOnlyCollection<Event>
{
TId Id { get; }
bool HasUncommittedChanges { get; }
int Version { get; }
int ExpectedVersion { get; }
DateTimeOffset? CreatedOn { get; }
DateTimeOffset? LastModifiedOn { get; }
void Append(Event e);
Event[] GetUncommittedEvents();
IEventStream<TId> Commit();
}


public static class StreamExtensions
{
public static async Task<TState> AggregateAsync<TState, TSource>
(this IEnumerable<TSource> source,
TState aggregate,
Func<TState, TSource, Task<TState>> func)
{
using IEnumerator<TSource> e = source.GetEnumerator();
if (!e.MoveNext())
{
return aggregate;
}

while (e.MoveNext()) aggregate = await func(aggregate, e.Current);
return aggregate;
}
}
namespace StreamWave.Extensions;

public static class EventStreamExtensions
{
public static async Task<TState> AggregateAsync<TState, TSource>
(this IEnumerable<TSource> source,
TState aggregate,
Func<TState, TSource, Task<TState>> func)
{
using IEnumerator<TSource> e = source.GetEnumerator();
if (!e.MoveNext())
{
return aggregate;
}

while (e.MoveNext()) aggregate = await func(aggregate, e.Current);
return aggregate;
}
}
Original file line number Diff line number Diff line change
@@ -1,49 +1,49 @@
using Microsoft.Extensions.DependencyInjection;
namespace StreamWave;
public static class ServiceCollectionExtensions
{
public static IAggregateBuilder<TState, TId> AddAggregate<TState, TId>(this IServiceCollection services, CreateStateDelegate<TState, TId> initialState)
where TState : IAggregateState<TId>
{
var builder = new AggregateBuilder<TState, TId>(initialState);
services.AddSingleton(builder);
services.AddScoped<IAggregateManager<TState, TId>, AggregateManager<TState, TId>>();
return builder;
}
}
public interface IAggregateState<TId>
{
TId Id { get; set; }
}
public interface IAggregateManager<TState, TId>
{
Task<IAggregate<TState, TId>> Create();
Task<IAggregate<TState, TId>> LoadAsync(TId id);
Task SaveAsync(IAggregate<TState, TId> aggregate);
}
public class AggregateManager<TState, TId>(
AggregateBuilder<TState, TId> builder, IServiceProvider serviceProvider) : IAggregateManager<TState, TId>
where TState : IAggregateState<TId>
{
public Task<IAggregate<TState, TId>> Create()
{
var aggregate = builder.Build(serviceProvider);
return Task.FromResult(aggregate);
}
public async Task<IAggregate<TState, TId>> LoadAsync(TId id)
{
var aggregate = builder.Build(serviceProvider);
await aggregate.LoadAsync(id);
return aggregate;
}
public Task SaveAsync(IAggregate<TState, TId> aggregate)
{
return aggregate.SaveAsync();
}
}
using Microsoft.Extensions.DependencyInjection;

namespace StreamWave.Extensions;
public static class ServiceCollectionExtensions
{
public static IAggregateBuilder<TState, TId> AddAggregate<TState, TId>(this IServiceCollection services, CreateStateDelegate<TState, TId> initialState)
where TState : IAggregateState<TId>
{
var builder = new AggregateBuilder<TState, TId>(initialState);
services.AddSingleton(builder);
services.AddScoped<IAggregateManager<TState, TId>, AggregateManager<TState, TId>>();
return builder;
}
}

public interface IAggregateState<TId>
{
TId Id { get; set; }
}

public interface IAggregateManager<TState, TId>
{
Task<IAggregate<TState, TId>> Create();
Task<IAggregate<TState, TId>> LoadAsync(TId id);
Task SaveAsync(IAggregate<TState, TId> aggregate);
}

public class AggregateManager<TState, TId>(
AggregateBuilder<TState, TId> builder, IServiceProvider serviceProvider) : IAggregateManager<TState, TId>
where TState : IAggregateState<TId>
{
public Task<IAggregate<TState, TId>> Create()
{
var aggregate = builder.Build(serviceProvider);
return Task.FromResult(aggregate);
}

public async Task<IAggregate<TState, TId>> LoadAsync(TId id)
{
var aggregate = builder.Build(serviceProvider);
await aggregate.LoadAsync(id);
return aggregate;
}

public Task SaveAsync(IAggregate<TState, TId> aggregate)
{
return aggregate.SaveAsync();
}
}
14 changes: 14 additions & 0 deletions src/StreamWave/IEventStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace StreamWave;

public interface IEventStream<TId> : IReadOnlyCollection<Event>
{
TId Id { get; }
bool HasUncommittedChanges { get; }
int Version { get; }
int ExpectedVersion { get; }
DateTimeOffset? CreatedOn { get; }
DateTimeOffset? LastModifiedOn { get; }
void Append(Event e);
Event[] GetUncommittedEvents();
IEventStream<TId> Commit();
}
Loading