Skip to content

Commit

Permalink
Projections rebuild worker WIP 2
Browse files Browse the repository at this point in the history
  • Loading branch information
ustims committed Jul 13, 2023
1 parent f227273 commit 4dffcb6
Show file tree
Hide file tree
Showing 15 changed files with 790 additions and 482 deletions.
132 changes: 69 additions & 63 deletions CloudFabric.EventSourcing.Tests/DynamicProjectionSchemaTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ public class OrderListsDynamicProjectionBuilder : ProjectionBuilder,

public OrderListsDynamicProjectionBuilder(
ProjectionRepositoryFactory projectionRepositoryFactory,
ProjectionDocumentSchema projectionDocumentSchema
) : base(projectionRepositoryFactory)
ProjectionDocumentSchema projectionDocumentSchema,
ProjectionOperationIndexSelector indexSelector = ProjectionOperationIndexSelector.Write
) : base(projectionRepositoryFactory, indexSelector)
{
_projectionDocumentSchema = projectionDocumentSchema;
}
Expand Down Expand Up @@ -122,7 +123,7 @@ public abstract class DynamicProjectionSchemaTests
{
// Some projection engines take time to catch events and update projection records
// (like cosmosdb with changefeed event observer)
protected TimeSpan ProjectionsUpdateDelay { get; set; } = TimeSpan.FromMilliseconds(1000);
protected TimeSpan ProjectionsUpdateDelay { get; set; } = TimeSpan.FromMilliseconds(10000);
protected abstract Task<IEventStore> GetEventStore();

protected abstract ProjectionRepositoryFactory GetProjectionRepositoryFactory();
Expand All @@ -148,18 +149,13 @@ public async Task Cleanup()
}
);
await projectionRepository.DeleteAll();

var rebuildStateRepository = GetProjectionRepositoryFactory()
.GetProjectionRepository<ProjectionRebuildState>();

await rebuildStateRepository.DeleteAll();
}
catch
{
}
}

private async Task<(ProjectionsEngine, IProjectionRepository)> PrepareProjection(IEventsObserver eventsObserver, ProjectionDocumentSchema schema)
private async Task<(ProjectionsEngine, IProjectionRepository)> PrepareProjections(IEventsObserver eventsObserver, ProjectionDocumentSchema schema)
{
// Repository containing projections - `view models` of orders
var ordersListProjectionsRepository = GetProjectionRepositoryFactory()
Expand All @@ -179,7 +175,34 @@ public async Task Cleanup()

return (projectionsEngine, ordersListProjectionsRepository);
}


private ProjectionsRebuildProcessor PrepareProjectionsRebuildProcessor(IEventsObserver eventsObserver, ProjectionDocumentSchema projectionDocumentSchema)
{
return new ProjectionsRebuildProcessor(
GetProjectionRepositoryFactory().GetProjectionRepository(null),
async (string connectionId) =>
{
var projectionsEngine = new ProjectionsEngine();
var ordersListProjectionBuilder = new OrderListsDynamicProjectionBuilder(
GetProjectionRepositoryFactory(),
projectionDocumentSchema,
ProjectionOperationIndexSelector.ProjectionRebuild
);
projectionsEngine.AddProjectionBuilder(ordersListProjectionBuilder);
projectionsEngine.SetEventsObserver(eventsObserver);
// no need to listen - we are attaching this projections engine to test event store which is already being observed
// by tests projections engine (see PrepareProjections method)
//await projectionsEngine.StartAsync("TestInstance");
return projectionsEngine;
},
NullLogger<ProjectionsRebuildProcessor>.Instance
);
}

[TestMethod]
public async Task TestPlaceOrderAndAddItemToDynamicProjection()
{
Expand Down Expand Up @@ -214,7 +237,7 @@ public async Task TestPlaceOrderAndAddItemToDynamicProjection()
}
};

