From 6b31273918fbbb94a16c329e50b58b98c302ad99 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Fri, 15 Dec 2023 19:25:31 +0300 Subject: [PATCH 1/9] #25 retrieve the retry topic name from the header of the message --- src/Services/Implementations/ConfigurationService.cs | 1 + src/Services/Implementations/KafkaRetryJobService.cs | 11 +++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/Services/Implementations/ConfigurationService.cs b/src/Services/Implementations/ConfigurationService.cs index c31f955..43a59b4 100644 --- a/src/Services/Implementations/ConfigurationService.cs +++ b/src/Services/Implementations/ConfigurationService.cs @@ -18,6 +18,7 @@ public ConfigurationService(IConfiguration configuration) public string TopicRegex => GetValueOrThrowInvalidConfigException("TopicRegex"); public string ErrorSuffix => GetValueOrThrowInvalidConfigException("ErrorSuffix"); public string RetrySuffix => GetValueOrThrowInvalidConfigException("RetrySuffix"); + public string RetryTopicNameInHeader => GetValue("RetryTopicNameInHeader"); public string GroupId => GetValueOrThrowInvalidConfigException("GroupId"); public string SaslUsername => GetValue("SaslUsername"); public string SaslPassword => GetValue("SaslPassword"); diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index 1d46ec4..780a448 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -72,8 +72,7 @@ public async Task MoveMessages() result.Message.Timestamp = new Timestamp(DateTime.UtcNow); - var retryTopic = - errorTopic.ReplaceAtEnd(_configuration.ErrorSuffix, _configuration.RetrySuffix); + var retryTopic = GetRetryTopicName(result, errorTopic); _logService.LogProducingMessage(result, errorTopic, retryTopic); @@ -96,6 +95,14 @@ public async Task MoveMessages() _logService.LogApplicationIsClosing(); } + + private string GetRetryTopicName(ConsumeResult result , string errorTopic ) + { + return !string.IsNullOrEmpty(_configuration.RetryTopicNameInHeader) && + result.Message.Headers.TryGetLastBytes(_configuration.RetryTopicNameInHeader, out var retryTopicInHeader) ? + System.Text.Encoding.UTF8.GetString(retryTopicInHeader) : + errorTopic.ReplaceAtEnd(_configuration.ErrorSuffix, _configuration.RetrySuffix); + } private List GetErrorTopicsFromCluster(Metadata metadata) { From dc31902414fadbf850ade9e2c1814f7559d955d7 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Fri, 15 Dec 2023 19:26:36 +0300 Subject: [PATCH 2/9] #24 retrieve topic partition lags before consuming any message --- .../Implementations/KafkaRetryJobService.cs | 109 +++++++++++------- src/Services/Implementations/LogService.cs | 19 ++- src/Services/Interfaces/ILogService.cs | 5 +- 3 files changed, 86 insertions(+), 47 deletions(-) diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index 780a448..af32b76 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -31,8 +31,10 @@ public async Task MoveMessages() var adminClient = _kafkaService.BuildAdminClient(); var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(120)); adminClient.Dispose(); - var errorTopics = GetErrorTopicsFromCluster(metadata); - + + var errorTopicPartitionsWithLag = GetErrorTopicInfosFromCluster(assignedConsumer, metadata); + var errorTopics = errorTopicPartitionsWithLag.Select(p => p.Item1.Topic).Distinct().ToList(); + _logService.LogMatchingErrorTopics(errorTopics); using var producer = _kafkaService.BuildKafkaProducer(); @@ -41,50 +43,56 @@ public async Task MoveMessages() try { - foreach (var errorTopic in errorTopics) + foreach (var (topicPartition, lag) in errorTopicPartitionsWithLag) { - _logService.LogConsumerSubscribingTopic(errorTopic); - - var topicPartitions = metadata.Topics.First(x => x.Topic == errorTopic).Partitions; - - for (var partition = 0; partition < topicPartitions.Count; partition++) + if (lag <= 0) + { + continue; + } + + _logService.LogStartOfSubscribingTopicPartition(topicPartition); + + var errorTopic = topicPartition.Topic; + var currentLag = lag; + + assignedConsumer.Assign(topicPartition); + + while (currentLag > 0) { - var topicPartition = new TopicPartition(errorTopic, partition); - assignedConsumer.Assign(topicPartition); + var result = assignedConsumer.Consume(TimeSpan.FromSeconds(3)); - while (true) + if (result is null) { - var result = assignedConsumer.Consume(TimeSpan.FromSeconds(3)); + break; + } - if (result is null) - { - _logService.LogEndOfPartition(topicPartition); - break; - } + currentLag -= 1; - var resultDate = result.Message.Timestamp.UtcDateTime; + var resultDate = result.Message.Timestamp.UtcDateTime; - if (utcNow < resultDate) - { - _logService.LogNewMessageArrived(utcNow); - break; - } + if (utcNow < resultDate) + { + _logService.LogNewMessageArrived(utcNow); + break; + } - result.Message.Timestamp = new Timestamp(DateTime.UtcNow); + result.Message.Timestamp = new Timestamp(DateTime.UtcNow); - var retryTopic = GetRetryTopicName(result, errorTopic); + var retryTopic = GetRetryTopicName(result, errorTopic); - _logService.LogProducingMessage(result, errorTopic, retryTopic); + _logService.LogProducingMessage(result, errorTopic, retryTopic); - await producer.ProduceAsync(retryTopic, result.Message); + await producer.ProduceAsync(retryTopic, result.Message); - assignedConsumer.StoreOffset(result); - assignedConsumer.Commit(); - } + assignedConsumer.StoreOffset(result); + assignedConsumer.Commit(); } - - assignedConsumer.Unassign(); + + _logService.LogEndOfSubscribingTopicPartition(topicPartition); } + + assignedConsumer.Unassign(); + } catch (Exception e) { @@ -95,27 +103,40 @@ public async Task MoveMessages() _logService.LogApplicationIsClosing(); } - + private string GetRetryTopicName(ConsumeResult result , string errorTopic ) { return !string.IsNullOrEmpty(_configuration.RetryTopicNameInHeader) && - result.Message.Headers.TryGetLastBytes(_configuration.RetryTopicNameInHeader, out var retryTopicInHeader) ? - System.Text.Encoding.UTF8.GetString(retryTopicInHeader) : - errorTopic.ReplaceAtEnd(_configuration.ErrorSuffix, _configuration.RetrySuffix); + result.Message.Headers.TryGetLastBytes(_configuration.RetryTopicNameInHeader, out var retryTopicInHeader) ? + System.Text.Encoding.UTF8.GetString(retryTopicInHeader) : + errorTopic.ReplaceAtEnd(_configuration.ErrorSuffix, _configuration.RetrySuffix); } - - private List GetErrorTopicsFromCluster(Metadata metadata) + + private List<(TopicPartition, long)> GetErrorTopicInfosFromCluster(IConsumer assignedConsumer, Metadata metadata) { + _logService.LogFetchingErrorTopicInfoStarted(); + var topicRegex = _configuration.TopicRegex; - var clusterTopics = metadata.Topics.Select(t => t.Topic).ToList(); var errorTopicRegex = new Regex(topicRegex); - var errorTopics = clusterTopics - .Where(t => errorTopicRegex.IsMatch(t)) - .Where(t => t.EndsWith(_configuration.ErrorSuffix)) - .ToList(); + var topicPartitionMetadata = metadata.Topics + .Where(t => errorTopicRegex.IsMatch(t.Topic)) + .SelectMany(topic => + topic.Partitions.Select(partition => + new TopicPartition(topic.Topic, partition.PartitionId))) + .ToArray(); + + var topicsWithFoundOffsets = assignedConsumer.Committed(topicPartitionMetadata, TimeSpan.FromSeconds(10)); + + var topicPartitionInfos = topicsWithFoundOffsets.Select(tpo => { + var watermark = assignedConsumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(5)); + var lag = tpo.Offset >= 0 ? watermark.High - tpo.Offset : 0; + return (tpo.TopicPartition, lag); + }).ToList(); - return errorTopics; + _logService.LogFetchingErrorTopicInfoFinished(); + + return topicPartitionInfos; } } } \ No newline at end of file diff --git a/src/Services/Implementations/LogService.cs b/src/Services/Implementations/LogService.cs index 882bee9..3597036 100644 --- a/src/Services/Implementations/LogService.cs +++ b/src/Services/Implementations/LogService.cs @@ -59,9 +59,14 @@ public void LogConsumerIsNotAssigned() _logger.LogInformation("Consumer is not assigned to any partition"); } - public void LogConsumerSubscribingTopic(string topic) + public void LogStartOfSubscribingTopicPartition(TopicPartition topicPartition) { - _logger.LogInformation($"Consumer is subscribing to topic: {topic}"); + _logger.LogInformation($"Start of partition: {topicPartition.Partition}, topic: {topicPartition.Topic}."); + } + + public void LogEndOfSubscribingTopicPartition(TopicPartition topicPartition) + { + _logger.LogInformation($"End of partition: {topicPartition.Partition}, topic: {topicPartition.Topic}."); } public void LogLastCommittedOffset(TopicPartitionOffset tpo) @@ -79,5 +84,15 @@ public void LogApplicationIsClosing() { _logger.LogInformation("Application is ending, closing consumer and producer"); } + + public void LogFetchingErrorTopicInfoStarted() + { + _logger.LogInformation("Fetching error topic info is started"); + } + + public void LogFetchingErrorTopicInfoFinished() + { + _logger.LogInformation("Fetching error topic info is finished"); + } } } \ No newline at end of file diff --git a/src/Services/Interfaces/ILogService.cs b/src/Services/Interfaces/ILogService.cs index eae6c97..5fa8cc0 100644 --- a/src/Services/Interfaces/ILogService.cs +++ b/src/Services/Interfaces/ILogService.cs @@ -13,9 +13,12 @@ public interface ILogService void LogProducingMessage(ConsumeResult result, string errorTopic, string retryTopic); void LogAssignedPartitions(string partitions); void LogConsumerIsNotAssigned(); - void LogConsumerSubscribingTopic(string topic); + void LogStartOfSubscribingTopicPartition(TopicPartition topicPartition); + void LogEndOfSubscribingTopicPartition(TopicPartition topicPartition); void LogLastCommittedOffset(TopicPartitionOffset tpo); void LogNewMessageArrived(DateTime utcNow); void LogApplicationIsClosing(); + void LogFetchingErrorTopicInfoStarted(); + void LogFetchingErrorTopicInfoFinished(); } } \ No newline at end of file From 5f307b951e9d57bd6083a8af15761ee39e1247d0 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Fri, 15 Dec 2023 19:29:33 +0300 Subject: [PATCH 3/9] #18 add MessageConsumeLimit parameter to restrict the message amount --- .../Implementations/ConfigurationService.cs | 2 ++ .../Implementations/KafkaRetryJobService.cs | 13 ++++++++++++- src/Services/Implementations/LogService.cs | 5 +++++ src/Services/Interfaces/ILogService.cs | 1 + 4 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/Services/Implementations/ConfigurationService.cs b/src/Services/Implementations/ConfigurationService.cs index 43a59b4..b8afec6 100644 --- a/src/Services/Implementations/ConfigurationService.cs +++ b/src/Services/Implementations/ConfigurationService.cs @@ -1,3 +1,4 @@ +using System; using Confluent.Kafka; using KafkaRetry.Job.Exceptions; using KafkaRetry.Job.Helpers; @@ -19,6 +20,7 @@ public ConfigurationService(IConfiguration configuration) public string ErrorSuffix => GetValueOrThrowInvalidConfigException("ErrorSuffix"); public string RetrySuffix => GetValueOrThrowInvalidConfigException("RetrySuffix"); public string RetryTopicNameInHeader => GetValue("RetryTopicNameInHeader"); + public long MessageConsumeLimit => GetValue("MessageConsumeLimit") ?? Int64.MaxValue; public string GroupId => GetValueOrThrowInvalidConfigException("GroupId"); public string SaslUsername => GetValue("SaslUsername"); public string SaslPassword => GetValue("SaslPassword"); diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index af32b76..482b2df 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -43,8 +43,18 @@ public async Task MoveMessages() try { + var messageConsumeLimit = _configuration.MessageConsumeLimit; + if (messageConsumeLimit <= 0) + { + _logService.LogMessageConsumeLimitIsZero(); + } + foreach (var (topicPartition, lag) in errorTopicPartitionsWithLag) { + if (messageConsumeLimit <= 0) + { + break; + } if (lag <= 0) { continue; @@ -57,7 +67,7 @@ public async Task MoveMessages() assignedConsumer.Assign(topicPartition); - while (currentLag > 0) + while (currentLag > 0 && messageConsumeLimit > 0) { var result = assignedConsumer.Consume(TimeSpan.FromSeconds(3)); @@ -67,6 +77,7 @@ public async Task MoveMessages() } currentLag -= 1; + messageConsumeLimit -= 1; var resultDate = result.Message.Timestamp.UtcDateTime; diff --git a/src/Services/Implementations/LogService.cs b/src/Services/Implementations/LogService.cs index 3597036..f06ff39 100644 --- a/src/Services/Implementations/LogService.cs +++ b/src/Services/Implementations/LogService.cs @@ -94,5 +94,10 @@ public void LogFetchingErrorTopicInfoFinished() { _logger.LogInformation("Fetching error topic info is finished"); } + + public void LogMessageConsumeLimitIsZero() + { + _logger.LogError("Parameter MessageConsumeLimit cannot be zero"); + } } } \ No newline at end of file diff --git a/src/Services/Interfaces/ILogService.cs b/src/Services/Interfaces/ILogService.cs index 5fa8cc0..7bcc93a 100644 --- a/src/Services/Interfaces/ILogService.cs +++ b/src/Services/Interfaces/ILogService.cs @@ -20,5 +20,6 @@ public interface ILogService void LogApplicationIsClosing(); void LogFetchingErrorTopicInfoStarted(); void LogFetchingErrorTopicInfoFinished(); + public void LogMessageConsumeLimitIsZero(); } } \ No newline at end of file From 9ebb5b1ae64fd940945075071af5b66d0cdb8a56 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Fri, 15 Dec 2023 19:30:35 +0300 Subject: [PATCH 4/9] #27 add EnableAutoCommit flag to enable auto commit strategy --- .../Implementations/ConfigurationService.cs | 1 + .../Implementations/KafkaRetryJobService.cs | 5 +++-- src/Services/Implementations/KafkaService.cs | 15 +++++++++++++++ src/Services/Interfaces/IKafkaService.cs | 2 ++ 4 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/Services/Implementations/ConfigurationService.cs b/src/Services/Implementations/ConfigurationService.cs index b8afec6..9088178 100644 --- a/src/Services/Implementations/ConfigurationService.cs +++ b/src/Services/Implementations/ConfigurationService.cs @@ -21,6 +21,7 @@ public ConfigurationService(IConfiguration configuration) public string RetrySuffix => GetValueOrThrowInvalidConfigException("RetrySuffix"); public string RetryTopicNameInHeader => GetValue("RetryTopicNameInHeader"); public long MessageConsumeLimit => GetValue("MessageConsumeLimit") ?? Int64.MaxValue; + public bool EnableAutoCommit => GetValue("EnableAutoCommit") ?? false; public string GroupId => GetValueOrThrowInvalidConfigException("GroupId"); public string SaslUsername => GetValue("SaslUsername"); public string SaslPassword => GetValue("SaslPassword"); diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index 482b2df..82d12d5 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -41,6 +41,8 @@ public async Task MoveMessages() var utcNow = DateTime.UtcNow; + var consumerCommitStrategy= _kafkaService.GetConsumerCommitStrategy(); + try { var messageConsumeLimit = _configuration.MessageConsumeLimit; @@ -95,8 +97,7 @@ public async Task MoveMessages() await producer.ProduceAsync(retryTopic, result.Message); - assignedConsumer.StoreOffset(result); - assignedConsumer.Commit(); + consumerCommitStrategy.Invoke(assignedConsumer, result); } _logService.LogEndOfSubscribingTopicPartition(topicPartition); diff --git a/src/Services/Implementations/KafkaService.cs b/src/Services/Implementations/KafkaService.cs index d71196c..2b655eb 100644 --- a/src/Services/Implementations/KafkaService.cs +++ b/src/Services/Implementations/KafkaService.cs @@ -1,3 +1,4 @@ +using System; using Confluent.Kafka; using KafkaRetry.Job.Services.Interfaces; @@ -86,5 +87,19 @@ private ConsumerConfig CreateConsumerConfig(string bootstrapServers, string grou EnableAutoOffsetStore = false }; } + + public Action, ConsumeResult> GetConsumerCommitStrategy() + { + return _configuration.EnableAutoCommit ? + (assignedConsumer, result) => + { + assignedConsumer.StoreOffset(result); + } : + (assignedConsumer, result) => + { + assignedConsumer.StoreOffset(result); + assignedConsumer.Commit(); + }; + } } } \ No newline at end of file diff --git a/src/Services/Interfaces/IKafkaService.cs b/src/Services/Interfaces/IKafkaService.cs index 6cd959b..46f9c1a 100644 --- a/src/Services/Interfaces/IKafkaService.cs +++ b/src/Services/Interfaces/IKafkaService.cs @@ -1,3 +1,4 @@ +using System; using Confluent.Kafka; namespace KafkaRetry.Job.Services.Interfaces @@ -7,5 +8,6 @@ public interface IKafkaService IConsumer BuildKafkaConsumer(); IProducer BuildKafkaProducer(); IAdminClient BuildAdminClient(); + public Action, ConsumeResult> GetConsumerCommitStrategy(); } } \ No newline at end of file From 8104ca3ddd3c01fc717965dd5ec9bcedb53dc208 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Tue, 26 Dec 2023 11:57:50 +0300 Subject: [PATCH 5/9] #27 fixes the config files of Kafka ConsumerConfig and adds configs to the read me file --- README.md | 4 ++++ src/Services/Implementations/ConfigurationService.cs | 1 + src/Services/Implementations/KafkaService.cs | 4 ++-- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index c57f28f..4fab6f2 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,10 @@ Here is the explanation of environment variables. Please note that the config pr - Error Suffix: The suffix of error topics - Retry Suffix: The suffix of retry topics - GroupId: GroupId for Retry Job Consumer +- RetryTopicNameInHeader: Retry topic name that will be presented in the header of each message to transfer them corresponding retry topic +- MessageConsumeLimit: Limit the total number of messages that can be consumed +- EnableAutoCommit: Enable/disable auto commit config +- EnableAutoOffsetStore: Enable/disable auto offset store config ## Getting Started diff --git a/src/Services/Implementations/ConfigurationService.cs b/src/Services/Implementations/ConfigurationService.cs index 9088178..a78f24c 100644 --- a/src/Services/Implementations/ConfigurationService.cs +++ b/src/Services/Implementations/ConfigurationService.cs @@ -22,6 +22,7 @@ public ConfigurationService(IConfiguration configuration) public string RetryTopicNameInHeader => GetValue("RetryTopicNameInHeader"); public long MessageConsumeLimit => GetValue("MessageConsumeLimit") ?? Int64.MaxValue; public bool EnableAutoCommit => GetValue("EnableAutoCommit") ?? false; + public bool EnableAutoOffsetStore => GetValue("EnableAutoOffsetStore") ?? false; public string GroupId => GetValueOrThrowInvalidConfigException("GroupId"); public string SaslUsername => GetValue("SaslUsername"); public string SaslPassword => GetValue("SaslPassword"); diff --git a/src/Services/Implementations/KafkaService.cs b/src/Services/Implementations/KafkaService.cs index 2b655eb..ae11fe5 100644 --- a/src/Services/Implementations/KafkaService.cs +++ b/src/Services/Implementations/KafkaService.cs @@ -77,14 +77,14 @@ private ConsumerConfig CreateConsumerConfig(string bootstrapServers, string grou BootstrapServers = bootstrapServers, AutoOffsetReset = AutoOffsetReset.Earliest, GroupId = groupId, - EnableAutoCommit = false, + EnableAutoCommit = _configuration.EnableAutoCommit, SaslUsername = _configuration.SaslUsername ?? string.Empty, SaslPassword = _configuration.SaslPassword ?? string.Empty, SslCaLocation = _configuration.SslCaLocation ?? string.Empty, SaslMechanism = _configuration.SaslMechanism, SecurityProtocol = _configuration.SecurityProtocol, SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty, - EnableAutoOffsetStore = false + EnableAutoOffsetStore = _configuration.EnableAutoOffsetStore, }; } From 68c41fdf282a5cdcb7db5563ea3420b5627ae423 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Tue, 26 Dec 2023 12:06:24 +0300 Subject: [PATCH 6/9] #25 upgrade version --- .github/workflows/publish.yml | 2 +- .gitlab-ci.yml | 2 +- src/Services/Implementations/KafkaRetryJobService.cs | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 1636f1e..e85d11d 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -19,7 +19,7 @@ jobs: name: Build & push Docker image with: image: kafka-retry-job - tags: 1.9.0, latest + tags: 1.10.0, latest registry: ghcr.io username: ${{ secrets.GHCR_USERNAME }} password: ${{ secrets.GHCR_TOKEN }} diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 22dafaa..110abe6 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,6 +1,6 @@ variables: - VERSION: "1.9.0" + VERSION: "1.10.0" DOCKER_IMAGE_VERSION: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH:$VERSION DOCKER_IMAGE_LATEST: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index 82d12d5..4e38320 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -49,6 +49,7 @@ public async Task MoveMessages() if (messageConsumeLimit <= 0) { _logService.LogMessageConsumeLimitIsZero(); + return; } foreach (var (topicPartition, lag) in errorTopicPartitionsWithLag) From f76d152bcb9362c0b667c0d86de2bafb5a14985e Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Tue, 26 Dec 2023 12:29:02 +0300 Subject: [PATCH 7/9] adds a changelog file --- CHANGELOG.MD | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 CHANGELOG.MD diff --git a/CHANGELOG.MD b/CHANGELOG.MD new file mode 100644 index 0000000..11c3f5d --- /dev/null +++ b/CHANGELOG.MD @@ -0,0 +1,14 @@ +# Changelog + +## [1.10.0](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2023-12) + +**Closed issues:** + +- [\#18](https://github.com/Trendyol/kafka-retry-job/issues/18) moves messages with limit given from config +- [\#24](https://github.com/Trendyol/kafka-retry-job/issues/24) check if error topic is empty without assigning and consuming message +- [\#25](https://github.com/Trendyol/kafka-retry-job/issues/25) get retry topic name from header of error message +- [\#27](https://github.com/Trendyol/kafka-retry-job/issues/27) get auto-commit , manuel commit options from config + +**Merged pull requests:** + +- Pull Request for the issues #18 #24 #25 #27 [\#28](https://github.com/Trendyol/kafka-retry-job/pull/28) ([ahmetfurkankavraz](https://github.com/ahmetfurkankavraz)) \ No newline at end of file From 6af022af0f3678b4603d438258933eeaab553f23 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Tue, 26 Dec 2023 15:53:19 +0300 Subject: [PATCH 8/9] #24 adds subscription to the previously unsubscribed topic partitions --- src/Services/Implementations/KafkaRetryJobService.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index 4e38320..b048c5b 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -143,7 +143,7 @@ private string GetRetryTopicName(ConsumeResult result , string er var topicPartitionInfos = topicsWithFoundOffsets.Select(tpo => { var watermark = assignedConsumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(5)); - var lag = tpo.Offset >= 0 ? watermark.High - tpo.Offset : 0; + var lag = tpo.Offset >= 0 ? watermark.High - tpo.Offset : watermark.High - watermark.Low; return (tpo.TopicPartition, lag); }).ToList(); From 676e8e0e37ed31bd2abcede2134f43a7689366f2 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Wed, 27 Dec 2023 10:05:03 +0300 Subject: [PATCH 9/9] #18 change the message limit from general to specific for each topic partition --- CHANGELOG.MD | 2 +- README.md | 2 +- src/Services/Implementations/ConfigurationService.cs | 2 +- src/Services/Implementations/KafkaRetryJobService.cs | 11 ++++------- 4 files changed, 7 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.MD b/CHANGELOG.MD index 11c3f5d..931ce73 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -4,7 +4,7 @@ **Closed issues:** -- [\#18](https://github.com/Trendyol/kafka-retry-job/issues/18) moves messages with limit given from config +- [\#18](https://github.com/Trendyol/kafka-retry-job/issues/18) moves messages with limit for each topic partition given from config - [\#24](https://github.com/Trendyol/kafka-retry-job/issues/24) check if error topic is empty without assigning and consuming message - [\#25](https://github.com/Trendyol/kafka-retry-job/issues/25) get retry topic name from header of error message - [\#27](https://github.com/Trendyol/kafka-retry-job/issues/27) get auto-commit , manuel commit options from config diff --git a/README.md b/README.md index 4fab6f2..57537d1 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ Here is the explanation of environment variables. Please note that the config pr - Retry Suffix: The suffix of retry topics - GroupId: GroupId for Retry Job Consumer - RetryTopicNameInHeader: Retry topic name that will be presented in the header of each message to transfer them corresponding retry topic -- MessageConsumeLimit: Limit the total number of messages that can be consumed +- MessageConsumeLimitPerTopicPartition: Limit the total number of messages that can be consumed for a topic partition - EnableAutoCommit: Enable/disable auto commit config - EnableAutoOffsetStore: Enable/disable auto offset store config diff --git a/src/Services/Implementations/ConfigurationService.cs b/src/Services/Implementations/ConfigurationService.cs index a78f24c..c17de48 100644 --- a/src/Services/Implementations/ConfigurationService.cs +++ b/src/Services/Implementations/ConfigurationService.cs @@ -20,7 +20,7 @@ public ConfigurationService(IConfiguration configuration) public string ErrorSuffix => GetValueOrThrowInvalidConfigException("ErrorSuffix"); public string RetrySuffix => GetValueOrThrowInvalidConfigException("RetrySuffix"); public string RetryTopicNameInHeader => GetValue("RetryTopicNameInHeader"); - public long MessageConsumeLimit => GetValue("MessageConsumeLimit") ?? Int64.MaxValue; + public long MessageConsumeLimitPerTopicPartition => GetValue("MessageConsumeLimitPerTopicPartition") ?? Int64.MaxValue; public bool EnableAutoCommit => GetValue("EnableAutoCommit") ?? false; public bool EnableAutoOffsetStore => GetValue("EnableAutoOffsetStore") ?? false; public string GroupId => GetValueOrThrowInvalidConfigException("GroupId"); diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index b048c5b..101f6fb 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -45,7 +45,7 @@ public async Task MoveMessages() try { - var messageConsumeLimit = _configuration.MessageConsumeLimit; + var messageConsumeLimit = _configuration.MessageConsumeLimitPerTopicPartition; if (messageConsumeLimit <= 0) { _logService.LogMessageConsumeLimitIsZero(); @@ -54,15 +54,12 @@ public async Task MoveMessages() foreach (var (topicPartition, lag) in errorTopicPartitionsWithLag) { - if (messageConsumeLimit <= 0) - { - break; - } if (lag <= 0) { continue; } + var messageConsumeLimitForTopicPartition = messageConsumeLimit; _logService.LogStartOfSubscribingTopicPartition(topicPartition); var errorTopic = topicPartition.Topic; @@ -70,7 +67,7 @@ public async Task MoveMessages() assignedConsumer.Assign(topicPartition); - while (currentLag > 0 && messageConsumeLimit > 0) + while (currentLag > 0 && messageConsumeLimitForTopicPartition > 0) { var result = assignedConsumer.Consume(TimeSpan.FromSeconds(3)); @@ -80,7 +77,7 @@ public async Task MoveMessages() } currentLag -= 1; - messageConsumeLimit -= 1; + messageConsumeLimitForTopicPartition -= 1; var resultDate = result.Message.Timestamp.UtcDateTime;