Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cached ISerDesSelector instances across application, bump version to 2.3.0 #240

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/net/Common/Common.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Owners>MASES s.r.l.</Owners>
<Authors>MASES s.r.l.</Authors>
<Company>MASES s.r.l.</Company>
<Version>2.2.0.0</Version>
<Version>2.3.0.0</Version>
<TargetFrameworks>net6.0;net7.0;net8.0</TargetFrameworks>
<LangVersion>latest</LangVersion>
<GenerateAssemblyInfo>true</GenerateAssemblyInfo>
Expand Down
35 changes: 20 additions & 15 deletions src/net/KEFCore/Extensions/KafkaDbContextOptionsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
using MASES.EntityFrameworkCore.KNet.Infrastructure;
using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;
using MASES.KNet.Serialization;
using System.Collections.Concurrent;

namespace MASES.EntityFrameworkCore.KNet;

Expand Down Expand Up @@ -144,38 +145,42 @@ public static Type ValueContainerType(this IKafkaSingletonOptions options, IEnti
public static Type JVMKeyType(this IKafkaSingletonOptions options, IEntityType entityType)
{
return typeof(byte[]);

var keySerDesType = SerDesSelectorTypeForKey(options, entityType);
ISerDes serDes = Activator.CreateInstance(keySerDesType) as ISerDes;
if (serDes == null) throw new InvalidOperationException($"{keySerDesType} is not a valid {nameof(ISerDes)}");
return serDes.JVMType;
}
/// <summary>
/// Create the ValueContainer <see cref="Type"/>
/// </summary>
public static Type JVMValueContainerType(this IKafkaSingletonOptions options, IEntityType entityType)
{
if (options.UseByteBufferDataTransfer) return typeof(Java.Nio.ByteBuffer);
var selector = SerDesSelectorForValue(options, entityType);
if (options.UseByteBufferDataTransfer && selector!= null && selector.ByteBufferSerDes != null) return typeof(Java.Nio.ByteBuffer);
return typeof(byte[]);
}

static ConcurrentDictionary<(Type?, IEntityType), ISerDesSelector> _keySerDesSelctors = new();

var valueSerDesType = SerDesSelectorTypeForValue(options, entityType);
ISerDes serDes = Activator.CreateInstance(valueSerDesType) as ISerDes;
if (serDes == null) throw new InvalidOperationException($"{valueSerDesType} is not a valid {nameof(ISerDes)}");
return serDes.JVMType;
}
/// <summary>
/// Creates a serializer <see cref="Type"/> for keys
/// </summary>
public static Type SerDesSelectorTypeForKey(this IKafkaSingletonOptions options, IEntityType entityType)
public static ISerDesSelector SerDesSelectorForKey(this IKafkaSingletonOptions options, IEntityType entityType)
{
return options.KeySerDesSelectorType?.MakeGenericType(KeyType(options, entityType))!;
return _keySerDesSelctors.GetOrAdd((options.KeySerDesSelectorType, entityType), (o) =>
{
var selector = o.Item1?.MakeGenericType(KeyType(options, o.Item2))!;
return (ISerDesSelector)Activator.CreateInstance(selector);
});
}

static ConcurrentDictionary<(Type?, IEntityType), ISerDesSelector> _valueContainerSerDesSelctors = new();

/// <summary>
/// Creates a serialzier <see cref="Type"/> for values
/// </summary>
public static Type SerDesSelectorTypeForValue(this IKafkaSingletonOptions options, IEntityType entityType)
public static ISerDesSelector SerDesSelectorForValue(this IKafkaSingletonOptions options, IEntityType entityType)
{
return options.ValueSerDesSelectorType?.MakeGenericType(ValueContainerType(options, entityType))!;
return _valueContainerSerDesSelctors.GetOrAdd((options.ValueSerDesSelectorType, entityType), (o) =>
{
var selector = o.Item1?.MakeGenericType(ValueContainerType(options, o.Item2))!;
return (ISerDesSelector)Activator.CreateInstance(selector);
});
}
}
19 changes: 9 additions & 10 deletions src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension, IKafkaSingleton
private Action<EntityTypeChanged>? _onChangeEvent = null;
private DbContextOptionsExtensionInfo? _info;

static readonly Java.Lang.ClassLoader _loader = Java.Lang.ClassLoader.SystemClassLoader;
internal static Java.Lang.ClassLoader SystemClassLoader => _loader;
/// <summary>
/// Initializer
/// </summary>
Expand Down Expand Up @@ -360,15 +358,16 @@ public virtual StreamsConfigBuilder StreamsOptions(IEntityType entityType)