var (projectionsEngine, ordersListProjectionsRepository) = await PrepareProjection(orderRepositoryEventsObserver, ordersProjectionSchema);
var (projectionsEngine, ordersListProjectionsRepository) = await PrepareProjections(orderRepositoryEventsObserver, ordersProjectionSchema);

await ordersListProjectionsRepository.EnsureIndex();

Expand Down Expand Up @@ -325,7 +348,7 @@ public async Task TestArrayAttributeDynamicProjection()
}
};

var (projectionsEngine, ordersListProjectionsRepository) = await PrepareProjection(orderRepositoryEventsObserver, ordersProjectionSchema);
var (projectionsEngine, ordersListProjectionsRepository) = await PrepareProjections(orderRepositoryEventsObserver, ordersProjectionSchema);

await ordersListProjectionsRepository.EnsureIndex();

Expand Down Expand Up @@ -427,23 +450,12 @@ public async Task TestPlaceOrderAndAddItemToDynamicProjectionWithCreatingNewProj
}
};

var (projectionsEngine, ordersListProjectionsRepository) = await PrepareProjection(orderRepositoryEventsObserver, ordersProjectionSchema);
var builder = new ProjectionsRebuildProcessor(
GetProjectionRepositoryFactory().GetProjectionRepository(null),
(string connectionId) =>
{
var projectionsBuilder = new ProjectionsEngine();
var (projectionsEngine, ordersListProjectionsRepository) = await PrepareProjections(orderRepositoryEventsObserver, ordersProjectionSchema);
var projectionsRebuildProcessor = PrepareProjectionsRebuildProcessor(orderRepositoryEventsObserver, ordersProjectionSchema);

projectionsBuilder.SetEventsObserver(orderRepositoryEventsObserver);
return projectionsEngine;
},
NullLogger<ProjectionsRebuildProcessor>.Instance
);

await ordersListProjectionsRepository.EnsureIndex();
await builder.DetectProjectionsToRebuild();
await projectionsRebuildProcessor.RebuildProjectionsThatRequireRebuild();

var userId = Guid.NewGuid();
var userInfo = new EventUserInfo(userId);
var id = Guid.NewGuid();
Expand Down Expand Up @@ -471,25 +483,32 @@ public async Task TestPlaceOrderAndAddItemToDynamicProjectionWithCreatingNewProj

await orderRepository.SaveOrder(userInfo, order);

await Task.Delay(ProjectionsUpdateDelay);

var orderProjection = await ordersListProjectionsRepository.Single(id, PartitionKeys.GetOrderPartitionKey());
var orderProjection = await TestHelpers.RepeatUntilNotNull(() => // repeat getting an item until it appears in the projection
ordersListProjectionsRepository.Single(id, PartitionKeys.GetOrderPartitionKey()),
TimeSpan.FromSeconds(10)
);

Debug.Assert(orderProjection != null, nameof(orderProjection) + " != null");

orderProjection["Id"].Should().Be(order.Id);
orderProjection["ItemsCount"].Should().Be(3);

order.AddItem(new OrderItem(DateTime.UtcNow, "Caverna", 12m));

var itemsCountShouldBe = 4; // since we added fourth item

await orderRepository.SaveOrder(userInfo, order);

await Task.Delay(ProjectionsUpdateDelay);

var orderProjectionWithNewItem = await ordersListProjectionsRepository.Single(id, PartitionKeys.GetOrderPartitionKey());
Debug.Assert(orderProjectionWithNewItem != null, nameof(orderProjectionWithNewItem) + " != null");

var orderProjectionWithNewItem = await TestHelpers.RepeatUntil( // projection does not update immediately, we will need to
() => ordersListProjectionsRepository.Single(id, PartitionKeys.GetOrderPartitionKey()), // wait a couple of seconds depending on hardware
a => a != null && a["ItemsCount"] as dynamic == itemsCountShouldBe,
ProjectionsUpdateDelay
);

Debug.Assert(orderProjectionWithNewItem != null, nameof(orderProjection) + " != null");

