diff --git a/.github/workflows/security-analysis.yml b/.github/workflows/security-analysis.yml new file mode 100644 index 0000000..da71f37 --- /dev/null +++ b/.github/workflows/security-analysis.yml @@ -0,0 +1,44 @@ +name: Code Security Testing + +on: + push: + branches: + - main + pull_request: + branches: + - main + schedule: + - cron: "0 20 * * 5" + +permissions: + security-events: write + +concurrency: + group: security-${{ github.ref }} + cancel-in-progress: true + +jobs: + codeQL: + name: Analyze + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis + + - name: Initialize CodeQL + uses: github/codeql-action/init@v3 + with: + languages: csharp + + - name: Setup dotnet + uses: actions/setup-dotnet@v4 + with: + dotnet-version: '8.x.x' + + - name: Build + run: dotnet build --configuration Release + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v3 diff --git a/README.md b/README.md index 06d0376..bbb58d7 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ ![GitHub License](https://img.shields.io/github/license/pmdevers/streamwave) ![Github Issues Open](https://img.shields.io/github/issues/pmdevers/streamwave) ![Github Pull Request Open](https://img.shields.io/github/issues-pr/pmdevers/streamwave) +[![Scheduled Code Security Testing](https://github.com/pmdevers/streamwave/actions/workflows/security-analysis.yml/badge.svg?event=schedule)](https://github.com/pmdevers/streamwave/actions/workflows/security-analysis.yml) An aggregate root designed for a streaming environment, addressing the dual write problem by decoupling the domain model from the event stream. diff --git a/src/StreamWave.EntityFramework/AggregateBuilderExtensions.cs b/src/StreamWave.EntityFramework/AggregateBuilderExtensions.cs index 485a2cb..88935da 100644 --- a/src/StreamWave.EntityFramework/AggregateBuilderExtensions.cs +++ b/src/StreamWave.EntityFramework/AggregateBuilderExtensions.cs @@ -13,13 +13,15 @@ public static IAggregateBuilder WithEntityFramework { var context = s.GetRequiredService(); - return AggregateStore.LoadAsync(context, id); + var serializer = s.GetRequiredService(); + return new AggregateStore(context, serializer).LoadAsync(id); } ) .WithSaver((s) => (a) => { var context = s.GetRequiredService(); - return AggregateStore.SaveAsync(context, a); + var serializer = s.GetRequiredService(); + return new AggregateStore(context, serializer).SaveAsync(a); }); return builder; } diff --git a/src/StreamWave.EntityFramework/AggregateStore.cs b/src/StreamWave.EntityFramework/AggregateStore.cs index 785ede1..fec8b28 100644 --- a/src/StreamWave.EntityFramework/AggregateStore.cs +++ b/src/StreamWave.EntityFramework/AggregateStore.cs @@ -1,34 +1,64 @@ using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using System.Text; using System.Text.Json; +using static System.Runtime.InteropServices.JavaScript.JSType; namespace StreamWave.EntityFramework; -public static partial class AggregateStore +public interface IEventSerializer +{ + object? Deserialize(byte[] data, Type type); + byte[] Serialize(object value, Type type); +} + +public class DefaultSerializer(JsonSerializerOptions options) : IEventSerializer +{ + public JsonSerializerOptions Options { get; } = options; + + public object? Deserialize(byte[] data, Type type) + { + using var stream = new MemoryStream(data); + return JsonSerializer.Deserialize(stream, type, Options); + } + + public byte[] Serialize(object value, Type type) + { + using var stream = new MemoryStream(); + JsonSerializer.Serialize(stream, value, type, Options); + stream.Flush(); + return stream.ToArray(); + } +} + +public class AggregateStore(DbContext context, IEventSerializer serializer) where TState : class where TId : struct { - public static async Task> SaveAsync(DbContext context, IAggregate aggregate) + private readonly IEventSerializer _serializer = serializer; + + public async Task> SaveAsync(IAggregate aggregate) { if (!aggregate.Stream.HasUncommittedChanges) { return aggregate.Stream; } - SaveState(context, aggregate.State); - SaveEvents(context, aggregate.Stream); + SaveState(aggregate.State); + SaveEvents(aggregate.Stream); await context.SaveChangesAsync(); return aggregate.Stream.Commit(); } - private static void SaveState(DbContext context, TState state) + private void SaveState(TState state) { context.Remove(state); context.Add(state); } - private static void SaveEvents(DbContext context, IEventStream stream) + private void SaveEvents(IEventStream stream) { var events = stream.GetUncommittedEvents(); @@ -42,12 +72,12 @@ private static void SaveEvents(DbContext context, IEventStream stream) stream.Id, version, e.GetType().AssemblyQualifiedName ?? string.Empty, - JsonSerializer.Serialize(e, e.GetType(), JsonSerializerOptions.Default) + _serializer.Serialize(e, e.GetType()) )); } } - public static async Task?> LoadAsync(DbContext context, TId id) + public async Task?> LoadAsync(TId id) { var events = await context.Set>() .Where(x => x.StreamId.Equals(id)) @@ -58,7 +88,7 @@ private static void SaveEvents(DbContext context, IEventStream stream) return EventStream.Create(id, events); } - private static Event GetEvent(PersistendEvent x) + private Event GetEvent(PersistendEvent x) { var eventType = Type.GetType(x.EventName); @@ -67,7 +97,7 @@ private static Event GetEvent(PersistendEvent x) return new UnkownEventType(x.EventName, x.Payload); } - var e = JsonSerializer.Deserialize(x.Payload, eventType); + var e = _serializer.Deserialize(x.Payload, eventType); if (e is Event ev) { return ev; diff --git a/src/StreamWave.EntityFramework/PersistendEvent.cs b/src/StreamWave.EntityFramework/PersistendEvent.cs index 7325404..552f322 100644 --- a/src/StreamWave.EntityFramework/PersistendEvent.cs +++ b/src/StreamWave.EntityFramework/PersistendEvent.cs @@ -1,3 +1,3 @@ namespace StreamWave.EntityFramework; -public record PersistendEvent(Guid Id, TId StreamId, int Version, string EventName, string Payload); +public record PersistendEvent(Guid Id, TId StreamId, int Version, string EventName, byte[] Payload); diff --git a/src/StreamWave.EntityFramework/UnkownEventType.cs b/src/StreamWave.EntityFramework/UnkownEventType.cs index f58fda3..2ffcb9a 100644 --- a/src/StreamWave.EntityFramework/UnkownEventType.cs +++ b/src/StreamWave.EntityFramework/UnkownEventType.cs @@ -1,8 +1,3 @@ namespace StreamWave.EntityFramework; -public static partial class AggregateStore - where TState : class - where TId : struct -{ - public record UnkownEventType(string EventName, string Payload) : Event; -} +public record UnkownEventType(string EventName, byte[] Payload) : Event; diff --git a/src/StreamWave/Aggregate.cs b/src/StreamWave/Aggregate.cs index 9089696..89e5c80 100644 --- a/src/StreamWave/Aggregate.cs +++ b/src/StreamWave/Aggregate.cs @@ -1,4 +1,6 @@ -namespace StreamWave; +using StreamWave.Extensions; + +namespace StreamWave; public delegate TState CreateStateDelegate() where TState : IAggregateState; diff --git a/src/StreamWave/AggregateBuilder.cs b/src/StreamWave/AggregateBuilder.cs index 3a94d66..24a0f22 100644 --- a/src/StreamWave/AggregateBuilder.cs +++ b/src/StreamWave/AggregateBuilder.cs @@ -1,4 +1,6 @@ -namespace StreamWave; +using StreamWave.Extensions; + +namespace StreamWave; public class AggregateBuilder : IAggregateBuilder where TState : IAggregateState diff --git a/src/StreamWave/IStream.cs b/src/StreamWave/Extensions/EventStreamExtensions.cs similarity index 53% rename from src/StreamWave/IStream.cs rename to src/StreamWave/Extensions/EventStreamExtensions.cs index 716952c..597383e 100644 --- a/src/StreamWave/IStream.cs +++ b/src/StreamWave/Extensions/EventStreamExtensions.cs @@ -1,33 +1,19 @@ -namespace StreamWave; - -public interface IEventStream : IReadOnlyCollection -{ - TId Id { get; } - bool HasUncommittedChanges { get; } - int Version { get; } - int ExpectedVersion { get; } - DateTimeOffset? CreatedOn { get; } - DateTimeOffset? LastModifiedOn { get; } - void Append(Event e); - Event[] GetUncommittedEvents(); - IEventStream Commit(); -} - - -public static class StreamExtensions -{ - public static async Task AggregateAsync - (this IEnumerable source, - TState aggregate, - Func> func) - { - using IEnumerator e = source.GetEnumerator(); - if (!e.MoveNext()) - { - return aggregate; - } - - while (e.MoveNext()) aggregate = await func(aggregate, e.Current); - return aggregate; - } -} +namespace StreamWave.Extensions; + +public static class EventStreamExtensions +{ + public static async Task AggregateAsync + (this IEnumerable source, + TState aggregate, + Func> func) + { + using IEnumerator e = source.GetEnumerator(); + if (!e.MoveNext()) + { + return aggregate; + } + + while (e.MoveNext()) aggregate = await func(aggregate, e.Current); + return aggregate; + } +} diff --git a/src/StreamWave/ServiceCollectionExtensions.cs b/src/StreamWave/Extensions/ServiceCollectionExtensions.cs similarity index 95% rename from src/StreamWave/ServiceCollectionExtensions.cs rename to src/StreamWave/Extensions/ServiceCollectionExtensions.cs index 0061d10..9f3faad 100644 --- a/src/StreamWave/ServiceCollectionExtensions.cs +++ b/src/StreamWave/Extensions/ServiceCollectionExtensions.cs @@ -1,49 +1,49 @@ -using Microsoft.Extensions.DependencyInjection; - -namespace StreamWave; -public static class ServiceCollectionExtensions -{ - public static IAggregateBuilder AddAggregate(this IServiceCollection services, CreateStateDelegate initialState) - where TState : IAggregateState - { - var builder = new AggregateBuilder(initialState); - services.AddSingleton(builder); - services.AddScoped, AggregateManager>(); - return builder; - } -} - -public interface IAggregateState -{ - TId Id { get; set; } -} - -public interface IAggregateManager -{ - Task> Create(); - Task> LoadAsync(TId id); - Task SaveAsync(IAggregate aggregate); -} - -public class AggregateManager( - AggregateBuilder builder, IServiceProvider serviceProvider) : IAggregateManager - where TState : IAggregateState -{ - public Task> Create() - { - var aggregate = builder.Build(serviceProvider); - return Task.FromResult(aggregate); - } - - public async Task> LoadAsync(TId id) - { - var aggregate = builder.Build(serviceProvider); - await aggregate.LoadAsync(id); - return aggregate; - } - - public Task SaveAsync(IAggregate aggregate) - { - return aggregate.SaveAsync(); - } -} +using Microsoft.Extensions.DependencyInjection; + +namespace StreamWave.Extensions; +public static class ServiceCollectionExtensions +{ + public static IAggregateBuilder AddAggregate(this IServiceCollection services, CreateStateDelegate initialState) + where TState : IAggregateState + { + var builder = new AggregateBuilder(initialState); + services.AddSingleton(builder); + services.AddScoped, AggregateManager>(); + return builder; + } +} + +public interface IAggregateState +{ + TId Id { get; set; } +} + +public interface IAggregateManager +{ + Task> Create(); + Task> LoadAsync(TId id); + Task SaveAsync(IAggregate aggregate); +} + +public class AggregateManager( + AggregateBuilder builder, IServiceProvider serviceProvider) : IAggregateManager + where TState : IAggregateState +{ + public Task> Create() + { + var aggregate = builder.Build(serviceProvider); + return Task.FromResult(aggregate); + } + + public async Task> LoadAsync(TId id) + { + var aggregate = builder.Build(serviceProvider); + await aggregate.LoadAsync(id); + return aggregate; + } + + public Task SaveAsync(IAggregate aggregate) + { + return aggregate.SaveAsync(); + } +} diff --git a/src/StreamWave/IEventStream.cs b/src/StreamWave/IEventStream.cs new file mode 100644 index 0000000..157b476 --- /dev/null +++ b/src/StreamWave/IEventStream.cs @@ -0,0 +1,14 @@ +namespace StreamWave; + +public interface IEventStream : IReadOnlyCollection +{ + TId Id { get; } + bool HasUncommittedChanges { get; } + int Version { get; } + int ExpectedVersion { get; } + DateTimeOffset? CreatedOn { get; } + DateTimeOffset? LastModifiedOn { get; } + void Append(Event e); + Event[] GetUncommittedEvents(); + IEventStream Commit(); +} diff --git a/src/StreamWave/IStreamManager.cs b/src/StreamWave/IStreamManager.cs deleted file mode 100644 index 92a9402..0000000 --- a/src/StreamWave/IStreamManager.cs +++ /dev/null @@ -1,31 +0,0 @@ -using System.IO; - -namespace StreamWave; - -public interface IStreamManager -{ - Task?> LoadAsync(TId streamId); - Task> SaveAsync(IEventStream stream); -} - -public class StreamManager(TId streamId, Event[] events) : IStreamManager -{ - private readonly TId _streamId = streamId; - private readonly Event[] _events = events; - - public async Task?> LoadAsync(TId streamId) - { - if (streamId?.Equals(_streamId) == true) - return null; - - await Task.CompletedTask; - - return EventStream.Create(streamId, _events); - } - - public async Task> SaveAsync(IEventStream stream) - { - await Task.CompletedTask; - return EventStream.Create(stream.Id, stream.GetUncommittedEvents()); - } -} diff --git a/src/StreamWave/SystemTime.cs b/src/StreamWave/SystemTime.cs index 348547d..95e7939 100644 --- a/src/StreamWave/SystemTime.cs +++ b/src/StreamWave/SystemTime.cs @@ -1,16 +1,17 @@ -namespace StreamWave; +using System.Diagnostics.CodeAnalysis; + +namespace StreamWave; /// /// Used for getting DateTime.Now(), time is changeable for unit testing /// +[SuppressMessage("Major Code Smell", "S6354:Use a testable date/time provider", Justification = "this is the testable date/time provider!")] public static class SystemTime { /// Normally this is a pass-through to DateTime.Now, but it can be overridden with SetDateTime( .. ) for testing or debugging. /// -#pragma warning disable S6354 // Use a testable date/time provider public static Func Now { get; private set; } = () => DateTime.Now; -#pragma warning restore S6354 // Use a testable date/time provider - + /// Set time to return when SystemTime.Now() is called. /// public static void SetDateTime(DateTime dateTimeNow) @@ -22,8 +23,6 @@ public static void SetDateTime(DateTime dateTimeNow) /// public static void ResetDateTime() { -#pragma warning disable S6354 // Use a testable date/time provider Now = () => DateTime.Now; -#pragma warning restore S6354 // Use a testable date/time provider } } diff --git a/test/StreamWave.EntityFramework.Tests/UnitTest1.cs b/test/StreamWave.EntityFramework.Tests/UnitTest1.cs index 4475f6e..e9c7d00 100644 --- a/test/StreamWave.EntityFramework.Tests/UnitTest1.cs +++ b/test/StreamWave.EntityFramework.Tests/UnitTest1.cs @@ -1,5 +1,7 @@ using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; +using StreamWave.Extensions; +using System.Text.Json; namespace StreamWave.EntityFramework.Tests; @@ -10,19 +12,23 @@ public async Task register_storage_full_specs() { var services = new ServiceCollection(); services.AddDbContext(x => x.UseInMemoryDatabase("test"), contextLifetime: ServiceLifetime.Transient); + services.AddSingleton(JsonSerializerOptions.Default); + services.AddScoped(); services.AddAggregate(() => new()) .WithLoader((s) => (id) => { var context = s.GetRequiredService(); - return AggregateStore.LoadAsync(context, id); + var serializer = s.GetRequiredService(); + return new AggregateStore(context, serializer).LoadAsync(id); } ) .WithSaver((s) => - (a) => { + (a) => { var context = s.GetRequiredService(); - return AggregateStore.SaveAsync(context, a); + var serializer = s.GetRequiredService(); + return new AggregateStore(context, serializer).SaveAsync(a); }) .WithApplier((s, e) => { @@ -51,6 +57,9 @@ public async Task register_storage_with_extension() var services = new ServiceCollection(); services.AddDbContext(x => x.UseInMemoryDatabase("test"), contextLifetime: ServiceLifetime.Transient); + services.AddSingleton(JsonSerializerOptions.Default); + services.AddScoped(); + services.AddAggregate(() => new TestState() { Id = Guid.NewGuid() }) .WithEntityFramework() .WithApplier((s, e) => diff --git a/test/StreamWave.Tests/RegistrationTest.cs b/test/StreamWave.Tests/RegistrationTest.cs index 72f52d6..a1f00bc 100644 --- a/test/StreamWave.Tests/RegistrationTest.cs +++ b/test/StreamWave.Tests/RegistrationTest.cs @@ -1,4 +1,5 @@ using Microsoft.Extensions.DependencyInjection; +using StreamWave.Extensions; namespace StreamWave.Tests; diff --git a/test/StreamWave.Tests/TestState.cs b/test/StreamWave.Tests/TestState.cs index 2cee748..8de93f2 100644 --- a/test/StreamWave.Tests/TestState.cs +++ b/test/StreamWave.Tests/TestState.cs @@ -1,4 +1,6 @@ -namespace StreamWave.Tests; +using StreamWave.Extensions; + +namespace StreamWave.Tests; public class TestState : IAggregateState {