diff --git a/src/Services/Implementations/ConfigurationService.cs b/src/Services/Implementations/ConfigurationService.cs index 4a166b3..1b1391d 100644 --- a/src/Services/Implementations/ConfigurationService.cs +++ b/src/Services/Implementations/ConfigurationService.cs @@ -40,6 +40,7 @@ public ConfigurationService(IConfiguration configuration) public int? MessageTimeoutMs => GetValue("ProducerMessageTimeoutMs"); public int? RequestTimeoutMs => GetValue("ProducerRequestTimeoutMs"); public int? MessageMaxBytes => GetValue("ProducerMessageMaxBytes"); + public int MaxLevelParallelism => GetValue("MaxLevelParallelism") ?? 1; private string GetValueOrThrowInvalidConfigException(string configName) { diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index 101f6fb..3203f0c 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -1,7 +1,9 @@ using System; using System.Collections.Generic; +using System.Collections.Immutable; using System.Linq; using System.Text.RegularExpressions; +using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; using KafkaRetry.Job.Services.Interfaces; @@ -26,43 +28,71 @@ public KafkaRetryJobService(IKafkaService kafkaService, public async Task MoveMessages() { _logService.LogApplicationStarted(); - - using var assignedConsumer = _kafkaService.BuildKafkaConsumer(); + var adminClient = _kafkaService.BuildAdminClient(); var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(120)); adminClient.Dispose(); - var errorTopicPartitionsWithLag = GetErrorTopicInfosFromCluster(assignedConsumer, metadata); - var errorTopics = errorTopicPartitionsWithLag.Select(p => p.Item1.Topic).Distinct().ToList(); + var assignedConsumerPool = new ThreadLocal>(() => _kafkaService.BuildKafkaConsumer()); + var producerPool = new ThreadLocal>(() => _kafkaService.BuildKafkaProducer()); + + var errorTopicsWithLag = GetErrorTopicInfosFromCluster(assignedConsumerPool.Value, metadata); + var errorTopics = errorTopicsWithLag.Keys.ToList(); _logService.LogMatchingErrorTopics(errorTopics); + + var consumerCommitStrategy= _kafkaService.GetConsumerCommitStrategy(); + + var messageConsumeLimit = _configuration.MessageConsumeLimitPerTopicPartition; + if (messageConsumeLimit <= 0) + { + _logService.LogMessageConsumeLimitIsZero(); + return; + } + + var maxDegreeOfParallelism = _configuration.MaxLevelParallelism; + var semaphore = new SemaphoreSlim(maxDegreeOfParallelism); + var tasks = new List(); + + foreach (var (_, topicPartitionsWithLag) in errorTopicsWithLag) + { + await semaphore.WaitAsync(); + tasks.Add(Task.Run(async () => + { + await MoveMessagesForTopic(topicPartitionsWithLag, assignedConsumerPool, producerPool, + consumerCommitStrategy); + semaphore.Release(); + })); + } - using var producer = _kafkaService.BuildKafkaProducer(); - - var utcNow = DateTime.UtcNow; + Task.WaitAll(tasks.ToArray()); + assignedConsumerPool.Value.Dispose(); + + _logService.LogApplicationIsClosing(); + } - var consumerCommitStrategy= _kafkaService.GetConsumerCommitStrategy(); + private async Task MoveMessagesForTopic( + List<(TopicPartition, long)> topicPartitionsWithLag, + ThreadLocal> assignedConsumerPool, + ThreadLocal> producerPool, + Action, ConsumeResult> consumerCommitStrategy + ) { + var assignedConsumer = assignedConsumerPool.Value; + var producer = producerPool.Value; - try + foreach (var (topicPartition, lag) in topicPartitionsWithLag) { - var messageConsumeLimit = _configuration.MessageConsumeLimitPerTopicPartition; - if (messageConsumeLimit <= 0) + var errorTopic = topicPartition.Topic; + if (lag <= 0) { - _logService.LogMessageConsumeLimitIsZero(); - return; + continue; } - foreach (var (topicPartition, lag) in errorTopicPartitionsWithLag) + try { - if (lag <= 0) - { - continue; - } - - var messageConsumeLimitForTopicPartition = messageConsumeLimit; + var messageConsumeLimitForTopicPartition = _configuration.MessageConsumeLimitPerTopicPartition; _logService.LogStartOfSubscribingTopicPartition(topicPartition); - var errorTopic = topicPartition.Topic; var currentLag = lag; assignedConsumer.Assign(topicPartition); @@ -78,15 +108,7 @@ public async Task MoveMessages() currentLag -= 1; messageConsumeLimitForTopicPartition -= 1; - - var resultDate = result.Message.Timestamp.UtcDateTime; - - if (utcNow < resultDate) - { - _logService.LogNewMessageArrived(utcNow); - break; - } - + result.Message.Timestamp = new Timestamp(DateTime.UtcNow); var retryTopic = GetRetryTopicName(result, errorTopic); @@ -97,21 +119,18 @@ public async Task MoveMessages() consumerCommitStrategy.Invoke(assignedConsumer, result); } + + assignedConsumer.Unassign(); _logService.LogEndOfSubscribingTopicPartition(topicPartition); } - - assignedConsumer.Unassign(); - - } - catch (Exception e) - { - _logService.LogError(e); - assignedConsumer.Unassign(); - throw; + catch (Exception e) + { + _logService.LogError(e); + assignedConsumer.Unassign(); + throw; + } } - - _logService.LogApplicationIsClosing(); } private string GetRetryTopicName(ConsumeResult result , string errorTopic ) @@ -122,7 +141,7 @@ private string GetRetryTopicName(ConsumeResult result , string er errorTopic.ReplaceAtEnd(_configuration.ErrorSuffix, _configuration.RetrySuffix); } - private List<(TopicPartition, long)> GetErrorTopicInfosFromCluster(IConsumer assignedConsumer, Metadata metadata) + private IDictionary> GetErrorTopicInfosFromCluster(IConsumer assignedConsumer, Metadata metadata) { _logService.LogFetchingErrorTopicInfoStarted(); @@ -142,7 +161,12 @@ private string GetRetryTopicName(ConsumeResult result , string er var watermark = assignedConsumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(5)); var lag = tpo.Offset >= 0 ? watermark.High - tpo.Offset : watermark.High - watermark.Low; return (tpo.TopicPartition, lag); - }).ToList(); + }) + .GroupBy(t => t.Item1.Topic) + .ToImmutableDictionary( + t => t.Key, + t => t.ToList() + ); _logService.LogFetchingErrorTopicInfoFinished();