Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MemoryTelemetryStorage concurrency issue #286

Merged

Conversation

SonicGD
Copy link
Contributor

@SonicGD SonicGD commented Jul 14, 2022

Description

Hello. We are getting these exceptions in out monitoring service:

Error executing consumer: {"Message":redacted}
ArgumentException: The index is equal to or greater than the length of the array, or the number of elements in the dictionary is greater than the available space from index to the end of the destination array.

  at System.Collections.Concurrent.ConcurrentDictionary`2.System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<TKey,TValue>>.CopyTo(KeyValuePair`2[] array, Int32 index)
   at System.Collections.Generic.List`1..ctor(IEnumerable`1 collection)
   at System.Linq.Enumerable.ToList[TSource](IEnumerable`1 source)
   at KafkaFlow.Admin.MemoryTelemetryStorage.CleanExpiredItems()
   at KafkaFlow.Admin.MemoryTelemetryStorage.TryCleanItems()
   at KafkaFlow.Admin.MemoryTelemetryStorage.Put(ConsumerTelemetryMetric telemetryMetric)
   at KafkaFlow.Admin.Handlers.ConsumerTelemetryMetricHandler.Handle(IMessageContext context, ConsumerTelemetryMetric message)
   at KafkaFlow.TypedHandler.TypedHandlerMiddleware.<>c__DisplayClass3_0.<Invoke>b__0(Type handler)
   at System.Linq.Enumerable.WhereSelectListIterator`2.MoveNext()
   at System.Threading.Tasks.Task.WhenAll(IEnumerable`1 tasks)
   at KafkaFlow.TypedHandler.TypedHandlerMiddleware.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at KafkaFlow.SerializerConsumerMiddleware.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at KafkaFlow.Consumers.ConsumerWorker.<<StartAsync>b__14_0>d.MoveNext()

The problem is call to ToList() on ConcurrentDictionary when it's updating. It can be reproduced by this code:

using System.Collections.Concurrent;

var dictionary = new ConcurrentDictionary<Guid, DateTimeOffset>();

var cts = new CancellationTokenSource();
UpdateAsync(dictionary, cts.Token);
UpdateAsync(dictionary, cts.Token);
UpdateAsync(dictionary, cts.Token);
UpdateAsync(dictionary, cts.Token);
UpdateAsync(dictionary, cts.Token);
UpdateAsync(dictionary, cts.Token);
UpdateAsync(dictionary, cts.Token);
UpdateAsync(dictionary, cts.Token);

while (!cts.Token.IsCancellationRequested)
{
    await Task.Delay(TimeSpan.FromSeconds(1));
    foreach (var pair in dictionary.ToList())
    {
        dictionary.TryRemove(pair.Key, out _);
    }
}

async Task UpdateAsync(ConcurrentDictionary<Guid, DateTimeOffset> data, CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        data[Guid.NewGuid()] = DateTimeOffset.UtcNow;
        await Task.Delay(TimeSpan.FromMilliseconds(50));
    }
}

After some time it will throw

Unhandled exception. System.ArgumentException: The index is equal to or greater than the length of the array, or the number of elements in the dictionary is greater than the available space from index to the end of the destination array.
   at System.Collections.Concurrent.ConcurrentDictionary`2.System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<TKey,TValue>>.CopyTo(KeyValuePair`2[] array, Int32 index)
   at System.Collections.Generic.List`1..ctor(IEnumerable`1 collection)
   at System.Linq.Enumerable.ToList[TSource](IEnumerable`1 source)
   at Program.<Main>$(String[] args) in D:\Projects\experiments\TestDictionary\TestDictionary\Program.cs:line 20
   at Program.<Main>(String[] args)

It can be fixed by locking access to dictionary or by replacing ToList() call with using .Keys to iterate on existing items.

How Has This Been Tested?

We use MemoryTelemetryStorage with this fix in our service for several days. Exceptions are gone.

Checklist

  • My code follows the style guidelines of this project
  • I have performed a self-review of my own code
  • I have added tests to cover my changes
  • I have made corresponding changes to the documentation

@SonicGD SonicGD marked this pull request as ready for review July 14, 2022 11:16
@SonicGD SonicGD force-pushed the telemetry-storage-concurrency-issue-fix branch from 9b90861 to 6258083 Compare July 14, 2022 11:19
@filipeesch filipeesch merged commit 8f793c2 into Farfetch:master Jul 14, 2022
@SonicGD
Copy link
Contributor Author

SonicGD commented Jul 14, 2022

Thanks for fast review and release!

@filipeesch
Copy link
Contributor

We are having a problem with the nuget package publishing. We are trying to solve ASAP :)

@SonicGD
Copy link
Contributor Author

SonicGD commented Jul 18, 2022

Hello. Any news on nuget package?)

@filipeesch
Copy link
Contributor

Not yet, sorry.

@filipeesch
Copy link
Contributor

Hi @SonicGD , the problem was solved and the release was launched.

@SonicGD
Copy link
Contributor Author

SonicGD commented Jul 21, 2022

Hi @SonicGD , the problem was solved and the release was launched.

Thanks! Updating now =)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants