Skip to content

Commit

Permalink
Trendyol#29 and Trendyol#30 change the error catching mechanism to to…
Browse files Browse the repository at this point in the history
…pic-partition-based and apply topic-based parallel programming to optimize program running time
  • Loading branch information
furkan.kavraz committed May 17, 2024
1 parent f28b0da commit bba2d28
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 43 deletions.
1 change: 1 addition & 0 deletions src/Services/Implementations/ConfigurationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public ConfigurationService(IConfiguration configuration)
public int? MessageTimeoutMs => GetValue<int?>("ProducerMessageTimeoutMs");
public int? RequestTimeoutMs => GetValue<int?>("ProducerRequestTimeoutMs");
public int? MessageMaxBytes => GetValue<int?>("ProducerMessageMaxBytes");
public int MaxLevelParallelism => GetValue<int?>("MaxLevelParallelism") ?? 1;

private string GetValueOrThrowInvalidConfigException(string configName)
{
Expand Down
110 changes: 67 additions & 43 deletions src/Services/Implementations/KafkaRetryJobService.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using KafkaRetry.Job.Services.Interfaces;
Expand All @@ -26,43 +28,71 @@ public KafkaRetryJobService(IKafkaService kafkaService,
public async Task MoveMessages()
{
_logService.LogApplicationStarted();

using var assignedConsumer = _kafkaService.BuildKafkaConsumer();

var adminClient = _kafkaService.BuildAdminClient();
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(120));
adminClient.Dispose();

var errorTopicPartitionsWithLag = GetErrorTopicInfosFromCluster(assignedConsumer, metadata);
var errorTopics = errorTopicPartitionsWithLag.Select(p => p.Item1.Topic).Distinct().ToList();
var assignedConsumerPool = new ThreadLocal<IConsumer<string, string>>(() => _kafkaService.BuildKafkaConsumer());
var producerPool = new ThreadLocal<IProducer<string, string>>(() => _kafkaService.BuildKafkaProducer());

var errorTopicsWithLag = GetErrorTopicInfosFromCluster(assignedConsumerPool.Value, metadata);
var errorTopics = errorTopicsWithLag.Keys.ToList();

_logService.LogMatchingErrorTopics(errorTopics);

var consumerCommitStrategy= _kafkaService.GetConsumerCommitStrategy();

var messageConsumeLimit = _configuration.MessageConsumeLimitPerTopicPartition;
if (messageConsumeLimit <= 0)
{
_logService.LogMessageConsumeLimitIsZero();
return;
}

var maxDegreeOfParallelism = _configuration.MaxLevelParallelism;
var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
var tasks = new List<Task>();

foreach (var (_, topicPartitionsWithLag) in errorTopicsWithLag)
{
await semaphore.WaitAsync();
tasks.Add(Task.Run(async () =>
{
await MoveMessagesForTopic(topicPartitionsWithLag, assignedConsumerPool, producerPool,
consumerCommitStrategy);
semaphore.Release();
}));
}

using var producer = _kafkaService.BuildKafkaProducer();

var utcNow = DateTime.UtcNow;
Task.WaitAll(tasks.ToArray());
assignedConsumerPool.Value.Dispose();

_logService.LogApplicationIsClosing();
}

var consumerCommitStrategy= _kafkaService.GetConsumerCommitStrategy();
private async Task MoveMessagesForTopic(
List<(TopicPartition, long)> topicPartitionsWithLag,
ThreadLocal<IConsumer<string, string>> assignedConsumerPool,
ThreadLocal<IProducer<string, string>> producerPool,
Action<IConsumer<string,string>, ConsumeResult<string,string>> consumerCommitStrategy
) {
var assignedConsumer = assignedConsumerPool.Value;
var producer = producerPool.Value;

try
foreach (var (topicPartition, lag) in topicPartitionsWithLag)
{
var messageConsumeLimit = _configuration.MessageConsumeLimitPerTopicPartition;
if (messageConsumeLimit <= 0)
var errorTopic = topicPartition.Topic;
if (lag <= 0)
{
_logService.LogMessageConsumeLimitIsZero();
return;
continue;
}

foreach (var (topicPartition, lag) in errorTopicPartitionsWithLag)
try
{
if (lag <= 0)
{
continue;
}

var messageConsumeLimitForTopicPartition = messageConsumeLimit;
var messageConsumeLimitForTopicPartition = _configuration.MessageConsumeLimitPerTopicPartition;
_logService.LogStartOfSubscribingTopicPartition(topicPartition);

var errorTopic = topicPartition.Topic;
var currentLag = lag;

assignedConsumer.Assign(topicPartition);
Expand All @@ -78,15 +108,7 @@ public async Task MoveMessages()

currentLag -= 1;
messageConsumeLimitForTopicPartition -= 1;

var resultDate = result.Message.Timestamp.UtcDateTime;

if (utcNow < resultDate)
{
_logService.LogNewMessageArrived(utcNow);
break;
}


result.Message.Timestamp = new Timestamp(DateTime.UtcNow);

var retryTopic = GetRetryTopicName(result, errorTopic);
Expand All @@ -97,21 +119,18 @@ public async Task MoveMessages()

consumerCommitStrategy.Invoke(assignedConsumer, result);
}

assignedConsumer.Unassign();

_logService.LogEndOfSubscribingTopicPartition(topicPartition);
}

assignedConsumer.Unassign();

}
catch (Exception e)
{
_logService.LogError(e);
assignedConsumer.Unassign();
throw;
catch (Exception e)
{
_logService.LogError(e);
assignedConsumer.Unassign();
throw;
}
}

_logService.LogApplicationIsClosing();
}

private string GetRetryTopicName(ConsumeResult<string,string> result , string errorTopic )
Expand All @@ -122,7 +141,7 @@ private string GetRetryTopicName(ConsumeResult<string,string> result , string er
errorTopic.ReplaceAtEnd(_configuration.ErrorSuffix, _configuration.RetrySuffix);
}

private List<(TopicPartition, long)> GetErrorTopicInfosFromCluster(IConsumer<string, string> assignedConsumer, Metadata metadata)
private IDictionary<string, List<(TopicPartition, long)>> GetErrorTopicInfosFromCluster(IConsumer<string, string> assignedConsumer, Metadata metadata)
{
_logService.LogFetchingErrorTopicInfoStarted();

Expand All @@ -142,7 +161,12 @@ private string GetRetryTopicName(ConsumeResult<string,string> result , string er
var watermark = assignedConsumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(5));
var lag = tpo.Offset >= 0 ? watermark.High - tpo.Offset : watermark.High - watermark.Low;
return (tpo.TopicPartition, lag);
}).ToList();
})
.GroupBy(t => t.Item1.Topic)
.ToImmutableDictionary(
t => t.Key,
t => t.ToList()
);

_logService.LogFetchingErrorTopicInfoFinished();

Expand Down

0 comments on commit bba2d28

Please sign in to comment.