Skip to content

Commit

Permalink
Added change event handler to receive notifications when something ch…
Browse files Browse the repository at this point in the history
…ange on an IEntityType, it works only if Compacted Replicator is in use (#120)

* Added change event handler to receive notifications when something change on an IEntityType, it works only if Compacted Replicator is in use

* Added template for events

* Update template identity
  • Loading branch information
masesdevelopers authored Oct 20, 2023
1 parent 480f785 commit f749390
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 16 deletions.
1 change: 1 addition & 0 deletions src/documentation/articles/kafkadbcontext.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- **ProducerConfig**: parameters to use for Producer
- **StreamsConfig**: parameters to use for Apche Kafka Streams application
- **TopicConfig**: parameters to use on topic creation for each entity
- **OnChangeEvent**: handler to receive change events from back-end

## How to use `KafkaDbContext` class

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ public interface IKafkaSingletonOptions : ISingletonOptions
StreamsConfigBuilder? StreamsConfig { get; }

TopicConfigBuilder? TopicConfig { get; }

Action<IEntityType, bool, object> OnChangeEvent { get; }
}
13 changes: 13 additions & 0 deletions src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension
private ProducerConfigBuilder? _producerConfigBuilder;
private StreamsConfigBuilder? _streamsConfigBuilder;
private TopicConfigBuilder? _topicConfigBuilder;
private Action<IEntityType, bool, object>? _onChangeEvent = null;
private DbContextOptionsExtensionInfo? _info;

static Java.Lang.ClassLoader _loader = Java.Lang.ClassLoader.SystemClassLoader;
Expand Down Expand Up @@ -83,6 +84,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)
_producerConfigBuilder = ProducerConfigBuilder.CreateFrom(copyFrom._producerConfigBuilder);
_streamsConfigBuilder = StreamsConfigBuilder.CreateFrom(copyFrom._streamsConfigBuilder);
_topicConfigBuilder = TopicConfigBuilder.CreateFrom(copyFrom._topicConfigBuilder);
_onChangeEvent = copyFrom._onChangeEvent;
}

public virtual DbContextOptionsExtensionInfo Info => _info ??= new ExtensionInfo(this);
Expand Down Expand Up @@ -125,6 +127,8 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)

public virtual TopicConfigBuilder TopicConfig => _topicConfigBuilder!;

public virtual Action<IEntityType, bool, object> OnChangeEvent => _onChangeEvent!;

public virtual KafkaOptionsExtension WithKeySerializationType(Type serializationType)
{
if (!serializationType.IsGenericTypeDefinition) throw new InvalidOperationException($"{serializationType.Name} shall be a generic type and shall be defined using \"<>\"");
Expand Down Expand Up @@ -284,6 +288,15 @@ public virtual KafkaOptionsExtension WithTopicConfig(TopicConfigBuilder topicCon
return clone;
}

public virtual KafkaOptionsExtension WithOnChangeEvent(Action<IEntityType, bool, object> onChangeEvent)
{
var clone = Clone();

clone._onChangeEvent = onChangeEvent;

return clone;
}

public virtual Properties StreamsOptions(IEntityType entityType)
{
return StreamsOptions(entityType.ApplicationIdForTable(this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public virtual void Initialize(IDbContextOptions options)
ProducerConfig = ProducerConfigBuilder.CreateFrom(kafkaOptions.ProducerConfig);
StreamsConfig = StreamsConfigBuilder.CreateFrom(kafkaOptions.StreamsConfig);
TopicConfig = TopicConfigBuilder.CreateFrom(kafkaOptions.TopicConfig);
OnChangeEvent = kafkaOptions.OnChangeEvent;
}
}

Expand Down Expand Up @@ -103,4 +104,6 @@ public virtual void Validate(IDbContextOptions options)
public virtual StreamsConfigBuilder? StreamsConfig { get; private set; }

public virtual TopicConfigBuilder? TopicConfig { get; private set; }

public virtual Action<IEntityType, bool, object> OnChangeEvent { get; private set; }
}
15 changes: 11 additions & 4 deletions src/net/KEFCore/Infrastructure/KafkaDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ public KafkaDbContext(DbContextOptions options) : base(options)
/// The optional <see cref="TopicConfigBuilder"/> used when topics shall be created
/// </summary>
public virtual TopicConfigBuilder? TopicConfig { get; set; }
/// <summary>
/// The optional handler to be used to receive notification when the back-end triggers a data change.
/// </summary>
/// <remarks>Works if <see cref="UseCompactedReplicator"/> is <see langword="true"/></remarks>
public virtual Action<IEntityType, bool, object>? OnChangeEvent { get; set; } = null;

/// <inheritdoc cref="DbContext.OnConfiguring(DbContextOptionsBuilder)"/>
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
Expand All @@ -227,17 +233,18 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)

