Skip to content

Commit

Permalink
new event projection testing support. Closes GH-1615
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Mar 29, 2021
1 parent 26e9dfa commit 06809b3
Show file tree
Hide file tree
Showing 14 changed files with 1,121 additions and 19 deletions.
249 changes: 249 additions & 0 deletions src/Marten.Testing/Events/event_projection_scenario_tests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
using System;
using System.Threading.Tasks;
using Marten.Events.Projections;
using Marten.Events.TestSupport;
using Marten.Testing.Documents;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace Marten.Testing.Events
{
[Collection("projection_scenario")]
public class event_projection_scenario_tests : OneOffConfigurationsContext
{
public event_projection_scenario_tests() : base("projection_scenario")
{
}

[Fact]
public async Task happy_path_test_with_inline_projection()
{
StoreOptions(opts =>
{
opts.Events.Projections.Add(new UserProjection(), ProjectionLifecycle.Inline);
});

await theStore.Advanced.EventProjectionScenario(scenario =>
{
var id1 = Guid.NewGuid();
var id2 = Guid.NewGuid();
var id3 = Guid.NewGuid();
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id1, UserName = "Kareem"});
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id2, UserName = "Magic"});
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id3, UserName = "James"});
scenario.DocumentShouldExist<User>(id1);
scenario.DocumentShouldExist<User>(id2);
scenario.DocumentShouldExist<User>(id3, user => user.UserName.ShouldBe("James"));
scenario.Append(Guid.NewGuid(), new DeleteUser {UserId = id2});
scenario.DocumentShouldExist<User>(id1);
scenario.DocumentShouldNotExist<User>(id2);
scenario.DocumentShouldExist<User>(id3);
});
}

[Fact]
public async Task sad_path_test_with_inline_projection()
{
StoreOptions(opts =>
{
opts.Events.Projections.Add(new UserProjection(), ProjectionLifecycle.Inline);
});

await Exception<ProjectionScenarioException>.ShouldBeThrownByAsync(async () =>
{
await theStore.Advanced.EventProjectionScenario(scenario =>
{
var id1 = Guid.NewGuid();
var id2 = Guid.NewGuid();
var id3 = Guid.NewGuid();
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id1, UserName = "Kareem"});
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id2, UserName = "Magic"});
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id3, UserName = "James"});
scenario.DocumentShouldExist<User>(id1);
scenario.DocumentShouldExist<User>(id2);
scenario.DocumentShouldExist<User>(id3, user => user.UserName.ShouldBe("James"));
scenario.Append(Guid.NewGuid(), new DeleteUser {UserId = id2});
// This should have been deleted
scenario.DocumentShouldExist<User>(id2);
});
});


}

[Fact]
public async Task sad_path_test_with_inline_projection_with_document_assertion()
{
StoreOptions(opts =>
{
opts.Events.Projections.Add(new UserProjection(), ProjectionLifecycle.Inline);
});

await Exception<ProjectionScenarioException>.ShouldBeThrownByAsync(async () =>
{
await theStore.Advanced.EventProjectionScenario(scenario =>
{
var id1 = Guid.NewGuid();
var id2 = Guid.NewGuid();
var id3 = Guid.NewGuid();
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id1, UserName = "Kareem"});
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id2, UserName = "Magic"});
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id3, UserName = "James"});
scenario.DocumentShouldExist<User>(id1, u => u.FirstName.ShouldBe("WRONG"));
scenario.DocumentShouldExist<User>(id2);
scenario.DocumentShouldExist<User>(id3, user => user.UserName.ShouldBe("James"));
});
});


}






[Fact]
public async Task happy_path_test_with_inline_projection_async()
{
StoreOptions(opts =>
{
opts.Events.Projections.Add(new UserProjection(), ProjectionLifecycle.Async);
});

await theStore.Advanced.EventProjectionScenario(scenario =>
{
var id1 = Guid.NewGuid();
var id2 = Guid.NewGuid();
var id3 = Guid.NewGuid();
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id1, UserName = "Kareem"});
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id2, UserName = "Magic"});
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id3, UserName = "James"});
scenario.DocumentShouldExist<User>(id1);
scenario.DocumentShouldExist<User>(id2);
scenario.DocumentShouldExist<User>(id3, user => user.UserName.ShouldBe("James"));
scenario.Append(Guid.NewGuid(), new DeleteUser {UserId = id2});
scenario.DocumentShouldExist<User>(id1);
scenario.DocumentShouldNotExist<User>(id2);
scenario.DocumentShouldExist<User>(id3);
});
}

