diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 07c092a..091ebe6 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -7,7 +7,7 @@ on: branches: [ "main" ] env: - PACKAGE_VERSION: 0.1.0 + PACKAGE_VERSION: 0.1.4 jobs: diff --git a/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.CosmosDb/Extensions/ServiceCollectionExtensions.cs b/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.CosmosDb/Extensions/ServiceCollectionExtensions.cs index 5387798..39f708d 100644 --- a/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.CosmosDb/Extensions/ServiceCollectionExtensions.cs +++ b/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.CosmosDb/Extensions/ServiceCollectionExtensions.cs @@ -15,7 +15,8 @@ public static IEventSourcingBuilder AddCosmosDbEventStore( string connectionString, CosmosClientOptions cosmosClientOptions, string databaseId, - string containerId, + string eventsContainerId, + string itemsContainerId, CosmosClient leaseClient, string leaseDatabaseId, string leaseContainerId, @@ -28,13 +29,13 @@ string processorName var cosmosClient = new CosmosClient(connectionString, cosmosClientOptions); - var eventStore = new CosmosDbEventStore(cosmosClient, databaseId, containerId); + var eventStore = new CosmosDbEventStore(cosmosClient, databaseId, eventsContainerId, itemsContainerId); eventStore.Initialize().Wait(); var eventStoreObserver = new CosmosDbEventStoreChangeFeedObserver( cosmosClient, databaseId, - containerId, + eventsContainerId, leaseClient, leaseDatabaseId, leaseContainerId, @@ -55,10 +56,11 @@ public static IEventSourcingBuilder AddCosmosDbEventStore( this IServiceCollection services, CosmosClient client, string databaseId, - string containerId + string eventsContainerId, + string itemsContainerId ) { - var eventStore = new CosmosDbEventStore(client, databaseId, containerId); + var eventStore = new CosmosDbEventStore(client, databaseId, eventsContainerId, itemsContainerId); eventStore.Initialize().Wait(); return new EventSourcingBuilder diff --git a/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.InMemory/Extensions/ServiceCollectionExtensions.cs b/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.InMemory/Extensions/ServiceCollectionExtensions.cs index a14934b..2eca959 100644 --- a/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.InMemory/Extensions/ServiceCollectionExtensions.cs +++ b/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.InMemory/Extensions/ServiceCollectionExtensions.cs @@ -1,4 +1,6 @@ using CloudFabric.EventSourcing.EventStore; +using System.Runtime.CompilerServices; +using CloudFabric.EventSourcing.Domain; using CloudFabric.EventSourcing.EventStore.InMemory; using CloudFabric.Projections; using CloudFabric.Projections.InMemory; @@ -12,7 +14,8 @@ public static class ServiceCollectionExtensions { public static IEventSourcingBuilder AddInMemoryEventStore( this IServiceCollection services, - Dictionary<(Guid, string), List> eventsContainer + Dictionary<(Guid, string), List> eventsContainer, + Dictionary<(string, string), string> itemsContainer ) { var builder = new EventSourcingBuilder @@ -23,7 +26,7 @@ public static IEventSourcingBuilder AddInMemoryEventStore( services.AddScoped( (sp) => { - var eventStore = new InMemoryEventStore(eventsContainer); + var eventStore = new InMemoryEventStore(eventsContainer, itemsContainer); eventStore.Initialize().Wait(); // add events observer for projections @@ -58,6 +61,14 @@ public static IEventSourcingBuilder AddInMemoryEventStore( return builder; } + public static IEventSourcingBuilder AddInMemoryEventStore(this IServiceCollection services) + { + return services.AddInMemoryEventStore( + new Dictionary<(Guid, string), List>(), + new Dictionary<(string, string), string>() + ); + } + public static IEventSourcingBuilder AddRepository(this IEventSourcingBuilder builder) where TRepo : class { diff --git a/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.Postgresql/Extensions/ServiceCollectionExtensions.cs b/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.Postgresql/Extensions/ServiceCollectionExtensions.cs index 93c7305..4886bf3 100644 --- a/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.Postgresql/Extensions/ServiceCollectionExtensions.cs +++ b/CloudFabric.EventSourcing.AspNet/CloudFabric.EventSourcing.AspNet.Postgresql/Extensions/ServiceCollectionExtensions.cs @@ -1,4 +1,6 @@ using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.Domain; +using CloudFabric.EventSourcing.EventStore.Enums; using CloudFabric.EventSourcing.EventStore.Postgresql; using CloudFabric.Projections; using CloudFabric.Projections.Postgresql; @@ -20,10 +22,13 @@ public static class ServiceCollectionExtensions public static IEventSourcingBuilder AddPostgresqlEventStore( this IServiceCollection services, string eventsConnectionString, - string tableName + string eventsTableName, + string itemsTableName ) { - services.AddPostgresqlEventStore((sp) => new PostgresqlEventStoreStaticConnectionInformationProvider(eventsConnectionString, tableName)); + services.AddPostgresqlEventStore((sp) => + new PostgresqlEventStoreStaticConnectionInformationProvider(eventsConnectionString, eventsTableName, itemsTableName) + ); return new EventSourcingBuilder { @@ -103,6 +108,22 @@ Func conne return builder; } + /// + /// This extension overload initialize event store with default item event store table name. + /// + public static IEventSourcingBuilder AddPostgresqlEventStore( + this IServiceCollection services, + string eventsConnectionString, + string eventsTableName + ) + { + return services.AddPostgresqlEventStore( + eventsConnectionString, + eventsTableName, + string.Concat(eventsTableName, ItemsEventStoreNameSuffix.TableNameSuffix) + ); + } + public static IEventSourcingBuilder AddRepository(this IEventSourcingBuilder builder) where TRepo : class { @@ -152,7 +173,9 @@ public static IEventSourcingBuilder AddProjectionsRebuildProcessor(this IEventSo { var connectionInformationProvider = sp.GetRequiredService(); var connectionInformation = connectionInformationProvider.GetConnectionInformation(connectionId); - var eventStore = new PostgresqlEventStore(connectionInformation.ConnectionString, connectionInformation.TableName); + var eventStore = new PostgresqlEventStore( + connectionInformation.ConnectionString, connectionInformation.TableName, connectionInformation.ItemsTableName + ); var eventObserver = new PostgresqlEventStoreEventObserver( (PostgresqlEventStore)eventStore, diff --git a/CloudFabric.EventSourcing.EventStore/Enums/ItemsEventStoreNameSuffix.cs b/CloudFabric.EventSourcing.EventStore/Enums/ItemsEventStoreNameSuffix.cs new file mode 100644 index 0000000..3d88648 --- /dev/null +++ b/CloudFabric.EventSourcing.EventStore/Enums/ItemsEventStoreNameSuffix.cs @@ -0,0 +1,6 @@ +namespace CloudFabric.EventSourcing.EventStore.Enums; + +public static class ItemsEventStoreNameSuffix +{ + public const string TableNameSuffix = "-item"; +} diff --git a/CloudFabric.EventSourcing.EventStore/EventSerializerOptions.cs b/CloudFabric.EventSourcing.EventStore/EventStoreSerializerOptions.cs similarity index 84% rename from CloudFabric.EventSourcing.EventStore/EventSerializerOptions.cs rename to CloudFabric.EventSourcing.EventStore/EventStoreSerializerOptions.cs index 89fd0fa..db82753 100644 --- a/CloudFabric.EventSourcing.EventStore/EventSerializerOptions.cs +++ b/CloudFabric.EventSourcing.EventStore/EventStoreSerializerOptions.cs @@ -2,7 +2,7 @@ namespace CloudFabric.EventSourcing.EventStore; -public static class EventSerializerOptions +public static class EventStoreSerializerOptions { public static JsonSerializerOptions Options { diff --git a/CloudFabric.EventSourcing.EventStore/IEventStore.cs b/CloudFabric.EventSourcing.EventStore/IEventStore.cs index 7496942..0880916 100755 --- a/CloudFabric.EventSourcing.EventStore/IEventStore.cs +++ b/CloudFabric.EventSourcing.EventStore/IEventStore.cs @@ -32,4 +32,8 @@ Task AppendToStreamAsync( Task DeleteAll(CancellationToken cancellationToken = default); Task HardDeleteAsync(Guid streamId, string partitionKey, CancellationToken cancellationToken = default); + + Task UpsertItem(string id, string partitionKey, T item, CancellationToken cancellationToken = default); + + Task LoadItem(string id, string partitionKey, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/CloudFabric.EventSourcing.EventStore/Persistence/EventWrapper.cs b/CloudFabric.EventSourcing.EventStore/Persistence/EventWrapper.cs index f60f56a..cae49c5 100755 --- a/CloudFabric.EventSourcing.EventStore/Persistence/EventWrapper.cs +++ b/CloudFabric.EventSourcing.EventStore/Persistence/EventWrapper.cs @@ -36,7 +36,7 @@ public IEvent GetEvent() throw new Exception("Couldn't find event type. Make sure it's in correct namespace."); } - var e = (IEvent?)EventData.Deserialize(eventType, EventSerializerOptions.Options); + var e = (IEvent?)EventData.Deserialize(eventType, EventStoreSerializerOptions.Options); if (e == null) { diff --git a/CloudFabric.EventSourcing.EventStore/Persistence/ItemWrapper.cs b/CloudFabric.EventSourcing.EventStore/Persistence/ItemWrapper.cs new file mode 100644 index 0000000..60c38ab --- /dev/null +++ b/CloudFabric.EventSourcing.EventStore/Persistence/ItemWrapper.cs @@ -0,0 +1,15 @@ +using System.Text.Json.Serialization; + +namespace CloudFabric.EventSourcing.EventStore.Persistence; + +public class ItemWrapper +{ + [JsonPropertyName("id")] + public string? Id { get; set; } + + [JsonPropertyName("partition_key")] + public string? PartitionKey { get; set; } + + [JsonPropertyName("data")] + public string? ItemData { get; set; } +} diff --git a/CloudFabric.EventSourcing.Tests/Domain/TestItem.cs b/CloudFabric.EventSourcing.Tests/Domain/TestItem.cs new file mode 100644 index 0000000..1c9173a --- /dev/null +++ b/CloudFabric.EventSourcing.Tests/Domain/TestItem.cs @@ -0,0 +1,19 @@ +namespace CloudFabric.EventSourcing.Tests.Domain; + +public class TestItem +{ + public Guid Id { get; set; } + + public string Name { get; set; } + + public Dictionary Properties { get; set; } +} + +public class TestNestedItemClass +{ + public Guid Id { get; set; } = Guid.NewGuid(); + + public string Name { get; set; } = nameof(Name).ToLowerInvariant(); + + public DateTime date { get; } = DateTime.MinValue; +} \ No newline at end of file diff --git a/CloudFabric.EventSourcing.Tests/ItemTests.cs b/CloudFabric.EventSourcing.Tests/ItemTests.cs new file mode 100644 index 0000000..5d20dca --- /dev/null +++ b/CloudFabric.EventSourcing.Tests/ItemTests.cs @@ -0,0 +1,102 @@ +using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.Tests.Domain; +using FluentAssertions; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace CloudFabric.EventSourcing.Tests; +public abstract class ItemTests +{ + protected abstract Task GetEventStore(); + private IEventStore _eventStore; + + [TestInitialize] + public async Task Initialize() + { + _eventStore = await GetEventStore(); + } + + [TestCleanup] + public async Task Cleanup() + { + await _eventStore.DeleteAll(); + } + + [TestMethod] + public async Task SaveItem() + { + var item = new TestItem + { + Id = Guid.NewGuid(), + Name = "Item1", + Properties = new Dictionary + { + { Guid.NewGuid().ToString(), new TestNestedItemClass() }, + { Guid.NewGuid().ToString(), new TestNestedItemClass() } + } + }; + + Func action = async () => await _eventStore.UpsertItem($"{item.Id}{item.Name}", $"{item.Id}{item.Name}", item); + await action.Should().NotThrowAsync(); + } + + [TestMethod] + public async Task LoadItem() + { + var item = new TestItem + { + Id = Guid.NewGuid(), + Name = "Item1", + Properties = new Dictionary + { + { Guid.NewGuid().ToString(), new TestNestedItemClass() } + } + }; + + await _eventStore.UpsertItem($"{item.Id}{item.Name}", $"{item.Id}{item.Name}", item); + + var loadedItem = await _eventStore.LoadItem($"{item.Id}{item.Name}", $"{item.Id}{item.Name}"); + + loadedItem.Id.Should().Be(item.Id); + loadedItem.Name.Should().Be(item.Name); + loadedItem.Properties.Keys.Should().BeEquivalentTo(item.Properties.Keys); + loadedItem.Properties.Values.Should().BeEquivalentTo(item.Properties.Values); + } + + [TestMethod] + public async Task LoadItem_NullIfNotFound() + { + var loadedItem = await _eventStore.LoadItem($"{Guid.NewGuid()}", $"{Guid.NewGuid()}"); + + loadedItem.Should().BeNull(); + } + + [TestMethod] + public async Task UpdateItem() + { + var item = new TestItem + { + Id = Guid.NewGuid(), + Name = "Item1", + Properties = new Dictionary + { + { Guid.NewGuid().ToString(), new TestNestedItemClass() }, + { Guid.NewGuid().ToString(), new TestNestedItemClass() } + } + }; + + await _eventStore.UpsertItem($"{item.Id}{item.Name}", $"{item.Id}{item.Name}", item); + + string propertyGuid = Guid.NewGuid().ToString(); + + item.Properties = new() + { + { propertyGuid, new TestNestedItemClass() } + }; + + Func action = async () => await _eventStore.UpsertItem($"{item.Id}{item.Name}", $"{item.Id}{item.Name}", item); + await action.Should().NotThrowAsync(); + + var updatedItem = await _eventStore.LoadItem($"{item.Id}{item.Name}", $"{item.Id}{item.Name}"); + updatedItem.Properties.ContainsKey(propertyGuid).Should().BeTrue(); + } +} diff --git a/CloudFabric.Projections/Queries/FilterConnectorQueryStringExtensions.cs b/CloudFabric.Projections/Queries/FilterConnectorQueryStringExtensions.cs index 51896f2..3b2ec69 100644 --- a/CloudFabric.Projections/Queries/FilterConnectorQueryStringExtensions.cs +++ b/CloudFabric.Projections/Queries/FilterConnectorQueryStringExtensions.cs @@ -4,12 +4,12 @@ public static class FilterConnectorQueryStringExtensions { public static string Serialize(this FilterConnector filterConnector) { - return $"{filterConnector.Logic}+{filterConnector.Filter.Serialize()}"; + return $"{filterConnector.Logic}{ProjectionQueryQueryStringExtensions.FILTER_LOGIC_JOIN_CHARACTER}{filterConnector.Filter.Serialize()}"; } public static FilterConnector Deserialize(string serialized) { - var separator = "+"; + var separator = $"{ProjectionQueryQueryStringExtensions.FILTER_LOGIC_JOIN_CHARACTER}"; var logicEnd = serialized.IndexOf(separator, StringComparison.Ordinal); diff --git a/CloudFabric.Projections/Queries/FilterQueryStringExtensions.cs b/CloudFabric.Projections/Queries/FilterQueryStringExtensions.cs index 390b4d2..48e2388 100644 --- a/CloudFabric.Projections/Queries/FilterQueryStringExtensions.cs +++ b/CloudFabric.Projections/Queries/FilterQueryStringExtensions.cs @@ -64,20 +64,27 @@ public static string Serialize(this Filter filter, bool useShortcuts = true) if (filter.Filters != null && filter.Filters.Count > 0) { - filtersSerialized = string.Join(".", filter.Filters.Select(f => f.Serialize())); + filtersSerialized = string.Join( + ProjectionQueryQueryStringExtensions.FILTER_NESTED_FILTERS_JOIN_CHARACTER, + filter.Filters.Select(f => f.Serialize()) + ); } + const char PROPS_JOIN = ProjectionQueryQueryStringExtensions.FILTER_PROPERTIES_JOIN_CHARACTER; + return $"{(string.IsNullOrEmpty(filter.PropertyName) ? "*" : SanitizeValue(filter.PropertyName))}" + - $"|{(string.IsNullOrEmpty(filter.Operator) ? "*" : filter.Operator)}" + - $"|{System.Net.WebUtility.UrlEncode(valueSerialized)}" + - $"|{(filter.Visible ? 'T' : 'F')}" + - $"|{System.Net.WebUtility.UrlEncode(filter.Tag)}" + - $"|{filtersSerialized}"; + $"{PROPS_JOIN}{(string.IsNullOrEmpty(filter.Operator) ? "*" : filter.Operator)}" + + $"{PROPS_JOIN}{System.Net.WebUtility.UrlEncode(valueSerialized)}" + + $"{PROPS_JOIN}{(filter.Visible.ToString().ToLower())}" + + $"{PROPS_JOIN}{System.Net.WebUtility.UrlEncode(filter.Tag)}" + + $"{PROPS_JOIN}{filtersSerialized}"; } public static Filter Deserialize(string f) { - if (f.IndexOf("|", StringComparison.Ordinal) < 0) + const char PROPS_JOIN = ProjectionQueryQueryStringExtensions.FILTER_PROPERTIES_JOIN_CHARACTER; + + if (f.IndexOf(PROPS_JOIN, StringComparison.Ordinal) < 0) { if (FacetFilterShortcuts.ContainsKey(f)) { @@ -85,19 +92,19 @@ public static Filter Deserialize(string f) } } - var propertyNameEnd = f.IndexOf("|", StringComparison.Ordinal); + var propertyNameEnd = f.IndexOf($"{PROPS_JOIN}", StringComparison.Ordinal); var propertyName = DesanitizeValue(f.Substring(0, propertyNameEnd)); - var operatorEnd = f.IndexOf("|", propertyNameEnd + 1, StringComparison.Ordinal); + var operatorEnd = f.IndexOf($"{PROPS_JOIN}", propertyNameEnd + 1, StringComparison.Ordinal); var operatorValue = f.Substring(propertyNameEnd + 1, operatorEnd - propertyNameEnd - 1); - var valueEnd = f.IndexOf("|", operatorEnd + 1, StringComparison.Ordinal); + var valueEnd = f.IndexOf($"{PROPS_JOIN}", operatorEnd + 1, StringComparison.Ordinal); var value = f.Substring(operatorEnd + 1, valueEnd - operatorEnd - 1); - var visibleEnd = f.IndexOf("|", valueEnd + 1, StringComparison.Ordinal); - var visible = f.Substring(valueEnd + 1, visibleEnd - valueEnd - 1) == "T"; + var visibleEnd = f.IndexOf($"{PROPS_JOIN}", valueEnd + 1, StringComparison.Ordinal); + var visible = f.Substring(valueEnd + 1, visibleEnd - valueEnd - 1) == "true"; - var tagEnd = f.IndexOf("|", visibleEnd + 1, StringComparison.Ordinal); + var tagEnd = f.IndexOf($"{PROPS_JOIN}", visibleEnd + 1, StringComparison.Ordinal); var tag = f.Substring(visibleEnd + 1, tagEnd - visibleEnd - 1); tag = System.Net.WebUtility.UrlDecode(tag); @@ -123,7 +130,9 @@ public static Filter Deserialize(string f) var filters = new List(); - var filtersSerializedList = f.Substring(tagEnd + 1).Split('.'); + var filtersSerializedList = f.Substring(tagEnd + 1) + .Split(ProjectionQueryQueryStringExtensions.FILTER_NESTED_FILTERS_JOIN_CHARACTER); + if (filtersSerializedList.Length > 0) { filters = filtersSerializedList diff --git a/CloudFabric.Projections/Queries/ProjectionQueryQueryStringExtensions.cs b/CloudFabric.Projections/Queries/ProjectionQueryQueryStringExtensions.cs index 4bf6b6a..a041337 100644 --- a/CloudFabric.Projections/Queries/ProjectionQueryQueryStringExtensions.cs +++ b/CloudFabric.Projections/Queries/ProjectionQueryQueryStringExtensions.cs @@ -2,6 +2,33 @@ namespace CloudFabric.Projections.Queries; public static class ProjectionQueryQueryStringExtensions { + /// + /// Top-level filters are joined using this character + /// + public const char FILTERS_JOIN_CHARACTER = '!'; + + /// + /// Individual filter properties are joined using this character. + /// Example: my_boolean_property|eq|true + /// + public const char FILTER_PROPERTIES_JOIN_CHARACTER = '|'; + + /// + /// Character used to join filter connector and the filter it's attached to. + /// + /// Example: AND+my_boolean_property|eq|true + /// + public const char FILTER_LOGIC_JOIN_CHARACTER = '$'; + + /// + /// Character used to join array of nested filters. + /// + /// Example: my_boolean_property|eq|true|and$my_int_property|gt|100000000.or$my_string_property|eq|'yo' + /// ^- main property filter ^- start of nester filter array ^- second filter is joined with . + /// (joined using | like all props) + /// + public const char FILTER_NESTED_FILTERS_JOIN_CHARACTER = '.'; + public static string SerializeToQueryString( this ProjectionQuery projectionQuery, string? searchText = null, @@ -39,7 +66,7 @@ public static string SerializeFiltersToQueryString(this ProjectionQuery projecti if (filtersSerialized.Count > 0) { - return "sv1_" + string.Join("!", filtersSerialized); + return "sv1_" + string.Join(FILTERS_JOIN_CHARACTER, filtersSerialized); } else { @@ -55,7 +82,7 @@ public static void DeserializeFiltersQueryString(this ProjectionQuery projection } var searchVersionPlaceholder = "sv"; - var version = ""; + var version = "1"; if (filters.IndexOf(searchVersionPlaceholder) == 0) { @@ -63,14 +90,15 @@ public static void DeserializeFiltersQueryString(this ProjectionQuery projection var versionLength = end - searchVersionPlaceholder.Length; version = filters.Substring(searchVersionPlaceholder.Length, versionLength); + + // remove version from filters string + filters = filters.Substring(filters.IndexOf("_") + 1); } switch (version) { case "1": - // remove version from filters string - filters = filters.Substring(filters.IndexOf("_") + 1); - var filtersList = filters.Split('!').Where(f => f.Length > 0).ToList(); + var filtersList = filters.Split(FILTERS_JOIN_CHARACTER).Where(f => f.Length > 0).ToList(); if (filtersList.Count > 0) { diff --git a/Implementations/CloudFabric.EventSourcing.EventStore.CosmosDb/CosmosDbEventStore.cs b/Implementations/CloudFabric.EventSourcing.EventStore.CosmosDb/CosmosDbEventStore.cs index 27b3b56..424ee06 100755 --- a/Implementations/CloudFabric.EventSourcing.EventStore.CosmosDb/CosmosDbEventStore.cs +++ b/Implementations/CloudFabric.EventSourcing.EventStore.CosmosDb/CosmosDbEventStore.cs @@ -10,30 +10,35 @@ namespace CloudFabric.EventSourcing.EventStore.CosmosDb; public class CosmosDbEventStore : IEventStore { private readonly CosmosClient _client; - private readonly string _containerId; + private readonly string _eventsContainerId; + private readonly string _itemsContainerId; private readonly string _databaseId; public CosmosDbEventStore( string connectionString, CosmosClientOptions cosmosClientOptions, string databaseId, - string containerId + string eventsContainerId, + string itemsContainerId ) { _client = new CosmosClient(connectionString, cosmosClientOptions); _databaseId = databaseId; - _containerId = containerId; + _eventsContainerId = eventsContainerId; + _itemsContainerId = itemsContainerId; } public CosmosDbEventStore( CosmosClient client, string databaseId, - string containerId + string eventsContainerId, + string itemsContainerId ) { _client = client; _databaseId = databaseId; - _containerId = containerId; + _eventsContainerId = eventsContainerId; + _itemsContainerId = itemsContainerId; } public Task Initialize(CancellationToken cancellationToken = default) @@ -43,7 +48,7 @@ public Task Initialize(CancellationToken cancellationToken = default) public async Task GetStatistics(CancellationToken cancellationToken = default) { - Container eventContainer = _client.GetContainer(_databaseId, _containerId); + Container eventContainer = _client.GetContainer(_databaseId, _eventsContainerId); return await GetStatistics(eventContainer); } @@ -100,18 +105,45 @@ public async Task DeleteAll(CancellationToken cancellationToken = default) return; } - var container = _client.GetContainer(_databaseId, _containerId); + var container = _client.GetContainer(_databaseId, _eventsContainerId); if (container != null) { await container.DeleteContainerAsync(cancellationToken: cancellationToken); } + var eventsContainer = _client.GetContainer(_databaseId, _eventsContainerId); + + try + { + await eventsContainer.DeleteContainerAsync(cancellationToken: cancellationToken); + } + catch (CosmosException ex) + { + if (ex.StatusCode != System.Net.HttpStatusCode.NotFound) + { + throw; + } + } + + var itemsContainer = _client.GetContainer(_databaseId, _itemsContainerId); + + try + { + await itemsContainer.DeleteContainerAsync(cancellationToken: cancellationToken); + } + catch (CosmosException ex) + { + if (ex.StatusCode != System.Net.HttpStatusCode.NotFound) + { + throw; + } + } } public async Task HardDeleteAsync(Guid streamId, string partitionKey, CancellationToken cancellationToken = default) { - var container = _client.GetContainer(_databaseId, _containerId); + var container = _client.GetContainer(_databaseId, _eventsContainerId); - var sqlQueryText = $"SELECT * FROM {_containerId} e" + " WHERE e.stream.id = @streamId"; + var sqlQueryText = $"SELECT * FROM {_eventsContainerId} e" + " WHERE e.stream.id = @streamId"; QueryDefinition queryDefinition = new QueryDefinition(sqlQueryText) .WithParameter("@streamId", streamId); @@ -171,9 +203,9 @@ await Parallel.ForEachAsync(batches, async (batch, cancellationToken) => public async Task LoadStreamAsyncOrThrowNotFound(Guid streamId, string partitionKey, CancellationToken cancellationToken = default) { - Container container = _client.GetContainer(_databaseId, _containerId); + Container container = _client.GetContainer(_databaseId, _eventsContainerId); - var sqlQueryText = $"SELECT * FROM {_containerId} e" + " WHERE e.stream.id = @streamId " + " ORDER BY e.stream.version"; + var sqlQueryText = $"SELECT * FROM {_eventsContainerId} e" + " WHERE e.stream.id = @streamId " + " ORDER BY e.stream.version"; QueryDefinition queryDefinition = new QueryDefinition(sqlQueryText) .WithParameter("@streamId", streamId); @@ -206,9 +238,9 @@ public async Task LoadStreamAsyncOrThrowNotFound(Guid streamId, str public async Task LoadStreamAsync(Guid streamId, string partitionKey, CancellationToken cancellationToken = default) { - Container container = _client.GetContainer(_databaseId, _containerId); + Container container = _client.GetContainer(_databaseId, _eventsContainerId); - var sqlQueryText = $"SELECT * FROM {_containerId} e" + " WHERE e.stream.id = @streamId " + " ORDER BY e.stream.version"; + var sqlQueryText = $"SELECT * FROM {_eventsContainerId} e" + " WHERE e.stream.id = @streamId " + " ORDER BY e.stream.version"; QueryDefinition queryDefinition = new QueryDefinition(sqlQueryText) .WithParameter("@streamId", streamId); @@ -236,9 +268,9 @@ public async Task LoadStreamAsync(Guid streamId, string partitionKe public async Task LoadStreamAsync(Guid streamId, string partitionKey, int fromVersion, CancellationToken cancellationToken = default) { - Container container = _client.GetContainer(_databaseId, _containerId); + Container container = _client.GetContainer(_databaseId, _eventsContainerId); - var sqlQueryText = $"SELECT * FROM {_containerId} e" + + var sqlQueryText = $"SELECT * FROM {_eventsContainerId} e" + " WHERE e.stream.id = @streamId AND e.stream.version >= @fromVersion" + " ORDER BY e.stream.version"; @@ -279,7 +311,7 @@ public async Task AppendToStreamAsync( throw new ArgumentException("Partition keys for all events in the stream must be the same"); } - Container container = _client.GetContainer(_databaseId, _containerId); + Container container = _client.GetContainer(_databaseId, _eventsContainerId); PartitionKey cosmosPartitionKey = new PartitionKey(events.First().PartitionKey); @@ -313,21 +345,22 @@ IEnumerable events Version = ++expectedVersion }, EventType = e.GetType().AssemblyQualifiedName, - EventData = JsonSerializer.SerializeToElement(e, e.GetType(), EventSerializerOptions.Options), - UserInfo = JsonSerializer.SerializeToElement(eventUserInfo, eventUserInfo.GetType(), EventSerializerOptions.Options) + EventData = JsonSerializer.SerializeToElement(e, e.GetType(), EventStoreSerializerOptions.Options), + UserInfo = JsonSerializer.SerializeToElement(eventUserInfo, eventUserInfo.GetType(), EventStoreSerializerOptions.Options) } ); - return JsonSerializer.Serialize(items, EventSerializerOptions.Options); + return JsonSerializer.Serialize(items, EventStoreSerializerOptions.Options); } + public async Task> LoadEventsAsync( string partitionKey, DateTime? dateFrom, int chunkSize = 250, CancellationToken cancellationToken = default ) { - Container eventContainer = _client.GetContainer(_databaseId, _containerId); + Container eventContainer = _client.GetContainer(_databaseId, _eventsContainerId); DateTime endTime = DateTime.UtcNow; @@ -366,4 +399,63 @@ public async Task> LoadEventsAsync( return results; } + + #region Item Functionality + + public async Task UpsertItem(string id, string partitionKey, T item, CancellationToken cancellationToken = default) + { + Container container = _client.GetContainer(_databaseId, _itemsContainerId); + + PartitionKey cosmosPartitionKey = new PartitionKey(partitionKey); + + var response = await container.UpsertItemAsync( + new ItemWrapper + { + Id = id, + PartitionKey = partitionKey, + ItemData = JsonSerializer.Serialize(item, EventStoreSerializerOptions.Options) + }, + cosmosPartitionKey, + null, + cancellationToken + ); + + if (response.StatusCode != System.Net.HttpStatusCode.OK && response.StatusCode != System.Net.HttpStatusCode.Created) + { + throw new Exception($"Cosmos Db returned status {response.StatusCode}."); + } + } + + public async Task LoadItem(string id, string partitionKey, CancellationToken cancellationToken = default) + { + Container container = _client.GetContainer(_databaseId, _itemsContainerId); + + PartitionKey cosmosPartitionKey = new PartitionKey(partitionKey); + + var sqlQueryText = $"SELECT * FROM {_itemsContainerId} i" + " WHERE i.id = @id OFFSET 0 LIMIT 1"; + + QueryDefinition queryDefinition = new QueryDefinition(sqlQueryText) + .WithParameter("@id", id); + + FeedIterator feedIterator = container.GetItemQueryIterator( + queryDefinition, + requestOptions: new QueryRequestOptions { PartitionKey = cosmosPartitionKey } + ); + + while (feedIterator.HasMoreResults) + { + FeedResponse response = await feedIterator.ReadNextAsync(cancellationToken); + + if (response.Count == 0) + { + return default; + } + + return JsonSerializer.Deserialize(response.First().ItemData, EventStoreSerializerOptions.Options); + } + + return default; + } + + #endregion } \ No newline at end of file diff --git a/Implementations/CloudFabric.EventSourcing.EventStore.CosmosDb/CosmosDbEventStoreChangeFeedObserver.cs b/Implementations/CloudFabric.EventSourcing.EventStore.CosmosDb/CosmosDbEventStoreChangeFeedObserver.cs index 20b72ec..0927f65 100755 --- a/Implementations/CloudFabric.EventSourcing.EventStore.CosmosDb/CosmosDbEventStoreChangeFeedObserver.cs +++ b/Implementations/CloudFabric.EventSourcing.EventStore.CosmosDb/CosmosDbEventStoreChangeFeedObserver.cs @@ -44,7 +44,7 @@ public CosmosDbEventStoreChangeFeedObserver( string leaseContainerId, string processorName, ILogger logger - ): base(new CosmosDbEventStore(eventsClient, eventsContainerId, eventsContainerId), logger) + ): base(new CosmosDbEventStore(eventsClient, eventsDatabaseId, eventsContainerId, eventsContainerId), logger) { _eventsClient = eventsClient; _eventsDatabaseId = eventsDatabaseId; diff --git a/Implementations/CloudFabric.EventSourcing.EventStore.InMemory/InMemoryEventStore.cs b/Implementations/CloudFabric.EventSourcing.EventStore.InMemory/InMemoryEventStore.cs index 1673abb..8b978cd 100755 --- a/Implementations/CloudFabric.EventSourcing.EventStore.InMemory/InMemoryEventStore.cs +++ b/Implementations/CloudFabric.EventSourcing.EventStore.InMemory/InMemoryEventStore.cs @@ -11,13 +11,16 @@ public class EventAddedEventArgs : EventArgs public class InMemoryEventStore : IEventStore { private readonly Dictionary<(Guid StreamId, string PartitionKey), List> _eventsContainer; + private readonly Dictionary<(string Id, string PartitionKey), string> _itemsContainer; private readonly List> _eventAddedEventHandlers = new(); public InMemoryEventStore( - Dictionary<(Guid StreamId, string PartitionKey), List> eventsContainer + Dictionary<(Guid StreamId, string PartitionKey), List> eventsContainer, + Dictionary<(string Id, string PartitionKey), string> itemsContainer ) { _eventsContainer = eventsContainer; + _itemsContainer = itemsContainer; } public Task Initialize(CancellationToken cancellationToken = default) @@ -35,9 +38,35 @@ public void UnsubscribeFromEventAdded(Func handler) _eventAddedEventHandlers.Remove(handler); } + + public async Task GetStatistics(CancellationToken cancellationToken = default) + { + var stats = new EventStoreStatistics(); + + stats.TotalEventsCount = _eventsContainer.Count; + + var eventsOrderedByTimestamp = _eventsContainer + .SelectMany(x => x.Value) + .Select(x => JsonSerializer.Deserialize(x, EventStoreSerializerOptions.Options).GetEvent()) + .OrderBy(x => x.Timestamp) + .ToList(); + + if (eventsOrderedByTimestamp.Count > 0) + { + stats.FirstEventCreatedAt = eventsOrderedByTimestamp.First().Timestamp; + } + if (eventsOrderedByTimestamp.Count > 0) + { + stats.LastEventCreatedAt = eventsOrderedByTimestamp.Last().Timestamp; + } + + return stats; + } + public Task DeleteAll(CancellationToken cancellationToken = default) { _eventsContainer.Clear(); + _itemsContainer.Clear(); return Task.CompletedTask; } @@ -121,7 +150,7 @@ public async Task> LoadEventsAsync( var events = eventsContainer .SelectMany(x => x.Value) - .Select(x => JsonSerializer.Deserialize(x, EventSerializerOptions.Options).GetEvent()) + .Select(x => JsonSerializer.Deserialize(x, EventStoreSerializerOptions.Options).GetEvent()) .Where(x => !dateFrom.HasValue || x.Timestamp > dateFrom) .OrderBy(x => x.Timestamp) .Take(limit) @@ -163,7 +192,7 @@ public async Task AppendToStreamAsync( foreach (var wrapper in wrappers) { - stream.Add(JsonSerializer.Serialize(wrapper, EventSerializerOptions.Options)); + stream.Add(JsonSerializer.Serialize(wrapper, EventStoreSerializerOptions.Options)); } if (!_eventsContainer.ContainsKey((streamId, partitionKey))) @@ -197,7 +226,7 @@ private async Task> LoadOrderedEventWrappers(Guid streamId, s foreach (var data in eventData) { - var eventWrapper = JsonSerializer.Deserialize(data, EventSerializerOptions.Options); + var eventWrapper = JsonSerializer.Deserialize(data, EventStoreSerializerOptions.Options); eventWrappers.Add(eventWrapper); } @@ -215,7 +244,7 @@ private async Task> LoadOrderedEventWrappersFromVersion(Guid foreach (var data in eventData) { - var eventWrapper = JsonSerializer.Deserialize(data, EventSerializerOptions.Options); + var eventWrapper = JsonSerializer.Deserialize(data, EventStoreSerializerOptions.Options); if (eventWrapper.StreamInfo.Version >= version) { eventWrappers.Add(eventWrapper); @@ -240,8 +269,8 @@ private static List PrepareEvents( Id = Guid.NewGuid(), //:{e.GetType().Name}", StreamInfo = new StreamInfo { Id = streamId, Version = ++expectedVersion }, EventType = e.GetType().AssemblyQualifiedName, - EventData = JsonSerializer.SerializeToElement(e, e.GetType(), EventSerializerOptions.Options), - UserInfo = JsonSerializer.SerializeToElement(eventUserInfo, eventUserInfo.GetType(), EventSerializerOptions.Options) + EventData = JsonSerializer.SerializeToElement(e, e.GetType(), EventStoreSerializerOptions.Options), + UserInfo = JsonSerializer.SerializeToElement(eventUserInfo, eventUserInfo.GetType(), EventStoreSerializerOptions.Options) } ); @@ -266,28 +295,33 @@ private static List PrepareEvents( // } #endregion - - public async Task GetStatistics(CancellationToken cancellationToken = default) + + #region Item Functionality + + public async Task UpsertItem(string id, string partitionKey, T item, CancellationToken cancellationToken = default) { - var stats = new EventStoreStatistics(); - - stats.TotalEventsCount = _eventsContainer.Count; + var serializedItem = JsonSerializer.Serialize(item, EventStoreSerializerOptions.Options); - var eventsOrderedByTimestamp = _eventsContainer - .SelectMany(x => x.Value) - .Select(x => JsonSerializer.Deserialize(x, EventSerializerOptions.Options).GetEvent()) - .OrderBy(x => x.Timestamp) - .ToList(); + var itemNotExists = _itemsContainer.TryAdd((id, partitionKey), serializedItem); - if (eventsOrderedByTimestamp.Count > 0) + if (!itemNotExists) { - stats.FirstEventCreatedAt = eventsOrderedByTimestamp.First().Timestamp; + _itemsContainer[(id, partitionKey)] = serializedItem; } - if (eventsOrderedByTimestamp.Count > 0) + } + + public async Task LoadItem(string id, string partitionKey, CancellationToken cancellationToken = default) + { + if (_itemsContainer.TryGetValue((id, partitionKey), out string? value)) { - stats.LastEventCreatedAt = eventsOrderedByTimestamp.Last().Timestamp; + return value != null + ? JsonSerializer.Deserialize(value, EventStoreSerializerOptions.Options) + : default; } - return stats; + return default; } + + #endregion + } \ No newline at end of file diff --git a/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStore.cs b/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStore.cs index fd5a720..639235a 100755 --- a/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStore.cs +++ b/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStore.cs @@ -29,12 +29,13 @@ private PostgresqlEventStoreConnectionInformation ConnectionInformation } } - public PostgresqlEventStore(string connectionString, string tableName) + public PostgresqlEventStore(string connectionString, string eventsTableName, string itemsTableName) { _connectionInformation = new PostgresqlEventStoreConnectionInformation() { ConnectionString = connectionString, - TableName = tableName + TableName = eventsTableName, + ItemsTableName = itemsTableName }; } @@ -48,6 +49,55 @@ public async Task Initialize(CancellationToken cancellationToken = default) await EnsureTableExistsAsync(cancellationToken); } + + public async Task GetStatistics(CancellationToken cancellationToken = default) + { + var connectionInformation = ConnectionInformation; + var stats = new EventStoreStatistics(); + + await using var conn = new NpgsqlConnection(connectionInformation.ConnectionString); + await conn.OpenAsync(); + + await using var countCmd = new NpgsqlCommand( + $"SELECT COUNT(*) FROM \"{connectionInformation.TableName}\"", conn + ); + await using var firstEventTimestampCmd = new NpgsqlCommand( + $"SELECT to_timestamp_utc(event_data->>'timestamp') " + + $"FROM \"{connectionInformation.TableName}\" " + + $"ORDER BY to_timestamp_utc(event_data->>'timestamp') ASC " + + $"LIMIT 1", conn + ); + await using var lastEventTimestampCmd = new NpgsqlCommand( + $"SELECT to_timestamp_utc(event_data->>'timestamp') " + + $"FROM \"{connectionInformation.TableName}\" " + + $"ORDER BY to_timestamp_utc(event_data->>'timestamp') DESC " + + $"LIMIT 1", conn + ); + + var count = await countCmd.ExecuteScalarAsync(cancellationToken); + + if (count != null) + { + stats.TotalEventsCount = (long)count; + } + + var firstEventDateTime = await firstEventTimestampCmd.ExecuteScalarAsync(cancellationToken); + + if (firstEventDateTime != null) + { + stats.FirstEventCreatedAt = (DateTime)firstEventDateTime; + } + + var lastEventDateTime = await lastEventTimestampCmd.ExecuteScalarAsync(cancellationToken); + + if (lastEventDateTime != null) + { + stats.LastEventCreatedAt = (DateTime)lastEventDateTime; + } + + return stats; + } + public async Task DeleteAll(CancellationToken cancellationToken = default) { var connectionInformation = ConnectionInformation; @@ -55,9 +105,34 @@ public async Task DeleteAll(CancellationToken cancellationToken = default) await using var conn = new NpgsqlConnection(connectionInformation.ConnectionString); await conn.OpenAsync(); - await using var cmd = new NpgsqlCommand($"DELETE FROM \"{connectionInformation.TableName}\"", conn); + await using var eventsTableCmd = new NpgsqlCommand($"DELETE FROM \"{connectionInformation.TableName}\"", conn); + + try + { + await eventsTableCmd.ExecuteScalarAsync(cancellationToken); + } + catch (NpgsqlException ex) + { + if (ex.SqlState != PostgresErrorCodes.UndefinedTable) + { + throw; + } + } + + await using var itemsTableCmd = new NpgsqlCommand($"DELETE FROM \"{connectionInformation.ItemsTableName}\"", conn); + + try + { + await itemsTableCmd.ExecuteScalarAsync(cancellationToken); + } + catch (NpgsqlException ex) + { + if (ex.SqlState != PostgresErrorCodes.UndefinedTable) + { + throw; + } + } - await cmd.ExecuteScalarAsync(cancellationToken); } public async Task HardDeleteAsync(Guid streamId, string partitionKey, CancellationToken cancellationToken = default) { @@ -201,7 +276,7 @@ public async Task LoadStreamAsync(Guid streamId, string partitionKe while (await reader.ReadAsync(cancellationToken)) { - var streamInfo = JsonSerializer.Deserialize(reader.GetString(1), EventSerializerOptions.Options); + var streamInfo = JsonSerializer.Deserialize(reader.GetString(1), EventStoreSerializerOptions.Options); if(streamInfo == null) { throw new Exception("Failed to deserialize stream info"); @@ -352,13 +427,13 @@ public async Task AppendToStreamAsync(EventUserInfo eventUserInfo, Guid st new NpgsqlParameter() { ParameterName = "event_data", - Value = JsonSerializer.Serialize(evt, evt.GetType(), EventSerializerOptions.Options), + Value = JsonSerializer.Serialize(evt, evt.GetType(), EventStoreSerializerOptions.Options), DataTypeName = "jsonb" }, new NpgsqlParameter() { ParameterName = "user_info", - Value = JsonSerializer.Serialize(eventUserInfo, eventUserInfo.GetType(), EventSerializerOptions.Options), + Value = JsonSerializer.Serialize(eventUserInfo, eventUserInfo.GetType(), EventStoreSerializerOptions.Options), DataTypeName = "jsonb" }, new NpgsqlParameter("eventstore_schema_version", EVENTSTORE_TABLE_SCHEMA_VERSION) @@ -405,7 +480,7 @@ private async Task EnsureTableExistsAsync(CancellationToken cancellationToken = try { - var tableExists = await cmd.ExecuteScalarAsync(cancellationToken); + await cmd.ExecuteScalarAsync(cancellationToken); } catch (NpgsqlException ex) { @@ -439,6 +514,33 @@ private async Task EnsureTableExistsAsync(CancellationToken cancellationToken = await createTableCommand.ExecuteNonQueryAsync(cancellationToken); } } + + await using var cmdItemTable = new NpgsqlCommand( + $"SELECT 1 FROM \"{connectionInformation.ItemsTableName}\"", conn) + { + }; + + try + { + await cmdItemTable.ExecuteScalarAsync(cancellationToken); + } + catch (NpgsqlException ex) + { + if (ex.SqlState == PostgresErrorCodes.UndefinedTable) + { + await using var createTableCommand = new NpgsqlCommand( + $"CREATE TABLE \"{connectionInformation.ItemsTableName}\" (" + + $"id varchar(100) UNIQUE NOT NULL, " + + $"partition_key varchar(100) NOT NULL, " + + $"data jsonb" + + $");" + + $"CREATE INDEX \"{connectionInformation.ItemsTableName}_id_idx\" ON \"{connectionInformation.ItemsTableName}\" (id);" + + $"CREATE INDEX \"{connectionInformation.ItemsTableName}_id_with_partition_key_idx\" ON \"{connectionInformation.ItemsTableName}\" (id, partition_key);" + , conn); + + await createTableCommand.ExecuteNonQueryAsync(cancellationToken); + } + } } #region Snapshot Functionality @@ -460,51 +562,106 @@ private async Task EnsureTableExistsAsync(CancellationToken cancellationToken = #endregion - public async Task GetStatistics(CancellationToken cancellationToken = default) + #region Item Functionality + + public async Task UpsertItem(string id, string partitionKey, T item, CancellationToken cancellationToken = default) { var connectionInformation = ConnectionInformation; - var stats = new EventStoreStatistics(); - + await using var conn = new NpgsqlConnection(connectionInformation.ConnectionString); - await conn.OpenAsync(); + await conn.OpenAsync(cancellationToken); - await using var countCmd = new NpgsqlCommand( - $"SELECT COUNT(*) FROM \"{connectionInformation.TableName}\"", conn - ); - await using var firstEventTimestampCmd = new NpgsqlCommand( - $"SELECT to_timestamp_utc(event_data->>'timestamp') " + - $"FROM \"{connectionInformation.TableName}\" " + - $"ORDER BY to_timestamp_utc(event_data->>'timestamp') ASC " + - $"LIMIT 1", conn - ); - await using var lastEventTimestampCmd = new NpgsqlCommand( - $"SELECT to_timestamp_utc(event_data->>'timestamp') " + - $"FROM \"{connectionInformation.TableName}\" " + - $"ORDER BY to_timestamp_utc(event_data->>'timestamp') DESC " + - $"LIMIT 1", conn - ); - - var count = await countCmd.ExecuteScalarAsync(cancellationToken); + await using var cmd = new NpgsqlCommand( + $"INSERT INTO \"{connectionInformation.ItemsTableName}\" " + + $"(id, partition_key, data) " + + $"VALUES" + + $"(@id, @partition_key, @data)" + + $"ON CONFLICT (id) " + + $"DO UPDATE " + + $"SET data = @data, partition_key = @partition_key; " + , conn + ) + { + Parameters = + { + new("id", id), + new("partition_key", partitionKey), + new NpgsqlParameter() + { + ParameterName = "data", + Value = JsonSerializer.Serialize(item, EventStoreSerializerOptions.Options), + DataTypeName = "jsonb" + } + } + }; - if (count != null) + try { - stats.TotalEventsCount = (long)count; - } - - var firstEventDateTime = await firstEventTimestampCmd.ExecuteScalarAsync(cancellationToken); + int insertItemResult = await cmd.ExecuteNonQueryAsync(cancellationToken); - if (firstEventDateTime != null) + if (insertItemResult == -1) + { + throw new Exception("Upsert item failed."); + } + } + catch (NpgsqlException ex) { - stats.FirstEventCreatedAt = (DateTime)firstEventDateTime; + if (ex.SqlState == PostgresErrorCodes.UndefinedTable) + { + throw new Exception( + "EventStore table not found, please make sure to call Initialize() on event store first.", + ex); + } + + throw; } + } + + public async Task LoadItem(string id, string partitionKey, CancellationToken cancellationToken = default) + { + var connectionInformation = ConnectionInformation; - var lastEventDateTime = await lastEventTimestampCmd.ExecuteScalarAsync(cancellationToken); + await using var conn = new NpgsqlConnection(connectionInformation.ConnectionString); + await conn.OpenAsync(cancellationToken); - if (lastEventDateTime != null) + await using var cmd = new NpgsqlCommand( + $"SELECT * FROM \"{connectionInformation.ItemsTableName}\" " + + $"WHERE id = @id AND partition_key = @partition_key LIMIT 1; " + , conn) { - stats.LastEventCreatedAt = (DateTime)lastEventDateTime; + Parameters = + { + new("id", id), + new("partition_key", partitionKey) + } + }; + + try + { + await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); + + if (await reader.ReadAsync(cancellationToken)) + { + var item = JsonDocument.Parse(reader.GetString("data")).RootElement; + + return JsonSerializer.Deserialize(item, EventStoreSerializerOptions.Options); + } + + return default; } - return stats; + catch (NpgsqlException ex) + { + if (ex.SqlState == PostgresErrorCodes.UndefinedTable) + { + throw new Exception( + "EventStore table not found, please make sure to call Initialize() on event store first.", + ex); + } + + throw; + } } + + #endregion } \ No newline at end of file diff --git a/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStoreConnectionInformation.cs b/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStoreConnectionInformation.cs index 8606f28..422a769 100644 --- a/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStoreConnectionInformation.cs +++ b/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStoreConnectionInformation.cs @@ -8,4 +8,6 @@ public class PostgresqlEventStoreConnectionInformation: EventStoreConnectionInfo /// Table where all events will be stored. /// public string TableName { get; set; } + + public string ItemsTableName { get; set; } } diff --git a/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStoreFactory.cs b/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStoreFactory.cs index 7e79a3c..4751bb8 100644 --- a/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStoreFactory.cs +++ b/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStoreFactory.cs @@ -12,7 +12,8 @@ public IEventStore CreateEventStore( return new PostgresqlEventStore( connectionInformation.ConnectionString, - connectionInformation.TableName + connectionInformation.TableName, + connectionInformation.ItemsTableName ); } } diff --git a/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStoreStaticConnectionInformationProvider.cs b/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStoreStaticConnectionInformationProvider.cs index 1dc9c22..64b13b1 100644 --- a/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStoreStaticConnectionInformationProvider.cs +++ b/Implementations/CloudFabric.EventSourcing.EventStore.Postgresql/PostgresqlEventStoreStaticConnectionInformationProvider.cs @@ -5,15 +5,18 @@ namespace CloudFabric.EventSourcing.EventStore.Postgresql; public class PostgresqlEventStoreStaticConnectionInformationProvider : IPostgresqlEventStoreConnectionInformationProvider { - private readonly string _tableName; + private readonly string _eventsTableName; + private readonly string _itemsTableName; private readonly NpgsqlConnectionStringBuilder _connectionStringBuilder; private readonly NpgsqlConnectionStringBuilder _connectionStringBuilderWithoutPasswords; public PostgresqlEventStoreStaticConnectionInformationProvider( string connectionString, - string tableName + string eventsTableName, + string itemsTableName ) { - _tableName = tableName; + _eventsTableName = eventsTableName; + _itemsTableName = itemsTableName; _connectionStringBuilder = new NpgsqlConnectionStringBuilder(connectionString); _connectionStringBuilderWithoutPasswords = new NpgsqlConnectionStringBuilder(connectionString); @@ -27,9 +30,10 @@ public PostgresqlEventStoreConnectionInformation GetConnectionInformation(string // we don't care about connection id because it's a static provider - connection string will always be the same. return new PostgresqlEventStoreConnectionInformation() { - ConnectionId = $"{_connectionStringBuilderWithoutPasswords}-{_tableName}", + ConnectionId = $"{_connectionStringBuilderWithoutPasswords}-{_eventsTableName}", ConnectionString = _connectionStringBuilder.ToString(), - TableName = _tableName + TableName = _eventsTableName, + ItemsTableName = _itemsTableName }; } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.CosmosDb/ItemTestsCosmosDb.cs b/Implementations/CloudFabric.EventSourcing.Tests.CosmosDb/ItemTestsCosmosDb.cs new file mode 100644 index 0000000..041c57e --- /dev/null +++ b/Implementations/CloudFabric.EventSourcing.Tests.CosmosDb/ItemTestsCosmosDb.cs @@ -0,0 +1,102 @@ +using System.Text.Json; +using System.Text.Json.Serialization; +using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.EventStore.CosmosDb; +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.Logging; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace CloudFabric.EventSourcing.Tests.CosmosDb; + +[TestClass] +public class ItemTestsCosmosDb : ItemTests +{ + private const string DatabaseName = "TestDatabase"; + private const string EventContainerName = "TestEventContainer"; + private const string ItemContainerName = "TestItemContainer"; + + private const string CosmosDbConnectionString = + "AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="; + + CosmosClient _cosmosClient = null; + CosmosClientOptions _cosmosClientOptions; + + private IEventStore? _eventStore = null; + private ILogger _logger; + + public async Task SetUp() + { + var loggerFactory = new LoggerFactory(); + _logger = loggerFactory.CreateLogger(); + + JsonSerializerOptions jsonSerializerOptions = new JsonSerializerOptions() + { + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull + }; + CosmosDbSystemTextJsonSerializer cosmosSystemTextJsonSerializer + = new CosmosDbSystemTextJsonSerializer(jsonSerializerOptions); + + _cosmosClientOptions = new CosmosClientOptions() + { + Serializer = cosmosSystemTextJsonSerializer, + HttpClientFactory = () => + { + HttpMessageHandler httpMessageHandler = new HttpClientHandler() + { + ServerCertificateCustomValidationCallback = + HttpClientHandler.DangerousAcceptAnyServerCertificateValidator + }; + + return new HttpClient(httpMessageHandler); + }, + ConnectionMode = ConnectionMode.Gateway + }; + + _cosmosClient = new CosmosClient( + CosmosDbConnectionString, + _cosmosClientOptions + ); + + var database = await ReCreateDatabase(_cosmosClient, DatabaseName); + await database.CreateContainerIfNotExistsAsync(new ContainerProperties(ItemContainerName, "/partition_key")); + + ContainerResponse itemContainerResponce = + await _cosmosClient.GetContainer(DatabaseName, ItemContainerName).ReadContainerAsync(); + // Set the indexing mode to consistent + itemContainerResponce.Resource.IndexingPolicy.IndexingMode = IndexingMode.Consistent; + // Add an included path + itemContainerResponce.Resource.IndexingPolicy.IncludedPaths.Add(new IncludedPath { Path = "/id" }); + // Add an excluded path + itemContainerResponce.Resource.IndexingPolicy.ExcludedPaths.Add(new ExcludedPath { Path = "/data" }); + } + + private async Task ReCreateDatabase(CosmosClient cosmosClient, string databaseName) + { + await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName); + var database = cosmosClient.GetDatabase(databaseName); + await database.DeleteAsync(); + await cosmosClient.CreateDatabaseIfNotExistsAsync( + databaseName, + ThroughputProperties.CreateManualThroughput(400) + ); + return cosmosClient.GetDatabase(databaseName); + } + + protected override async Task GetEventStore() + { + if (_eventStore == null) + { + await SetUp(); + + _eventStore = new CosmosDbEventStore( + _cosmosClient, + DatabaseName, + EventContainerName, + ItemContainerName + ); + await _eventStore.Initialize(); + } + + return _eventStore; + } +} diff --git a/Implementations/CloudFabric.EventSourcing.Tests.CosmosDb/OrderTestsCosmosDb.cs b/Implementations/CloudFabric.EventSourcing.Tests.CosmosDb/OrderTestsCosmosDb.cs index b680139..1106a41 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.CosmosDb/OrderTestsCosmosDb.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.CosmosDb/OrderTestsCosmosDb.cs @@ -126,7 +126,8 @@ await leaseDatabase.CreateContainerIfNotExistsAsync( _eventStore = new CosmosDbEventStore( _cosmosClient, DatabaseName, - ContainerName + ContainerName, + "NotUsedItemContainer" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/DynamicProjectionsOrderTestsElasticSearch.cs b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/DynamicProjectionsOrderTestsElasticSearch.cs index 8ba723a..40f7b20 100644 --- a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/DynamicProjectionsOrderTestsElasticSearch.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/DynamicProjectionsOrderTestsElasticSearch.cs @@ -21,7 +21,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( "Host=localhost;Username=cloudfabric_eventsourcing_test;Password=cloudfabric_eventsourcing_test;Database=cloudfabric_eventsourcing_test;Maximum Pool Size=1000", - "orders_events" + "orders_events", + "stored_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderStringComparisonTestsElasticSearch.cs b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderStringComparisonTestsElasticSearch.cs index 009b92b..d7e7686 100644 --- a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderStringComparisonTestsElasticSearch.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderStringComparisonTestsElasticSearch.cs @@ -24,7 +24,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( "Host=localhost;Username=cloudfabric_eventsourcing_test;Password=cloudfabric_eventsourcing_test;Database=cloudfabric_eventsourcing_test;Maximum Pool Size=1000", - "orders_events" + "orders_events", + "orders_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderTestsElasticSearch.cs b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderTestsElasticSearch.cs index c08b444..ee5eda4 100644 --- a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderTestsElasticSearch.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/OrderTestsElasticSearch.cs @@ -25,7 +25,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( "Host=localhost;Username=cloudfabric_eventsourcing_test;Password=cloudfabric_eventsourcing_test;Database=cloudfabric_eventsourcing_test;Maximum Pool Size=1000", - "orders_events" + "orders_events", + "stored_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/QueryStringTestsElasticSearch.cs b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/QueryStringTestsElasticSearch.cs index 4a7f481..b7c6cdf 100644 --- a/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/QueryStringTestsElasticSearch.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.ElasticSearch/QueryStringTestsElasticSearch.cs @@ -24,7 +24,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( "Host=localhost;Username=cloudfabric_eventsourcing_test;Password=cloudfabric_eventsourcing_test;Database=cloudfabric_eventsourcing_test;Maximum Pool Size=1000", - "orders_events" + "orders_events", + "stored_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/DynamicProjectionsOrderTestsInMemory.cs b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/DynamicProjectionsOrderTestsInMemory.cs index 0438bd1..5db0146 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/DynamicProjectionsOrderTestsInMemory.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/DynamicProjectionsOrderTestsInMemory.cs @@ -1,4 +1,4 @@ -using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.EventStore; using CloudFabric.EventSourcing.EventStore.InMemory; using CloudFabric.Projections; using CloudFabric.Projections.InMemory; @@ -18,7 +18,10 @@ protected override async Task GetEventStore() { if (_eventStore == null) { - _eventStore = new InMemoryEventStore(new Dictionary<(Guid, string), List>()); + _eventStore = new InMemoryEventStore( + new Dictionary<(Guid, string), List>(), + new Dictionary<(string, string), string>() + ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/ItemTestInMemory.cs b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/ItemTestInMemory.cs new file mode 100644 index 0000000..ec731fc --- /dev/null +++ b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/ItemTestInMemory.cs @@ -0,0 +1,25 @@ +using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.EventStore.InMemory; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace CloudFabric.EventSourcing.Tests.InMemory; + +[TestClass] +public class ItemTestInMemory : ItemTests +{ + private InMemoryEventStore? _eventStore = null; + + protected override async Task GetEventStore() + { + if (_eventStore == null) + { + _eventStore = new InMemoryEventStore( + new Dictionary<(Guid, string), List>(), + new Dictionary<(string, string), string>() + ); + await _eventStore.Initialize(); + } + + return _eventStore; + } +} diff --git a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderStringComparisonTestsInMemory.cs b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderStringComparisonTestsInMemory.cs index f6be93d..d51bc7f 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderStringComparisonTestsInMemory.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderStringComparisonTestsInMemory.cs @@ -1,4 +1,4 @@ -using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.EventStore; using CloudFabric.EventSourcing.EventStore.InMemory; using CloudFabric.Projections; using CloudFabric.Projections.InMemory; @@ -18,7 +18,10 @@ protected override async Task GetEventStore() { if (_eventStore == null) { - _eventStore = new InMemoryEventStore(new Dictionary<(Guid, string), List>()); + _eventStore = new InMemoryEventStore( + new Dictionary<(Guid, string), List>(), + new Dictionary<(string, string), string>() + ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderTestsInMemory.cs b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderTestsInMemory.cs index e25ba68..d983dc6 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderTestsInMemory.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/OrderTestsInMemory.cs @@ -18,7 +18,10 @@ protected override async Task GetEventStore() { if (_eventStore == null) { - _eventStore = new InMemoryEventStore(new Dictionary<(Guid, string), List>()); + _eventStore = new InMemoryEventStore( + new Dictionary<(Guid, string), List>(), + new Dictionary<(string, string), string>() + ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/QueryStringTestsInMemory.cs b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/QueryStringTestsInMemory.cs index 63c1a9c..e0ff072 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.InMemory/QueryStringTestsInMemory.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.InMemory/QueryStringTestsInMemory.cs @@ -1,4 +1,4 @@ -using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.EventStore; using CloudFabric.EventSourcing.EventStore.InMemory; using CloudFabric.Projections; using CloudFabric.Projections.InMemory; @@ -18,7 +18,10 @@ protected override async Task GetEventStore() { if (_eventStore == null) { - _eventStore = new InMemoryEventStore(new Dictionary<(Guid, string), List>()); + _eventStore = new InMemoryEventStore( + new Dictionary<(Guid, string), List>(), + new Dictionary<(string, string), string>() + ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/DynamicProjectionsOrderTestsPostgresql.cs b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/DynamicProjectionsOrderTestsPostgresql.cs index 1dc073e..a91f48a 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/DynamicProjectionsOrderTestsPostgresql.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/DynamicProjectionsOrderTestsPostgresql.cs @@ -1,4 +1,4 @@ -using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.EventStore; using CloudFabric.EventSourcing.EventStore.Postgresql; using CloudFabric.Projections; using CloudFabric.Projections.Postgresql; @@ -20,7 +20,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( TestsConnectionStrings.CONNECTION_STRING, - "orders_events" + "orders_events", + "stored_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/ItemtestsPostgresql.cs b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/ItemtestsPostgresql.cs new file mode 100644 index 0000000..9697e1a --- /dev/null +++ b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/ItemtestsPostgresql.cs @@ -0,0 +1,27 @@ +using CloudFabric.EventSourcing.EventStore; +using CloudFabric.EventSourcing.EventStore.Postgresql; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace CloudFabric.EventSourcing.Tests.Postgresql; + +[TestClass] +public class ItemtestsPostgresql : ItemTests +{ + private PostgresqlEventStore? _eventStore; + + protected override async Task GetEventStore() + { + if (_eventStore == null) + { + _eventStore = new PostgresqlEventStore( + TestsConnectionStrings.CONNECTION_STRING, + "orders_events", + "stored_items" + ); + await _eventStore.Initialize(); + } + + return _eventStore; + } + +} diff --git a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderStringComparisonTestsPostgresql.cs b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderStringComparisonTestsPostgresql.cs index 29fa120..082aed2 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderStringComparisonTestsPostgresql.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderStringComparisonTestsPostgresql.cs @@ -20,7 +20,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( TestsConnectionStrings.CONNECTION_STRING, - "orders_events" + "orders_events", + "stored_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderTestsPostgresql.cs b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderTestsPostgresql.cs index f27f044..c66882b 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderTestsPostgresql.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/OrderTestsPostgresql.cs @@ -21,7 +21,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( TestsConnectionStrings.CONNECTION_STRING, - "orders_events" + "orders_events", + "stored_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/QueryStringTestsPostgresql.cs b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/QueryStringTestsPostgresql.cs index 62dc01c..4b6fb2d 100755 --- a/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/QueryStringTestsPostgresql.cs +++ b/Implementations/CloudFabric.EventSourcing.Tests.Postgresql/QueryStringTestsPostgresql.cs @@ -20,7 +20,8 @@ protected override async Task GetEventStore() { _eventStore = new PostgresqlEventStore( TestsConnectionStrings.CONNECTION_STRING, - "orders_events" + "orders_events", + "stored_items" ); await _eventStore.Initialize(); } diff --git a/Implementations/CloudFabric.Projections.CosmosDb/CosmosDbProjectionRepository.cs b/Implementations/CloudFabric.Projections.CosmosDb/CosmosDbProjectionRepository.cs index 117c9a3..fda269a 100755 --- a/Implementations/CloudFabric.Projections.CosmosDb/CosmosDbProjectionRepository.cs +++ b/Implementations/CloudFabric.Projections.CosmosDb/CosmosDbProjectionRepository.cs @@ -126,7 +126,7 @@ ProjectionDocumentSchema projectionDocumentSchema _containerId = containerId; _projectionDocumentSchema = projectionDocumentSchema; } - + protected override Task CreateIndex(string indexName, ProjectionDocumentSchema projectionDocumentSchema) { throw new NotImplementedException(); @@ -581,4 +581,4 @@ private string SerializePropertyName(string propertyName) return newDictionary; } -} \ No newline at end of file +} diff --git a/Implementations/CloudFabric.Projections.ElasticSearch/Helpers/ElasticSearchFilterFactory.cs b/Implementations/CloudFabric.Projections.ElasticSearch/Helpers/ElasticSearchFilterFactory.cs index 93bb13a..1c2a191 100644 --- a/Implementations/CloudFabric.Projections.ElasticSearch/Helpers/ElasticSearchFilterFactory.cs +++ b/Implementations/CloudFabric.Projections.ElasticSearch/Helpers/ElasticSearchFilterFactory.cs @@ -146,6 +146,11 @@ private static string ConstructOneConditionFilter(Queries.Filter filter) var filterOperator = ""; var filterValue = filter.Value?.ToString()?.EscapeElasticUnsupportedCharacters(); + if (filter.Value is bool) + { + filterValue = filterValue.ToLower(); + } + switch (filter.Operator) { case FilterOperator.ArrayContains: