Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(*): upgrade dotnet 8 + orleans 3.7.1 #56

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
</PropertyGroup>

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<!-- Shared Package Versions -->
<PropertyGroup>
<OrleansVersion>3.6.0</OrleansVersion>
<OrleansVersion>3.7.1</OrleansVersion>
<StreamUtilsVersion>11.1.0</StreamUtilsVersion>
<ConfluentKafkaVersion>2.1.0</ConfluentKafkaVersion>
<ConfluentAvroVersion>1.7.7.7</ConfluentAvroVersion>
<MicrosoftExtensionsVersion>6.0.0</MicrosoftExtensionsVersion>
<MicrosoftExtensionsVersion>8.0.*</MicrosoftExtensionsVersion>
</PropertyGroup>
</Project>
2 changes: 1 addition & 1 deletion Orleans.Streams.Kafka.E2E/Orleans.Streams.Kafka.E2E.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<IsPackable>false</IsPackable>
<LangVersion>10</LangVersion>
</PropertyGroup>
Expand Down
2 changes: 1 addition & 1 deletion Orleans.Streams.Kafka/Consumer/ConsumeResultExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ QueueProperties queueProperties

var message = serializationContext
.ExternalStreamDeserializer
.Deserialize(queueProperties, queueProperties.ExternalContractType, result);
.Deserialize(queueProperties, queueProperties.ExternalContractType, result.Message.Value);

return new KafkaBatchContainer(
StreamProviderUtils.GenerateStreamGuid(key),
Expand Down
65 changes: 22 additions & 43 deletions Orleans.Streams.Kafka/Core/KafkaAdapterFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

namespace Orleans.Streams.Kafka.Core
{
using System.Globalization;

public class KafkaAdapterFactory : IQueueAdapterFactory
{
private readonly string _name;
Expand All @@ -34,29 +32,14 @@ public class KafkaAdapterFactory : IQueueAdapterFactory
private readonly AdminClientBuilder _adminConfig;
private readonly AdminClientConfig _config;

public KafkaAdapterFactory(
string name,
KafkaStreamOptions options,
SimpleQueueCacheOptions cacheOptions,
SerializationManager serializationManager,
ILoggerFactory loggerFactory,
IGrainFactory grainFactory
) : this(name, options, cacheOptions, serializationManager, loggerFactory, grainFactory, null)
{
if (options.Topics.Any(topic => topic.IsExternal))
throw new InvalidOperationException(
"Cannot have external topic with no 'IExternalDeserializer' defined. Use 'AddJson' or 'AddAvro'"
);
}

public KafkaAdapterFactory(
string name,
KafkaStreamOptions options,
SimpleQueueCacheOptions cacheOptions,
SerializationManager serializationManager,
ILoggerFactory loggerFactory,
IGrainFactory grainFactory,
IExternalStreamDeserializer externalDeserializer
IServiceProvider services
)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
Expand All @@ -65,7 +48,7 @@ IExternalStreamDeserializer externalDeserializer
_serializationManager = serializationManager;
_loggerFactory = loggerFactory;
_grainFactory = grainFactory;
_externalDeserializer = externalDeserializer;
_externalDeserializer = services.GetServiceByName<IExternalStreamDeserializer>(name);
_logger = loggerFactory.CreateLogger<KafkaAdapterFactory>();
_adminConfig = new AdminClientBuilder(options.ToAdminProperties());

Expand Down Expand Up @@ -108,23 +91,19 @@ public static KafkaAdapterFactory Create(IServiceProvider services, string name)
{
var streamsConfig = services.GetOptionsByName<KafkaStreamOptions>(name);
var cacheOptions = services.GetOptionsByName<SimpleQueueCacheOptions>(name);
var deserializer = services.GetServiceByName<IExternalStreamDeserializer>(name);
var serializer = services.GetRequiredService<SerializationManager>();
var logger = services.GetRequiredService<ILoggerFactory>();
var grainFactory = services.GetRequiredService<IGrainFactory>();

KafkaAdapterFactory factory;
if (deserializer != null)
factory = ActivatorUtilities.CreateInstance<KafkaAdapterFactory>(
var factory = ActivatorUtilities.CreateInstance<KafkaAdapterFactory>(
services,
name,
streamsConfig,
cacheOptions,
deserializer
);
else
factory = ActivatorUtilities.CreateInstance<KafkaAdapterFactory>(
services,
name,
streamsConfig,
cacheOptions
serializer,
logger,
grainFactory,
services
);

return factory;
Expand All @@ -139,7 +118,7 @@ private IEnumerable<QueueProperties> GetQueuesProperties()
var currentMetaTopics = meta.Topics.ToList();

var props = new List<QueueProperties>();
var autoProps = new List<(QueueProperties props, short replicationFactor, ulong? retentionPeriodInMs )>();
var autoProps = new List<(QueueProperties props, short replicationFactor, ulong? retentionPeriodInMs)>();

foreach (var topic in _options.Topics)
{
Expand Down Expand Up @@ -195,21 +174,21 @@ private static Task CreateAutoTopics(IAdminClient admin, IEnumerable<(QueuePrope
var tuple = queues.First();

var topicSpecification = new TopicSpecification
{
Name = queues.Key,
NumPartitions = queues.Count(),
ReplicationFactor = tuple.replicationFactor
};
{
Name = queues.Key,
NumPartitions = queues.Count(),
ReplicationFactor = tuple.replicationFactor
};

if (tuple.retentionPeriodInMs.HasValue)
{
topicSpecification.Configs = new Dictionary<string, string>()
{
{
"retention.ms",
tuple.retentionPeriodInMs.ToString()
}
};
{
{
"retention.ms",
tuple.retentionPeriodInMs.ToString()
}
};
}

result.Add(topicSpecification);
Expand Down
2 changes: 1 addition & 1 deletion Orleans.Streams.Kafka/Orleans.Streams.Kafka.csproj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<Description>Orleans streaming provider for Kafka.</Description>
<PackageTags>orleans kafka streams providers streamprovider confluent</PackageTags>
<LangVersion>10</LangVersion>
Expand Down
2 changes: 1 addition & 1 deletion Samples/ConfluentSample/ConfluentSample.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion Samples/TestClient/TestClient.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<LangVersion>latest</LangVersion>
<IsPackable>false</IsPackable>
</PropertyGroup>
Expand Down
2 changes: 1 addition & 1 deletion Samples/TestGrains/TestGrains.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Library</OutputType>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>

Expand Down
2 changes: 1 addition & 1 deletion Samples/TestSilo/TestSilo.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<LangVersion>latest</LangVersion>
<IsPackable>false</IsPackable>
</PropertyGroup>
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "orleans.streams.kafka",
"version": "6.2.1",
"version": "6.3.0",
"description": "Orleans Kafka Stream Provider",
"solution": "Orleans.Streams.Kafka-build.sln",
"dependencies": {},
Expand Down
Loading