Skip to content

Commit

Permalink
Merge pull request #28 from ahmetfurkankavraz/main
Browse files Browse the repository at this point in the history
Pull Request for the issues #18 #24 #25 #27
  • Loading branch information
MehmetFiratKomurcu authored Dec 28, 2023
2 parents 2cb6c12 + 676e8e0 commit 7913141
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 47 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
name: Build & push Docker image
with:
image: kafka-retry-job
tags: 1.9.0, latest
tags: 1.10.0, latest
registry: ghcr.io
username: ${{ secrets.GHCR_USERNAME }}
password: ${{ secrets.GHCR_TOKEN }}
Expand Down
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

variables:
VERSION: "1.9.0"
VERSION: "1.10.0"
DOCKER_IMAGE_VERSION: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH:$VERSION
DOCKER_IMAGE_LATEST: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH

Expand Down
14 changes: 14 additions & 0 deletions CHANGELOG.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Changelog

## [1.10.0](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2023-12)

**Closed issues:**

- [\#18](https://github.com/Trendyol/kafka-retry-job/issues/18) moves messages with limit for each topic partition given from config
- [\#24](https://github.com/Trendyol/kafka-retry-job/issues/24) check if error topic is empty without assigning and consuming message
- [\#25](https://github.com/Trendyol/kafka-retry-job/issues/25) get retry topic name from header of error message
- [\#27](https://github.com/Trendyol/kafka-retry-job/issues/27) get auto-commit , manuel commit options from config

**Merged pull requests:**

- Pull Request for the issues #18 #24 #25 #27 [\#28](https://github.com/Trendyol/kafka-retry-job/pull/28) ([ahmetfurkankavraz](https://github.com/ahmetfurkankavraz))
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ Here is the explanation of environment variables. Please note that the config pr
- Error Suffix: The suffix of error topics
- Retry Suffix: The suffix of retry topics
- GroupId: GroupId for Retry Job Consumer
- RetryTopicNameInHeader: Retry topic name that will be presented in the header of each message to transfer them corresponding retry topic
- MessageConsumeLimitPerTopicPartition: Limit the total number of messages that can be consumed for a topic partition
- EnableAutoCommit: Enable/disable auto commit config
- EnableAutoOffsetStore: Enable/disable auto offset store config

## Getting Started

Expand Down
5 changes: 5 additions & 0 deletions src/Services/Implementations/ConfigurationService.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using Confluent.Kafka;
using KafkaRetry.Job.Exceptions;
using KafkaRetry.Job.Helpers;
Expand All @@ -18,6 +19,10 @@ public ConfigurationService(IConfiguration configuration)
public string TopicRegex => GetValueOrThrowInvalidConfigException("TopicRegex");
public string ErrorSuffix => GetValueOrThrowInvalidConfigException("ErrorSuffix");
public string RetrySuffix => GetValueOrThrowInvalidConfigException("RetrySuffix");
public string RetryTopicNameInHeader => GetValue<string>("RetryTopicNameInHeader");
public long MessageConsumeLimitPerTopicPartition => GetValue<long?>("MessageConsumeLimitPerTopicPartition") ?? Int64.MaxValue;
public bool EnableAutoCommit => GetValue<bool?>("EnableAutoCommit") ?? false;
public bool EnableAutoOffsetStore => GetValue<bool?>("EnableAutoOffsetStore") ?? false;
public string GroupId => GetValueOrThrowInvalidConfigException("GroupId");
public string SaslUsername => GetValue<string>("SaslUsername");
public string SaslPassword => GetValue<string>("SaslPassword");
Expand Down
118 changes: 78 additions & 40 deletions src/Services/Implementations/KafkaRetryJobService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,61 +31,78 @@ public async Task MoveMessages()
var adminClient = _kafkaService.BuildAdminClient();
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(120));
adminClient.Dispose();
var errorTopics = GetErrorTopicsFromCluster(metadata);


var errorTopicPartitionsWithLag = GetErrorTopicInfosFromCluster(assignedConsumer, metadata);
var errorTopics = errorTopicPartitionsWithLag.Select(p => p.Item1.Topic).Distinct().ToList();

_logService.LogMatchingErrorTopics(errorTopics);

using var producer = _kafkaService.BuildKafkaProducer();

var utcNow = DateTime.UtcNow;

var consumerCommitStrategy= _kafkaService.GetConsumerCommitStrategy();

try
{
foreach (var errorTopic in errorTopics)
var messageConsumeLimit = _configuration.MessageConsumeLimitPerTopicPartition;
if (messageConsumeLimit <= 0)
{
_logService.LogConsumerSubscribingTopic(errorTopic);

var topicPartitions = metadata.Topics.First(x => x.Topic == errorTopic).Partitions;

for (var partition = 0; partition < topicPartitions.Count; partition++)
_logService.LogMessageConsumeLimitIsZero();
return;
}

foreach (var (topicPartition, lag) in errorTopicPartitionsWithLag)
{
if (lag <= 0)
{
var topicPartition = new TopicPartition(errorTopic, partition);
assignedConsumer.Assign(topicPartition);
continue;
}

var messageConsumeLimitForTopicPartition = messageConsumeLimit;
_logService.LogStartOfSubscribingTopicPartition(topicPartition);

var errorTopic = topicPartition.Topic;
var currentLag = lag;

assignedConsumer.Assign(topicPartition);

while (currentLag > 0 && messageConsumeLimitForTopicPartition > 0)
{
var result = assignedConsumer.Consume(TimeSpan.FromSeconds(3));

while (true)
if (result is null)
{
var result = assignedConsumer.Consume(TimeSpan.FromSeconds(3));
break;
}

if (result is null)
{
_logService.LogEndOfPartition(topicPartition);
break;
}
currentLag -= 1;
messageConsumeLimitForTopicPartition -= 1;

var resultDate = result.Message.Timestamp.UtcDateTime;
var resultDate = result.Message.Timestamp.UtcDateTime;

if (utcNow < resultDate)
{
_logService.LogNewMessageArrived(utcNow);
break;
}
if (utcNow < resultDate)
{
_logService.LogNewMessageArrived(utcNow);
break;
}

result.Message.Timestamp = new Timestamp(DateTime.UtcNow);
result.Message.Timestamp = new Timestamp(DateTime.UtcNow);

var retryTopic =
errorTopic.ReplaceAtEnd(_configuration.ErrorSuffix, _configuration.RetrySuffix);
var retryTopic = GetRetryTopicName(result, errorTopic);

_logService.LogProducingMessage(result, errorTopic, retryTopic);
_logService.LogProducingMessage(result, errorTopic, retryTopic);

await producer.ProduceAsync(retryTopic, result.Message);
await producer.ProduceAsync(retryTopic, result.Message);

assignedConsumer.StoreOffset(result);
assignedConsumer.Commit();
}
consumerCommitStrategy.Invoke(assignedConsumer, result);
}

assignedConsumer.Unassign();
_logService.LogEndOfSubscribingTopicPartition(topicPartition);
}

assignedConsumer.Unassign();

}
catch (Exception e)
{
Expand All @@ -97,18 +114,39 @@ public async Task MoveMessages()
_logService.LogApplicationIsClosing();
}

private List<string> GetErrorTopicsFromCluster(Metadata metadata)
private string GetRetryTopicName(ConsumeResult<string,string> result , string errorTopic )
{
return !string.IsNullOrEmpty(_configuration.RetryTopicNameInHeader) &&
result.Message.Headers.TryGetLastBytes(_configuration.RetryTopicNameInHeader, out var retryTopicInHeader) ?
System.Text.Encoding.UTF8.GetString(retryTopicInHeader) :
errorTopic.ReplaceAtEnd(_configuration.ErrorSuffix, _configuration.RetrySuffix);
}

private List<(TopicPartition, long)> GetErrorTopicInfosFromCluster(IConsumer<string, string> assignedConsumer, Metadata metadata)
{
_logService.LogFetchingErrorTopicInfoStarted();

var topicRegex = _configuration.TopicRegex;
var clusterTopics = metadata.Topics.Select(t => t.Topic).ToList();
var errorTopicRegex = new Regex(topicRegex);

var errorTopics = clusterTopics
.Where(t => errorTopicRegex.IsMatch(t))
.Where(t => t.EndsWith(_configuration.ErrorSuffix))
.ToList();
var topicPartitionMetadata = metadata.Topics
.Where(t => errorTopicRegex.IsMatch(t.Topic))
.SelectMany(topic =>
topic.Partitions.Select(partition =>
new TopicPartition(topic.Topic, partition.PartitionId)))
.ToArray();

var topicsWithFoundOffsets = assignedConsumer.Committed(topicPartitionMetadata, TimeSpan.FromSeconds(10));

var topicPartitionInfos = topicsWithFoundOffsets.Select<TopicPartitionOffset, (TopicPartition, long)>(tpo => {
var watermark = assignedConsumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(5));
var lag = tpo.Offset >= 0 ? watermark.High - tpo.Offset : watermark.High - watermark.Low;
return (tpo.TopicPartition, lag);
}).ToList();

return errorTopics;
_logService.LogFetchingErrorTopicInfoFinished();

return topicPartitionInfos;
}
}
}
19 changes: 17 additions & 2 deletions src/Services/Implementations/KafkaService.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using Confluent.Kafka;
using KafkaRetry.Job.Services.Interfaces;

