Skip to content

Commit

Permalink
Update documentation and usage of ConsumerConfigBuilder (#84)
Browse files Browse the repository at this point in the history
* Hyperlink fix

* Missing file update

* Describe and use ConsumerConfigBuilder

* Name simplification
  • Loading branch information
masesdevelopers authored Oct 11, 2023
1 parent f0e7d32 commit e0b408e
Show file tree
Hide file tree
Showing 16 changed files with 174 additions and 83 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDU
## Summary

* [Roadmap](src/documentation/articles/roadmap.md)
* [Actual state](src/documentation/articles/actualstate.md)
* [Current state](src/documentation/articles/currentstate.md)
* [KEFCore usage](src/documentation/articles/usage.md)

## Runtime engine
Expand Down
6 changes: 0 additions & 6 deletions src/documentation/articles/actualstate.md

This file was deleted.

6 changes: 6 additions & 0 deletions src/documentation/articles/currentstate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# KEFCore: development state

The latest release implementes these features:

* [x] A working provider based on Apache Kafka Streams
* [x] The provider can use KNetCompactedReplicator
37 changes: 32 additions & 5 deletions src/documentation/articles/kafkadbcontext.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,43 @@
- **DefaultConsumerInstances**: the consumer instances to be allocated when UseCompactedReplicator is **true**
- **UsePersistentStorage**: set to **true** to use a persintent storage between multiple application startup
- **UseCompactedReplicator**: Use `KNetCompactedReplicator` instead of Apache Kafka Streams to manage data to or from topics
- **ProducerConfigBuilder**: parameters to use for Producer
- **StreamsConfigBuilder**: parameters to use for Apche Kafka Streams application
- **TopicConfigBuilder**: parameters to use on topic creation for each entity
- **ConsumerConfig**: parameters to use for Producer
- **ProducerConfig**: parameters to use for Producer
- **StreamsConfig**: parameters to use for Apche Kafka Streams application
- **TopicConfig**: parameters to use on topic creation for each entity

## How to use `KafkaDbContext` class

The most simple example of usage can be found in [KEFCore usage](usage.md). By default, `KafkaDbContext` automatically manages `OnConfiguring` method of `DbContext`:
- checks for mandatory opions like **BootstrapServers** and **DbName**
- setup the options to use an Apache Kafka cluster
- `KafkaDbContext` checks the mandatory options like **BootstrapServers** and **DbName**
- `KafkaDbContext` setup the options needed to use an Apache Kafka cluster:
- default `ConsumerConfig` can be overridden using **ConsumerConfig** property of `KafkaDbContext`
- default `ProducerConfig` can be overridden using **ProducerConfig** property of `KafkaDbContext`
- default `StreamsConfig` can be overridden using **StreamsConfig** property of `KafkaDbContext`
- default `TopicConfig` can be overridden using **TopicConfig** property of `KafkaDbContext`


### Default **ConsumerConfig**

Over the Apache Kafka defaults it applies:

- EnableAutoCommit is **true**
- AutoOffsetReset set to **EARLIEST**
- AllowAutoCreateTopics set to **false**

### Default **ProducerConfig**

Does not change anything than the Apache Kafka defaults

### Default **ConsumerConfig**

Does not change anything than the Apache Kafka defaults

### Default **TopicConfig**

Over the Apache Kafka defaults it applies:

- DeleteRetentionMs set to 100 ms
- MinCleanableDirtyRatio set to 0.01
- SegmentMs set to 100 ms
- RetentionBytes set to 1073741824 bytes (1 Gb)
6 changes: 3 additions & 3 deletions src/documentation/articles/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

The roadmap can be synthetized in the following points:

* Create a first working provider based on InMemory provider
* Extends the first provider with new features able to create Apache Kafka Streams topology to retrieve information
* Use KNetCompactedReplicator beside Apache Kafka Streams
* [x] Create a first working provider starting from the code of InMemory provider
* [ ] Extends the first provider with new features able to create Apache Kafka Streams topology to retrieve information
* [x] Use KNetCompactedReplicator beside Apache Kafka Streams
4 changes: 2 additions & 2 deletions src/documentation/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ Have a look at the following resources:
---
## Summary

* [Roadmap](src/net/Documentation/articles/roadmap.md)
* [Actual state](src/net/Documentation/articles/actualstate.md)
* [Roadmap](articles/roadmap.md)
* [Current state](articles/currentstate.md)
* [KEFCore usage](articles/usage.md)

---
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#nullable enable

using MASES.KNet.Common;
using MASES.KNet.Consumer;
using MASES.KNet.Producer;
using MASES.KNet.Streams;

Expand Down Expand Up @@ -51,9 +52,11 @@ public interface IKafkaSingletonOptions : ISingletonOptions

int DefaultReplicationFactor { get; }

ProducerConfigBuilder? ProducerConfigBuilder { get; }
ConsumerConfigBuilder? ConsumerConfig { get; }

StreamsConfigBuilder? StreamsConfigBuilder { get; }
ProducerConfigBuilder? ProducerConfig { get; }

TopicConfigBuilder? TopicConfigBuilder { get; }
StreamsConfigBuilder? StreamsConfig { get; }

TopicConfigBuilder? TopicConfig { get; }
}
68 changes: 41 additions & 27 deletions src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using Java.Lang;
using Java.Util;
using MASES.KNet.Common;
using MASES.KNet.Consumer;
using MASES.KNet.Producer;
using MASES.KNet.Streams;
using Org.Apache.Kafka.Clients.Consumer;
Expand All @@ -47,6 +48,7 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension
private int _defaultNumPartitions = 1;
private int? _defaultConsumerInstances = null;
private short _defaultReplicationFactor = 1;
private ConsumerConfigBuilder? _consumerConfigBuilder;
private ProducerConfigBuilder? _producerConfigBuilder;
private StreamsConfigBuilder? _streamsConfigBuilder;
private TopicConfigBuilder? _topicConfigBuilder;
Expand All @@ -71,6 +73,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)
_defaultNumPartitions = copyFrom._defaultNumPartitions;
_defaultConsumerInstances = copyFrom._defaultConsumerInstances;
_defaultReplicationFactor = copyFrom._defaultReplicationFactor;
_consumerConfigBuilder = ConsumerConfigBuilder.CreateFrom(copyFrom._consumerConfigBuilder);
_producerConfigBuilder = ProducerConfigBuilder.CreateFrom(copyFrom._producerConfigBuilder);
_streamsConfigBuilder = StreamsConfigBuilder.CreateFrom(copyFrom._streamsConfigBuilder);
_topicConfigBuilder = TopicConfigBuilder.CreateFrom(copyFrom._topicConfigBuilder);
Expand Down Expand Up @@ -102,11 +105,13 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)