builder.ApplicationId = ApplicationId;
builder.BootstrapServers = BootstrapServers;

string baSerdesName = Class.ClassNameOf<Org.Apache.Kafka.Common.Serialization.Serdes.ByteArraySerde>();
string bbSerdesName = Class.ClassNameOf<MASES.KNet.Serialization.Serdes.ByteBufferSerde>();

builder.DefaultKeySerdeClass = this.JVMKeyType(entityType) == typeof(byte[]) ? Class.ForName(baSerdesName, true, SystemClassLoader)
: Class.ForName(bbSerdesName, true, SystemClassLoader);
builder.DefaultValueSerdeClass = this.JVMValueContainerType(entityType) == typeof(byte[]) ? Class.ForName(baSerdesName, true, SystemClassLoader)
: Class.ForName(bbSerdesName, true, SystemClassLoader);
builder.DSLStoreSuppliersClass = UsePersistentStorage ? Class.ForName(Class.ClassNameOf<BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers>(), true, SystemClassLoader)
: Class.ForName(Class.ClassNameOf<BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers>(), true, SystemClassLoader);
builder.DefaultKeySerdeClass = this.JVMKeyType(entityType) == typeof(byte[]) ? Class.ForName(baSerdesName, true, Class.SystemClassLoader)
: Class.ForName(bbSerdesName, true, Class.SystemClassLoader);
builder.DefaultValueSerdeClass = this.JVMValueContainerType(entityType) == typeof(byte[]) ? Class.ForName(baSerdesName, true, Class.SystemClassLoader)
: Class.ForName(bbSerdesName, true, Class.SystemClassLoader);
builder.DSLStoreSuppliersClass = UsePersistentStorage ? Class.ForName(Class.ClassNameOf<BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers>(), true, Class.SystemClassLoader)
: Class.ForName(Class.ClassNameOf<BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers>(), true, Class.SystemClassLoader);

