Skip to content

Commit

Permalink
fix: fix IndexOutOfRangeException on registering offset actions
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Sep 28, 2021
1 parent 941cd54 commit 228664d
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static class DictionaryExtensions
/// <typeparam name="TKey">The key type</typeparam>
/// <typeparam name="TValue">The value type</typeparam>
/// <returns></returns>
public static TValue GetOrAdd<TKey, TValue>(
public static TValue SafeGetOrAdd<TKey, TValue>(
this IDictionary<TKey, TValue> dictionary,
TKey key,
Func<TKey, TValue> addFactory)
Expand Down
7 changes: 4 additions & 3 deletions src/KafkaFlow/Consumers/OffsetManager.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
namespace KafkaFlow.Consumers
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using Confluent.Kafka;

internal class OffsetManager : IOffsetManager, IDisposable
{
private readonly Dictionary<TopicPartitionOffset, List<Action>> onProcessedActions = new();
private readonly ConcurrentDictionary<TopicPartitionOffset, ConcurrentBag<Action>> onProcessedActions = new();

private readonly IOffsetCommitter committer;
private readonly Dictionary<(string, int), PartitionOffsets> partitionsOffsets;
Expand Down Expand Up @@ -46,7 +47,7 @@ public void MarkAsProcessed(TopicPartitionOffset offset)
public void OnOffsetProcessed(TopicPartitionOffset offset, Action action)
{
this.onProcessedActions
.GetOrAdd(offset, _ => new List<Action>())
.SafeGetOrAdd(offset, _ => new())
.Add(action);
}

Expand All @@ -72,7 +73,7 @@ private void ExecuteOffsetActions(TopicPartitionOffset offset)
return;
}

this.onProcessedActions.Remove(offset);
this.onProcessedActions.TryRemove(offset, out _);

foreach (var action in actions)
{
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaFlow/MiddlewareExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private IMessageMiddleware GetConsumerOrProducerInstance(
int index,
MiddlewareConfiguration configuration)
{
return this.consumerOrProducerMiddlewares.GetOrAdd(
return this.consumerOrProducerMiddlewares.SafeGetOrAdd(
index,
_ => CreateInstance(dependencyResolver, configuration));
}
Expand All @@ -109,7 +109,7 @@ private IMessageMiddleware GetWorkerInstance(
IMessageContext context,
MiddlewareConfiguration configuration)
{
return this.workersMiddlewares.GetOrAdd(
return this.workersMiddlewares.SafeGetOrAdd(
(index, context.ConsumerContext?.WorkerId ?? 0),
_ => CreateInstance(dependencyResolver, configuration));
}
Expand Down

0 comments on commit 228664d

Please sign in to comment.