orderProjectionWithNewItem["Id"].Should().Be(order.Id);
orderProjectionWithNewItem["ItemsCount"].Should().Be(4);
orderProjectionWithNewItem["ItemsCount"].Should().Be(itemsCountShouldBe);

await projectionsEngine.StopAsync();

Expand All @@ -502,18 +521,16 @@ public async Task TestPlaceOrderAndAddItemToDynamicProjectionWithCreatingNewProj
PropertyType = TypeCode.Decimal
});

(projectionsEngine, ordersListProjectionsRepository) = await PrepareProjection(orderRepositoryEventsObserver, ordersProjectionSchema);
(projectionsEngine, ordersListProjectionsRepository) = await PrepareProjections(orderRepositoryEventsObserver, ordersProjectionSchema);

await ordersListProjectionsRepository.EnsureIndex();
await builder.DetectProjectionsToRebuild();
//await builder.RebuildProjectionsThatRequireRebuild();

var addItem = new OrderItem(DateTime.UtcNow, "Twilight Struggle", 6.95m);
order.AddItem(addItem);

await orderRepository.SaveOrder(userInfo, order);

await Task.Delay(ProjectionsUpdateDelay);


var orderProjectionWithNewSchemaTotalPrice = await ordersListProjectionsRepository
.Single(id, PartitionKeys.GetOrderPartitionKey());
Debug.Assert(orderProjectionWithNewSchemaTotalPrice != null, nameof(orderProjectionWithNewSchemaTotalPrice) + " != null");
Expand All @@ -523,7 +540,8 @@ public async Task TestPlaceOrderAndAddItemToDynamicProjectionWithCreatingNewProj

// Important! After we added a new projection field it's required
// to re-run all projection builders from the first event (rebuild all projections)
// Since we didn't rebuild projections, new field will only have data for events that happened after the field was added.
// Since we didn't rebuild projections, new field will only have data for events that happened after the field was added
// Hence total price is nly the price of last added item.
orderProjectionWithNewSchemaTotalPrice["TotalPrice"].Should().Be(6.95m);

var query = new ProjectionQuery();
Expand All @@ -532,28 +550,15 @@ public async Task TestPlaceOrderAndAddItemToDynamicProjectionWithCreatingNewProj
new Filter("TotalPrice", FilterOperator.Greater, 6m)
};

var searchResult = await ordersListProjectionsRepository.Query(query);
var searchResult = await TestHelpers.RepeatUntil(
() => ordersListProjectionsRepository.Query(query),
(r) => r.Records.Count == 1,
ProjectionsUpdateDelay
);
searchResult.Records.Count.Should().Be(1);



await builder.DetectProjectionsToRebuild();

// await projectionsEngine.StartRebuildAsync("rebuild", PartitionKeys.GetOrderPartitionKey());
//
// // wait for the rebuild state to be indexed
// await Task.Delay(ProjectionsUpdateDelay);
//
// // wait for the rebuild to finish
// ProjectionRebuildState rebuildState;
// do
// {
// rebuildState = await projectionsEngine.GetRebuildState("rebuild", PartitionKeys.GetOrderPartitionKey());
// await Task.Delay(10);
// } while (rebuildState.Status != RebuildStatus.Completed && rebuildState.Status != RebuildStatus.Failed);
//
// rebuildState.Status.Should().Be(RebuildStatus.Completed);

await projectionsRebuildProcessor.RebuildProjectionsThatRequireRebuild();

var orderProjectionWithNewSchemaTotalPriceAfterRebuild = await ordersListProjectionsRepository
.Single(id, PartitionKeys.GetOrderPartitionKey());
Debug.Assert(orderProjectionWithNewSchemaTotalPriceAfterRebuild != null, nameof(orderProjectionWithNewSchemaTotalPriceAfterRebuild) + " != null");
Expand All @@ -563,6 +568,7 @@ public async Task TestPlaceOrderAndAddItemToDynamicProjectionWithCreatingNewProj

