Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConnectionMultiplexer.Subscription is not Thread-safe #2763

Closed
Chuck-EP opened this issue Jul 18, 2024 · 0 comments
Closed

ConnectionMultiplexer.Subscription is not Thread-safe #2763

Chuck-EP opened this issue Jul 18, 2024 · 0 comments

Comments

@Chuck-EP
Copy link
Contributor

Chuck-EP commented Jul 18, 2024

Version: 2.8.0 (Latest Stable)

Problem

The documentation states that ConnectionMultiplexer is fully thread-safe, but its Subscription internal class is not, causing problems in some operations such as Subscribe, SubscribeAsync, Unsubscribe, UnsubscribeAsync when called with a handler.

The issue stems from the Add(...) and Remove(...) functions, while delegates are immutable and cannot be corrupted, the += and -= operations are not atomic and not thread-safe. When a delegate updates itself, it creates a copy of its callback list, modifies it and then update its internal reference, if called from multiple threads some callbacks may be incorrectly lost or kept.

Example

I've made a very simple example that only calls Subscribe & Unsubscribe in a multi-threaded way and uses reflection to inspect Subscription._handlers to show that it incorrectly lose or keep callbacks that it should not.

(Please note that I've also tested both Async version of the same functions and had similar results)

using System.Reflection;
using StackExchange.Redis;

public static class Program
{
    public static async Task Main(string[] args)
    {
        ConnectionMultiplexer pubsub = await ConnectionMultiplexer.ConnectAsync("localhost:6379");
        ISubscriber subscriber = pubsub.GetSubscriber();

        const int COUNT = 1000;
        const string CHANNEL = "CHANNEL:TEST";

        void Handler(RedisChannel channel, RedisValue value) 
        {
            Console.WriteLine(value);
        };
        
        List<Action> subscribes = new List<Action>(COUNT);
        for (int i = 0; i < COUNT; i++)
            subscribes.Add(() => subscriber.Subscribe(CHANNEL, Handler));
        Parallel.ForEach(subscribes, (action) => { action(); });

        PrintInternalSubscribers(subscriber, CHANNEL, COUNT);

        List<Action> unsubscribes = new List<Action>(COUNT);
        for (int i = 0; i < COUNT; i++)
            unsubscribes.Add(() => subscriber.Unsubscribe(CHANNEL, Handler));
        Parallel.ForEach(unsubscribes, (action) => { action(); });

        PrintInternalSubscribers(subscriber, CHANNEL, 0);
    }

    private static void PrintInternalSubscribers(ISubscriber subscriber, string channel, int expected)
    {
        MethodInfo methodInfo = subscriber.Multiplexer.GetType().GetMethod("TryGetSubscription", BindingFlags.NonPublic | BindingFlags.Instance);
        object[] parameters = new object[] { RedisChannel.Literal(channel) , null };
        methodInfo.Invoke(subscriber.Multiplexer, parameters);
        object subscription = parameters[1];
        int count = 0;
        if(subscription != null)
        {
            FieldInfo fieldInfo = subscription.GetType().GetField("_handlers", BindingFlags.NonPublic | BindingFlags.Instance);
            Action<RedisChannel, RedisValue> handlers = fieldInfo.GetValue(subscription) as Action<RedisChannel, RedisValue>;
            count = handlers != null ? handlers.GetInvocationList().Length : 0;
        }
        Console.WriteLine($"InvocationList Actual: {count} Expected: {expected}");
    }
}

Here's the output of a typical run:
image

Here's the output of the same program after changing Parallel.ForEach for a regular foreach making it single threaded:
image

Fix ?

A simple solution could be to add a lock around those operations. Jon Skeet has a very good article about delegate & thread-safety

I've created a PR #2769 that does exactly this and has unit tests.

Related

The following github issues are most likely related: #2115 #2458

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant