Skip to content

Commit

Permalink
#60: added KNetCompactedReplicator as an option (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
masesdevelopers authored Sep 27, 2023
1 parent 6c01278 commit 615d8f2
Show file tree
Hide file tree
Showing 21 changed files with 582 additions and 123 deletions.
6 changes: 6 additions & 0 deletions src/net/KEFCore/Extensions/KafkaEntityTypeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,10 @@ public static int NumPartitions(this IEntityType entityType, KafkaOptionsExtensi
var numPartitions = options.DefaultNumPartitions;
return numPartitions;
}

public static int? ConsumerInstances(this IEntityType entityType, KafkaOptionsExtension options)
{
var consumerInstances = options.DefaultConsumerInstances;
return consumerInstances;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ public interface IKafkaSingletonOptions : ISingletonOptions

bool ProducerByEntity { get; }

bool UseCompactedReplicator { get; }

bool UsePersistentStorage { get; }

int DefaultNumPartitions { get; }

int? DefaultConsumerInstances { get; }

int DefaultReplicationFactor { get; }

ProducerConfigBuilder? ProducerConfigBuilder { get; }
Expand Down
28 changes: 26 additions & 2 deletions src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@

using Java.Lang;
using Java.Util;
using MASES.JCOBridge.C2JBridge;
using MASES.KNet.Common;
using MASES.KNet.Producer;
using MASES.KNet.Streams;
using Org.Apache.Kafka.Clients.Consumer;
using Org.Apache.Kafka.Clients.Producer;
using Org.Apache.Kafka.Streams;
using System.Globalization;
using System.Text;

namespace MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;

Expand All @@ -39,8 +37,10 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension
private string? _applicationId;
private string? _bootstrapServers;
private bool _producerByEntity = false;
private bool _useCompactedReplicator = false;
private bool _usePersistentStorage = false;
private int _defaultNumPartitions = 1;
private int? _defaultConsumerInstances = null;
private short _defaultReplicationFactor = 1;
private ProducerConfigBuilder? _producerConfigBuilder;
private StreamsConfigBuilder? _streamsConfigBuilder;
Expand All @@ -61,8 +61,10 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)
_applicationId = copyFrom._applicationId;
_bootstrapServers = copyFrom._bootstrapServers;
_producerByEntity = copyFrom._producerByEntity;
_useCompactedReplicator = copyFrom._useCompactedReplicator;
_usePersistentStorage = copyFrom._usePersistentStorage;
_defaultNumPartitions = copyFrom._defaultNumPartitions;
_defaultConsumerInstances = copyFrom._defaultConsumerInstances;
_defaultReplicationFactor = copyFrom._defaultReplicationFactor;
_producerConfigBuilder = ProducerConfigBuilder.CreateFrom(copyFrom._producerConfigBuilder);
_streamsConfigBuilder = StreamsConfigBuilder.CreateFrom(copyFrom._streamsConfigBuilder);
Expand All @@ -85,10 +87,14 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)

public virtual bool ProducerByEntity => _producerByEntity;

public virtual bool UseCompactedReplicator => _useCompactedReplicator;

public virtual bool UsePersistentStorage => _usePersistentStorage;

public virtual int DefaultNumPartitions => _defaultNumPartitions;

public virtual int? DefaultConsumerInstances => _defaultConsumerInstances;

public virtual short DefaultReplicationFactor => _defaultReplicationFactor;

public virtual ProducerConfigBuilder ProducerConfigBuilder => _producerConfigBuilder!;
Expand Down Expand Up @@ -142,6 +148,15 @@ public virtual KafkaOptionsExtension WithProducerByEntity(bool producerByEntity
return clone;
}

public virtual KafkaOptionsExtension WithCompactedReplicator(bool useCompactedReplicator = false)
{
var clone = Clone();

clone._useCompactedReplicator = useCompactedReplicator;

return clone;
}

public virtual KafkaOptionsExtension WithUsePersistentStorage(bool usePersistentStorage = false)
{
var clone = Clone();
Expand All @@ -160,6 +175,15 @@ public virtual KafkaOptionsExtension WithDefaultNumPartitions(int defaultNumPart
return clone;
}

public virtual KafkaOptionsExtension WithDefaultConsumerInstances(int? defaultConsumerInstances = null)
{
var clone = Clone();

clone._defaultConsumerInstances = defaultConsumerInstances;

return clone;
}

public virtual KafkaOptionsExtension WithDefaultReplicationFactor(short defaultReplicationFactor = 1)
{
var clone = Clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ public virtual void Initialize(IDbContextOptions options)
ApplicationId = kafkaOptions.ApplicationId;
BootstrapServers = kafkaOptions.BootstrapServers;
ProducerByEntity = kafkaOptions.ProducerByEntity;
UseCompactedReplicator = kafkaOptions.UseCompactedReplicator;
UsePersistentStorage = kafkaOptions.UsePersistentStorage;
DefaultNumPartitions = kafkaOptions.DefaultNumPartitions;
DefaultConsumerInstances = kafkaOptions.DefaultConsumerInstances;
DefaultReplicationFactor = kafkaOptions.DefaultReplicationFactor;
ProducerConfigBuilder = ProducerConfigBuilder.CreateFrom(kafkaOptions.ProducerConfigBuilder);
StreamsConfigBuilder = StreamsConfigBuilder.CreateFrom(kafkaOptions.StreamsConfigBuilder);
Expand Down Expand Up @@ -68,10 +70,14 @@ public virtual void Validate(IDbContextOptions options)

public virtual bool ProducerByEntity { get; private set; }

public virtual bool UseCompactedReplicator { get; private set; }

public virtual bool UsePersistentStorage { get; private set; }

public virtual int DefaultNumPartitions { get; private set; }

public virtual int? DefaultConsumerInstances { get; private set; }

public virtual int DefaultReplicationFactor { get; private set; }

public virtual ProducerConfigBuilder? ProducerConfigBuilder { get; private set; }
Expand Down
55 changes: 41 additions & 14 deletions src/net/KEFCore/Infrastructure/KafkaDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* Refer to LICENSE for more information.
*/

using MASES.KNet;
using MASES.KNet.Common;
using MASES.KNet.Producer;
using MASES.KNet.Streams;
Expand All @@ -30,41 +31,66 @@ namespace MASES.EntityFrameworkCore.KNet.Infrastructure;
/// </summary>
public class KafkaDbContext : DbContext
{
/// <inheritdoc cref="DbContext.DbContext()"/>
public KafkaDbContext()
{

}
/// <inheritdoc cref="DbContext.DbContext(DbContextOptions)"/>
public KafkaDbContext(DbContextOptions options) : base(options)
{

}

/// <summary>
/// The bootstrap servers of the Apache Kafka cluster
/// </summary>
public string? BootstrapServers { get; set; }
public virtual string? BootstrapServers { get; set; }
/// <summary>
/// The application id
/// </summary>
public string ApplicationId { get; set; } = Guid.NewGuid().ToString();
public virtual string ApplicationId { get; set; } = Guid.NewGuid().ToString();
/// <summary>
/// Database name
/// </summary>
public string? DbName { get; set; }
public virtual string? DbName { get; set; }
/// <summary>
/// Database number of partitions
/// </summary>
public int DefaultNumPartitions { get; set; } = 10;
public virtual int DefaultNumPartitions { get; set; } = 10;
/// <summary>
/// Database replication factor
/// </summary>
public short DefaultReplicationFactor { get; set; } = 1;
public virtual short DefaultReplicationFactor { get; set; } = 1;
/// <summary>
/// Database consumr instances used in conjunction with <see cref="UseCompactedReplicator"/>
/// </summary>
public virtual int? DefaultConsumerInstances { get; set; } = null;
/// <summary>
/// Use persistent storage
/// </summary>
public bool UsePersistentStorage { get; set; } = false;
public virtual bool UsePersistentStorage { get; set; } = false;
/// <summary>
/// Use a producer for each Entity
/// </summary>
public bool UseProducerByEntity { get; set; } = false;

public ProducerConfigBuilder? ProducerConfigBuilder { get; set; }

public StreamsConfigBuilder? StreamsConfigBuilder { get; set; }

public TopicConfigBuilder? TopicConfigBuilder { get; set; }

/// <summary>
/// Use <see cref="MASES.KNet.Replicator.KNetCompactedReplicator{TKey, TValue}"/> instead of Apache Kafka Streams
/// </summary>
public virtual bool UseCompactedReplicator { get; set; } = false;
/// <summary>
/// The optional <see cref="ProducerConfigBuilder"/>
/// </summary>
public virtual ProducerConfigBuilder? ProducerConfigBuilder { get; set; }
/// <summary>
/// The optional <see cref="StreamsConfigBuilder"/>
/// </summary>
public virtual StreamsConfigBuilder? StreamsConfigBuilder { get; set; }
/// <summary>
/// The optional <see cref="TopicConfigBuilder"/>
/// </summary>
public virtual TopicConfigBuilder? TopicConfigBuilder { get; set; }
/// <inheritdoc cref="DbContext.OnConfiguring(DbContextOptionsBuilder)"/>
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
if (BootstrapServers == null)
Expand All @@ -76,9 +102,10 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)

optionsBuilder.UseKafkaDatabase(ApplicationId, DbName, BootstrapServers, (o) =>
{
o.StreamsConfig(StreamsConfigBuilder??o.EmptyStreamsConfigBuilder).WithDefaultNumPartitions(DefaultNumPartitions);
o.StreamsConfig(StreamsConfigBuilder ?? o.EmptyStreamsConfigBuilder).WithDefaultNumPartitions(DefaultNumPartitions);
o.WithUsePersistentStorage(UsePersistentStorage);
o.WithProducerByEntity(UseProducerByEntity);
o.WithCompactedReplicator(UseCompactedReplicator);
o.WithDefaultReplicationFactor(DefaultReplicationFactor);
});
}
Expand Down
42 changes: 42 additions & 0 deletions src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,27 @@ public virtual KafkaDbContextOptionsBuilder WithProducerByEntity(bool producerBy
return this;
}

/// <summary>
/// Enables use of <see cref="MASES.KNet.Replicator.KNetCompactedReplicator{TKey, TValue}"/>
/// </summary>
/// <remarks>
/// See <see href="https://aka.ms/efcore-docs-dbcontext-options">Using DbContextOptions</see>, and
/// <see href="https://github.com/masesgroup/KEFCore">The EF Core Kafka database provider</see> for more information and examples.
/// </remarks>
/// <param name="useCompactedReplicator">If <see langword="true" /> then <see cref="MASES.KNet.Replicator.KNetCompactedReplicator{TKey, TValue}"/> will be used instead of Apache Kafka Streams.</param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder WithCompactedReplicator(bool useCompactedReplicator = false)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();

extension = extension.WithCompactedReplicator(useCompactedReplicator);

((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension);

return this;
}

/// <summary>
/// Enables use of persistent storage, otherwise a <see cref="Materialized"/> storage will be in-memory
/// </summary>
Expand Down Expand Up @@ -156,6 +177,27 @@ public virtual KafkaDbContextOptionsBuilder WithDefaultNumPartitions(int default
return this;
}

/// <summary>
/// Defines the default number of consumer instances to be used in conjunction with <see cref="WithCompactedReplicator(bool)"/>
/// </summary>
/// <remarks>
/// See <see href="https://aka.ms/efcore-docs-dbcontext-options">Using DbContextOptions</see>, and
/// <see href="https://github.com/masesgroup/KEFCore">The EF Core Kafka database provider</see> for more information and examples.
/// </remarks>
/// <param name="defaultConsumerInstances">The default number of consumer instances to be used in conjunction with <see cref="WithCompactedReplicator(bool)"/></param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder WithDefaultConsumerInstances(int? defaultConsumerInstances = null)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();

extension = extension.WithDefaultConsumerInstances(defaultConsumerInstances);

((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension);

return this;
}

/// <summary>
/// Defines the default replication factor to use when a new topic is created
/// </summary>
Expand Down
6 changes: 3 additions & 3 deletions src/net/KEFCore/KEFCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<PackageReadmeFile>README.md</PackageReadmeFile>
<Nullable>enable</Nullable>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<RunAnalyzersDuringLiveAnalysis>False</RunAnalyzersDuringLiveAnalysis>
<RunAnalyzersDuringLiveAnalysis>True</RunAnalyzersDuringLiveAnalysis>
<RunAnalyzersDuringBuild>False</RunAnalyzersDuringBuild>
</PropertyGroup>

Expand Down Expand Up @@ -66,10 +66,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="MASES.KNet" Version="2.0.2">
<PackageReference Include="MASES.KNet" Version="2.1.0">
<IncludeAssets>All</IncludeAssets>
<PrivateAssets>None</PrivateAssets>
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.9" PrivateAssets="none" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.11" PrivateAssets="none" />
</ItemGroup>
</Project>
8 changes: 8 additions & 0 deletions src/net/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@ namespace MASES.EntityFrameworkCore.KNet.Serdes.Internal
{
public interface IKafkaSerdesEntityType
{
string Serialize(params object?[]? args);

string Serialize(Headers headers, params object?[]? args);

string Serialize<TKey>(TKey key);

string Serialize<TKey>(Headers headers, TKey key);

object[] Deserialize(string arg);

object[] Deserialize(Headers headers, string arg);

TKey Deserialize<TKey>(string arg);

TKey Deserialize<TKey>(Headers headers, string arg);

object[] ConvertData(object[]? input);
Expand Down
12 changes: 12 additions & 0 deletions src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,28 @@ public KafkaSerdesEntityType(IEntityType type)
_properties = _type.GetProperties().ToArray();
}

public object[] Deserialize(string arg)
{
var des = GetFullType(arg);
return ConvertData(des!.data);
}

public object[] Deserialize(Headers headers, string arg)
{
var des = GetFullType(arg);
return ConvertData(des!.data);
}

public TKey Deserialize<TKey>(string arg) => System.Text.Json.JsonSerializer.Deserialize<TKey>(arg)!;

public TKey Deserialize<TKey>(Headers headers, string arg) => System.Text.Json.JsonSerializer.Deserialize<TKey>(arg)!;

public string Serialize(params object?[]? args) => System.Text.Json.JsonSerializer.Serialize(new KafkaSerdesEntityTypeData(_type.Name, args!));

public string Serialize(Headers headers, params object?[]? args) => System.Text.Json.JsonSerializer.Serialize(new KafkaSerdesEntityTypeData(_type.Name, args!));

public string Serialize<TKey>(TKey key) => System.Text.Json.JsonSerializer.Serialize(key);

public string Serialize<TKey>(Headers headers, TKey key) => System.Text.Json.JsonSerializer.Serialize(key);

public static KafkaSerdesEntityTypeData? GetFullType(string arg) => System.Text.Json.JsonSerializer.Deserialize<KafkaSerdesEntityTypeData>(arg);
Expand Down
6 changes: 5 additions & 1 deletion src/net/KEFCore/Storage/Internal/IKafkaCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;
using MASES.EntityFrameworkCore.KNet.Serdes.Internal;
using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal;
using MASES.KNet;
using MASES.KNet.Producer;
using MASES.KNet.Replicator;
using Org.Apache.Kafka.Clients.Producer;

namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;

public interface IKafkaCluster
public interface IKafkaCluster :IDisposable
{
bool EnsureDeleted(
IUpdateAdapterFactory updateAdapterFactory,
Expand All @@ -46,6 +48,8 @@ bool EnsureConnected(

IKafkaSerdesEntityType CreateSerdes(IEntityType entityType);

IKNetCompactedReplicator<string, string> CreateCompactedReplicator(IEntityType entityType);

IProducer<string, string> CreateProducer(IEntityType entityType);

IEnumerable<ValueBuffer> GetData(IEntityType entityType);
Expand Down
2 changes: 1 addition & 1 deletion src/net/KEFCore/Storage/Internal/IKafkaDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;

public interface IKafkaDatabase : IDatabase
public interface IKafkaDatabase : IDatabase, IDisposable
{
IKafkaCluster Cluster { get; }

Expand Down
Loading

0 comments on commit 615d8f2

Please sign in to comment.