diff --git a/Directory.Build.props b/Directory.Build.props index 225deed..eaf11c6 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -21,15 +21,15 @@ - net6.0 + net8.0 - 3.6.0 + 3.7.1 11.1.0 2.1.0 1.7.7.7 - 6.0.0 + 8.0.* \ No newline at end of file diff --git a/Orleans.Streams.Kafka.E2E/Orleans.Streams.Kafka.E2E.csproj b/Orleans.Streams.Kafka.E2E/Orleans.Streams.Kafka.E2E.csproj index ae77df7..9251f44 100644 --- a/Orleans.Streams.Kafka.E2E/Orleans.Streams.Kafka.E2E.csproj +++ b/Orleans.Streams.Kafka.E2E/Orleans.Streams.Kafka.E2E.csproj @@ -1,7 +1,7 @@  - net6.0 + net8.0 false 10 diff --git a/Orleans.Streams.Kafka/Consumer/ConsumeResultExtensions.cs b/Orleans.Streams.Kafka/Consumer/ConsumeResultExtensions.cs index 0a04b6f..853a205 100644 --- a/Orleans.Streams.Kafka/Consumer/ConsumeResultExtensions.cs +++ b/Orleans.Streams.Kafka/Consumer/ConsumeResultExtensions.cs @@ -24,7 +24,7 @@ QueueProperties queueProperties var message = serializationContext .ExternalStreamDeserializer - .Deserialize(queueProperties, queueProperties.ExternalContractType, result); + .Deserialize(queueProperties, queueProperties.ExternalContractType, result.Message.Value); return new KafkaBatchContainer( StreamProviderUtils.GenerateStreamGuid(key), diff --git a/Orleans.Streams.Kafka/Core/KafkaAdapterFactory.cs b/Orleans.Streams.Kafka/Core/KafkaAdapterFactory.cs index 9ebcab3..ba1fccf 100644 --- a/Orleans.Streams.Kafka/Core/KafkaAdapterFactory.cs +++ b/Orleans.Streams.Kafka/Core/KafkaAdapterFactory.cs @@ -17,8 +17,6 @@ namespace Orleans.Streams.Kafka.Core { - using System.Globalization; - public class KafkaAdapterFactory : IQueueAdapterFactory { private readonly string _name; @@ -34,21 +32,6 @@ public class KafkaAdapterFactory : IQueueAdapterFactory private readonly AdminClientBuilder _adminConfig; private readonly AdminClientConfig _config; - public KafkaAdapterFactory( - string name, - KafkaStreamOptions options, - SimpleQueueCacheOptions cacheOptions, - SerializationManager serializationManager, - ILoggerFactory loggerFactory, - IGrainFactory grainFactory - ) : this(name, options, cacheOptions, serializationManager, loggerFactory, grainFactory, null) - { - if (options.Topics.Any(topic => topic.IsExternal)) - throw new InvalidOperationException( - "Cannot have external topic with no 'IExternalDeserializer' defined. Use 'AddJson' or 'AddAvro'" - ); - } - public KafkaAdapterFactory( string name, KafkaStreamOptions options, @@ -56,7 +39,7 @@ public KafkaAdapterFactory( SerializationManager serializationManager, ILoggerFactory loggerFactory, IGrainFactory grainFactory, - IExternalStreamDeserializer externalDeserializer + IServiceProvider services ) { _options = options ?? throw new ArgumentNullException(nameof(options)); @@ -65,7 +48,7 @@ IExternalStreamDeserializer externalDeserializer _serializationManager = serializationManager; _loggerFactory = loggerFactory; _grainFactory = grainFactory; - _externalDeserializer = externalDeserializer; + _externalDeserializer = services.GetServiceByName(name); _logger = loggerFactory.CreateLogger(); _adminConfig = new AdminClientBuilder(options.ToAdminProperties()); @@ -108,23 +91,19 @@ public static KafkaAdapterFactory Create(IServiceProvider services, string name) { var streamsConfig = services.GetOptionsByName(name); var cacheOptions = services.GetOptionsByName(name); - var deserializer = services.GetServiceByName(name); + var serializer = services.GetRequiredService(); + var logger = services.GetRequiredService(); + var grainFactory = services.GetRequiredService(); - KafkaAdapterFactory factory; - if (deserializer != null) - factory = ActivatorUtilities.CreateInstance( + var factory = ActivatorUtilities.CreateInstance( services, name, streamsConfig, cacheOptions, - deserializer - ); - else - factory = ActivatorUtilities.CreateInstance( - services, - name, - streamsConfig, - cacheOptions + serializer, + logger, + grainFactory, + services ); return factory; @@ -139,7 +118,7 @@ private IEnumerable GetQueuesProperties() var currentMetaTopics = meta.Topics.ToList(); var props = new List(); - var autoProps = new List<(QueueProperties props, short replicationFactor, ulong? retentionPeriodInMs )>(); + var autoProps = new List<(QueueProperties props, short replicationFactor, ulong? retentionPeriodInMs)>(); foreach (var topic in _options.Topics) { @@ -195,21 +174,21 @@ private static Task CreateAutoTopics(IAdminClient admin, IEnumerable<(QueuePrope var tuple = queues.First(); var topicSpecification = new TopicSpecification - { - Name = queues.Key, - NumPartitions = queues.Count(), - ReplicationFactor = tuple.replicationFactor - }; + { + Name = queues.Key, + NumPartitions = queues.Count(), + ReplicationFactor = tuple.replicationFactor + }; if (tuple.retentionPeriodInMs.HasValue) { topicSpecification.Configs = new Dictionary() - { - { - "retention.ms", - tuple.retentionPeriodInMs.ToString() - } - }; + { + { + "retention.ms", + tuple.retentionPeriodInMs.ToString() + } + }; } result.Add(topicSpecification); diff --git a/Orleans.Streams.Kafka/Orleans.Streams.Kafka.csproj b/Orleans.Streams.Kafka/Orleans.Streams.Kafka.csproj index 64f4ec1..cc883cf 100644 --- a/Orleans.Streams.Kafka/Orleans.Streams.Kafka.csproj +++ b/Orleans.Streams.Kafka/Orleans.Streams.Kafka.csproj @@ -1,6 +1,6 @@  - net6.0 + net8.0 Orleans streaming provider for Kafka. orleans kafka streams providers streamprovider confluent 10 diff --git a/Samples/ConfluentSample/ConfluentSample.csproj b/Samples/ConfluentSample/ConfluentSample.csproj index 8cb8ebe..8a68ca8 100644 --- a/Samples/ConfluentSample/ConfluentSample.csproj +++ b/Samples/ConfluentSample/ConfluentSample.csproj @@ -1,7 +1,7 @@  Exe - net6.0 + net8.0 latest diff --git a/Samples/TestClient/TestClient.csproj b/Samples/TestClient/TestClient.csproj index 18318f9..d55f2ab 100644 --- a/Samples/TestClient/TestClient.csproj +++ b/Samples/TestClient/TestClient.csproj @@ -2,7 +2,7 @@ Exe - net6.0 + net8.0 latest false diff --git a/Samples/TestGrains/TestGrains.csproj b/Samples/TestGrains/TestGrains.csproj index 1c325c8..313352b 100644 --- a/Samples/TestGrains/TestGrains.csproj +++ b/Samples/TestGrains/TestGrains.csproj @@ -2,7 +2,7 @@ Library - net6.0 + net8.0 false diff --git a/Samples/TestSilo/TestSilo.csproj b/Samples/TestSilo/TestSilo.csproj index 0b504dd..2ad2492 100644 --- a/Samples/TestSilo/TestSilo.csproj +++ b/Samples/TestSilo/TestSilo.csproj @@ -2,7 +2,7 @@ Exe - net6.0 + net8.0 latest false diff --git a/package.json b/package.json index 6f04060..7fa9d59 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "orleans.streams.kafka", - "version": "6.2.1", + "version": "6.3.0", "description": "Orleans Kafka Stream Provider", "solution": "Orleans.Streams.Kafka-build.sln", "dependencies": {},