From e25679f0119a6ec15c88d23058602c34efa2cc2c Mon Sep 17 00:00:00 2001 From: Ustimenko Sergey Date: Mon, 29 May 2023 14:23:10 +0300 Subject: [PATCH 1/4] Update ElasticSearchFilterFactory.cs Fix bool query for elastic --- .../Helpers/ElasticSearchFilterFactory.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Implementations/CloudFabric.Projections.ElasticSearch/Helpers/ElasticSearchFilterFactory.cs b/Implementations/CloudFabric.Projections.ElasticSearch/Helpers/ElasticSearchFilterFactory.cs index 22107ee..44d1cf8 100644 --- a/Implementations/CloudFabric.Projections.ElasticSearch/Helpers/ElasticSearchFilterFactory.cs +++ b/Implementations/CloudFabric.Projections.ElasticSearch/Helpers/ElasticSearchFilterFactory.cs @@ -146,6 +146,11 @@ private static string ConstructOneConditionFilter(Queries.Filter filter) var filterOperator = ""; var filterValue = filter.Value?.ToString()?.EscapeElasticUnsupportedCharacters(); + if (filter.Value is bool) + { + filterValue = filterValue.ToLower(); + } + switch (filter.Operator) { case FilterOperator.ArrayContains: From c7e2a330614789aacbd13781e205dffde3b11cf8 Mon Sep 17 00:00:00 2001 From: Ustimenko Sergey Date: Mon, 29 May 2023 14:23:43 +0300 Subject: [PATCH 2/4] Update build.yml --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 07c092a..c00ca42 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -7,7 +7,7 @@ on: branches: [ "main" ] env: - PACKAGE_VERSION: 0.1.0 + PACKAGE_VERSION: 0.1.1 jobs: From 2cfdccfbd386cf017a1c3304634244edaef8cce6 Mon Sep 17 00:00:00 2001 From: Sergey Ustimenko Date: Mon, 29 May 2023 16:32:56 +0300 Subject: [PATCH 3/4] Filter query string serialization updates --- .github/workflows/build.yml | 2 +- .../FilterConnectorQueryStringExtensions.cs | 4 +- .../Queries/FilterQueryStringExtensions.cs | 37 +++++++++++------- .../ProjectionQueryQueryStringExtensions.cs | 38 ++++++++++++++++--- .../Helpers/ElasticSearchFilterFactory.cs | 2 +- 5 files changed, 60 insertions(+), 23 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c00ca42..b4e7e6b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -7,7 +7,7 @@ on: branches: [ "main" ] env: - PACKAGE_VERSION: 0.1.1 + PACKAGE_VERSION: 0.1.2 jobs: diff --git a/CloudFabric.Projections/Queries/FilterConnectorQueryStringExtensions.cs b/CloudFabric.Projections/Queries/FilterConnectorQueryStringExtensions.cs index 51896f2..3b2ec69 100644 --- a/CloudFabric.Projections/Queries/FilterConnectorQueryStringExtensions.cs +++ b/CloudFabric.Projections/Queries/FilterConnectorQueryStringExtensions.cs @@ -4,12 +4,12 @@ public static class FilterConnectorQueryStringExtensions { public static string Serialize(this FilterConnector filterConnector) { - return $"{filterConnector.Logic}+{filterConnector.Filter.Serialize()}"; + return $"{filterConnector.Logic}{ProjectionQueryQueryStringExtensions.FILTER_LOGIC_JOIN_CHARACTER}{filterConnector.Filter.Serialize()}"; } public static FilterConnector Deserialize(string serialized) { - var separator = "+"; + var separator = $"{ProjectionQueryQueryStringExtensions.FILTER_LOGIC_JOIN_CHARACTER}"; var logicEnd = serialized.IndexOf(separator, StringComparison.Ordinal); diff --git a/CloudFabric.Projections/Queries/FilterQueryStringExtensions.cs b/CloudFabric.Projections/Queries/FilterQueryStringExtensions.cs index 75abd18..15b910e 100644 --- a/CloudFabric.Projections/Queries/FilterQueryStringExtensions.cs +++ b/CloudFabric.Projections/Queries/FilterQueryStringExtensions.cs @@ -64,20 +64,27 @@ public static string Serialize(this Filter filter, bool useShortcuts = true) if (filter.Filters != null && filter.Filters.Count > 0) { - filtersSerialized = string.Join(".", filter.Filters.Select(f => f.Serialize())); + filtersSerialized = string.Join( + ProjectionQueryQueryStringExtensions.FILTER_NESTED_FILTERS_JOIN_CHARACTER, + filter.Filters.Select(f => f.Serialize()) + ); } + const char PROPS_JOIN = ProjectionQueryQueryStringExtensions.FILTER_PROPERTIES_JOIN_CHARACTER; + return $"{(string.IsNullOrEmpty(filter.PropertyName) ? "*" : SanitizeValue(filter.PropertyName))}" + - $"|{(string.IsNullOrEmpty(filter.Operator) ? "*" : filter.Operator)}" + - $"|{System.Net.WebUtility.UrlEncode(valueSerialized)}" + - $"|{(filter.Visible ? 'T' : 'F')}" + - $"|{System.Net.WebUtility.UrlEncode(filter.Tag)}" + - $"|{filtersSerialized}"; + $"{PROPS_JOIN}{(string.IsNullOrEmpty(filter.Operator) ? "*" : filter.Operator)}" + + $"{PROPS_JOIN}{System.Net.WebUtility.UrlEncode(valueSerialized)}" + + $"{PROPS_JOIN}{(filter.Visible.ToString().ToLower())}" + + $"{PROPS_JOIN}{System.Net.WebUtility.UrlEncode(filter.Tag)}" + + $"{PROPS_JOIN}{filtersSerialized}"; } public static Filter Deserialize(string f) { - if (f.IndexOf("|", StringComparison.Ordinal) < 0) + const char PROPS_JOIN = ProjectionQueryQueryStringExtensions.FILTER_PROPERTIES_JOIN_CHARACTER; + + if (f.IndexOf(PROPS_JOIN, StringComparison.Ordinal) < 0) { if (FacetFilterShortcuts.ContainsKey(f)) { @@ -85,19 +92,19 @@ public static Filter Deserialize(string f) } } - var propertyNameEnd = f.IndexOf("|", StringComparison.Ordinal); + var propertyNameEnd = f.IndexOf($"{PROPS_JOIN}", StringComparison.Ordinal); var propertyName = DesanitizeValue(f.Substring(0, propertyNameEnd)); - var operatorEnd = f.IndexOf("|", propertyNameEnd + 1, StringComparison.Ordinal); + var operatorEnd = f.IndexOf($"{PROPS_JOIN}", propertyNameEnd + 1, StringComparison.Ordinal); var operatorValue = f.Substring(propertyNameEnd + 1, operatorEnd - propertyNameEnd - 1); - var valueEnd = f.IndexOf("|", operatorEnd + 1, StringComparison.Ordinal); + var valueEnd = f.IndexOf($"{PROPS_JOIN}", operatorEnd + 1, StringComparison.Ordinal); var value = f.Substring(operatorEnd + 1, valueEnd - operatorEnd - 1); - var visibleEnd = f.IndexOf("|", valueEnd + 1, StringComparison.Ordinal); - var visible = f.Substring(valueEnd + 1, visibleEnd - valueEnd - 1) == "T"; + var visibleEnd = f.IndexOf($"{PROPS_JOIN}", valueEnd + 1, StringComparison.Ordinal); + var visible = f.Substring(valueEnd + 1, visibleEnd - valueEnd - 1) == "true"; - var tagEnd = f.IndexOf("|", visibleEnd + 1, StringComparison.Ordinal); + var tagEnd = f.IndexOf($"{PROPS_JOIN}", visibleEnd + 1, StringComparison.Ordinal); var tag = f.Substring(visibleEnd + 1, tagEnd - visibleEnd - 1); tag = System.Net.WebUtility.UrlDecode(tag); @@ -123,7 +130,9 @@ public static Filter Deserialize(string f) var filters = new List(); - var filtersSerializedList = f.Substring(tagEnd + 1).Split('.'); + var filtersSerializedList = f.Substring(tagEnd + 1) + .Split(ProjectionQueryQueryStringExtensions.FILTER_NESTED_FILTERS_JOIN_CHARACTER); + if (filtersSerializedList.Length > 0) { filters = filtersSerializedList diff --git a/CloudFabric.Projections/Queries/ProjectionQueryQueryStringExtensions.cs b/CloudFabric.Projections/Queries/ProjectionQueryQueryStringExtensions.cs index 4bf6b6a..a041337 100644 --- a/CloudFabric.Projections/Queries/ProjectionQueryQueryStringExtensions.cs +++ b/CloudFabric.Projections/Queries/ProjectionQueryQueryStringExtensions.cs @@ -2,6 +2,33 @@ namespace CloudFabric.Projections.Queries; public static class ProjectionQueryQueryStringExtensions { + /// + /// Top-level filters are joined using this character + /// + public const char FILTERS_JOIN_CHARACTER = '!'; + + /// + /// Individual filter properties are joined using this character. + /// Example: my_boolean_property|eq|true + /// + public const char FILTER_PROPERTIES_JOIN_CHARACTER = '|'; + + /// + /// Character used to join filter connector and the filter it's attached to. + /// + /// Example: AND+my_boolean_property|eq|true + /// + public const char FILTER_LOGIC_JOIN_CHARACTER = '$'; + + /// + /// Character used to join array of nested filters. + /// + /// Example: my_boolean_property|eq|true|and$my_int_property|gt|100000000.or$my_string_property|eq|'yo' + /// ^- main property filter ^- start of nester filter array ^- second filter is joined with . + /// (joined using | like all props) + /// + public const char FILTER_NESTED_FILTERS_JOIN_CHARACTER = '.'; + public static string SerializeToQueryString( this ProjectionQuery projectionQuery, string? searchText = null, @@ -39,7 +66,7 @@ public static string SerializeFiltersToQueryString(this ProjectionQuery projecti if (filtersSerialized.Count > 0) { - return "sv1_" + string.Join("!", filtersSerialized); + return "sv1_" + string.Join(FILTERS_JOIN_CHARACTER, filtersSerialized); } else { @@ -55,7 +82,7 @@ public static void DeserializeFiltersQueryString(this ProjectionQuery projection } var searchVersionPlaceholder = "sv"; - var version = ""; + var version = "1"; if (filters.IndexOf(searchVersionPlaceholder) == 0) { @@ -63,14 +90,15 @@ public static void DeserializeFiltersQueryString(this ProjectionQuery projection var versionLength = end - searchVersionPlaceholder.Length; version = filters.Substring(searchVersionPlaceholder.Length, versionLength); + + // remove version from filters string + filters = filters.Substring(filters.IndexOf("_") + 1); } switch (version) { case "1": - // remove version from filters string - filters = filters.Substring(filters.IndexOf("_") + 1); - var filtersList = filters.Split('!').Where(f => f.Length > 0).ToList(); + var filtersList = filters.Split(FILTERS_JOIN_CHARACTER).Where(f => f.Length > 0).ToList(); if (filtersList.Count > 0) { diff --git a/Implementations/CloudFabric.Projections.ElasticSearch/Helpers/ElasticSearchFilterFactory.cs b/Implementations/CloudFabric.Projections.ElasticSearch/Helpers/ElasticSearchFilterFactory.cs index 44d1cf8..7798afe 100644 --- a/Implementations/CloudFabric.Projections.ElasticSearch/Helpers/ElasticSearchFilterFactory.cs +++ b/Implementations/CloudFabric.Projections.ElasticSearch/Helpers/ElasticSearchFilterFactory.cs @@ -150,7 +150,7 @@ private static string ConstructOneConditionFilter(Queries.Filter filter) { filterValue = filterValue.ToLower(); } - + switch (filter.Operator) { case FilterOperator.ArrayContains: From b1883053d9d77cc0dcf93ac4725d2f18bfbf7aa3 Mon Sep 17 00:00:00 2001 From: scand1n <115148869+scand1n@users.noreply.github.com> Date: Wed, 9 Aug 2023 11:38:23 +0300 Subject: [PATCH 4/4] Add store item logic to EventStore (#60) Add store item logic to EventStore --- .github/workflows/build.yml | 2 +- .../Extensions/ServiceCollectionExtensions.cs | 12 +- .../Extensions/ServiceCollectionExtensions.cs | 14 +- .../Extensions/ServiceCollectionExtensions.cs | 22 +- .../Enums/ItemsEventStoreNameSuffix.cs | 6 + ...ions.cs => EventStoreSerializerOptions.cs} | 2 +- .../IEventStore.cs | 4 + .../Persistence/EventWrapper.cs | 2 +- .../Persistence/ItemWrapper.cs | 15 ++ .../Domain/TestItem.cs | 19 ++ CloudFabric.EventSourcing.Tests/ItemTests.cs | 102 ++++++++++ .../CosmosDbEventStore.cs | 127 ++++++++++-- .../InMemoryEventStore.cs | 46 ++++- .../PostgresqlEventStore.cs | 192 ++++++++++++++++-- .../ItemTestsCosmosDb.cs | 102 ++++++++++ .../OrderTestsCosmosDb.cs | 3 +- ...namicProjectionsOrderTestsElasticSearch.cs | 3 +- ...OrderStringComparisonTestsElasticSearch.cs | 3 +- .../OrderTestsElasticSearch.cs | 3 +- .../QueryStringTestsElasticSearch.cs | 3 +- .../DynamicProjectionsOrderTestsInMemory.cs | 7 +- .../ItemTestInMemory.cs | 25 +++ .../OrderStringComparisonTestsInMemory.cs | 7 +- .../OrderTestsInMemory.cs | 5 +- .../QueryStringTestsInMemory.cs | 7 +- .../DynamicProjectionsOrderTestsPostgresql.cs | 5 +- .../ItemtestsPostgresql.cs | 27 +++ .../OrderStringComparisonTestsPostgresql.cs | 3 +- .../OrderTestsPostgresql.cs | 3 +- .../QueryStringTestsPostgresql.cs | 3 +- .../CosmosDbProjectionRepository.cs | 2 +- 31 files changed, 701 insertions(+), 75 deletions(-) create mode 100644 CloudFabric.EventSourcing.EventStore/Enums/ItemsEventStoreNameSuffix.cs rename CloudFabric.EventSourcing.EventStore/{EventSerializerOptions.cs => EventStoreSerializerOptions.cs} (84%) create mode 100644 CloudFabric.EventSourcing.EventStore/Persistence/ItemWrapper.cs create mode 100644 CloudFabric.EventSourcing.Tests/Domain/TestItem.cs create mode 100644 CloudFabric.EventSourcing.Tests/ItemTests.cs create mode 100644 Implementations/CloudFabric.EventSourcing.Tests.CosmosDb/ItemTestsCosmosDb.cs create mode 100644 Implementations/CloudFabric.EventSourcing.Tests.InMemory/ItemTestInMemory.cs create mode 100644 Implementations/CloudFabric.EventSourcing.Tests.Postgresql/ItemtestsPostgresql.cs diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b4e7e6b..4f8d3bb 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -7,7 +7,7 @@ on: branches: [ "main" ] env: - PACKAGE_VERSION: 0.1.2 + PACKAGE_VERSION: 0.1.3 jobs: diff --git a/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.CosmosDb/Extensions/ServiceCollectionExtensions.cs b/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.CosmosDb/Extensions/ServiceCollectionExtensions.cs index 2509c84..04ed64c 100644 --- a/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.CosmosDb/Extensions/ServiceCollectionExtensions.cs +++ b/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.CosmosDb/Extensions/ServiceCollectionExtensions.cs @@ -14,7 +14,8 @@ public static IEventSourcingBuilder AddCosmosDbEventStore( string connectionString, CosmosClientOptions cosmosClientOptions, string databaseId, - string containerId, + string eventsContainerId, + string itemsContainerId, CosmosClient leaseClient, string leaseDatabaseId, string leaseContainerId, @@ -23,13 +24,13 @@ string processorName { var cosmosClient = new CosmosClient(connectionString, cosmosClientOptions); - var eventStore = new CosmosDbEventStore(cosmosClient, databaseId, containerId); + var eventStore = new CosmosDbEventStore(cosmosClient, databaseId, eventsContainerId, itemsContainerId); eventStore.Initialize().Wait(); var eventStoreObserver = new CosmosDbEventStoreChangeFeedObserver( cosmosClient, databaseId, - containerId, + eventsContainerId, leaseClient, leaseDatabaseId, leaseContainerId, @@ -52,10 +53,11 @@ public static IEventSourcingBuilder AddCosmosDbEventStore( this IServiceCollection services, CosmosClient client, string databaseId, - string containerId + string eventsContainerId, + string itemsContainerId ) { - var eventStore = new CosmosDbEventStore(client, databaseId, containerId); + var eventStore = new CosmosDbEventStore(client, databaseId, eventsContainerId, itemsContainerId); eventStore.Initialize().Wait(); return new EventSourcingBuilder diff --git a/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.InMemory/Extensions/ServiceCollectionExtensions.cs b/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.InMemory/Extensions/ServiceCollectionExtensions.cs index 0f05e35..dcd13d7 100644 --- a/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.InMemory/Extensions/ServiceCollectionExtensions.cs +++ b/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.InMemory/Extensions/ServiceCollectionExtensions.cs @@ -1,3 +1,4 @@ +using System.Runtime.CompilerServices; using CloudFabric.EventSourcing.Domain; using CloudFabric.EventSourcing.EventStore.InMemory; using CloudFabric.Projections; @@ -11,10 +12,11 @@ public static class ServiceCollectionExtensions { public static IEventSourcingBuilder AddInMemoryEventStore( this IServiceCollection services, - Dictionary<(Guid, string), List> eventsContainer + Dictionary<(Guid, string), List> eventsContainer, + Dictionary<(string, string), string> itemsContainer ) { - var eventStore = new InMemoryEventStore(eventsContainer); + var eventStore = new InMemoryEventStore(eventsContainer, itemsContainer); eventStore.Initialize().Wait(); // add events observer for projections @@ -32,6 +34,14 @@ public static IEventSourcingBuilder AddInMemoryEventStore( }; } + public static IEventSourcingBuilder AddInMemoryEventStore(this IServiceCollection services) + { + return services.AddInMemoryEventStore( + new Dictionary<(Guid, string), List>(), + new Dictionary<(string, string), string>() + ); + } + public static IEventSourcingBuilder AddRepository(this IEventSourcingBuilder builder) where TRepo : class { diff --git a/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.Postgresql/Extensions/ServiceCollectionExtensions.cs b/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.Postgresql/Extensions/ServiceCollectionExtensions.cs index a0b1df0..ffded61 100644 --- a/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.Postgresql/Extensions/ServiceCollectionExtensions.cs +++ b/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.Postgresql/Extensions/ServiceCollectionExtensions.cs @@ -1,4 +1,5 @@ using CloudFabric.EventSourcing.Domain; +using CloudFabric.EventSourcing.EventStore.Enums; using CloudFabric.EventSourcing.EventStore.Postgresql; using CloudFabric.Projections; using CloudFabric.Projections.Postgresql; @@ -12,10 +13,11 @@ public static class ServiceCollectionExtensions public static IEventSourcingBuilder AddPostgresqlEventStore( this IServiceCollection services, string eventsConnectionString, - string tableName + string eventsTableName, + string itemsTableName ) { - var eventStore = new PostgresqlEventStore(eventsConnectionString, tableName); + var eventStore = new PostgresqlEventStore(eventsConnectionString, eventsTableName, itemsTableName); eventStore.Initialize().Wait(); // add events observer for projections @@ -33,6 +35,22 @@ string tableName }; } + /// + /// This extension overload initialize event store with default item event store table name. + /// + public static IEventSourcingBuilder AddPostgresqlEventStore( + this IServiceCollection services, + string eventsConnectionString, + string eventsTableName + ) + { + return services.AddPostgresqlEventStore( + eventsConnectionString, + eventsTableName, + string.Concat(eventsTableName, ItemsEventStoreNameSuffix.TableNameSuffix) + ); + } + public static IEventSourcingBuilder AddRepository(this IEventSourcingBuilder builder) where TRepo : class { diff --git a/CloudFabric.EventSourcing.EventStore/Enums/ItemsEventStoreNameSuffix.cs b/CloudFabric.EventSourcing.EventStore/Enums/ItemsEventStoreNameSuffix.cs new file mode 100644 index 0000000..3d88648 --- /dev/null +++ b/CloudFabric.EventSourcing.EventStore/Enums/ItemsEventStoreNameSuffix.cs @@ -0,0 +1,6 @@ +namespace CloudFabric.EventSourcing.EventStore.Enums; + +public static class ItemsEventStoreNameSuffix +{ + public const string TableNameSuffix = "-item"; +} diff --git a/CloudFabric.EventSourcing.EventStore/EventSerializerOptions.cs b/CloudFabric.EventSourcing.EventStore/EventStoreSerializerOptions.cs similarity index 84% rename from CloudFabric.EventSourcing.EventStore/EventSerializerOptions.cs rename to CloudFabric.EventSourcing.EventStore/EventStoreSerializerOptions.cs index 89fd0fa..db82753 100644 --- a/CloudFabric.EventSourcing.EventStore/EventSerializerOptions.cs +++ b/CloudFabric.EventSourcing.EventStore/EventStoreSerializerOptions.cs @@ -2,7 +2,7 @@ namespace CloudFabric.EventSourcing.EventStore; -public static class EventSerializerOptions +public static class EventStoreSerializerOptions { public static JsonSerializerOptions Options { diff --git a/CloudFabric.EventSourcing.EventStore/IEventStore.cs b/CloudFabric.EventSourcing.EventStore/IEventStore.cs index f07c27c..c75a1b5 100755 --- a/CloudFabric.EventSourcing.EventStore/IEventStore.cs +++ b/CloudFabric.EventSourcing.EventStore/IEventStore.cs @@ -23,4 +23,8 @@ Task AppendToStreamAsync( Task DeleteAll(CancellationToken cancellationToken = default); Task HardDeleteAsync(Guid streamId, string partitionKey, CancellationToken cancellationToken = default); + + Task UpsertItem(string id, string partitionKey, T item, CancellationToken cancellationToken = default); + + Task LoadItem(string id, string partitionKey, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/CloudFabric.EventSourcing.EventStore/Persistence/EventWrapper.cs b/CloudFabric.EventSourcing.EventStore/Persistence/EventWrapper.cs index f60f56a..cae49c5 100755 --- a/CloudFabric.EventSourcing.EventStore/Persistence/EventWrapper.cs +++ b/CloudFabric.EventSourcing.EventStore/Persistence/EventWrapper.cs @@ -36,7 +36,7 @@ public IEvent GetEvent() throw new Exception("Couldn't find event type. Make sure it's in correct namespace."); } - var e = (IEvent?)EventData.Deserialize(eventType, EventSerializerOptions.Options); + var e = (IEvent?)EventData.Deserialize(eventType, EventStoreSerializerOptions.Options); if (e == null) { diff --git a/CloudFabric.EventSourcing.EventStore/Persistence/ItemWrapper.cs b/CloudFabric.EventSourcing.EventStore/Persistence/ItemWrapper.cs new file mode 100644 index 0000000..60c38ab --- /dev/null +++ b/CloudFabric.EventSourcing.EventStore/Persistence/ItemWrapper.cs @@ -0,0 +1,15 @@ +using System.Text.Json.Serialization; + +namespace CloudFabric.EventSourcing.EventStore.Persistence; + +public class ItemWrapper +{ + [JsonPropertyName("id")] + public string? Id { get; set; } + + [JsonPropertyName("partition_key")] + public string? PartitionKey { get; set; } + + [JsonPropertyName("data")] + public string? ItemData { get; set; } +} diff --git a/CloudFabric.EventSourcing.Tests/Domain/TestItem.cs b/CloudFabric.EventSourcing.Tests/Domain/TestItem.cs new file mode 100644 index 0000000..1c9173a --- /dev/null +++ b/CloudFabric.EventSourcing.Tests/Domain/TestItem.cs @@ -0,0 +1,19 @@ +namespace CloudFabric.EventSourcing.Tests.Domain; + +public class TestItem +{ + public Guid Id { get; set; } + + public string Name { get; set; } + + public Dictionary Properties { get; set; } +} + +public class TestNestedItemClass +{ + public Guid Id { get; set; } = Guid.NewGuid(); + + public string Name { get; set; } = nameof(Name).ToLowerInvariant(); + + public DateTime date { get; } = DateTime.MinValue; +} \ No newline at end of file diff --git a/CloudFabric.EventSourcing.Tests/ItemTests.cs b/CloudFabric.EventSourcing.Tests/ItemTests.cs new file mode 100644 index 0000000..5d20dca --- /dev/null +++ b/CloudFabric.EventSourcing.Tests/ItemTests.cs @@ -0,0 +1,102 @@ +using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.Tests.Domain; +using FluentAssertions; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace CloudFabric.EventSourcing.Tests; +public abstract class ItemTests +{ + protected abstract Task GetEventStore(); + private IEventStore _eventStore; + + [TestInitialize] + public async Task Initialize() + { + _eventStore = await GetEventStore(); + } + + [TestCleanup] + public async Task Cleanup() + { + await _eventStore.DeleteAll(); + } + + [TestMethod] + public async Task SaveItem() + { + var item = new TestItem + { + Id = Guid.NewGuid(), + Name = "Item1", + Properties = new Dictionary + { + { Guid.NewGuid().ToString(), new TestNestedItemClass() }, + { Guid.NewGuid().ToString(), new TestNestedItemClass() } + } + }; + + Func action = async () => await _eventStore.UpsertItem($"{item.Id}{item.Name}", $"{item.Id}{item.Name}", item); + await action.Should().NotThrowAsync(); + } + + [TestMethod] + public async Task LoadItem() + { + var item = new TestItem + { + Id = Guid.NewGuid(), + Name = "Item1", + Properties = new Dictionary + { + { Guid.NewGuid().ToString(), new TestNestedItemClass() } + } + }; + + await _eventStore.UpsertItem($"{item.Id}{item.Name}", $"{item.Id}{item.Name}", item); + + var loadedItem = await _eventStore.LoadItem($"{item.Id}{item.Name}", $"{item.Id}{item.Name}"); + + loadedItem.Id.Should().Be(item.Id); + loadedItem.Name.Should().Be(item.Name); + loadedItem.Properties.Keys.Should().BeEquivalentTo(item.Properties.Keys); + loadedItem.Properties.Values.Should().BeEquivalentTo(item.Properties.Values); + } + + [TestMethod] + public async Task LoadItem_NullIfNotFound() + { + var loadedItem = await _eventStore.LoadItem($"{Guid.NewGuid()}", $"{Guid.NewGuid()}"); + + loadedItem.Should().BeNull(); + } + + [TestMethod] + public async Task UpdateItem() + { + var item = new TestItem + { + Id = Guid.NewGuid(), + Name = "Item1", + Properties = new Dictionary + { + { Guid.NewGuid().ToString(), new TestNestedItemClass() }, + { Guid.NewGuid().ToString(), new TestNestedItemClass() } + } + }; + + await _eventStore.UpsertItem($"{item.Id}{item.Name}", $"{item.Id}{item.Name}", item); + + string propertyGuid = Guid.NewGuid().ToString(); + + item.Properties = new() + { + { propertyGuid, new TestNestedItemClass() } + }; + + Func action = async () => await _eventStore.UpsertItem($"{item.Id}{item.Name}", $"{item.Id}{item.Name}", item); + await action.Should().NotThrowAsync(); + + var updatedItem = await _eventStore.LoadItem($"{item.Id}{item.Name}", $"{item.Id}{item.Name}"); + updatedItem.Properties.ContainsKey(propertyGuid).Should().BeTrue(); + } +} diff --git a/Implementations/CloudFabric.EventSourcing.EventStore.CosmosDb/CosmosDbEventStore.cs b/Implementations/CloudFabric.EventSourcing.EventStore.CosmosDb/CosmosDbEventStore.cs index f1779fb..4e4a389 100755 --- a/Implementations/CloudFabric.EventSourcing.EventStore.CosmosDb/CosmosDbEventStore.cs +++ b/Implementations/CloudFabric.EventSourcing.EventStore.CosmosDb/CosmosDbEventStore.cs @@ -7,30 +7,35 @@ namespace CloudFabric.EventSourcing.EventStore.CosmosDb; public class CosmosDbEventStore : IEventStore { private readonly CosmosClient _client; - private readonly string _containerId; + private readonly string _eventsContainerId; + private readonly string _itemsContainerId; private readonly string _databaseId; public CosmosDbEventStore( string connectionString, CosmosClientOptions cosmosClientOptions, string databaseId, - string containerId + string eventsContainerId, + string itemsContainerId ) { _client = new CosmosClient(connectionString, cosmosClientOptions); _databaseId = databaseId; - _containerId = containerId; + _eventsContainerId = eventsContainerId; + _itemsContainerId = itemsContainerId; } public CosmosDbEventStore( CosmosClient client, string databaseId, - string containerId + string eventsContainerId, + string itemsContainerId ) { _client = client; _databaseId = databaseId; - _containerId = containerId; + _eventsContainerId = eventsContainerId; + _itemsContainerId = itemsContainerId; } public Task Initialize(CancellationToken cancellationToken = default) @@ -40,15 +45,40 @@ public Task Initialize(CancellationToken cancellationToken = default) public async Task DeleteAll(CancellationToken cancellationToken = default) { - var container = _client.GetContainer(_databaseId, _containerId); - await container.DeleteContainerAsync(cancellationToken: cancellationToken); + var eventsContainer = _client.GetContainer(_databaseId, _eventsContainerId); + + try + { + await eventsContainer.DeleteContainerAsync(cancellationToken: cancellationToken); + } + catch (CosmosException ex) + { + if (ex.StatusCode != System.Net.HttpStatusCode.NotFound) + { + throw; + } + } + + var itemsContainer = _client.GetContainer(_databaseId, _itemsContainerId); + + try + { + await itemsContainer.DeleteContainerAsync(cancellationToken: cancellationToken); + } + catch (CosmosException ex) + { + if (ex.StatusCode != System.Net.HttpStatusCode.NotFound) + { + throw; + } + } } public async Task HardDeleteAsync(Guid streamId, string partitionKey, CancellationToken cancellationToken = default) { - var container = _client.GetContainer(_databaseId, _containerId); + var container = _client.GetContainer(_databaseId, _eventsContainerId); - var sqlQueryText = $"SELECT * FROM {_containerId} e" + " WHERE e.stream.id = @streamId"; + var sqlQueryText = $"SELECT * FROM {_eventsContainerId} e" + " WHERE e.stream.id = @streamId"; QueryDefinition queryDefinition = new QueryDefinition(sqlQueryText) .WithParameter("@streamId", streamId); @@ -108,9 +138,9 @@ await Parallel.ForEachAsync(batches, async (batch, cancellationToken) => public async Task LoadStreamAsyncOrThrowNotFound(Guid streamId, string partitionKey, CancellationToken cancellationToken = default) { - Container container = _client.GetContainer(_databaseId, _containerId); + Container container = _client.GetContainer(_databaseId, _eventsContainerId); - var sqlQueryText = $"SELECT * FROM {_containerId} e" + " WHERE e.stream.id = @streamId " + " ORDER BY e.stream.version"; + var sqlQueryText = $"SELECT * FROM {_eventsContainerId} e" + " WHERE e.stream.id = @streamId " + " ORDER BY e.stream.version"; QueryDefinition queryDefinition = new QueryDefinition(sqlQueryText) .WithParameter("@streamId", streamId); @@ -143,9 +173,9 @@ public async Task LoadStreamAsyncOrThrowNotFound(Guid streamId, str public async Task LoadStreamAsync(Guid streamId, string partitionKey, CancellationToken cancellationToken = default) { - Container container = _client.GetContainer(_databaseId, _containerId); + Container container = _client.GetContainer(_databaseId, _eventsContainerId); - var sqlQueryText = $"SELECT * FROM {_containerId} e" + " WHERE e.stream.id = @streamId " + " ORDER BY e.stream.version"; + var sqlQueryText = $"SELECT * FROM {_eventsContainerId} e" + " WHERE e.stream.id = @streamId " + " ORDER BY e.stream.version"; QueryDefinition queryDefinition = new QueryDefinition(sqlQueryText) .WithParameter("@streamId", streamId); @@ -173,9 +203,9 @@ public async Task LoadStreamAsync(Guid streamId, string partitionKe public async Task LoadStreamAsync(Guid streamId, string partitionKey, int fromVersion, CancellationToken cancellationToken = default) { - Container container = _client.GetContainer(_databaseId, _containerId); + Container container = _client.GetContainer(_databaseId, _eventsContainerId); - var sqlQueryText = $"SELECT * FROM {_containerId} e" + + var sqlQueryText = $"SELECT * FROM {_eventsContainerId} e" + " WHERE e.stream.id = @streamId AND e.stream.version >= @fromVersion" + " ORDER BY e.stream.version"; @@ -216,7 +246,7 @@ public async Task AppendToStreamAsync( throw new ArgumentException("Partition keys for all events in the stream must be the same"); } - Container container = _client.GetContainer(_databaseId, _containerId); + Container container = _client.GetContainer(_databaseId, _eventsContainerId); PartitionKey cosmosPartitionKey = new PartitionKey(events.First().PartitionKey); @@ -250,11 +280,70 @@ IEnumerable events Version = ++expectedVersion }, EventType = e.GetType().AssemblyQualifiedName, - EventData = JsonSerializer.SerializeToElement(e, e.GetType(), EventSerializerOptions.Options), - UserInfo = JsonSerializer.SerializeToElement(eventUserInfo, eventUserInfo.GetType(), EventSerializerOptions.Options) + EventData = JsonSerializer.SerializeToElement(e, e.GetType(), EventStoreSerializerOptions.Options), + UserInfo = JsonSerializer.SerializeToElement(eventUserInfo, eventUserInfo.GetType(), EventStoreSerializerOptions.Options) } ); - return JsonSerializer.Serialize(items, EventSerializerOptions.Options); + return JsonSerializer.Serialize(items, EventStoreSerializerOptions.Options); + } + + #region Item Functionality + + public async Task UpsertItem(string id, string partitionKey, T item, CancellationToken cancellationToken = default) + { + Container container = _client.GetContainer(_databaseId, _itemsContainerId); + + PartitionKey cosmosPartitionKey = new PartitionKey(partitionKey); + + var response = await container.UpsertItemAsync( + new ItemWrapper + { + Id = id, + PartitionKey = partitionKey, + ItemData = JsonSerializer.Serialize(item, EventStoreSerializerOptions.Options) + }, + cosmosPartitionKey, + null, + cancellationToken + ); + + if (response.StatusCode != System.Net.HttpStatusCode.OK && response.StatusCode != System.Net.HttpStatusCode.Created) + { + throw new Exception($"Cosmos Db returned status {response.StatusCode}."); + } } + + public async Task LoadItem(string id, string partitionKey, CancellationToken cancellationToken = default) + { + Container container = _client.GetContainer(_databaseId, _itemsContainerId); + + PartitionKey cosmosPartitionKey = new PartitionKey(partitionKey); + + var sqlQueryText = $"SELECT * FROM {_itemsContainerId} i" + " WHERE i.id = @id OFFSET 0 LIMIT 1"; + + QueryDefinition queryDefinition = new QueryDefinition(sqlQueryText) + .WithParameter("@id", id); + + FeedIterator feedIterator = container.GetItemQueryIterator( + queryDefinition, + requestOptions: new QueryRequestOptions { PartitionKey = cosmosPartitionKey } + ); + + while (feedIterator.HasMoreResults) + { + FeedResponse response = await feedIterator.ReadNextAsync(cancellationToken); + + if (response.Count == 0) + { + return default; + } + + return JsonSerializer.Deserialize(response.First().ItemData, EventStoreSerializerOptions.Options); + } + + return default; + } + + #endregion } diff --git a/Implementations/CloudFabric.EventSourcing.EventStore.InMemory/InMemoryEventStore.cs b/Implementations/CloudFabric.EventSourcing.EventStore.InMemory/InMemoryEventStore.cs index dfe531c..988166a 100755 --- a/Implementations/CloudFabric.EventSourcing.EventStore.InMemory/InMemoryEventStore.cs +++ b/Implementations/CloudFabric.EventSourcing.EventStore.InMemory/InMemoryEventStore.cs @@ -11,13 +11,16 @@ public class EventAddedEventArgs : EventArgs public class InMemoryEventStore : IEventStore { private readonly Dictionary<(Guid StreamId, string PartitionKey), List> _eventsContainer; + private readonly Dictionary<(string Id, string PartitionKey), string> _itemsContainer; private readonly List> _eventAddedEventHandlers = new(); public InMemoryEventStore( - Dictionary<(Guid StreamId, string PartitionKey), List> eventsContainer + Dictionary<(Guid StreamId, string PartitionKey), List> eventsContainer, + Dictionary<(string Id, string PartitionKey), string> itemsContainer ) { _eventsContainer = eventsContainer; + _itemsContainer = itemsContainer; } public Task Initialize(CancellationToken cancellationToken = default) @@ -38,6 +41,7 @@ public void UnsubscribeFromEventAdded(Func handler) public Task DeleteAll(CancellationToken cancellationToken = default) { _eventsContainer.Clear(); + _itemsContainer.Clear(); return Task.CompletedTask; } @@ -109,7 +113,7 @@ public async Task> LoadEventsAsync(string partitionKey, DateTime? d var events = _eventsContainer .Where(x => x.Key.PartitionKey == partitionKey) .SelectMany(x => x.Value) - .Select(x => JsonSerializer.Deserialize(x, EventSerializerOptions.Options).GetEvent()) + .Select(x => JsonSerializer.Deserialize(x, EventStoreSerializerOptions.Options).GetEvent()) .Where(x => !dateFrom.HasValue || x.Timestamp >= dateFrom) .OrderBy(x => x.Timestamp) .ToList(); @@ -150,7 +154,7 @@ public async Task AppendToStreamAsync( foreach (var wrapper in wrappers) { - stream.Add(JsonSerializer.Serialize(wrapper, EventSerializerOptions.Options)); + stream.Add(JsonSerializer.Serialize(wrapper, EventStoreSerializerOptions.Options)); } if (!_eventsContainer.ContainsKey((streamId, partitionKey))) @@ -184,7 +188,7 @@ private async Task> LoadOrderedEventWrappers(Guid streamId, s foreach (var data in eventData) { - var eventWrapper = JsonSerializer.Deserialize(data, EventSerializerOptions.Options); + var eventWrapper = JsonSerializer.Deserialize(data, EventStoreSerializerOptions.Options); eventWrappers.Add(eventWrapper); } @@ -202,7 +206,7 @@ private async Task> LoadOrderedEventWrappersFromVersion(Guid foreach (var data in eventData) { - var eventWrapper = JsonSerializer.Deserialize(data, EventSerializerOptions.Options); + var eventWrapper = JsonSerializer.Deserialize(data, EventStoreSerializerOptions.Options); if (eventWrapper.StreamInfo.Version >= version) { eventWrappers.Add(eventWrapper); @@ -227,8 +231,8 @@ private static List PrepareEvents( Id = Guid.NewGuid(), //:{e.GetType().Name}", StreamInfo = new StreamInfo { Id = streamId, Version = ++expectedVersion }, EventType = e.GetType().AssemblyQualifiedName, - EventData = JsonSerializer.SerializeToElement(e, e.GetType(), EventSerializerOptions.Options), - UserInfo = JsonSerializer.SerializeToElement(eventUserInfo, eventUserInfo.GetType(), EventSerializerOptions.Options) + EventData = JsonSerializer.SerializeToElement(e, e.GetType(), EventStoreSerializerOptions.Options), + UserInfo = JsonSerializer.SerializeToElement(eventUserInfo, eventUserInfo.GetType(), EventStoreSerializerOptions.Options) } ); @@ -253,4 +257,32 @@ private static List PrepareEvents( // } #endregion + + #region Item Functionality + + public async Task UpsertItem(string id, string partitionKey, T item, CancellationToken cancellationToken = default) + { + var serializedItem = JsonSerializer.Serialize(item, EventStoreSerializerOptions.Options); + + var itemNotExists = _itemsContainer.TryAdd((id, partitionKey), serializedItem); + + if (!itemNotExists) + { + _itemsContainer[(id, partitionKey)] = serializedItem; + } + } + + public async Task LoadItem(string id, string partitionKey, CancellationToken cancellationToken = default) + { + if (_itemsContainer.TryGetValue((id, partitionKey), out string? value)) + { + return value != null + ? JsonSerializer.Deserialize(value, EventStoreSerializerOptions.Options) + : default; + } + + return default; + } + + #endregion } \ No newline at end of file diff --git a/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStore.cs b/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStore.cs index 7813c2b..5bc86a2 100755 --- a/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStore.cs +++ b/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStore.cs @@ -10,12 +10,14 @@ public class PostgresqlEventStore : IEventStore private const int EVENTSTORE_TABLE_SCHEMA_VERSION = 1; private readonly string _connectionString; private readonly List> _eventAddedEventHandlers = new(); - private readonly string _tableName; + private readonly string _eventsTableName; + private readonly string _itemsTableName; - public PostgresqlEventStore(string connectionString, string tableName) + public PostgresqlEventStore(string connectionString, string eventsTableName, string itemsTableName) { _connectionString = connectionString; - _tableName = tableName; + _eventsTableName = eventsTableName; + _itemsTableName = itemsTableName; } public async Task Initialize(CancellationToken cancellationToken = default) @@ -28,9 +30,34 @@ public async Task DeleteAll(CancellationToken cancellationToken = default) await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(); - await using var cmd = new NpgsqlCommand($"DELETE FROM \"{_tableName}\"", conn); + await using var eventsTableCmd = new NpgsqlCommand($"DELETE FROM \"{_eventsTableName}\"", conn); + + try + { + await eventsTableCmd.ExecuteScalarAsync(cancellationToken); + } + catch (NpgsqlException ex) + { + if (ex.SqlState != PostgresErrorCodes.UndefinedTable) + { + throw; + } + } + + await using var itemsTableCmd = new NpgsqlCommand($"DELETE FROM \"{_itemsTableName}\"", conn); + + try + { + await itemsTableCmd.ExecuteScalarAsync(cancellationToken); + } + catch (NpgsqlException ex) + { + if (ex.SqlState != PostgresErrorCodes.UndefinedTable) + { + throw; + } + } - await cmd.ExecuteScalarAsync(cancellationToken); } public async Task HardDeleteAsync(Guid streamId, string partitionKey, CancellationToken cancellationToken = default) { @@ -40,7 +67,7 @@ public async Task HardDeleteAsync(Guid streamId, string partitionKey, Canc await using var transaction = await conn.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken); await using var cmd = new NpgsqlCommand( - $"DELETE FROM \"{_tableName}\"" + + $"DELETE FROM \"{_eventsTableName}\"" + $"WHERE stream_id = @streamId AND event_data->>'partitionKey' = @partitionKey", conn, transaction) @@ -92,7 +119,7 @@ public async Task LoadStreamAsync(Guid streamId, string partitionKe await using var cmd = new NpgsqlCommand( $"SELECT id, stream_id, stream_version, event_type, event_data, user_info " + - $"FROM \"{_tableName}\" " + + $"FROM \"{_eventsTableName}\" " + $"WHERE stream_id = @streamId AND event_data->>'partitionKey' = @partitionKey ORDER BY stream_version ASC", conn) { Parameters = @@ -151,7 +178,7 @@ public async Task LoadStreamAsync(Guid streamId, string partitionKe await using var cmd = new NpgsqlCommand( $"SELECT id, stream_id, stream_version, event_type, event_data, user_info, eventstore_schema_version " + - $"FROM \"{_tableName}\" WHERE stream_id = @streamId AND event_data->>'partitionKey' = @partitionKey AND stream_version >= @fromVersion", conn) + $"FROM \"{_eventsTableName}\" WHERE stream_id = @streamId AND event_data->>'partitionKey' = @partitionKey AND stream_version >= @fromVersion", conn) { Parameters = { @@ -167,7 +194,7 @@ public async Task LoadStreamAsync(Guid streamId, string partitionKe while (await reader.ReadAsync(cancellationToken)) { - var streamInfo = JsonSerializer.Deserialize(reader.GetString(1), EventSerializerOptions.Options); + var streamInfo = JsonSerializer.Deserialize(reader.GetString(1), EventStoreSerializerOptions.Options); if(streamInfo == null) { throw new Exception("Failed to deserialize stream info"); @@ -204,7 +231,7 @@ public async Task> LoadEventsAsync(string partitionKey, DateTime? d await using var cmd = new NpgsqlCommand( $"SELECT id, event_type, event_data " + - $"FROM \"{_tableName}\" " + + $"FROM \"{_eventsTableName}\" " + (!string.IsNullOrEmpty(whereClause) ? $"WHERE {whereClause} " : "") + @@ -255,7 +282,7 @@ public async Task AppendToStreamAsync(EventUserInfo eventUserInfo, Guid st await using var transaction = await conn.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken); await using var cmd = new NpgsqlCommand( - $"SELECT MAX(stream_version) FROM \"{_tableName}\" WHERE stream_id = @streamId", conn, transaction) + $"SELECT MAX(stream_version) FROM \"{_eventsTableName}\" WHERE stream_id = @streamId", conn, transaction) { Parameters = { @@ -293,7 +320,7 @@ public async Task AppendToStreamAsync(EventUserInfo eventUserInfo, Guid st foreach (var evt in events) { batchInsert.BatchCommands.Add(new NpgsqlBatchCommand($"" + - $"INSERT INTO \"{_tableName}\" " + + $"INSERT INTO \"{_eventsTableName}\" " + $"(id, stream_id, stream_version, event_type, event_data, user_info, eventstore_schema_version) " + $"VALUES " + $"(@id, @stream_id, @stream_version, @event_type, @event_data, @user_info, @eventstore_schema_version)") @@ -307,13 +334,13 @@ public async Task AppendToStreamAsync(EventUserInfo eventUserInfo, Guid st new NpgsqlParameter() { ParameterName = "event_data", - Value = JsonSerializer.Serialize(evt, evt.GetType(), EventSerializerOptions.Options), + Value = JsonSerializer.Serialize(evt, evt.GetType(), EventStoreSerializerOptions.Options), DataTypeName = "jsonb" }, new NpgsqlParameter() { ParameterName = "user_info", - Value = JsonSerializer.Serialize(eventUserInfo, eventUserInfo.GetType(), EventSerializerOptions.Options), + Value = JsonSerializer.Serialize(eventUserInfo, eventUserInfo.GetType(), EventStoreSerializerOptions.Options), DataTypeName = "jsonb" }, new NpgsqlParameter("eventstore_schema_version", EVENTSTORE_TABLE_SCHEMA_VERSION) @@ -352,20 +379,20 @@ private async Task EnsureTableExistsAsync(CancellationToken cancellationToken = await conn.OpenAsync(cancellationToken); await using var cmd = new NpgsqlCommand( - $"SELECT 1 FROM \"{_tableName}\"", conn) + $"SELECT 1 FROM \"{_eventsTableName}\"", conn) { }; try { - var tableExists = await cmd.ExecuteScalarAsync(cancellationToken); + await cmd.ExecuteScalarAsync(cancellationToken); } catch (NpgsqlException ex) { if (ex.SqlState == PostgresErrorCodes.UndefinedTable) { await using var createTableCommand = new NpgsqlCommand( - $"CREATE TABLE \"{_tableName}\" (" + + $"CREATE TABLE \"{_eventsTableName}\" (" + $"id uuid, " + $"stream_id uuid, " + $"stream_version integer, " + @@ -374,14 +401,42 @@ private async Task EnsureTableExistsAsync(CancellationToken cancellationToken = $"user_info jsonb, " + $"eventstore_schema_version int NOT NULL" + $");" + - $"CREATE INDEX \"{_tableName}_stream_id_idx\" ON \"{_tableName}\" (stream_id);" + - $"CREATE INDEX \"{_tableName}_stream_id_with_partition_key_idx\" ON \"{_tableName}\" (stream_id, ((event_data ->> 'partitionKey')::varchar(256)));" + $"CREATE INDEX \"{_eventsTableName}_stream_id_idx\" ON \"{_eventsTableName}\" (stream_id);" + + $"CREATE INDEX \"{_eventsTableName}_stream_id_with_partition_key_idx\" ON \"{_eventsTableName}\" (stream_id, ((event_data ->> 'partitionKey')::varchar(256)));" , conn); await createTableCommand.ExecuteNonQueryAsync(cancellationToken); } } + + await using var cmdItemTable = new NpgsqlCommand( + $"SELECT 1 FROM \"{_itemsTableName}\"", conn) + { + }; + + try + { + await cmdItemTable.ExecuteScalarAsync(cancellationToken); + } + catch (NpgsqlException ex) + { + if (ex.SqlState == PostgresErrorCodes.UndefinedTable) + { + await using var createTableCommand = new NpgsqlCommand( + $"CREATE TABLE \"{_itemsTableName}\" (" + + $"id varchar(100) UNIQUE NOT NULL, " + + $"partition_key varchar(100) NOT NULL, " + + $"data jsonb" + + $");" + + $"CREATE INDEX \"{_itemsTableName}_id_idx\" ON \"{_itemsTableName}\" (id);" + + $"CREATE INDEX \"{_itemsTableName}_id_with_partition_key_idx\" ON \"{_itemsTableName}\" (id, partition_key);" + + , conn); + + await createTableCommand.ExecuteNonQueryAsync(cancellationToken); + } + } } #region Snapshot Functionality @@ -402,4 +457,103 @@ private async Task EnsureTableExistsAsync(CancellationToken cancellationToken = // } #endregion + + #region Item Functionality + + public async Task UpsertItem(string id, string partitionKey, T item, CancellationToken cancellationToken = default) + { + await using var conn = new NpgsqlConnection(_connectionString); + await conn.OpenAsync(cancellationToken); + + await using var cmd = new NpgsqlCommand( + $"INSERT INTO \"{_itemsTableName}\" " + + $"(id, partition_key, data) " + + $"VALUES" + + $"(@id, @partition_key, @data)" + + $"ON CONFLICT (id) " + + $"DO UPDATE " + + $"SET data = @data, partition_key = @partition_key; " + , conn + ) + { + Parameters = + { + new("id", id), + new("partition_key", partitionKey), + new NpgsqlParameter() + { + ParameterName = "data", + Value = JsonSerializer.Serialize(item, EventStoreSerializerOptions.Options), + DataTypeName = "jsonb" + } + } + }; + + try + { + int insertItemResult = await cmd.ExecuteNonQueryAsync(cancellationToken); + + if (insertItemResult == -1) + { + throw new Exception("Upsert item failed."); + } + } + catch (NpgsqlException ex) + { + if (ex.SqlState == PostgresErrorCodes.UndefinedTable) + { + throw new Exception( + "EventStore table not found, please make sure to call Initialize() on event store first.", + ex); + } + + throw; + } + } + + public async Task LoadItem(string id, string partitionKey, CancellationToken cancellationToken = default) + { + await using var conn = new NpgsqlConnection(_connectionString); + await conn.OpenAsync(cancellationToken); + + await using var cmd = new NpgsqlCommand( + $"SELECT * FROM \"{_itemsTableName}\" " + + $"WHERE id = @id AND partition_key = @partition_key LIMIT 1; " + , conn) + { + Parameters = + { + new("id", id), + new("partition_key", partitionKey) + } + }; + + try + { + await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); + + if (await reader.ReadAsync(cancellationToken)) + { + var item = JsonDocument.Parse(reader.GetString("data")).RootElement; + + return JsonSerializer.Deserialize(item, EventStoreSerializerOptions.Options); + } + + return default; + } + + catch (NpgsqlException ex) + { + if (ex.SqlState == PostgresErrorCodes.UndefinedTable) + { + throw new Exception( + "EventStore table not found, please make sure to call Initialize() on event store first.", + ex); + } + + throw; + } + } + + #endregion } \ No newline at end of file diff --git a/Implementations/CloudFabric.EventSourcing.Tests.CosmosDb/ItemTestsCosmosDb.cs b/Implementations/CloudFabric.EventSourcing.Tests.CosmosDb/ItemTestsCosmosDb.cs new file mode 100644 index 0000000..041c57e --- /dev/null +++ b/Implementations/CloudFabric.EventSourcing.Tests.CosmosDb/ItemTestsCosmosDb.cs @@ -0,0 +1,102 @@ +using System.Text.Json; +using System.Text.Json.Serialization; +using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.EventStore.CosmosDb; +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.Logging; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace CloudFabric.EventSourcing.Tests.CosmosDb; + +[TestClass] +public class ItemTestsCosmosDb : ItemTests +{ + private const string DatabaseName = "TestDatabase"; + private const string EventContainerName = "TestEventContainer"; + private const string ItemContainerName = "TestItemContainer"; + + private const string CosmosDbConnectionString = + "AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="; + + CosmosClient _cosmosClient = null; + CosmosClientOptions _cosmosClientOptions; + + private IEventStore? _eventStore = null; + private ILogger _logger; + + public async Task SetUp() + { + var loggerFactory = new LoggerFactory(); + _logger = loggerFactory.CreateLogger(); + + JsonSerializerOptions jsonSerializerOptions = new JsonSerializerOptions() + { + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull + }; + CosmosDbSystemTextJsonSerializer cosmosSystemTextJsonSerializer + = new CosmosDbSystemTextJsonSerializer(jsonSerializerOptions); + + _cosmosClientOptions = new CosmosClientOptions() + { + Serializer = cosmosSystemTextJsonSerializer, + HttpClientFactory = () => + { + HttpMessageHandler httpMessageHandler = new HttpClientHandler() + { + ServerCertificateCustomValidationCallback = + HttpClientHandler.DangerousAcceptAnyServerCertificateValidator + }; + + return new HttpClient(httpMessageHandler); + }, + ConnectionMode = ConnectionMode.Gateway + }; + + _cosmosClient = new CosmosClient( + CosmosDbConnectionString, + _cosmosClientOptions + ); + + var database = await ReCreateDatabase(_cosmosClient, DatabaseName); + await database.CreateContainerIfNotExistsAsync(new ContainerProperties(ItemContainerName, "/partition_key")); + + ContainerResponse itemContainerResponce = + await _cosmosClient.GetContainer(DatabaseName, ItemContainerName).ReadContainerAsync(); + // Set the indexing mode to consistent + itemContainerResponce.Resource.IndexingPolicy.IndexingMode = IndexingMode.Consistent; + // Add an included path + itemContainerResponce.Resource.IndexingPolicy.IncludedPaths.Add(new IncludedPath { Path = "/id" }); + // Add an excluded path + itemContainerResponce.Resource.IndexingPolicy.ExcludedPaths.Add(new ExcludedPath { Path = "/data" }); + } + + private async Task ReCreateDatabase(CosmosClient cosmosClient, string databaseName) + { + await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName); + var database = cosmosClient.GetDatabase(databaseName); + await database.DeleteAsync(); + await cosmosClient.CreateDatabaseIfNotExistsAsync( + databaseName, + ThroughputProperties.CreateManualThroughput(400) + ); + return cosmosClient.GetDatabase(databaseName); + } + + protected override async Task GetEventStore() + { + if (_eventStore == null) + { + await SetUp(); + + _eventStore = new CosmosDbEventStore( + _cosmosClient, + DatabaseName, + EventContainerName, + ItemContainerName + ); + await _eventStore.Initialize(); + } + + return _eventStore; + } +} diff --git a/Implementations/CloudFabric.EventSourcing.Tests.CosmosDb/OrderTestsCosmosDb.cs b/Implementations/CloudFabric.EventSourcing.Tests.CosmosDb/OrderTestsCosmosDb.cs index 51cb17c..18e632f 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.CosmosDb/OrderTestsCosmosDb.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.CosmosDb/OrderTestsCosmosDb.cs @@ -124,7 +124,8 @@ protected override async Task GetEventStore() _eventStore = new CosmosDbEventStore( _cosmosClient, DatabaseName, - ContainerName + ContainerName, + "NotUsedItemContainer" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/DynamicProjectionsOrderTestsElasticSearch.cs b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/DynamicProjectionsOrderTestsElasticSearch.cs index 76f67ff..33238c5 100644 --- a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/DynamicProjectionsOrderTestsElasticSearch.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/DynamicProjectionsOrderTestsElasticSearch.cs @@ -20,7 +20,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( "Host=localhost;Username=cloudfabric_eventsourcing_test;Password=cloudfabric_eventsourcing_test;Database=cloudfabric_eventsourcing_test;Maximum Pool Size=1000", - "orders_events" + "orders_events", + "stored_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderStringComparisonTestsElasticSearch.cs b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderStringComparisonTestsElasticSearch.cs index a90376c..13f9056 100644 --- a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderStringComparisonTestsElasticSearch.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderStringComparisonTestsElasticSearch.cs @@ -23,7 +23,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( "Host=localhost;Username=cloudfabric_eventsourcing_test;Password=cloudfabric_eventsourcing_test;Database=cloudfabric_eventsourcing_test;Maximum Pool Size=1000", - "orders_events" + "orders_events", + "orders_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderTestsElasticSearch.cs b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderTestsElasticSearch.cs index 312ff1f..95ecfc7 100644 --- a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderTestsElasticSearch.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderTestsElasticSearch.cs @@ -24,7 +24,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( "Host=localhost;Username=cloudfabric_eventsourcing_test;Password=cloudfabric_eventsourcing_test;Database=cloudfabric_eventsourcing_test;Maximum Pool Size=1000", - "orders_events" + "orders_events", + "stored_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/QueryStringTestsElasticSearch.cs b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/QueryStringTestsElasticSearch.cs index 1481cc0..86cedb2 100644 --- a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/QueryStringTestsElasticSearch.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/QueryStringTestsElasticSearch.cs @@ -23,7 +23,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( "Host=localhost;Username=cloudfabric_eventsourcing_test;Password=cloudfabric_eventsourcing_test;Database=cloudfabric_eventsourcing_test;Maximum Pool Size=1000", - "orders_events" + "orders_events", + "stored_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/DynamicProjectionsOrderTestsInMemory.cs b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/DynamicProjectionsOrderTestsInMemory.cs index 28ac6ee..655e661 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/DynamicProjectionsOrderTestsInMemory.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/DynamicProjectionsOrderTestsInMemory.cs @@ -1,4 +1,4 @@ -using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.EventStore; using CloudFabric.EventSourcing.EventStore.InMemory; using CloudFabric.Projections; using CloudFabric.Projections.InMemory; @@ -17,7 +17,10 @@ protected override async Task GetEventStore() { if (_eventStore == null) { - _eventStore = new InMemoryEventStore(new Dictionary<(Guid, string), List>()); + _eventStore = new InMemoryEventStore( + new Dictionary<(Guid, string), List>(), + new Dictionary<(string, string), string>() + ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/ItemTestInMemory.cs b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/ItemTestInMemory.cs new file mode 100644 index 0000000..ec731fc --- /dev/null +++ b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/ItemTestInMemory.cs @@ -0,0 +1,25 @@ +using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.EventStore.InMemory; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace CloudFabric.EventSourcing.Tests.InMemory; + +[TestClass] +public class ItemTestInMemory : ItemTests +{ + private InMemoryEventStore? _eventStore = null; + + protected override async Task GetEventStore() + { + if (_eventStore == null) + { + _eventStore = new InMemoryEventStore( + new Dictionary<(Guid, string), List>(), + new Dictionary<(string, string), string>() + ); + await _eventStore.Initialize(); + } + + return _eventStore; + } +} diff --git a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderStringComparisonTestsInMemory.cs b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderStringComparisonTestsInMemory.cs index 3b90340..071abc3 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderStringComparisonTestsInMemory.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderStringComparisonTestsInMemory.cs @@ -1,4 +1,4 @@ -using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.EventStore; using CloudFabric.EventSourcing.EventStore.InMemory; using CloudFabric.Projections; using CloudFabric.Projections.InMemory; @@ -17,7 +17,10 @@ protected override async Task GetEventStore() { if (_eventStore == null) { - _eventStore = new InMemoryEventStore(new Dictionary<(Guid, string), List>()); + _eventStore = new InMemoryEventStore( + new Dictionary<(Guid, string), List>(), + new Dictionary<(string, string), string>() + ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderTestsInMemory.cs b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderTestsInMemory.cs index 2cf9a13..7913d83 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderTestsInMemory.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderTestsInMemory.cs @@ -17,7 +17,10 @@ protected override async Task GetEventStore() { if (_eventStore == null) { - _eventStore = new InMemoryEventStore(new Dictionary<(Guid, string), List>()); + _eventStore = new InMemoryEventStore( + new Dictionary<(Guid, string), List>(), + new Dictionary<(string, string), string>() + ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/QueryStringTestsInMemory.cs b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/QueryStringTestsInMemory.cs index 3976438..9af7ecc 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/QueryStringTestsInMemory.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/QueryStringTestsInMemory.cs @@ -1,4 +1,4 @@ -using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.EventStore; using CloudFabric.EventSourcing.EventStore.InMemory; using CloudFabric.Projections; using CloudFabric.Projections.InMemory; @@ -17,7 +17,10 @@ protected override async Task GetEventStore() { if (_eventStore == null) { - _eventStore = new InMemoryEventStore(new Dictionary<(Guid, string), List>()); + _eventStore = new InMemoryEventStore( + new Dictionary<(Guid, string), List>(), + new Dictionary<(string, string), string>() + ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/DynamicProjectionsOrderTestsPostgresql.cs b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/DynamicProjectionsOrderTestsPostgresql.cs index 3510a9d..801c2ee 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/DynamicProjectionsOrderTestsPostgresql.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/DynamicProjectionsOrderTestsPostgresql.cs @@ -1,4 +1,4 @@ -using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.EventStore; using CloudFabric.EventSourcing.EventStore.Postgresql; using CloudFabric.Projections; using CloudFabric.Projections.Postgresql; @@ -19,7 +19,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( TestsConnectionStrings.CONNECTION_STRING, - "orders_events" + "orders_events", + "stored_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/ItemtestsPostgresql.cs b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/ItemtestsPostgresql.cs new file mode 100644 index 0000000..9697e1a --- /dev/null +++ b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/ItemtestsPostgresql.cs @@ -0,0 +1,27 @@ +using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.EventStore.Postgresql; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace CloudFabric.EventSourcing.Tests.Postgresql; + +[TestClass] +public class ItemtestsPostgresql : ItemTests +{ + private PostgresqlEventStore? _eventStore; + + protected override async Task GetEventStore() + { + if (_eventStore == null) + { + _eventStore = new PostgresqlEventStore( + TestsConnectionStrings.CONNECTION_STRING, + "orders_events", + "stored_items" + ); + await _eventStore.Initialize(); + } + + return _eventStore; + } + +} diff --git a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderStringComparisonTestsPostgresql.cs b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderStringComparisonTestsPostgresql.cs index 3dbe12a..c89411f 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderStringComparisonTestsPostgresql.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderStringComparisonTestsPostgresql.cs @@ -19,7 +19,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( TestsConnectionStrings.CONNECTION_STRING, - "orders_events" + "orders_events", + "stored_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderTestsPostgresql.cs b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderTestsPostgresql.cs index 1ea9983..2192b93 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderTestsPostgresql.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderTestsPostgresql.cs @@ -20,7 +20,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( TestsConnectionStrings.CONNECTION_STRING, - "orders_events" + "orders_events", + "stored_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/QueryStringTestsPostgresql.cs b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/QueryStringTestsPostgresql.cs index 691c4e8..1159f1e 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/QueryStringTestsPostgresql.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/QueryStringTestsPostgresql.cs @@ -19,7 +19,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( TestsConnectionStrings.CONNECTION_STRING, - "orders_events" + "orders_events", + "stored_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.Projections.CosmosDb/CosmosDbProjectionRepository.cs b/Implementations/CloudFabric.Projections.CosmosDb/CosmosDbProjectionRepository.cs index 718157c..094d8f8 100755 --- a/Implementations/CloudFabric.Projections.CosmosDb/CosmosDbProjectionRepository.cs +++ b/Implementations/CloudFabric.Projections.CosmosDb/CosmosDbProjectionRepository.cs @@ -131,7 +131,7 @@ string containerId _containerId = containerId; } - public Task EnsureIndex(CancellationToken cancellationToken = default) + public async Task EnsureIndex(CancellationToken cancellationToken = default) { throw new NotImplementedException(); }