//if (props.ContainsKey(Org.Apache.Kafka.Streams.StreamsConfig.APPLICATION_ID_CONFIG))
//{
Expand Down Expand Up @@ -419,12 +418,12 @@ public virtual Properties StreamsOptions(string applicationId)
{
props.Remove(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG);
}
props.Put(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader));
props.Put(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, Class.SystemClassLoader));
if (props.ContainsKey(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG))
{
props.Remove(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG);
}
props.Put(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader));
props.Put(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, Class.SystemClassLoader));
if (props.ContainsKey(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
{
props.Remove(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
Expand Down
4 changes: 2 additions & 2 deletions src/net/KEFCore/Infrastructure/KafkaDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ public KafkaDbContext(DbContextOptions options) : base(options)
/// Use KNet version of Apache Kafka Streams instead of standard Apache Kafka Streams
/// </summary>
public virtual bool UseKNetStreams { get; set; } = true;

/// <summary>
/// The optional <see cref="ConsumerConfigBuilder"/> used when <see cref="UseCompactedReplicator"/> is <see langword="true"/>
/// </summary>
Expand Down Expand Up @@ -264,7 +263,8 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
o.WithStreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions);
o.WithTopicConfig(TopicConfig ?? DefaultTopicConfig);
o.WithUsePersistentStorage(UsePersistentStorage);
o.WithUseEnumeratorWithPrefetch(UseEnumeratorWithPrefetch);
o.WithUseEnumeratorWithPrefetch(UseEnumeratorWithPrefetch);
o.WithUseByteBufferDataTransfer(UseByteBufferDataTransfer);
o.WithUseDeletePolicyForTopic(UseDeletePolicyForTopic);
o.WithCompactedReplicator(UseCompactedReplicator);
o.WithUseKNetStreams(UseKNetStreams);
Expand Down
15 changes: 8 additions & 7 deletions src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public class EntityTypeProducer<TKey, TValueContainer, TJVMKey, TJVMValueContainer, TKeySerDesSelectorType, TValueContainerSerDesSelectorType> : IEntityTypeProducer
public class EntityTypeProducer<TKey, TValueContainer, TJVMKey, TJVMValueContainer> : IEntityTypeProducer
where TKey : notnull
where TValueContainer : class, IValueContainer<TKey>
where TKeySerDesSelectorType : class, ISerDesSelector<TKey>, new()
where TValueContainerSerDesSelectorType : class, ISerDesSelector<TValueContainer>, new()
{
private readonly ConstructorInfo TValueContainerConstructor;
private readonly bool _useCompactedReplicator;
Expand Down Expand Up @@ -191,11 +189,14 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
var tTValueContainer = typeof(TValueContainer);
TValueContainerConstructor = tTValueContainer.GetConstructors().Single(ci => ci.GetParameters().Length == 2);

_keySerdes = new TKeySerDesSelectorType().NewSerDes<TJVMKey>();
_valueSerdes = new TValueContainerSerDesSelectorType().NewSerDes<TJVMValueContainer>();
var keySelector = _cluster.Options.SerDesSelectorForKey(_entityType) as ISerDesSelector<TKey>;
var valueSelector = _cluster.Options.SerDesSelectorForValue(_entityType) as ISerDesSelector<TValueContainer>;

if (_keySerdes == null) throw new InvalidOperationException($"{typeof(TKeySerDesSelectorType)} does not returned a {typeof(ISerDes<TKey, TJVMKey>)}");
if (_valueSerdes == null) throw new InvalidOperationException($"{typeof(TValueContainerSerDesSelectorType)} does not returned a {typeof(ISerDes<TValueContainer, TJVMValueContainer>)}");
_keySerdes = keySelector?.NewSerDes<TJVMKey>();
_valueSerdes = valueSelector?.NewSerDes<TJVMValueContainer>();

if (_keySerdes == null) throw new InvalidOperationException($"SerDsSelector for key does not returned a {typeof(ISerDes<TKey, TJVMKey>)}");
if (_valueSerdes == null) throw new InvalidOperationException($"SerDsSelector for value does not returned a {typeof(ISerDes<TValueContainer, TJVMValueContainer>)}");

if (_useCompactedReplicator)
{
Expand Down
12 changes: 4 additions & 8 deletions src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@ public class EntityTypeProducers
/// <summary>
/// Allocates a new <see cref="IEntityTypeProducer"/>
/// </summary>
public static IEntityTypeProducer Create<TKey, TValueContainer, TJVMKey, TJVMValueContainer, TKeySerDesSelectorType, TValueContainerSerDesSelectorType>(IEntityType entityType, IKafkaCluster cluster)
public static IEntityTypeProducer Create<TKey, TValueContainer, TJVMKey, TJVMValueContainer>(IEntityType entityType, IKafkaCluster cluster)
where TKey : notnull
where TValueContainer : class, IValueContainer<TKey>
where TKeySerDesSelectorType : class, ISerDesSelector<TKey>, new()
where TValueContainerSerDesSelectorType : class, ISerDesSelector<TValueContainer>, new()
{
return _producers.GetOrAdd(entityType, _ => CreateProducerLocal<TKey, TValueContainer, TJVMKey, TJVMValueContainer, TKeySerDesSelectorType, TValueContainerSerDesSelectorType>(entityType, cluster));
return _producers.GetOrAdd(entityType, _ => CreateProducerLocal<TKey, TValueContainer, TJVMKey, TJVMValueContainer>(entityType, cluster));
}
/// <summary>
/// Dispose a previously allocated <see cref="IEntityTypeProducer"/>
Expand All @@ -55,10 +53,8 @@ public static void Dispose(IEntityTypeProducer producer)
producer.Dispose();
}

static IEntityTypeProducer CreateProducerLocal<TKey, TValueContainer, TJVMKey, TJVMValueContainer, TKeySerDesSelectorType, TValueContainerSerDesSelectorType>(IEntityType entityType, IKafkaCluster cluster)
static IEntityTypeProducer CreateProducerLocal<TKey, TValueContainer, TJVMKey, TJVMValueContainer>(IEntityType entityType, IKafkaCluster cluster)
where TKey : notnull
where TValueContainer : class, IValueContainer<TKey>
where TKeySerDesSelectorType : class, ISerDesSelector<TKey>, new()
where TValueContainerSerDesSelectorType : class, ISerDesSelector<TValueContainer>, new()
=> new EntityTypeProducer<TKey, TValueContainer, TJVMKey, TJVMValueContainer, TKeySerDesSelectorType, TValueContainerSerDesSelectorType>(entityType, cluster);
=> new EntityTypeProducer<TKey, TValueContainer, TJVMKey, TJVMValueContainer>(entityType, cluster);
}
6 changes: 2 additions & 4 deletions src/net/KEFCore/Storage/Internal/KafkaTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public class KafkaTable<TKey, TValueContainer, TJVMKey, TJVMValueContainer, TKeySerDesSelectorType, TValueContainerSerDesSelectorType> : IKafkaTable
public class KafkaTable<TKey, TValueContainer, TJVMKey, TJVMValueContainer> : IKafkaTable
where TKey : notnull
where TValueContainer : class, IValueContainer<TKey>
where TKeySerDesSelectorType : class, ISerDesSelector<TKey>, new()
where TValueContainerSerDesSelectorType : class, ISerDesSelector<TValueContainer>, new()
{
private readonly IPrincipalKeyValueFactory<TKey> _keyValueFactory;
private readonly bool _sensitiveLoggingEnabled;
Expand All @@ -63,7 +61,7 @@ public KafkaTable(
Cluster = cluster;
EntityType = entityType;
_tableAssociatedTopicName = Cluster.CreateTable(entityType);
_producer = EntityTypeProducers.Create<TKey, TValueContainer, TJVMKey, TJVMValueContainer, TKeySerDesSelectorType, TValueContainerSerDesSelectorType>(entityType, Cluster);
_producer = EntityTypeProducers.Create<TKey, TValueContainer, TJVMKey, TJVMValueContainer>(entityType, Cluster);
_keyValueFactory = entityType.FindPrimaryKey()!.GetPrincipalKeyValueFactory<TKey>();
_sensitiveLoggingEnabled = sensitiveLoggingEnabled;
_rows = new Dictionary<TKey, object?[]>(_keyValueFactory.EqualityComparer);
Expand Down
10 changes: 3 additions & 7 deletions src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,14 @@ private Func<IKafkaTable> CreateTable(IKafkaCluster cluster, IEntityType entityT
.MakeGenericMethod(entityType.FindPrimaryKey()!.GetKeyType(),
_options.ValueContainerType(entityType),
_options.JVMKeyType(entityType),
_options.JVMValueContainerType(entityType),
_options.SerDesSelectorTypeForKey(entityType),
_options.SerDesSelectorTypeForValue(entityType))
_options.JVMValueContainerType(entityType))
.Invoke(null, new object?[] { cluster, entityType, _sensitiveLoggingEnabled })!;

private static Func<IKafkaTable> CreateFactory<TKey, TValueContainer, TJVMKey, TJVMValueContainer, TKeySerDesSelectorType, TValueContainerSerDesSelectorType>(
private static Func<IKafkaTable> CreateFactory<TKey, TValueContainer, TJVMKey, TJVMValueContainer>(
IKafkaCluster cluster,
IEntityType entityType,
bool sensitiveLoggingEnabled)
where TKey : notnull
where TValueContainer : class, IValueContainer<TKey>
where TKeySerDesSelectorType : class, ISerDesSelector<TKey>, new()
where TValueContainerSerDesSelectorType : class, ISerDesSelector<TValueContainer>, new()
=> () => new KafkaTable<TKey, TValueContainer, TJVMKey, TJVMValueContainer, TKeySerDesSelectorType, TValueContainerSerDesSelectorType>(cluster, entityType, sensitiveLoggingEnabled);
=> () => new KafkaTable<TKey, TValueContainer, TJVMKey, TJVMValueContainer>(cluster, entityType, sensitiveLoggingEnabled);
}
2 changes: 1 addition & 1 deletion src/net/templates/templates/kefcoreApp/kefcoreApp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
</ItemGroup>
<ItemGroup Condition="!Exists('..\..\..\KEFCore\KEFCore.csproj')">
<!--Outside GitHub repo-->
<PackageReference Include="MASES.EntityFrameworkCore.KNet" Version="2.2.0" IncludeAssets="All" PrivateAssets="None" />
<PackageReference Include="MASES.EntityFrameworkCore.KNet" Version="2.3.0" IncludeAssets="All" PrivateAssets="None" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
</ItemGroup>
<ItemGroup Condition="!Exists('..\..\..\KEFCore\KEFCore.csproj')">
<!--Outside GitHub repo-->
<PackageReference Include="MASES.EntityFrameworkCore.KNet" Version="2.2.0" IncludeAssets="All" PrivateAssets="None" />
<PackageReference Include="MASES.EntityFrameworkCore.KNet" Version="2.3.0" IncludeAssets="All" PrivateAssets="None" />
</ItemGroup>
</Project>
Loading