Skip to content

Commit

Permalink
Changed LiveStreamAggregation implementation to set SkipSchemaGenerat…
Browse files Browse the repository at this point in the history
…ion property for DocumentMapping instead of fully removing it from storage.

Updated implementation for schema generation to skip such mappings.
  • Loading branch information
oskardudycz committed Jul 21, 2023
1 parent 074ee67 commit 0a7a351
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Marten;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Internal.CodeGeneration;
using Shouldly;
using Xunit;

Expand All @@ -13,7 +14,7 @@ namespace EventSourcingTests.Projections.CodeGeneration;
public class ProjectionCodeGenerationTests
{
[Fact]
public void Snapshot_GeneratesCodeFile()
public void Snapshot_GeneratesCodeFiles()
{
var options = new StoreOptions();
options.Connection("Dummy");
Expand All @@ -28,11 +29,15 @@ public void Snapshot_GeneratesCodeFile()
store.Events.As<ICodeFileCollection>().BuildFiles()
.OfType<SingleStreamProjection<Something>>()
.ShouldHaveSingleItem();
}

options.BuildFiles()
.OfType<DocumentProviderBuilder>()
.Where(e => e.ProviderName == typeof(Something).ToSuffixedTypeName("Provider"))
.ShouldHaveSingleItem();
}

[Fact]
public void LiveStreamAggregation_GeneratesCodeFile()
public void LiveStreamAggregation_GeneratesCodeFiles()
{
var options = new StoreOptions();
options.Connection("Dummy");
Expand All @@ -47,18 +52,91 @@ public void LiveStreamAggregation_GeneratesCodeFile()
store.Events.As<ICodeFileCollection>().BuildFiles()
.OfType<SingleStreamProjection<Something>>()
.ShouldHaveSingleItem();

options.BuildFiles()
.OfType<DocumentProviderBuilder>()
.Where(e => e.ProviderName == typeof(Something).ToSuffixedTypeName("Provider"))
.ShouldHaveSingleItem();
}

[Fact]
public void SingleStreamProjection_GeneratesCodeFiles()
{
var options = new StoreOptions();
options.Connection("Dummy");

// Given
options.Projections.Add<SomethingElseSingleStreamProjection>(ProjectionLifecycle.Inline);

// When
var store = new DocumentStore(options);

// Then
store.Events.As<ICodeFileCollection>().BuildFiles()
.OfType<SingleStreamProjection<SomethingElse>>()
.ShouldHaveSingleItem();

options.BuildFiles()
.OfType<DocumentProviderBuilder>()
.Where(e => e.ProviderName == typeof(SomethingElse).ToSuffixedTypeName("Provider"))
.ShouldHaveSingleItem();
}

[Fact]
public void MultiStreamProjection_GeneratesCodeFiles()
{
var options = new StoreOptions();
options.Connection("Dummy");

// Given
options.Projections.Add<SomethingElseMultiStreamProjection>(ProjectionLifecycle.Inline);

// When
var store = new DocumentStore(options);

// Then
store.Events.As<ICodeFileCollection>().BuildFiles()
.OfType<MultiStreamProjection<SomethingElse, Guid>>()
.ShouldHaveSingleItem();

options.BuildFiles()
.OfType<DocumentProviderBuilder>()
.Where(e => e.ProviderName == typeof(SomethingElse).ToSuffixedTypeName("Provider"))
.ShouldHaveSingleItem();
}

public record SomethingHappened(Guid SomethingId, string SomethingSomething);
public record SomethingHasHappened(Guid SomethingId, string SomethingSomething);

public record SomethingDifferentHappened(Guid SomethingId, string SomethingSomething);
public record SomethingElseHasHappened(Guid SomethingId, string SomethingSomething);

public record Something(Guid Id, string SomethingSomething)
{
public static Something Create(SomethingHappened @event) =>
public static Something Create(SomethingHasHappened @event) =>
new Something(@event.SomethingId, @event.SomethingSomething);

public Something Apply(SomethingHappened @event) =>
public Something Apply(SomethingElseHasHappened @event) =>
this with { SomethingSomething = @event.SomethingSomething };
}

