Skip to content

Commit

Permalink
Management of buffered data exchange based on serializer defined in c…
Browse files Browse the repository at this point in the history
…onfiguration (#235)

* Management of buffered data exchange based on serializer used in configuration

* #235 (comment), #235 (comment): Review based on latest options available in KNet

* Update EntityExtractor

* Added external project for model to perform entity extraction

* Update documentation
  • Loading branch information
masesdevelopers authored May 27, 2024
1 parent 4e6086c commit 7d4e6b5
Show file tree
Hide file tree
Showing 42 changed files with 1,406 additions and 1,167 deletions.
7 changes: 4 additions & 3 deletions src/documentation/articles/kafkadbcontext.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ _description: Describe what is and how use KafkaDbContext class from Entity Fram
`KafkaDbContext` is a special class which helps to define the `DbContext` and use [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/) provider for [Apache Kafka](https://kafka.apache.org/):
- `KafkaDbContext` inherits from `DbContext`: to define the model, and/or creating the database, see [getting started](https://docs.microsoft.com/ef/core/get-started/) in the docs and [KEFCore usage](usage.md)
- `KafkaDbContext` defines the following properties:
- **KeySerializationType**: the .NET type to be used to allocate an external serializer for Apache Kafka record key
- **ValueSerializationType**: the .NET type to be used to allocate an external serializer for Apache Kafka record value
- **KeySerDesSelectorType**: the .NET type to be used to allocate an external serializer for Apache Kafka record key
- **ValueSerDesSelectorType**: the .NET type to be used to allocate an external serializer for Apache Kafka record value
- **ValueContainerType**: the .NET type to be used to allocate an external container class for Apache Kafka record value
- **UseNameMatching**: set to **false** to avoid Entity matching based on Name
- **BootstrapServers**: the server hosting the broker of Apache Kafka
Expand All @@ -19,10 +19,11 @@ _description: Describe what is and how use KafkaDbContext class from Entity Fram
- **DefaultReplicationFactor**: the replication factor to use when data are stored in Apache Kafka
- **DefaultConsumerInstances**: the consumer instances to be allocated when UseCompactedReplicator is **true**
- **UsePersistentStorage**: set to **true** to use a persistent storage between multiple application startup
- **UseEnumeratorWithPrefetch**: set to **true** to prefer enumerator instances able to do a prefetch on data speeding up execution, used if **UseKNetStreams** is **true** and **UseCompactedReplicator** is **false**
- **UseByteBufferDataTransfer**: set to **true** to prefer <see cref="Java.Nio.ByteBuffer"/> data exchange in serializer instances
- **UseDeletePolicyForTopic**: set to **true** to enable [delete cleanup policy](https://kafka.apache.org/documentation/#topicconfigs_cleanup.policy)
- **UseCompactedReplicator**: Use `KNetCompactedReplicator` instead of Apache Kafka Streams to manage data to or from topics
- **UseKNetStreams**: Use KNet version of Apache Kafka Streams instead of standard Apache Kafka Streams, used if **UseCompactedReplicator** is **false**
- **UseEnumeratorWithPrefetch**: Setting this property to **true** the engine prefers to use enumerator instances able to do a prefetch on data speeding up execution, used if **UseKNetStreams** is **true** and **UseCompactedReplicator** is **false**
- **ConsumerConfig**: parameters to use for Producer
- **ProducerConfig**: parameters to use for Producer
- **StreamsConfig**: parameters to use for Apche Kafka Streams application
Expand Down
2 changes: 1 addition & 1 deletion src/documentation/articles/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ The extension converted this schema into code to speedup the exection of seriali
### How to use Protobuf

`KafkaDbContext` contains three properties can be used to override the default types:
- **KeySerializationType**: set this value to `ProtobufKEFCoreSerDes..Key.BinaryRaw<>` or use `ProtobufKEFCoreSerDes.DefaultKeySerialization`, the type automatically manages simple or complex Primary Key
- **KeySerializationType**: set this value to `ProtobufKEFCoreSerDes.Key.BinaryRaw<>` or use `ProtobufKEFCoreSerDes.DefaultKeySerialization`, the type automatically manages simple or complex Primary Key
- **ValueSerializationType**: set this value to `ProtobufKEFCoreSerDes.ValueContainer.BinaryRaw<>` or use `ProtobufKEFCoreSerDes.DefaultValueContainerSerialization`
- **ValueContainerType**: set this value to `ProtobufValueContainerRaw<>` or use `ProtobufKEFCoreSerDes.DefaultValueContainer`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="MASES.KNet.Serialization.Avro" Version="2.7.1" />
<PackageReference Include="MASES.KNet.Serialization.Avro" Version="2.7.2" />
</ItemGroup>

<ItemGroup>
Expand Down
1,034 changes: 603 additions & 431 deletions src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions src/net/KEFCore.SerDes.Avro/AvroValueContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#nullable enable

using Avro;
using MASES.KNet.Serialization;

namespace MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage;

Expand All @@ -44,7 +45,7 @@ public partial class AvroValueContainer<TKey> : AvroValueContainer, IValueContai
public AvroValueContainer(IEntityType tName, object[] rData)
{
EntityName = tName.Name;
ClrType = tName.ClrType.FullName!;
ClrType = tName.ClrType?.ToAssemblyQualified()!;
Data = new List<PropertyDataRecord>();
foreach (var item in tName.GetProperties())
{
Expand All @@ -53,7 +54,7 @@ public AvroValueContainer(IEntityType tName, object[] rData)
{
PropertyIndex = index,
PropertyName = item.Name,
ClrType = item.ClrType?.FullName,
ClrType = item.ClrType?.ToAssemblyQualified(),
Value = rData[index]
};
Data.Add(pRecord);
Expand Down
12 changes: 10 additions & 2 deletions src/net/KEFCore.SerDes.Protobuf/Generated/GenericValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,11 @@ public void MergeFrom(pb::CodedInputStream input) {
#else
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
if ((tag & 7) == 4) {
// Abort on any end group tag.
return;
}
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input);
break;
Expand Down Expand Up @@ -846,7 +850,11 @@ public void MergeFrom(pb::CodedInputStream input) {
void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) {
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
if ((tag & 7) == 4) {
// Abort on any end group tag.
return;
}
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input);
break;
Expand Down
24 changes: 20 additions & 4 deletions src/net/KEFCore.SerDes.Protobuf/Generated/KeyContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,11 @@ public void MergeFrom(pb::CodedInputStream input) {
#else
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
if ((tag & 7) == 4) {
// Abort on any end group tag.
return;
}
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input);
break;
Expand All @@ -209,7 +213,11 @@ public void MergeFrom(pb::CodedInputStream input) {
void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) {
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
if ((tag & 7) == 4) {
// Abort on any end group tag.
return;
}
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input);
break;
Expand Down Expand Up @@ -386,7 +394,11 @@ public void MergeFrom(pb::CodedInputStream input) {
#else
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
if ((tag & 7) == 4) {
// Abort on any end group tag.
return;
}
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input);
break;
Expand All @@ -408,7 +420,11 @@ public void MergeFrom(pb::CodedInputStream input) {
void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) {
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
if ((tag & 7) == 4) {
// Abort on any end group tag.
return;
}
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input);
break;
Expand Down
24 changes: 20 additions & 4 deletions src/net/KEFCore.SerDes.Protobuf/Generated/ValueContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,11 @@ public void MergeFrom(pb::CodedInputStream input) {
#else
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
if ((tag & 7) == 4) {
// Abort on any end group tag.
return;
}
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input);
break;
Expand Down Expand Up @@ -328,7 +332,11 @@ public void MergeFrom(pb::CodedInputStream input) {
void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) {
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
if ((tag & 7) == 4) {
// Abort on any end group tag.
return;
}
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input);
break;
Expand Down Expand Up @@ -564,7 +572,11 @@ public void MergeFrom(pb::CodedInputStream input) {
#else
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
if ((tag & 7) == 4) {
// Abort on any end group tag.
return;
}
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input);
break;
Expand All @@ -591,7 +603,11 @@ public void MergeFrom(pb::CodedInputStream input) {
void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) {
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
if ((tag & 7) == 4) {
// Abort on any end group tag.
return;
}
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@

<ItemGroup>
<ProjectReference Include="..\KEFCore.SerDes\KEFCore.SerDes.csproj" />
<PackageReference Include="Google.Protobuf" Version="3.26.1" />
<PackageReference Include="Google.Protobuf.Tools" Version="3.26.1">
<PackageReference Include="Google.Protobuf" Version="3.27.0" />
<PackageReference Include="Google.Protobuf.Tools" Version="3.27.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down
Loading

0 comments on commit 7d4e6b5

Please sign in to comment.