[Fact]
public async Task sad_path_test_with_inline_projection_async()
{
StoreOptions(opts =>
{
opts.Events.Projections.Add(new UserProjection(), ProjectionLifecycle.Async);
});

await Exception<ProjectionScenarioException>.ShouldBeThrownByAsync(async () =>
{
await theStore.Advanced.EventProjectionScenario(scenario =>
{
var id1 = Guid.NewGuid();
var id2 = Guid.NewGuid();
var id3 = Guid.NewGuid();
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id1, UserName = "Kareem"});
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id2, UserName = "Magic"});
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id3, UserName = "James"});
scenario.DocumentShouldExist<User>(id1);
scenario.DocumentShouldExist<User>(id2);
scenario.DocumentShouldExist<User>(id3, user => user.UserName.ShouldBe("James"));
scenario.Append(Guid.NewGuid(), new DeleteUser {UserId = id2});
// This should have been deleted
scenario.DocumentShouldExist<User>(id2);
});
});


}

[Fact]
public async Task sad_path_test_with_inline_projection_with_document_assertion_async()
{
StoreOptions(opts =>
{
opts.Events.Projections.Add(new UserProjection(), ProjectionLifecycle.Async);
});

await Exception<ProjectionScenarioException>.ShouldBeThrownByAsync(async () =>
{
await theStore.Advanced.EventProjectionScenario(scenario =>
{
var id1 = Guid.NewGuid();
var id2 = Guid.NewGuid();
var id3 = Guid.NewGuid();
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id1, UserName = "Kareem"});
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id2, UserName = "Magic"});
scenario.Append(Guid.NewGuid(), new CreateUser {UserId = id3, UserName = "James"});
scenario.DocumentShouldExist<User>(id1, u => u.FirstName.ShouldBe("WRONG"));
scenario.DocumentShouldExist<User>(id2);
scenario.DocumentShouldExist<User>(id3, user => user.UserName.ShouldBe("James"));
});
});


}




}

public class CreateUser
{
public Guid UserId { get; set; }
public string UserName { get; set; }
}

public class DeleteUser
{
public Guid UserId { get; set; }
}

public class UserProjection: EventProjection
{
public User Create(CreateUser create)
{
return new User
{
Id = create.UserId, UserName = create.UserName
};
}

public void Project(DeleteUser deletion, IDocumentOperations operations)
{
operations.Delete<User>(deletion.UserId);
}
}
}
17 changes: 17 additions & 0 deletions src/Marten/AdvancedOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Marten.Events;
using Marten.Events.Daemon;
using Marten.Events.Daemon.Progress;
using Marten.Events.TestSupport;
using Marten.Internal;
using Marten.Internal.Sessions;
using Marten.Linq.QueryHandlers;
Expand Down Expand Up @@ -180,5 +181,21 @@ public IDocumentSourceCode Load(IProviderGraph providers)
return providers.StorageFor<T>();
}
}


/// <summary>
/// Marten's built in test support for event projections. Only use this in testing as
/// it will delete existing event and projected aggregate data
/// </summary>
/// <param name="configuration"></param>
/// <returns></returns>
public Task EventProjectionScenario(Action<ProjectionScenario> configuration)
{
var scenario = new ProjectionScenario(_store);
configuration(scenario);

return scenario.Execute();
}

}
}
9 changes: 9 additions & 0 deletions src/Marten/Events/Daemon/IProjectionDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,14 @@ public interface IProjectionDaemon : IDisposable
/// </summary>
/// <returns></returns>
Task StartDaemon();


/// <summary>
/// Use with caution! This will try to wait for all projections to "catch up" to the currently
/// known farthest known sequence of the event store
/// </summary>
/// <param name="timeout"></param>
/// <returns></returns>
Task WaitForNonStaleData(TimeSpan timeout);
}
}
37 changes: 37 additions & 0 deletions src/Marten/Events/Daemon/ProjectionDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,43 @@ public async Task StartDaemon()
_hasStarted = true;
}

public Task WaitForNonStaleData(TimeSpan timeout)
{
var completion = new TaskCompletionSource<bool>();
var timeoutCancellation = new CancellationTokenSource(timeout);


Task.Run(async () =>
{
var statistics = await _store.Advanced.FetchEventStoreStatistics(timeoutCancellation.Token);
timeoutCancellation.Token.Register(() =>
{
completion.TrySetException(new TimeoutException(
$"The active projection shards did not reach sequence {statistics.EventSequenceNumber} in time"));
});
if (CurrentShards().All(x => x.Position >= statistics.EventSequenceNumber))
{
completion.SetResult(true);
return;
}
while (!timeoutCancellation.IsCancellationRequested)
{
await Task.Delay(100.Milliseconds(), timeoutCancellation.Token);
if (CurrentShards().All(x => x.Position >= statistics.EventSequenceNumber))
{
completion.SetResult(true);
return;
}
}
}, timeoutCancellation.Token);

return completion.Task;

}

public async Task StartAll()
{
if (!_hasStarted) await StartDaemon();
Expand Down
Loading

0 comments on commit 06809b3

Please sign in to comment.