Expand Down Expand Up @@ -76,15 +77,29 @@ private ConsumerConfig CreateConsumerConfig(string bootstrapServers, string grou
BootstrapServers = bootstrapServers,
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = groupId,
EnableAutoCommit = false,
EnableAutoCommit = _configuration.EnableAutoCommit,
SaslUsername = _configuration.SaslUsername ?? string.Empty,
SaslPassword = _configuration.SaslPassword ?? string.Empty,
SslCaLocation = _configuration.SslCaLocation ?? string.Empty,
SaslMechanism = _configuration.SaslMechanism,
SecurityProtocol = _configuration.SecurityProtocol,
SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty,
EnableAutoOffsetStore = false
EnableAutoOffsetStore = _configuration.EnableAutoOffsetStore,
};
}

public Action<IConsumer<string, string>, ConsumeResult<string, string>> GetConsumerCommitStrategy()
{
return _configuration.EnableAutoCommit ?
(assignedConsumer, result) =>
{
assignedConsumer.StoreOffset(result);
} :
(assignedConsumer, result) =>
{
assignedConsumer.StoreOffset(result);
assignedConsumer.Commit();
};
}
}
}
24 changes: 22 additions & 2 deletions src/Services/Implementations/LogService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,14 @@ public void LogConsumerIsNotAssigned()
_logger.LogInformation("Consumer is not assigned to any partition");
}