// Projections were rebuilt, that means TotalPrice should have all items summed up
orderProjectionWithNewSchemaTotalPriceAfterRebuild["TotalPrice"].Should().Be(42.39m);

}

[TestMethod]
Expand All @@ -571,4 +577,4 @@ public async Task TestPlaceOrderAndAddItemtoDynamicProjectionWithRemovingProject
// todo: same as previous, but with additional step:
// removing a projection field and then making sure it's removed from underlying projection storage (new column in postgresql/new index in elastic etc)
}
}
}
55 changes: 55 additions & 0 deletions CloudFabric.EventSourcing.Tests/TestHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
namespace CloudFabric.EventSourcing.Tests;

public static class TestHelpers
{
public static async Task<T?> RepeatUntilNotNull<T>(Func<Task<T?>> lambdaToRepeat, TimeSpan timeout, int millisecondsRetryDelay = 500)
{
var startTime = DateTime.UtcNow;
var result = default(T);

while (result == null || DateTime.UtcNow - startTime > timeout)
{
result = await lambdaToRepeat();
if (result != null)
{
return result;
}

await Task.Delay(millisecondsRetryDelay);
}

throw new Exception("Function failed to return non-null value within timeout");
}

/// <summary>
/// Calls `functionToRepeat` and passes result value to `conditionCheckFunction` until it returns true.
/// Returns result of `functionToRepeat` when `conditionCheckFunction` returns true or null when timeout passes.
/// </summary>
/// <param name="functionToRepeat"></param>
/// <param name="conditionCheckFunction"></param>
/// <param name="timeout"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public static async Task<T?> RepeatUntil<T>(Func<Task<T?>> functionToRepeat, Func<T?, bool> conditionCheckFunction, TimeSpan timeout, int millisecondsRetryDelay = 500)
{
var startTime = DateTime.UtcNow;
var result = default(T);
bool? conditionResult = null;

while (DateTime.UtcNow - startTime < timeout)
{
result = await functionToRepeat();

conditionResult = conditionCheckFunction(result);
if (conditionResult == true)
{
return result;
}

await Task.Delay(millisecondsRetryDelay);
}

throw new Exception("Function failed to return non-null value within timeout");
}
}
29 changes: 27 additions & 2 deletions CloudFabric.EventSourcing.Tests/TestsBaseWithProjections.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using CloudFabric.EventSourcing.EventStore;
using CloudFabric.Projections;
using CloudFabric.Projections.Worker;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace CloudFabric.EventSourcing.Tests;
Expand All @@ -23,9 +25,20 @@ public abstract class TestsBaseWithProjections<TProjectionDocument, TProjectionB
public async Task Initialize()
{
var store = await GetEventStore();

// Repository containing projections - `view models` of orders
ProjectionsRepository = GetProjectionRepositoryFactory().GetProjectionRepository<TProjectionDocument>();
await ProjectionsRepository.EnsureIndex();

await store.DeleteAll();

try
{
await ProjectionsRepository.DeleteAll();
}
catch
{
}

var repositoryEventsObserver = GetEventStoreEventsObserver();

// Projections engine - takes events from events observer and passes them to multiple projection builders
Expand All @@ -38,6 +51,18 @@ public async Task Initialize()
ProjectionsEngine.AddProjectionBuilder(ProjectionBuilder);

await ProjectionsEngine.StartAsync("Test");

var projectionsRebuildProcessor = new ProjectionsRebuildProcessor(
GetProjectionRepositoryFactory().GetProjectionRepository(null),
async (string connectionId) =>
{
return ProjectionsEngine;
},
NullLogger<ProjectionsRebuildProcessor>.Instance
);

await ProjectionsRepository.EnsureIndex();
await projectionsRebuildProcessor.RebuildProjectionsThatRequireRebuild();
}

[TestCleanup]
Expand All @@ -60,4 +85,4 @@ public async Task Cleanup()
{
}
}
}
}
Loading

0 comments on commit 4dffcb6

Please sign in to comment.