Skip to content

Commit

Permalink
feat(*): upgrade dotnet 8 + orleans 3.7.1 (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
claylaut authored Jan 30, 2024
1 parent 0957864 commit 81d55f3
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 54 deletions.
6 changes: 3 additions & 3 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
</PropertyGroup>

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<!-- Shared Package Versions -->
<PropertyGroup>
<OrleansVersion>3.6.0</OrleansVersion>
<OrleansVersion>3.7.1</OrleansVersion>
<StreamUtilsVersion>11.1.0</StreamUtilsVersion>
<ConfluentKafkaVersion>2.1.0</ConfluentKafkaVersion>
<ConfluentAvroVersion>1.7.7.7</ConfluentAvroVersion>
<MicrosoftExtensionsVersion>6.0.0</MicrosoftExtensionsVersion>
<MicrosoftExtensionsVersion>8.0.*</MicrosoftExtensionsVersion>
</PropertyGroup>
</Project>
2 changes: 1 addition & 1 deletion Orleans.Streams.Kafka.E2E/Orleans.Streams.Kafka.E2E.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<IsPackable>false</IsPackable>
<LangVersion>10</LangVersion>
</PropertyGroup>
Expand Down
2 changes: 1 addition & 1 deletion Orleans.Streams.Kafka/Consumer/ConsumeResultExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ QueueProperties queueProperties

var message = serializationContext
.ExternalStreamDeserializer
.Deserialize(queueProperties, queueProperties.ExternalContractType, result);
.Deserialize(queueProperties, queueProperties.ExternalContractType, result.Message.Value);

return new KafkaBatchContainer(
StreamProviderUtils.GenerateStreamGuid(key),
Expand Down
65 changes: 22 additions & 43 deletions Orleans.Streams.Kafka/Core/KafkaAdapterFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

namespace Orleans.Streams.Kafka.Core
{
using System.Globalization;

public class KafkaAdapterFactory : IQueueAdapterFactory
{
private readonly string _name;
Expand All @@ -34,29 +32,14 @@ public class KafkaAdapterFactory : IQueueAdapterFactory
private readonly AdminClientBuilder _adminConfig;
private readonly AdminClientConfig _config;

public KafkaAdapterFactory(
string name,
KafkaStreamOptions options,
SimpleQueueCacheOptions cacheOptions,
SerializationManager serializationManager,
ILoggerFactory loggerFactory,
IGrainFactory grainFactory
) : this(name, options, cacheOptions, serializationManager, loggerFactory, grainFactory, null)
{
if (options.Topics.Any(topic => topic.IsExternal))
throw new InvalidOperationException(
"Cannot have external topic with no 'IExternalDeserializer' defined. Use 'AddJson' or 'AddAvro'"
);
}

public KafkaAdapterFactory(
string name,
KafkaStreamOptions options,
SimpleQueueCacheOptions cacheOptions,
SerializationManager serializationManager,
ILoggerFactory loggerFactory,
IGrainFactory grainFactory,
IExternalStreamDeserializer externalDeserializer
IServiceProvider services
)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
Expand All @@ -65,7 +48,7 @@ IExternalStreamDeserializer externalDeserializer
_serializationManager = serializationManager;
_loggerFactory = loggerFactory;
_grainFactory = grainFactory;
_externalDeserializer = externalDeserializer;
_externalDeserializer = services.GetServiceByName<IExternalStreamDeserializer>(name);
_logger = loggerFactory.CreateLogger<KafkaAdapterFactory>();
_adminConfig = new AdminClientBuilder(options.ToAdminProperties());

Expand Down Expand Up @@ -108,23 +91,19 @@ public static KafkaAdapterFactory Create(IServiceProvider services, string name)
{
var streamsConfig = services.GetOptionsByName<KafkaStreamOptions>(name);
var cacheOptions = services.GetOptionsByName<SimpleQueueCacheOptions>(name);
var deserializer = services.GetServiceByName<IExternalStreamDeserializer>(name);
var serializer = services.GetRequiredService<SerializationManager>();
var logger = services.GetRequiredService<ILoggerFactory>();
var grainFactory = services.GetRequiredService<IGrainFactory>();

KafkaAdapterFactory factory;
if (deserializer != null)
factory = ActivatorUtilities.CreateInstance<KafkaAdapterFactory>(
var factory = ActivatorUtilities.CreateInstance<KafkaAdapterFactory>(
services,
name,
streamsConfig,
cacheOptions,
deserializer
);
else
factory = ActivatorUtilities.CreateInstance<KafkaAdapterFactory>(
services,
name,
streamsConfig,
cacheOptions
serializer,
logger,
grainFactory,
services
);

return factory;
Expand All @@ -139,7 +118,7 @@ private IEnumerable<QueueProperties> GetQueuesProperties()
var currentMetaTopics = meta.Topics.ToList();

var props = new List<QueueProperties>();
var autoProps = new List<(QueueProperties props, short replicationFactor, ulong? retentionPeriodInMs )>();
var autoProps = new List<(QueueProperties props, short replicationFactor, ulong? retentionPeriodInMs)>();

foreach (var topic in _options.Topics)
{
Expand Down Expand Up @@ -195,21 +174,21 @@ private static Task CreateAutoTopics(IAdminClient admin, IEnumerable<(QueuePrope
var tuple = queues.First();

var topicSpecification = new TopicSpecification
{
Name = queues.Key,
NumPartitions = queues.Count(),
ReplicationFactor = tuple.replicationFactor
};
{
Name = queues.Key,
NumPartitions = queues.Count(),
ReplicationFactor = tuple.replicationFactor
};

if (tuple.retentionPeriodInMs.HasValue)
{
topicSpecification.Configs = new Dictionary<string, string>()
{
{
"retention.ms",
tuple.retentionPeriodInMs.ToString()
}
};
{
{
"retention.ms",
tuple.retentionPeriodInMs.ToString()
}
};
}

result.Add(topicSpecification);
Expand Down
2 changes: 1 addition & 1 deletion Orleans.Streams.Kafka/Orleans.Streams.Kafka.csproj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<Description>Orleans streaming provider for Kafka.</Description>
<PackageTags>orleans kafka streams providers streamprovider confluent</PackageTags>
<LangVersion>10</LangVersion>
Expand Down
2 changes: 1 addition & 1 deletion Samples/ConfluentSample/ConfluentSample.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion Samples/TestClient/TestClient.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<LangVersion>latest</LangVersion>
<IsPackable>false</IsPackable>
</PropertyGroup>
Expand Down
2 changes: 1 addition & 1 deletion Samples/TestGrains/TestGrains.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Library</OutputType>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>

Expand Down
2 changes: 1 addition & 1 deletion Samples/TestSilo/TestSilo.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<LangVersion>latest</LangVersion>
<IsPackable>false</IsPackable>
</PropertyGroup>
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "orleans.streams.kafka",
"version": "6.2.1",
"version": "6.3.0",
"description": "Orleans Kafka Stream Provider",
"solution": "Orleans.Streams.Kafka-build.sln",
"dependencies": {},
Expand Down

0 comments on commit 81d55f3

Please sign in to comment.