public virtual short DefaultReplicationFactor => _defaultReplicationFactor;

public virtual ProducerConfigBuilder ProducerConfigBuilder => _producerConfigBuilder!;
public virtual ConsumerConfigBuilder ConsumerConfig => _consumerConfigBuilder!;

public virtual StreamsConfigBuilder StreamsConfigBuilder => _streamsConfigBuilder!;
public virtual ProducerConfigBuilder ProducerConfig => _producerConfigBuilder!;

public virtual TopicConfigBuilder TopicConfigBuilder => _topicConfigBuilder!;
public virtual StreamsConfigBuilder StreamsConfig => _streamsConfigBuilder!;

public virtual TopicConfigBuilder TopicConfig => _topicConfigBuilder!;

public virtual KafkaOptionsExtension WithUseNameMatching(bool useNameMatching = true)
{
Expand Down Expand Up @@ -198,6 +203,15 @@ public virtual KafkaOptionsExtension WithDefaultReplicationFactor(short defaultR
return clone;
}

public virtual KafkaOptionsExtension WithConsumerConfig(ConsumerConfigBuilder consumerConfigBuilder)
{
var clone = Clone();

clone._consumerConfigBuilder = ConsumerConfigBuilder.CreateFrom(consumerConfigBuilder);

return clone;
}

public virtual KafkaOptionsExtension WithProducerConfig(ProducerConfigBuilder producerConfigBuilder)
{
var clone = Clone();
Expand Down Expand Up @@ -233,54 +247,54 @@ public virtual Properties StreamsOptions(IEntityType entityType)
public virtual Properties StreamsOptions(string applicationId)
{
Properties props = _streamsConfigBuilder ?? new();
if (props.ContainsKey(StreamsConfig.APPLICATION_ID_CONFIG))
if (props.ContainsKey(Org.Apache.Kafka.Streams.StreamsConfig.APPLICATION_ID_CONFIG))
{
props.Remove(StreamsConfig.APPLICATION_ID_CONFIG);
props.Remove(Org.Apache.Kafka.Streams.StreamsConfig.APPLICATION_ID_CONFIG);
}
props.Put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
if (props.ContainsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))
props.Put(Org.Apache.Kafka.Streams.StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
if (props.ContainsKey(Org.Apache.Kafka.Streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))
{
props.Remove(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
props.Remove(Org.Apache.Kafka.Streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
}
props.Put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServers);
if (props.ContainsKey(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG))
props.Put(Org.Apache.Kafka.Streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServers);
if (props.ContainsKey(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG))
{
props.Remove(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG);
props.Remove(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG);
}
props.Put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader));
if (props.ContainsKey(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG))
props.Put(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader));
if (props.ContainsKey(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG))
{
props.Remove(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG);
props.Remove(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG);
}
props.Put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader));
if (props.ContainsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
props.Put(Org.Apache.Kafka.Streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader));
if (props.ContainsKey(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
{
props.Remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
props.Remove(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
}
props.Put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.Put(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

return props;
}

public virtual Properties ProducerOptions()
{
Properties props = _producerConfigBuilder ?? new();
if (props.ContainsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
if (props.ContainsKey(Org.Apache.Kafka.Clients.Producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
{
props.Remove(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
props.Remove(Org.Apache.Kafka.Clients.Producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
}
props.Put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServers);
if (!props.ContainsKey(ProducerConfig.ACKS_CONFIG))
props.Put(Org.Apache.Kafka.Clients.Producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServers);
if (!props.ContainsKey(Org.Apache.Kafka.Clients.Producer.ProducerConfig.ACKS_CONFIG))
{
props.Put(ProducerConfig.ACKS_CONFIG, "all");
props.Put(Org.Apache.Kafka.Clients.Producer.ProducerConfig.ACKS_CONFIG, "all");
}
if (!props.ContainsKey(ProducerConfig.RETRIES_CONFIG))
if (!props.ContainsKey(Org.Apache.Kafka.Clients.Producer.ProducerConfig.RETRIES_CONFIG))
{
props.Put(ProducerConfig.RETRIES_CONFIG, 0);
props.Put(Org.Apache.Kafka.Clients.Producer.ProducerConfig.RETRIES_CONFIG, 0);
}
if (!props.ContainsKey(ProducerConfig.LINGER_MS_CONFIG))
if (!props.ContainsKey(Org.Apache.Kafka.Clients.Producer.ProducerConfig.LINGER_MS_CONFIG))
{
props.Put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.Put(Org.Apache.Kafka.Clients.Producer.ProducerConfig.LINGER_MS_CONFIG, 1);
}
//if (props.ContainsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
//{
Expand Down
16 changes: 10 additions & 6 deletions src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

using MASES.KNet.Common;
using MASES.KNet.Consumer;
using MASES.KNet.Producer;
using MASES.KNet.Streams;

Expand Down Expand Up @@ -45,9 +46,10 @@ public virtual void Initialize(IDbContextOptions options)
DefaultNumPartitions = kafkaOptions.DefaultNumPartitions;
DefaultConsumerInstances = kafkaOptions.DefaultConsumerInstances;
DefaultReplicationFactor = kafkaOptions.DefaultReplicationFactor;
ProducerConfigBuilder = ProducerConfigBuilder.CreateFrom(kafkaOptions.ProducerConfigBuilder);
StreamsConfigBuilder = StreamsConfigBuilder.CreateFrom(kafkaOptions.StreamsConfigBuilder);
TopicConfigBuilder = TopicConfigBuilder.CreateFrom(kafkaOptions.TopicConfigBuilder);
ConsumerConfig = ConsumerConfigBuilder.CreateFrom(kafkaOptions.ConsumerConfig);
ProducerConfig = ProducerConfigBuilder.CreateFrom(kafkaOptions.ProducerConfig);
StreamsConfig = StreamsConfigBuilder.CreateFrom(kafkaOptions.StreamsConfig);
TopicConfig = TopicConfigBuilder.CreateFrom(kafkaOptions.TopicConfig);
}
}

Expand Down Expand Up @@ -85,9 +87,11 @@ public virtual void Validate(IDbContextOptions options)

public virtual int DefaultReplicationFactor { get; private set; }

public virtual ProducerConfigBuilder? ProducerConfigBuilder { get; private set; }
public virtual ConsumerConfigBuilder? ConsumerConfig { get; private set; }

public virtual StreamsConfigBuilder? StreamsConfigBuilder { get; private set; }
public virtual ProducerConfigBuilder? ProducerConfig { get; private set; }

public virtual TopicConfigBuilder? TopicConfigBuilder { get; private set; }
public virtual StreamsConfigBuilder? StreamsConfig { get; private set; }

public virtual TopicConfigBuilder? TopicConfig { get; private set; }
}
46 changes: 40 additions & 6 deletions src/net/KEFCore/Infrastructure/KafkaDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

using MASES.KNet.Common;
using MASES.KNet.Consumer;
using MASES.KNet.Producer;
using MASES.KNet.Streams;

Expand Down Expand Up @@ -105,6 +106,32 @@ public static bool EnableKEFCoreTracing
}
}

/// <summary>
/// The default <see cref="ConsumerConfig"/> configuration
/// </summary>
/// <returns>The default <see cref="ConsumerConfig"/> configuration.</returns>
public static ConsumerConfigBuilder DefaultConsumerConfig => ConsumerConfigBuilder.Create().WithEnableAutoCommit(true)
.WithAutoOffsetReset(ConsumerConfigBuilder.AutoOffsetResetTypes.EARLIEST)
.WithAllowAutoCreateTopics(false);
/// <summary>
/// The default <see cref="ProducerConfig"/> configuration
/// </summary>
/// <returns>The default <see cref="ProducerConfig"/> configuration.</returns>
public static ProducerConfigBuilder DefaultProducerConfig => ProducerConfigBuilder.Create();
/// <summary>
/// The default <see cref="StreamsConfig"/> configuration
/// </summary>
/// <returns>The default <see cref="StreamsConfig"/> configuration.</returns>
public static StreamsConfigBuilder DefaultStreamsConfig => StreamsConfigBuilder.Create();
/// <summary>
/// The default <see cref="TopicConfig"/> configuration
/// </summary>
/// <returns>The default <see cref="TopicConfig"/> configuration.</returns>
public static TopicConfigBuilder DefaultTopicConfig => TopicConfigBuilder.Create().WithDeleteRetentionMs(100)
.WithMinCleanableDirtyRatio(0.01)
.WithSegmentMs(100)
.WithRetentionBytes(1073741824);

/// <inheritdoc cref="DbContext.DbContext()"/>
public KafkaDbContext()
{
Expand Down Expand Up @@ -153,17 +180,21 @@ public KafkaDbContext(DbContextOptions options) : base(options)
/// </summary>
public virtual bool UseCompactedReplicator { get; set; } = false;
/// <summary>
/// The optional <see cref="ConsumerConfigBuilder"/> used when <see cref="UseCompactedReplicator"/> is <see langword="true"/>
/// </summary>
public virtual ConsumerConfigBuilder? ConsumerConfig { get; set; }
/// <summary>
/// The optional <see cref="ProducerConfigBuilder"/>
/// </summary>
public virtual ProducerConfigBuilder? ProducerConfigBuilder { get; set; }
public virtual ProducerConfigBuilder? ProducerConfig { get; set; }
/// <summary>
/// The optional <see cref="StreamsConfigBuilder"/>
/// The optional <see cref="StreamsConfig"/> used when <see cref="UseCompactedReplicator"/> is <see langword="false"/>
/// </summary>
public virtual StreamsConfigBuilder? StreamsConfigBuilder { get; set; }
public virtual StreamsConfigBuilder? StreamsConfig { get; set; }
/// <summary>
/// The optional <see cref="TopicConfigBuilder"/>
/// The optional <see cref="TopicConfigBuilder"/> used when topics shall be created
/// </summary>
public virtual TopicConfigBuilder? TopicConfigBuilder { get; set; }
public virtual TopicConfigBuilder? TopicConfig { get; set; }
/// <inheritdoc cref="DbContext.OnConfiguring(DbContextOptionsBuilder)"/>
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
Expand All @@ -172,7 +203,10 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)

optionsBuilder.UseKafkaCluster(ApplicationId, DbName, BootstrapServers, (o) =>
{
o.StreamsConfig(StreamsConfigBuilder ?? o.EmptyStreamsConfigBuilder).WithDefaultNumPartitions(DefaultNumPartitions);
o.ConsumerConfig(ConsumerConfig ?? DefaultConsumerConfig);
o.ProducerConfig(ProducerConfig ?? DefaultProducerConfig);
o.StreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions);
o.TopicConfig(TopicConfig ?? DefaultTopicConfig);
o.WithUsePersistentStorage(UsePersistentStorage);
//o.WithProducerByEntity(UseProducerByEntity);
o.WithCompactedReplicator(UseCompactedReplicator);
Expand Down
Loading

0 comments on commit e0b408e

Please sign in to comment.