public void LogConsumerSubscribingTopic(string topic)
public void LogStartOfSubscribingTopicPartition(TopicPartition topicPartition)
{
_logger.LogInformation($"Consumer is subscribing to topic: {topic}");
_logger.LogInformation($"Start of partition: {topicPartition.Partition}, topic: {topicPartition.Topic}.");
}

public void LogEndOfSubscribingTopicPartition(TopicPartition topicPartition)
{
_logger.LogInformation($"End of partition: {topicPartition.Partition}, topic: {topicPartition.Topic}.");
}

public void LogLastCommittedOffset(TopicPartitionOffset tpo)
Expand All @@ -79,5 +84,20 @@ public void LogApplicationIsClosing()
{
_logger.LogInformation("Application is ending, closing consumer and producer");
}

public void LogFetchingErrorTopicInfoStarted()
{
_logger.LogInformation("Fetching error topic info is started");
}

public void LogFetchingErrorTopicInfoFinished()
{
_logger.LogInformation("Fetching error topic info is finished");
}

public void LogMessageConsumeLimitIsZero()
{
_logger.LogError("Parameter MessageConsumeLimit cannot be zero");
}
}
}
2 changes: 2 additions & 0 deletions src/Services/Interfaces/IKafkaService.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using Confluent.Kafka;

namespace KafkaRetry.Job.Services.Interfaces
Expand All @@ -7,5 +8,6 @@ public interface IKafkaService
IConsumer<string, string> BuildKafkaConsumer();
IProducer<string, string> BuildKafkaProducer();
IAdminClient BuildAdminClient();
public Action<IConsumer<string, string>, ConsumeResult<string, string>> GetConsumerCommitStrategy();
}
}
6 changes: 5 additions & 1 deletion src/Services/Interfaces/ILogService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ public interface ILogService
void LogProducingMessage(ConsumeResult<string, string> result, string errorTopic, string retryTopic);
void LogAssignedPartitions(string partitions);
void LogConsumerIsNotAssigned();
void LogConsumerSubscribingTopic(string topic);
void LogStartOfSubscribingTopicPartition(TopicPartition topicPartition);
void LogEndOfSubscribingTopicPartition(TopicPartition topicPartition);
void LogLastCommittedOffset(TopicPartitionOffset tpo);
void LogNewMessageArrived(DateTime utcNow);
void LogApplicationIsClosing();
void LogFetchingErrorTopicInfoStarted();
void LogFetchingErrorTopicInfoFinished();
public void LogMessageConsumeLimitIsZero();
}
}

0 comments on commit 7913141

Please sign in to comment.