public record SomethingElse(Guid Id, string SomethingSomething)
{
public static SomethingElse Create(SomethingHasHappened @event) =>
new SomethingElse(@event.SomethingId, @event.SomethingSomething);

public SomethingElse Apply(SomethingElseHasHappened @event) =>
this with { SomethingSomething = @event.SomethingSomething };
}

public class SomethingElseSingleStreamProjection: SingleStreamProjection<SomethingElse>
{
}

public class SomethingElseMultiStreamProjection: MultiStreamProjection<SomethingElse, Guid>
{
public SomethingElseMultiStreamProjection()
{
Identity<SomethingHasHappened>(e => e.SomethingId);
Identity<SomethingHasHappened>(e => e.SomethingId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ protected virtual void specialAssertValid() { }

internal override IEnumerable<string> ValidateConfiguration(StoreOptions options)
{
// Need to use an isolated DocumentMapping for live aggregations to prevent
// Marten from building empty tables for the aggregate type
var mapping = Lifecycle == ProjectionLifecycle.Live
? new DocumentMapping(typeof(T), options)
: options.Storage.FindMapping(typeof(T)).Root.As<DocumentMapping>();
var mapping = options.Storage.FindMapping(typeof(T)).Root.As<DocumentMapping>();

foreach (var p in validateDocumentIdentity(options, mapping)) yield return p;

Expand Down
2 changes: 1 addition & 1 deletion src/Marten/Events/Projections/ProjectionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public MartenRegistry.DocumentMappingExpression<T> LiveStreamAggregation<T>(
var expression = singleStreamProjection<T>(ProjectionLifecycle.Live, asyncConfiguration);

// Hack to address https://github.com/JasperFx/marten/issues/2610
_options.Storage.RemoveBuilderFor<T>();
_options.Storage.MappingFor(typeof(T)).SkipSchemaGeneration = true;

return expression;
}
Expand Down
9 changes: 4 additions & 5 deletions src/Marten/Schema/DocumentMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public class DocumentMapping: FieldMapping, IDocumentMapping, IDocumentType
private string _alias;
private string _databaseSchemaName;


private HiloSettings _hiloSettings;
private MemberInfo _idMember;

Expand Down Expand Up @@ -113,6 +112,9 @@ public HiloSettings HiloSettings
}
}

// TODO: This should be smarter, maybe nullable option for Schema or some other base type
internal bool SkipSchemaGeneration { get; set; }

public MemberInfo IdMember
{
get => _idMember;
Expand Down Expand Up @@ -148,7 +150,6 @@ public MemberInfo IdMember

public DocumentMetadataCollection Metadata { get; }


public bool UseOptimisticConcurrency { get; set; }

public IList<IndexDefinition> Indexes { get; } = new List<IndexDefinition>();
Expand Down Expand Up @@ -191,6 +192,7 @@ public string Alias
public bool StructuralTyped { get; set; }

public string DdlTemplate { get; set; }

IReadOnlyHiloSettings IDocumentType.HiloSettings { get; }

public TenancyStyle TenancyStyle { get; set; } = TenancyStyle.Single;
Expand All @@ -199,7 +201,6 @@ public string Alias

public DuplicatedField[] DuplicatedFields => fields().OfType<DuplicatedField>().ToArray();


public bool IsHierarchy()
{
return SubClasses.Any() || DocumentType.GetTypeInfo().IsAbstract || DocumentType.GetTypeInfo().IsInterface;
Expand All @@ -210,7 +211,6 @@ public IEnumerable<DocumentIndex> IndexesFor(string column)
return Indexes.OfType<DocumentIndex>().Where(x => x.Columns.Contains(column));
}


public string AliasFor(Type subclassType)
{
if (subclassType == DocumentType)
Expand Down Expand Up @@ -330,7 +330,6 @@ private static PropertyInfo[] GetProperties(Type type)
.OrderByDescending(x => x.DeclaringType == type).ToArray();
}


public DocumentIndex AddGinIndexToData()
{
var index = AddIndex("data");
Expand Down
4 changes: 2 additions & 2 deletions src/Marten/Storage/MartenDatabase.DocumentCleaner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public async Task DeleteDocumentsByTypeAsync(Type documentType, CancellationToke

public void DeleteDocumentsExcept(params Type[] documentTypes)
{
var documentMappings = _options.Storage.AllDocumentMappings.Where(x => !documentTypes.Contains(x.DocumentType));
var documentMappings = _options.Storage.DocumentMappingsWithSchema.Where(x => !documentTypes.Contains(x.DocumentType));
foreach (var mapping in documentMappings)
{
var storage = Providers.StorageFor(mapping.DocumentType);
Expand All @@ -93,7 +93,7 @@ public void DeleteDocumentsExcept(params Type[] documentTypes)

public async Task DeleteDocumentsExceptAsync(CancellationToken ct, params Type[] documentTypes)
{
var documentMappings = _options.Storage.AllDocumentMappings.Where(x => !documentTypes.Contains(x.DocumentType));
var documentMappings = _options.Storage.DocumentMappingsWithSchema.Where(x => !documentTypes.Contains(x.DocumentType));
foreach (var mapping in documentMappings)
{
var storage = Providers.StorageFor(mapping.DocumentType);
Expand Down
20 changes: 6 additions & 14 deletions src/Marten/Storage/StorageFeatures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ internal StorageFeatures(StoreOptions options)
internal IEnumerable<DocumentMapping> AllDocumentMappings =>
_documentMappings.Value.Enumerate().Select(x => x.Value);

internal IEnumerable<DocumentMapping> DocumentMappingsWithSchema =>
_documentMappings.Value.Enumerate().Where(x => !x.Value.SkipSchemaGeneration).Select(x => x.Value);

void IFeatureSchema.WritePermissions(Migrator rules, TextWriter writer)
{
// Nothing
Expand Down Expand Up @@ -173,7 +176,6 @@ internal DocumentMapping MappingFor(Type documentType)
{
if (!_documentMappings.Value.TryFind(documentType, out var value))
{
var buildingList = new List<Type>();
value = Build(documentType, _options);
_documentMappings.Swap(d => d.AddOrUpdate(documentType, value));
}
Expand Down Expand Up @@ -271,7 +273,7 @@ internal void PostProcessConfiguration()

_mappings.Swap(d => d.AddOrUpdate(typeof(IEvent), new EventQueryMapping(_options)));

foreach (var mapping in _documentMappings.Value.Enumerate().Select(x => x.Value))
foreach (var mapping in DocumentMappingsWithSchema)
{
foreach (var subClass in mapping.SubClasses)
{
Expand All @@ -290,8 +292,7 @@ internal IEnumerable<IFeatureSchema> AllActiveFeatures(IMartenDatabase database)
MappingFor(typeof(DeadLetterEvent)).DatabaseSchemaName = _options.Events.DatabaseSchemaName;
}

var mappings = _documentMappings.Value
.Enumerate().Select(x => x.Value)
var mappings = DocumentMappingsWithSchema
.OrderBy(x => x.DocumentType.Name)
.TopologicalSort(m => m.ReferencedTypes()
.Select(MappingFor));
Expand Down Expand Up @@ -321,7 +322,7 @@ internal IEnumerable<IFeatureSchema> AllActiveFeatures(IMartenDatabase database)

internal bool SequenceIsRequired()
{
return _documentMappings.Value.Enumerate().Select(x => x.Value).Any(x => x.IdStrategy.RequiresSequences);
return DocumentMappingsWithSchema.Any(x => x.IdStrategy.RequiresSequences);
}

internal IEnumerable<Type> GetTypeDependencies(Type type)
Expand Down Expand Up @@ -384,13 +385,4 @@ internal void IncludeDocumentMappingBuilders(StorageFeatures includedStorage)
}
}
}

internal void RemoveBuilderFor<T>()
{
_builders.Swap(d => d.Remove(typeof(T)));
_documentMappings.Swap(d => d.Remove(typeof(T)));
_features.Swap(d => d.Remove(typeof(T)));
_mappings.Swap(d => d.Remove(typeof(T)));

}
}

0 comments on commit 0a7a351

Please sign in to comment.