From 06809b3e5d8981baa6606ce20108391224358d69 Mon Sep 17 00:00:00 2001 From: jeremydmiller Date: Mon, 29 Mar 2021 11:35:19 -0500 Subject: [PATCH] new event projection testing support. Closes GH-1615 --- .../Events/event_projection_scenario_tests.cs | 249 +++++++++++ src/Marten/AdvancedOperations.cs | 17 + src/Marten/Events/Daemon/IProjectionDaemon.cs | 9 + src/Marten/Events/Daemon/ProjectionDaemon.cs | 37 ++ src/Marten/Events/IEventStore.cs | 41 +- .../Projections/ProjectionCollection.cs | 5 + .../ProjectionScenario.Assertions.cs | 171 ++++++++ .../ProjectionScenario.EventOperations.cs | 411 ++++++++++++++++++ .../Events/TestSupport/ProjectionScenario.cs | 122 ++++++ .../ProjectionScenarioException.cs | 18 + .../Events/TestSupport/ScenarioAction.cs | 26 ++ .../Events/TestSupport/ScenarioAssertion.cs | 20 + src/Marten/Events/TestSupport/ScenarioStep.cs | 11 + src/Marten/StoreOptions.cs | 3 + 14 files changed, 1121 insertions(+), 19 deletions(-) create mode 100644 src/Marten.Testing/Events/event_projection_scenario_tests.cs create mode 100644 src/Marten/Events/TestSupport/ProjectionScenario.Assertions.cs create mode 100644 src/Marten/Events/TestSupport/ProjectionScenario.EventOperations.cs create mode 100644 src/Marten/Events/TestSupport/ProjectionScenario.cs create mode 100644 src/Marten/Events/TestSupport/ProjectionScenarioException.cs create mode 100644 src/Marten/Events/TestSupport/ScenarioAction.cs create mode 100644 src/Marten/Events/TestSupport/ScenarioAssertion.cs create mode 100644 src/Marten/Events/TestSupport/ScenarioStep.cs diff --git a/src/Marten.Testing/Events/event_projection_scenario_tests.cs b/src/Marten.Testing/Events/event_projection_scenario_tests.cs new file mode 100644 index 0000000000..32234757a5 --- /dev/null +++ b/src/Marten.Testing/Events/event_projection_scenario_tests.cs @@ -0,0 +1,249 @@ +using System; +using System.Threading.Tasks; +using Marten.Events.Projections; +using Marten.Events.TestSupport; +using Marten.Testing.Documents; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace Marten.Testing.Events +{ + [Collection("projection_scenario")] + public class event_projection_scenario_tests : OneOffConfigurationsContext + { + public event_projection_scenario_tests() : base("projection_scenario") + { + } + + [Fact] + public async Task happy_path_test_with_inline_projection() + { + StoreOptions(opts => + { + opts.Events.Projections.Add(new UserProjection(), ProjectionLifecycle.Inline); + }); + + await theStore.Advanced.EventProjectionScenario(scenario => + { + var id1 = Guid.NewGuid(); + var id2 = Guid.NewGuid(); + var id3 = Guid.NewGuid(); + + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id1, UserName = "Kareem"}); + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id2, UserName = "Magic"}); + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id3, UserName = "James"}); + + scenario.DocumentShouldExist(id1); + scenario.DocumentShouldExist(id2); + scenario.DocumentShouldExist(id3, user => user.UserName.ShouldBe("James")); + + scenario.Append(Guid.NewGuid(), new DeleteUser {UserId = id2}); + + scenario.DocumentShouldExist(id1); + scenario.DocumentShouldNotExist(id2); + scenario.DocumentShouldExist(id3); + + }); + } + + [Fact] + public async Task sad_path_test_with_inline_projection() + { + StoreOptions(opts => + { + opts.Events.Projections.Add(new UserProjection(), ProjectionLifecycle.Inline); + }); + + await Exception.ShouldBeThrownByAsync(async () => + { + await theStore.Advanced.EventProjectionScenario(scenario => + { + var id1 = Guid.NewGuid(); + var id2 = Guid.NewGuid(); + var id3 = Guid.NewGuid(); + + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id1, UserName = "Kareem"}); + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id2, UserName = "Magic"}); + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id3, UserName = "James"}); + + scenario.DocumentShouldExist(id1); + scenario.DocumentShouldExist(id2); + scenario.DocumentShouldExist(id3, user => user.UserName.ShouldBe("James")); + + scenario.Append(Guid.NewGuid(), new DeleteUser {UserId = id2}); + + // This should have been deleted + scenario.DocumentShouldExist(id2); + + }); + }); + + + } + + [Fact] + public async Task sad_path_test_with_inline_projection_with_document_assertion() + { + StoreOptions(opts => + { + opts.Events.Projections.Add(new UserProjection(), ProjectionLifecycle.Inline); + }); + + await Exception.ShouldBeThrownByAsync(async () => + { + await theStore.Advanced.EventProjectionScenario(scenario => + { + var id1 = Guid.NewGuid(); + var id2 = Guid.NewGuid(); + var id3 = Guid.NewGuid(); + + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id1, UserName = "Kareem"}); + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id2, UserName = "Magic"}); + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id3, UserName = "James"}); + + scenario.DocumentShouldExist(id1, u => u.FirstName.ShouldBe("WRONG")); + scenario.DocumentShouldExist(id2); + scenario.DocumentShouldExist(id3, user => user.UserName.ShouldBe("James")); + + + }); + }); + + + } + + + + + + + [Fact] + public async Task happy_path_test_with_inline_projection_async() + { + StoreOptions(opts => + { + opts.Events.Projections.Add(new UserProjection(), ProjectionLifecycle.Async); + }); + + await theStore.Advanced.EventProjectionScenario(scenario => + { + var id1 = Guid.NewGuid(); + var id2 = Guid.NewGuid(); + var id3 = Guid.NewGuid(); + + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id1, UserName = "Kareem"}); + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id2, UserName = "Magic"}); + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id3, UserName = "James"}); + + scenario.DocumentShouldExist(id1); + scenario.DocumentShouldExist(id2); + scenario.DocumentShouldExist(id3, user => user.UserName.ShouldBe("James")); + + scenario.Append(Guid.NewGuid(), new DeleteUser {UserId = id2}); + + scenario.DocumentShouldExist(id1); + scenario.DocumentShouldNotExist(id2); + scenario.DocumentShouldExist(id3); + + }); + } + + [Fact] + public async Task sad_path_test_with_inline_projection_async() + { + StoreOptions(opts => + { + opts.Events.Projections.Add(new UserProjection(), ProjectionLifecycle.Async); + }); + + await Exception.ShouldBeThrownByAsync(async () => + { + await theStore.Advanced.EventProjectionScenario(scenario => + { + var id1 = Guid.NewGuid(); + var id2 = Guid.NewGuid(); + var id3 = Guid.NewGuid(); + + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id1, UserName = "Kareem"}); + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id2, UserName = "Magic"}); + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id3, UserName = "James"}); + + scenario.DocumentShouldExist(id1); + scenario.DocumentShouldExist(id2); + scenario.DocumentShouldExist(id3, user => user.UserName.ShouldBe("James")); + + scenario.Append(Guid.NewGuid(), new DeleteUser {UserId = id2}); + + // This should have been deleted + scenario.DocumentShouldExist(id2); + + }); + }); + + + } + + [Fact] + public async Task sad_path_test_with_inline_projection_with_document_assertion_async() + { + StoreOptions(opts => + { + opts.Events.Projections.Add(new UserProjection(), ProjectionLifecycle.Async); + }); + + await Exception.ShouldBeThrownByAsync(async () => + { + await theStore.Advanced.EventProjectionScenario(scenario => + { + var id1 = Guid.NewGuid(); + var id2 = Guid.NewGuid(); + var id3 = Guid.NewGuid(); + + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id1, UserName = "Kareem"}); + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id2, UserName = "Magic"}); + scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id3, UserName = "James"}); + + scenario.DocumentShouldExist(id1, u => u.FirstName.ShouldBe("WRONG")); + scenario.DocumentShouldExist(id2); + scenario.DocumentShouldExist(id3, user => user.UserName.ShouldBe("James")); + + + }); + }); + + + } + + + + + } + + public class CreateUser + { + public Guid UserId { get; set; } + public string UserName { get; set; } + } + + public class DeleteUser + { + public Guid UserId { get; set; } + } + + public class UserProjection: EventProjection + { + public User Create(CreateUser create) + { + return new User + { + Id = create.UserId, UserName = create.UserName + }; + } + + public void Project(DeleteUser deletion, IDocumentOperations operations) + { + operations.Delete(deletion.UserId); + } + } +} diff --git a/src/Marten/AdvancedOperations.cs b/src/Marten/AdvancedOperations.cs index d06d5d4fb5..3d976d7a01 100644 --- a/src/Marten/AdvancedOperations.cs +++ b/src/Marten/AdvancedOperations.cs @@ -6,6 +6,7 @@ using Marten.Events; using Marten.Events.Daemon; using Marten.Events.Daemon.Progress; +using Marten.Events.TestSupport; using Marten.Internal; using Marten.Internal.Sessions; using Marten.Linq.QueryHandlers; @@ -180,5 +181,21 @@ public IDocumentSourceCode Load(IProviderGraph providers) return providers.StorageFor(); } } + + + /// + /// Marten's built in test support for event projections. Only use this in testing as + /// it will delete existing event and projected aggregate data + /// + /// + /// + public Task EventProjectionScenario(Action configuration) + { + var scenario = new ProjectionScenario(_store); + configuration(scenario); + + return scenario.Execute(); + } + } } diff --git a/src/Marten/Events/Daemon/IProjectionDaemon.cs b/src/Marten/Events/Daemon/IProjectionDaemon.cs index 70db99435e..8b5a426e40 100644 --- a/src/Marten/Events/Daemon/IProjectionDaemon.cs +++ b/src/Marten/Events/Daemon/IProjectionDaemon.cs @@ -74,5 +74,14 @@ public interface IProjectionDaemon : IDisposable /// /// Task StartDaemon(); + + + /// + /// Use with caution! This will try to wait for all projections to "catch up" to the currently + /// known farthest known sequence of the event store + /// + /// + /// + Task WaitForNonStaleData(TimeSpan timeout); } } diff --git a/src/Marten/Events/Daemon/ProjectionDaemon.cs b/src/Marten/Events/Daemon/ProjectionDaemon.cs index a962daac96..ee31113434 100644 --- a/src/Marten/Events/Daemon/ProjectionDaemon.cs +++ b/src/Marten/Events/Daemon/ProjectionDaemon.cs @@ -51,6 +51,43 @@ public async Task StartDaemon() _hasStarted = true; } + public Task WaitForNonStaleData(TimeSpan timeout) + { + var completion = new TaskCompletionSource(); + var timeoutCancellation = new CancellationTokenSource(timeout); + + + Task.Run(async () => + { + var statistics = await _store.Advanced.FetchEventStoreStatistics(timeoutCancellation.Token); + timeoutCancellation.Token.Register(() => + { + completion.TrySetException(new TimeoutException( + $"The active projection shards did not reach sequence {statistics.EventSequenceNumber} in time")); + }); + + if (CurrentShards().All(x => x.Position >= statistics.EventSequenceNumber)) + { + completion.SetResult(true); + return; + } + + while (!timeoutCancellation.IsCancellationRequested) + { + await Task.Delay(100.Milliseconds(), timeoutCancellation.Token); + + if (CurrentShards().All(x => x.Position >= statistics.EventSequenceNumber)) + { + completion.SetResult(true); + return; + } + } + }, timeoutCancellation.Token); + + return completion.Task; + + } + public async Task StartAll() { if (!_hasStarted) await StartDaemon(); diff --git a/src/Marten/Events/IEventStore.cs b/src/Marten/Events/IEventStore.cs index dbb2a61ff2..21f14a8a58 100644 --- a/src/Marten/Events/IEventStore.cs +++ b/src/Marten/Events/IEventStore.cs @@ -7,7 +7,7 @@ namespace Marten.Events { - public interface IEventStore + public interface IEventOperations { /// /// Append one or more events in order to an existing stream @@ -37,15 +37,6 @@ public interface IEventStore /// StreamAction Append(string stream, params object[] events); - /// - /// Append one or more events in order to an existing stream and verify that maximum event id for the stream - /// matches supplied expected version or transaction is aborted. - /// - /// - /// Expected maximum event version after append - /// - StreamAction Append(Guid stream, long expectedVersion, IEnumerable events); - /// /// Append one or more events in order to an existing stream and verify that maximum event id for the stream /// matches supplied expected version or transaction is aborted. @@ -73,15 +64,6 @@ public interface IEventStore /// StreamAction Append(string stream, long expectedVersion, params object[] events); - /// - /// Creates a new event stream based on a user-supplied Guid and appends the events in order to the new stream - /// - /// - /// - /// - /// - StreamAction StartStream(Guid id, IEnumerable events) where TAggregate : class; - /// /// Creates a new event stream based on a user-supplied Guid and appends the events in order to the new stream /// @@ -236,6 +218,27 @@ public interface IEventStore /// /// StreamAction StartStream(params object[] events); + } + + public interface IEventStore: IEventOperations + { + /// + /// Append one or more events in order to an existing stream and verify that maximum event id for the stream + /// matches supplied expected version or transaction is aborted. + /// + /// + /// Expected maximum event version after append + /// + StreamAction Append(Guid stream, long expectedVersion, IEnumerable events); + + /// + /// Creates a new event stream based on a user-supplied Guid and appends the events in order to the new stream + /// + /// + /// + /// + /// + StreamAction StartStream(Guid id, IEnumerable events) where TAggregate : class; /// /// Synchronously fetches all of the events for the named stream diff --git a/src/Marten/Events/Projections/ProjectionCollection.cs b/src/Marten/Events/Projections/ProjectionCollection.cs index 3d70a58e12..8a37618350 100644 --- a/src/Marten/Events/Projections/ProjectionCollection.cs +++ b/src/Marten/Events/Projections/ProjectionCollection.cs @@ -32,6 +32,11 @@ internal IList BuildAllShards(DocumentStore store) return Projections.SelectMany(x => x.AsyncProjectionShards(store)).ToList(); } + internal bool HasAnyAsyncProjections() + { + return Projections.Any(x => x.Lifecycle == ProjectionLifecycle.Async); + } + internal IEnumerable AllAggregateTypes() { foreach (var kv in _liveAggregators.Enumerate()) diff --git a/src/Marten/Events/TestSupport/ProjectionScenario.Assertions.cs b/src/Marten/Events/TestSupport/ProjectionScenario.Assertions.cs new file mode 100644 index 0000000000..f25023608e --- /dev/null +++ b/src/Marten/Events/TestSupport/ProjectionScenario.Assertions.cs @@ -0,0 +1,171 @@ +using System; +using System.Threading.Tasks; +using LamarCodeGeneration; + +namespace Marten.Events.TestSupport +{ + public partial class ProjectionScenario + { + /// + /// General hook to run + /// + /// + /// + public void AssertAgainstProjectedData(string description, Func assertions) + { + assertion(assertions).Description = description; + } + + /// + /// Verify that a document with the supplied id exists + /// + /// + /// Optional lambda to make additional assertions about the document state + /// The document type + public void DocumentShouldExist(string id, Action assertions = null) + { + assertion(async session => + { + var document = await session.LoadAsync(id); + if (document == null) + { + throw new Exception($"Document {typeof(T).FullNameInCode()} with id '{id}' does not exist"); + } + + assertions?.Invoke(document); + }).Description = $"Document {typeof(T).FullNameInCode()} with id '{id}' should exist"; + } + + /// + /// Verify that a document with the supplied id exists + /// + /// + /// Optional lambda to make additional assertions about the document state + /// The document type + public void DocumentShouldExist(long id, Action assertions = null) + { + assertion(async session => + { + var document = await session.LoadAsync(id); + if (document == null) + { + throw new Exception($"Document {typeof(T).FullNameInCode()} with id '{id}' does not exist"); + } + + assertions?.Invoke(document); + }).Description = $"Document {typeof(T).FullNameInCode()} with id '{id}' should exist"; + } + + /// + /// Verify that a document with the supplied id exists + /// + /// + /// Optional lambda to make additional assertions about the document state + /// The document type + public void DocumentShouldExist(int id, Action assertions = null) + { + assertion(async session => + { + var document = await session.LoadAsync(id); + if (document == null) + { + throw new Exception($"Document {typeof(T).FullNameInCode()} with id '{id}' does not exist"); + } + + assertions?.Invoke(document); + }).Description = $"Document {typeof(T).FullNameInCode()} with id '{id}' should exist"; + } + + /// + /// Verify that a document with the supplied id exists + /// + /// + /// Optional lambda to make additional assertions about the document state + /// The document type + public void DocumentShouldExist(Guid id, Action assertions = null) + { + assertion(async session => + { + var document = await session.LoadAsync(id); + if (document == null) + { + throw new Exception($"Document {typeof(T).FullNameInCode()} with id '{id}' does not exist"); + } + + assertions?.Invoke(document); + }).Description = $"Document {typeof(T).FullNameInCode()} with id '{id}' should exist"; + } + + /// + /// Asserts that a document with a given id has been deleted or does not exist + /// + /// The identity of the document + /// The document type + public void DocumentShouldNotExist(string id) + { + assertion(async session => + { + var document = await session.LoadAsync(id); + if (document != null) + { + throw new Exception( + $"Document {typeof(T).FullNameInCode()} with id '{id}' exists, but should not."); + } + }).Description = $"Document {typeof(T).FullNameInCode()} with id '{id}' should not exist or be deleted"; + } + + /// + /// Asserts that a document with a given id has been deleted or does not exist + /// + /// The identity of the document + /// The document type + public void DocumentShouldNotExist(long id) + { + assertion(async session => + { + var document = await session.LoadAsync(id); + if (document != null) + { + throw new Exception( + $"Document {typeof(T).FullNameInCode()} with id '{id}' exists, but should not."); + } + }).Description = $"Document {typeof(T).FullNameInCode()} with id '{id}' should not exist or be deleted"; + } + + /// + /// Asserts that a document with a given id has been deleted or does not exist + /// + /// The identity of the document + /// The document type + public void DocumentShouldNotExist(int id) + { + assertion(async session => + { + var document = await session.LoadAsync(id); + if (document != null) + { + throw new Exception( + $"Document {typeof(T).FullNameInCode()} with id '{id}' exists, but should not."); + } + }).Description = $"Document {typeof(T).FullNameInCode()} with id '{id}' should not exist or be deleted"; + } + + /// + /// Asserts that a document with a given id has been deleted or does not exist + /// + /// The identity of the document + /// The document type + public void DocumentShouldNotExist(Guid id) + { + assertion(async session => + { + var document = await session.LoadAsync(id); + if (document != null) + { + throw new Exception( + $"Document {typeof(T).FullNameInCode()} with id '{id}' exists, but should not."); + } + }).Description = $"Document {typeof(T).FullNameInCode()} with id '{id}' should not exist or be deleted"; + } + } +} diff --git a/src/Marten/Events/TestSupport/ProjectionScenario.EventOperations.cs b/src/Marten/Events/TestSupport/ProjectionScenario.EventOperations.cs new file mode 100644 index 0000000000..376bf8f537 --- /dev/null +++ b/src/Marten/Events/TestSupport/ProjectionScenario.EventOperations.cs @@ -0,0 +1,411 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Baseline; +using LamarCodeGeneration; + +namespace Marten.Events.TestSupport +{ + public partial class ProjectionScenario + { + /// + /// Make any number of append event operations in the scenario sequence + /// + /// Descriptive explanation of the action in case of failures + /// + public void AppendEvents(string description, Action appendAction) + { + action(appendAction).Description = description; + } + + /// + /// Make any number of append event operations in the scenario sequence + /// + /// + public void AppendEvents(Action appendAction) + { + AppendEvents("Appending events...", appendAction); + } + + public StreamAction Append(Guid stream, IEnumerable events) + { + var step = action(e => e.Append(stream, events)); + if (events.Count() > 3) + { + step.Description = $"Append({stream}, events)"; + } + else + { + step.Description = $"Append({stream}, {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Append(_store.Events, stream, events); + } + + public StreamAction Append(Guid stream, params object[] events) + { + var step = action(e => e.Append(stream, events)); + if (events.Count() > 3) + { + step.Description = $"Append({stream}, events)"; + } + else + { + step.Description = $"Append({stream}, {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Append(_store.Events, stream, events); + } + + public StreamAction Append(string stream, IEnumerable events) + { + var step = action(e => e.Append(stream, events)); + if (events.Count() > 3) + { + step.Description = $"Append('{stream}', events)"; + } + else + { + step.Description = $"Append('{stream}', {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Append(_store.Events, stream, events); + } + + public StreamAction Append(string stream, params object[] events) + { + var step = action(e => e.Append(stream, events)); + if (events.Count() > 3) + { + step.Description = $"Append('{stream}', events)"; + } + else + { + step.Description = $"Append('{stream}', {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Append(_store.Events, stream, events); + } + + public StreamAction Append(Guid stream, long expectedVersion, params object[] events) + { + var step = action(e => e.Append(stream, expectedVersion, events)); + if (events.Count() > 3) + { + step.Description = $"Append({stream}, {expectedVersion}, events)"; + } + else + { + step.Description = + $"Append({stream}, {expectedVersion}, {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Append(_store.Events, stream, events); + } + + public StreamAction Append(string stream, long expectedVersion, IEnumerable events) + { + var step = action(e => e.Append(stream, expectedVersion, events)); + if (events.Count() > 3) + { + step.Description = $"Append(\"{stream}\", {expectedVersion}, events)"; + } + else + { + step.Description = + $"Append(\"{stream}\", {expectedVersion}, {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Append(_store.Events, stream, events); + } + + public StreamAction Append(string stream, long expectedVersion, params object[] events) + { + var step = action(e => e.Append(stream, expectedVersion, events)); + if (events.Count() > 3) + { + step.Description = $"Append(\"{stream}\", {expectedVersion}, events)"; + } + else + { + step.Description = + $"Append(\"{stream}\", {expectedVersion}, {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Append(_store.Events, stream, events); + } + + public StreamAction StartStream(Guid id, params object[] events) where TAggregate : class + { + var step = action(e => e.StartStream(id, events)); + if (events.Count() > 3) + { + step.Description = $"StartStream<{typeof(TAggregate).FullNameInCode()}>({id}, events)"; + } + else + { + step.Description = + $"StartStream<{typeof(TAggregate).FullNameInCode()}>({id}, {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, id, events); + } + + public StreamAction StartStream(Type aggregateType, Guid id, IEnumerable events) + { + var step = action(e => e.StartStream(aggregateType, id, events)); + if (events.Count() > 3) + { + step.Description = $"StartStream({aggregateType.FullNameInCode()}>({id}, events)"; + } + else + { + step.Description = + $"StartStream({aggregateType.FullNameInCode()}, {id}, {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, id, events); + } + + public StreamAction StartStream(Type aggregateType, Guid id, params object[] events) + { + var step = action(e => e.StartStream(aggregateType, id, events)); + if (events.Count() > 3) + { + step.Description = $"StartStream({aggregateType.FullNameInCode()}>({id}, events)"; + } + else + { + step.Description = + $"StartStream({aggregateType.FullNameInCode()}, {id}, {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, id, events); + } + + public StreamAction StartStream(string streamKey, IEnumerable events) + where TAggregate : class + { + var step = action(e => e.StartStream(streamKey, events)); + if (events.Count() > 3) + { + step.Description = $"StartStream<{typeof(TAggregate).FullNameInCode()}>(\"{streamKey}\", events)"; + } + else + { + step.Description = + $"StartStream<{typeof(TAggregate).FullNameInCode()}>(\"{streamKey}\", {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, streamKey, events); + } + + public StreamAction StartStream(string streamKey, params object[] events) where TAggregate : class + { + var step = action(e => e.StartStream(streamKey, events)); + if (events.Count() > 3) + { + step.Description = $"StartStream<{typeof(TAggregate).FullNameInCode()}>(\"{streamKey}\", events)"; + } + else + { + step.Description = + $"StartStream<{typeof(TAggregate).FullNameInCode()}>(\"{streamKey}\", {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, streamKey, events); + } + + public StreamAction StartStream(Type aggregateType, string streamKey, IEnumerable events) + { + var step = action(e => e.StartStream(aggregateType, streamKey, events)); + if (events.Count() > 3) + { + step.Description = $"StartStream({aggregateType.FullNameInCode()}>(\"{streamKey}\", events)"; + } + else + { + step.Description = + $"StartStream({aggregateType.FullNameInCode()}, \"{streamKey}\", {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, streamKey, events); + } + + public StreamAction StartStream(Type aggregateType, string streamKey, params object[] events) + { + var step = action(e => e.StartStream(aggregateType, streamKey, events)); + if (events.Count() > 3) + { + step.Description = $"StartStream({aggregateType.FullNameInCode()}>(\"{streamKey}\", events)"; + } + else + { + step.Description = + $"StartStream({aggregateType.FullNameInCode()}, \"{streamKey}\", {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, streamKey, events); + } + + public StreamAction StartStream(Guid id, IEnumerable events) + { + var step = action(e => e.StartStream(id, events)); + if (events.Count() > 3) + { + step.Description = $"StartStream({id}, events)"; + } + else + { + step.Description = $"StartStream({id}, {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, id, events); + } + + public StreamAction StartStream(Guid id, params object[] events) + { + var step = action(e => e.StartStream(id, events)); + if (events.Count() > 3) + { + step.Description = $"StartStream({id}, events)"; + } + else + { + step.Description = $"StartStream({id}, {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, id, events); + } + + public StreamAction StartStream(string streamKey, IEnumerable events) + { + var step = action(e => e.StartStream(streamKey, events)); + if (events.Count() > 3) + { + step.Description = $"StartStream(\"{streamKey}\", events)"; + } + else + { + step.Description = $"StartStream(\"{streamKey}\", {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, streamKey, events); + } + + public StreamAction StartStream(string streamKey, params object[] events) + { + var step = action(e => e.StartStream(streamKey, events)); + if (events.Count() > 3) + { + step.Description = $"StartStream(\"{streamKey}\", events)"; + } + else + { + step.Description = $"StartStream(\"{streamKey}\", {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, streamKey, events); + } + + public StreamAction StartStream(IEnumerable events) where TAggregate : class + { + var streamId = Guid.NewGuid(); + var step = action(e => e.StartStream(streamId, events)); + if (events.Count() > 3) + { + step.Description = $"StartStream<{typeof(TAggregate).FullNameInCode()}>(events)"; + } + else + { + step.Description = + $"StartStream<{typeof(TAggregate).FullNameInCode()}>({events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, streamId, events); + } + + public StreamAction StartStream(params object[] events) where TAggregate : class + { + var streamId = Guid.NewGuid(); + var step = action(e => e.StartStream(streamId, events)); + if (events.Count() > 3) + { + step.Description = $"StartStream<{typeof(TAggregate).FullNameInCode()}>(events)"; + } + else + { + step.Description = + $"StartStream<{typeof(TAggregate).FullNameInCode()}>({events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, streamId, events); + } + + public StreamAction StartStream(Type aggregateType, IEnumerable events) + { + var streamId = Guid.NewGuid(); + var step = action(e => e.StartStream(aggregateType, streamId, events)); + if (events.Count() > 3) + { + step.Description = $"StartStream({aggregateType.FullNameInCode()}>(events)"; + } + else + { + step.Description = + $"StartStream({aggregateType.FullNameInCode()}, {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, streamId, events); + } + + public StreamAction StartStream(Type aggregateType, params object[] events) + { + var streamId = Guid.NewGuid(); + var step = action(e => e.StartStream(aggregateType, streamId, events)); + if (events.Count() > 3) + { + step.Description = $"StartStream({aggregateType.FullNameInCode()}>(events)"; + } + else + { + step.Description = + $"StartStream({aggregateType.FullNameInCode()}, {events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, streamId, events); + } + + public StreamAction StartStream(IEnumerable events) + { + var streamId = Guid.NewGuid(); + var step = action(e => e.StartStream(streamId, events)); + if (events.Count() > 3) + { + step.Description = "StartStream(events)"; + } + else + { + step.Description = $"StartStream({events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, streamId, events); + } + + public StreamAction StartStream(params object[] events) + { + var streamId = Guid.NewGuid(); + var step = action(e => e.StartStream(streamId, events)); + if (events.Count() > 3) + { + step.Description = "StartStream(events)"; + } + else + { + step.Description = $"StartStream({events.Select(x => x.ToString()).Join(", ")})"; + } + + return StreamAction.Start(_store.Events, streamId, events); + } + } +} diff --git a/src/Marten/Events/TestSupport/ProjectionScenario.cs b/src/Marten/Events/TestSupport/ProjectionScenario.cs new file mode 100644 index 0000000000..21c5b73cbe --- /dev/null +++ b/src/Marten/Events/TestSupport/ProjectionScenario.cs @@ -0,0 +1,122 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Baseline; +using Baseline.Dates; +using Marten.Events.Daemon; + +namespace Marten.Events.TestSupport +{ + public partial class ProjectionScenario: IEventOperations + { + private readonly Queue _steps = new(); + private readonly DocumentStore _store; + + + public ProjectionScenario(DocumentStore store) + { + _store = store; + } + + internal IProjectionDaemon Daemon { get; private set; } + + internal ScenarioStep NextStep => _steps.Any() ? _steps.Peek() : null; + + internal IDocumentSession Session { get; private set; } + + /// + /// Disable the scenario from "cleaning" out any existing + /// event and projected document data before running the scenario + /// + public bool DoNotDeleteExistingData { get; set; } + + + + internal Task WaitForNonStaleData() + { + if (Daemon == null) + { + return Task.CompletedTask; + } + + return Daemon.WaitForNonStaleData(30.Seconds()); + } + + + private ScenarioStep action(Action action) + { + var step = new ScenarioAction(action); + _steps.Enqueue(step); + + return step; + } + + private ScenarioStep assertion(Func check) + { + var step = new ScenarioAssertion(check); + _steps.Enqueue(step); + + return step; + } + + internal async Task Execute() + { + if (!DoNotDeleteExistingData) + { + _store.Advanced.Clean.DeleteAllEventData(); + foreach (var storageType in + _store.Events.Projections.Projections.SelectMany(x => x.Options.StorageTypes)) + _store.Advanced.Clean.DeleteDocumentsFor(storageType); + } + + if (_store.Events.Projections.HasAnyAsyncProjections()) + { + Daemon = _store.BuildProjectionDaemon(); + await Daemon.StartAll(); + } + + Session = _store.LightweightSession(); + + try + { + var exceptions = new List(); + var number = 0; + var descriptions = new List(); + + while (_steps.Any()) + { + number++; + var step = _steps.Dequeue(); + + try + { + await step.Execute(this); + descriptions.Add($"{number.ToString().PadLeft(3)}. {step.Description}"); + } + catch (Exception e) + { + descriptions.Add($"FAILED: {number.ToString().PadLeft(3)}. {step.Description}"); + descriptions.Add(e.ToString()); + exceptions.Add(e); + } + } + + if (exceptions.Any()) + { + throw new ProjectionScenarioException(descriptions, exceptions); + } + } + finally + { + if (Daemon != null) + { + await Daemon.StopAll(); + Daemon.SafeDispose(); + } + + Session?.SafeDispose(); + } + } + } +} diff --git a/src/Marten/Events/TestSupport/ProjectionScenarioException.cs b/src/Marten/Events/TestSupport/ProjectionScenarioException.cs new file mode 100644 index 0000000000..a2b85f32cd --- /dev/null +++ b/src/Marten/Events/TestSupport/ProjectionScenarioException.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using Baseline; + +namespace Marten.Events.TestSupport +{ + /// + /// Thrown when a ProjectionScenario fails + /// + public class ProjectionScenarioException: AggregateException + { + public ProjectionScenarioException(List descriptions, List exceptions): base( + $"Event Projection Scenario Failure{Environment.NewLine}{descriptions.Join(Environment.NewLine)}", + exceptions) + { + } + } +} diff --git a/src/Marten/Events/TestSupport/ScenarioAction.cs b/src/Marten/Events/TestSupport/ScenarioAction.cs new file mode 100644 index 0000000000..4a6974667c --- /dev/null +++ b/src/Marten/Events/TestSupport/ScenarioAction.cs @@ -0,0 +1,26 @@ +using System; +using System.Threading.Tasks; + +namespace Marten.Events.TestSupport +{ + internal class ScenarioAction: ScenarioStep + { + private readonly Action _action; + + public ScenarioAction(Action action) + { + _action = action; + } + + public override async Task Execute(ProjectionScenario scenario) + { + _action(scenario.Session.Events); + + if (scenario.NextStep is ScenarioAssertion) + { + await scenario.Session.SaveChangesAsync(); + await scenario.WaitForNonStaleData(); + } + } + } +} \ No newline at end of file diff --git a/src/Marten/Events/TestSupport/ScenarioAssertion.cs b/src/Marten/Events/TestSupport/ScenarioAssertion.cs new file mode 100644 index 0000000000..8a952240b7 --- /dev/null +++ b/src/Marten/Events/TestSupport/ScenarioAssertion.cs @@ -0,0 +1,20 @@ +using System; +using System.Threading.Tasks; + +namespace Marten.Events.TestSupport +{ + internal class ScenarioAssertion: ScenarioStep + { + private readonly Func _check; + + public ScenarioAssertion(Func check) + { + _check = check; + } + + public override Task Execute(ProjectionScenario scenario) + { + return _check(scenario.Session); + } + } +} \ No newline at end of file diff --git a/src/Marten/Events/TestSupport/ScenarioStep.cs b/src/Marten/Events/TestSupport/ScenarioStep.cs new file mode 100644 index 0000000000..b15742b48d --- /dev/null +++ b/src/Marten/Events/TestSupport/ScenarioStep.cs @@ -0,0 +1,11 @@ +using System.Threading.Tasks; + +namespace Marten.Events.TestSupport +{ + internal abstract class ScenarioStep + { + public string Description { get; set; } + + public abstract Task Execute(ProjectionScenario scenario); + } +} \ No newline at end of file diff --git a/src/Marten/StoreOptions.cs b/src/Marten/StoreOptions.cs index 4036ef3735..de5a353371 100644 --- a/src/Marten/StoreOptions.cs +++ b/src/Marten/StoreOptions.cs @@ -6,6 +6,7 @@ using Baseline; using Marten.Events; using Marten.Events.Daemon; +using Marten.Events.TestSupport; using Marten.Exceptions; using Marten.Internal; using Marten.Internal.CompiledQueries; @@ -562,5 +563,7 @@ public EnumStorage DuplicatedFieldEnumStorage /// public DdlRules DdlRules { get; } = new DdlRules(); + } + }