optionsBuilder.UseKafkaCluster(ApplicationId, DbName, BootstrapServers, (o) =>
{
o.ConsumerConfig(ConsumerConfig ?? DefaultConsumerConfig);
o.ProducerConfig(ProducerConfig ?? DefaultProducerConfig);
o.StreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions);
o.TopicConfig(TopicConfig ?? DefaultTopicConfig);
o.WithConsumerConfig(ConsumerConfig ?? DefaultConsumerConfig);
o.WithProducerConfig(ProducerConfig ?? DefaultProducerConfig);
o.WithStreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions);
o.WithTopicConfig(TopicConfig ?? DefaultTopicConfig);
o.WithUsePersistentStorage(UsePersistentStorage);
o.WithUseDeletePolicyForTopic(UseDeletePolicyForTopic);
o.WithCompactedReplicator(UseCompactedReplicator);
o.WithDefaultReplicationFactor(DefaultReplicationFactor);
if (KeySerializationType != null) o.WithKeySerializationType(KeySerializationType);
if (ValueSerializationType != null) o.WithValueSerializationType(ValueSerializationType);
if (ValueContainerType != null) o.WithValueContainerType(ValueContainerType);
if (OnChangeEvent != null) o.WithOnChangeEvent(OnChangeEvent);
});
}
}
29 changes: 25 additions & 4 deletions src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public virtual KafkaDbContextOptionsBuilder WithDefaultReplicationFactor(short d
/// </remarks>
/// <param name="consumerConfigBuilder">The <see cref="ConsumerConfigBuilder"/> where options are stored.</param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder ConsumerConfig(ConsumerConfigBuilder consumerConfigBuilder)
public virtual KafkaDbContextOptionsBuilder WithConsumerConfig(ConsumerConfigBuilder consumerConfigBuilder)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();
Expand All @@ -313,7 +313,7 @@ public virtual KafkaDbContextOptionsBuilder ConsumerConfig(ConsumerConfigBuilder
/// </remarks>
/// <param name="producerConfigBuilder">The <see cref="ProducerConfigBuilder"/> where options are stored.</param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder ProducerConfig(ProducerConfigBuilder producerConfigBuilder)
public virtual KafkaDbContextOptionsBuilder WithProducerConfig(ProducerConfigBuilder producerConfigBuilder)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();
Expand All @@ -334,7 +334,7 @@ public virtual KafkaDbContextOptionsBuilder ProducerConfig(ProducerConfigBuilder
/// </remarks>
/// <param name="streamsConfigBuilder">The <see cref="StreamsConfigBuilder"/> where options are stored.</param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder StreamsConfig(StreamsConfigBuilder streamsConfigBuilder)
public virtual KafkaDbContextOptionsBuilder WithStreamsConfig(StreamsConfigBuilder streamsConfigBuilder)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();
Expand All @@ -355,7 +355,7 @@ public virtual KafkaDbContextOptionsBuilder StreamsConfig(StreamsConfigBuilder s
/// </remarks>
/// <param name="topicConfig">The <see cref="TopicConfigBuilder"/> where options are stored.</param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder TopicConfig(TopicConfigBuilder topicConfig)
public virtual KafkaDbContextOptionsBuilder WithTopicConfig(TopicConfigBuilder topicConfig)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();
Expand All @@ -367,6 +367,27 @@ public virtual KafkaDbContextOptionsBuilder TopicConfig(TopicConfigBuilder topic
return this;
}

/// <summary>
/// Set the optional handler to be used to receive notification when the back-end triggers a data change. Works if <see cref="WithCompactedReplicator(bool)"/> is invoked with <see langword="true"/>
/// </summary>
/// <remarks>
/// See <see href="https://aka.ms/efcore-docs-dbcontext-options">Using DbContextOptions</see>, and
/// <see href="https://github.com/masesgroup/KEFCore">The EF Core Kafka database provider</see> for more information and examples.
/// </remarks>
/// <param name="onChangeEvent">The <see cref="Action{IEntityType, Boolean, Object}"/> will be used to report change event.</param>
/// <returns>The same builder instance so that multiple calls can be chained.</returns>
public virtual KafkaDbContextOptionsBuilder WithOnChangeEvent(Action<IEntityType, bool, object> onChangeEvent)
{
var extension = OptionsBuilder.Options.FindExtension<KafkaOptionsExtension>()
?? new KafkaOptionsExtension();

extension = extension.WithOnChangeEvent(onChangeEvent);

((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension);

return this;
}

#region Hidden System.Object members

/// <summary>
Expand Down
24 changes: 23 additions & 1 deletion src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class EntityTypeProducer<TKey, TValueContainer, TKeySerializer, TValueSer
private readonly IKafkaStreamsBaseRetriever _streamData;
private readonly IKNetSerDes<TKey> _keySerdes;
private readonly IKNetSerDes<TValueContainer> _valueSerdes;
private readonly Action<IEntityType, bool, object>? _onChangeEvent;

#region KNetCompactedReplicatorEnumerable
class KNetCompactedReplicatorEnumerable : IEnumerable<ValueBuffer>
Expand Down Expand Up @@ -188,6 +189,7 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
_entityType = entityType;
_cluster = cluster;
_useCompactedReplicator = _cluster.Options.UseCompactedReplicator;
_onChangeEvent = _cluster.Options.OnChangeEvent;

var tTValueContainer = typeof(TValueContainer);
TValueContainerConstructor = tTValueContainer.GetConstructors().Single(ci => ci.GetParameters().Length == 2);
Expand All @@ -211,6 +213,11 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
KeySerDes = _keySerdes,
ValueSerDes = _valueSerdes,
};
if (_onChangeEvent != null)
{
_kafkaCompactedReplicator.OnRemoteUpdate += KafkaCompactedReplicator_OnRemoteUpdate;
_kafkaCompactedReplicator.OnRemoteRemove += KafkaCompactedReplicator_OnRemoteRemove;
}
#if DEBUG_PERFORMANCE
Stopwatch sw = Stopwatch.StartNew();
#endif
Expand Down Expand Up @@ -258,8 +265,13 @@ public IEnumerable<Future<RecordMetadata>> Commit(IEnumerable<IKafkaRowBag> reco

public void Dispose()
{
if (_useCompactedReplicator)
if (_kafkaCompactedReplicator != null)
{
if (_onChangeEvent != null)
{
_kafkaCompactedReplicator.OnRemoteUpdate -= KafkaCompactedReplicator_OnRemoteUpdate;
_kafkaCompactedReplicator.OnRemoteRemove -= KafkaCompactedReplicator_OnRemoteRemove;
}
_kafkaCompactedReplicator?.Dispose();
}
else
Expand All @@ -278,4 +290,14 @@ public IEnumerable<ValueBuffer> ValueBuffers
return new KNetCompactedReplicatorEnumerable(_entityType, _kafkaCompactedReplicator);
}
}

private void KafkaCompactedReplicator_OnRemoteUpdate(IKNetCompactedReplicator<TKey, TValueContainer> arg1, KeyValuePair<TKey, TValueContainer> arg2)
{
_onChangeEvent?.Invoke(_entityType, false, arg2.Key);
}

private void KafkaCompactedReplicator_OnRemoteRemove(IKNetCompactedReplicator<TKey, TValueContainer> arg1, KeyValuePair<TKey, TValueContainer> arg2)
{
_onChangeEvent?.Invoke(_entityType, true, arg2.Key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"$schema": "http://json.schemastore.org/template",
"author": "MASES s.r.l.",
"classifications": [ "Common", "Library" ],
"identity": "MASES.EntityFrameworkCore.KNet.Templates",
"identity": "MASES.EntityFrameworkCore.KNet.Templates.Simple",
"name": "Executable template: Simple Console for EntityFrameworkCore provider for Apache Kafka project",
"shortName": "kefcoreApp",
"tags": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"$schema": "http://json.schemastore.org/template",
"author": "MASES s.r.l.",
"classifications": [ "Common", "Library" ],
"identity": "MASES.EntityFrameworkCore.KNet.Templates.AppWithEvents",
"name": "Executable template: Simple Console with events for EntityFrameworkCore provider for Apache Kafka project",
"shortName": "kefcoreAppWithEvents",
"tags": {
"language": "C#",
"type": "project"
}
}
124 changes: 124 additions & 0 deletions src/net/templates/templates/kefcoreAppWithEvents/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
using MASES.EntityFrameworkCore.KNet.Infrastructure;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata;
using System;
using System.Collections.Generic;
using System.Linq;

namespace MASES.EntityFrameworkCore.KNet.Templates
{
partial class Program
{
static void OnEvent(IEntityType entity, bool state, object key)
{
Console.WriteLine($"Entity {entity.Name} has {(state ? "removed" : "added/updated")} the key {key}");
}

static void Main(string[] args)
{
BloggingContext context = null;
try
{
context = new BloggingContext()
{
BootstrapServers = "KAFKA-BROKER:9092",
ApplicationId = "MyApplicationId",
DbName = "MyDB",
OnChangeEvent = OnEvent
};
// cleanup topics on Broker
context.Database.EnsureDeleted();
context.Database.EnsureCreated();

// prefill data
for (int i = 0; i < 1000; i++)
{
context.Add(new Blog
{
Url = "http://blogs.msdn.com/adonet" + i.ToString(),
Posts = new List<Post>()
{
new Post()
{
Title = "title",
Content = i.ToString()
}
},
Rating = i,
});
}
// save data
context.SaveChanges();

// make some queries
var selector = (from op in context.Blogs
join pg in context.Posts on op.BlogId equals pg.BlogId
where pg.BlogId == op.BlogId
select new { pg, op });
var pageObject = selector.FirstOrDefault();

var post = context.Posts.Single(b => b.BlogId == 2);

post = context.Posts.Single(b => b.BlogId == 1);

var all = context.Posts.All((o) => true);

var value = context.Blogs.AsQueryable().ToQueryString();
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
finally
{
context?.Dispose();
}
}
}

public class BloggingContext : KafkaDbContext
{
// uncomment for persistent storage
// public override bool UsePersistentStorage { get; set; } = true;

// uncomment to disable compacted replicator
//public override bool UseCompactedReplicator { get; set; } = false;

public DbSet<Blog> Blogs { get; set; }
public DbSet<Post> Posts { get; set; }

/// uncomment for model builder
//protected override void OnModelCreating(ModelBuilder modelBuilder)
//{
// modelBuilder.Entity<Blog>().HasKey(c => new { c.BlogId, c.Rating });
//}
}

public class Blog
{
public int BlogId { get; set; }
public string Url { get; set; }
public int Rating { get; set; }
public List<Post> Posts { get; set; }

public override string ToString()
{
return $"BlogId: {BlogId} Url: {Url} Rating: {Rating}";
}
}

public class Post
{
public int PostId { get; set; }
public string Title { get; set; }
public string Content { get; set; }

public int BlogId { get; set; }
public Blog Blog { get; set; }

public override string ToString()
{
return $"PostId: {PostId} Title: {Title} Content: {Content} BlogId: {BlogId}";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<LangVersion>latest</LangVersion>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
</PropertyGroup>
<ItemGroup Condition="Exists('..\..\..\KEFCore\KEFCore.csproj')">
<!--Within GitHub repo: used for test purpose-->
<ProjectReference Include="..\..\..\KEFCore\KEFCore.csproj" />
</ItemGroup>
<ItemGroup Condition="!Exists('..\..\..\KEFCore\KEFCore.csproj')">
<!--Outside GitHub repo-->
<PackageReference Include="MASES.EntityFrameworkCore.KNet" Version="0.10.1" IncludeAssets="All" PrivateAssets="None" />
</ItemGroup>
</Project>
Loading

0 comments on commit f749390

Please sign in to comment.