diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 185a679f4..00c63d0ff 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -9,6 +9,7 @@ Current package versions: ## Unreleased - Fix [#2951](https://github.com/StackExchange/StackExchange.Redis/issues/2951) - sentinel reconnection failure ([#2956 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2956)) +- Mitigate [#2955](https://github.com/StackExchange/StackExchange.Redis/issues/2955) (unbalanced pub/sub routing) ([#2958 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2958)) ## 2.9.17 diff --git a/src/StackExchange.Redis/Message.cs b/src/StackExchange.Redis/Message.cs index 5973bd55b..a3c19ab93 100644 --- a/src/StackExchange.Redis/Message.cs +++ b/src/StackExchange.Redis/Message.cs @@ -866,7 +866,8 @@ protected CommandChannelBase(int db, CommandFlags flags, RedisCommand command, i public override string CommandAndKey => Command + " " + Channel; - public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => serverSelectionStrategy.HashSlot(Channel); + public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) + => Channel.IsKeyRouted ? serverSelectionStrategy.HashSlot(Channel) : ServerSelectionStrategy.NoSlot; } internal abstract class CommandKeyBase : Message diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index 10044dc9b..0abb20043 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -2051,3 +2051,4 @@ StackExchange.Redis.IServer.ExecuteAsync(int? database, string! command, System. [SER001]static StackExchange.Redis.VectorSetAddRequest.Member(StackExchange.Redis.RedisValue element, System.ReadOnlyMemory values, string? attributesJson = null) -> StackExchange.Redis.VectorSetAddRequest! [SER001]static StackExchange.Redis.VectorSetSimilaritySearchRequest.ByMember(StackExchange.Redis.RedisValue member) -> StackExchange.Redis.VectorSetSimilaritySearchRequest! [SER001]static StackExchange.Redis.VectorSetSimilaritySearchRequest.ByVector(System.ReadOnlyMemory vector) -> StackExchange.Redis.VectorSetSimilaritySearchRequest! +StackExchange.Redis.RedisChannel.WithKeyRouting() -> StackExchange.Redis.RedisChannel diff --git a/src/StackExchange.Redis/RedisChannel.cs b/src/StackExchange.Redis/RedisChannel.cs index 2e0d7fc6c..d4289f3c6 100644 --- a/src/StackExchange.Redis/RedisChannel.cs +++ b/src/StackExchange.Redis/RedisChannel.cs @@ -18,10 +18,20 @@ internal enum RedisChannelOptions None = 0, Pattern = 1 << 0, Sharded = 1 << 1, + KeyRouted = 1 << 2, } + // we don't consider Routed for equality - it's an implementation detail, not a fundamental feature + private const RedisChannelOptions EqualityMask = ~RedisChannelOptions.KeyRouted; + internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH; + /// + /// Should we use cluster routing for this channel? This applies *either* to sharded (SPUBLISH) scenarios, + /// or to scenarios using . + /// + internal bool IsKeyRouted => (Options & RedisChannelOptions.KeyRouted) != 0; + /// /// Indicates whether the channel-name is either null or a zero-length value. /// @@ -51,24 +61,44 @@ public static bool UseImplicitAutoPattern private static PatternMode s_DefaultPatternMode = PatternMode.Auto; /// - /// Creates a new that does not act as a wildcard subscription. + /// Creates a new that does not act as a wildcard subscription. In cluster + /// environments, this channel will be freely routed to any applicable server - different client nodes + /// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with + /// very few channels. In non-cluster environments, routing is not a consideration. + /// + public static RedisChannel Literal(string value) => new(value, RedisChannelOptions.None); + + /// + /// Creates a new that does not act as a wildcard subscription. In cluster + /// environments, this channel will be freely routed to any applicable server - different client nodes + /// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with + /// very few channels. In non-cluster environments, routing is not a consideration. /// - public static RedisChannel Literal(string value) => new RedisChannel(value, PatternMode.Literal); + public static RedisChannel Literal(byte[] value) => new(value, RedisChannelOptions.None); /// - /// Creates a new that does not act as a wildcard subscription. + /// In cluster environments, this channel will be routed using similar rules to , which is suitable + /// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not + /// a consideration. /// - public static RedisChannel Literal(byte[] value) => new RedisChannel(value, PatternMode.Literal); + /// Note that channels from Sharded are always routed. + public RedisChannel WithKeyRouting() => new(Value, Options | RedisChannelOptions.KeyRouted); /// - /// Creates a new that acts as a wildcard subscription. + /// Creates a new that acts as a wildcard subscription. In cluster + /// environments, this channel will be freely routed to any applicable server - different client nodes + /// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with + /// very few channels. In non-cluster environments, routing is not a consideration. /// - public static RedisChannel Pattern(string value) => new RedisChannel(value, PatternMode.Pattern); + public static RedisChannel Pattern(string value) => new(value, RedisChannelOptions.Pattern); /// - /// Creates a new that acts as a wildcard subscription. + /// Creates a new that acts as a wildcard subscription. In cluster + /// environments, this channel will be freely routed to any applicable server - different client nodes + /// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with + /// very few channels. In non-cluster environments, routing is not a consideration. /// - public static RedisChannel Pattern(byte[] value) => new RedisChannel(value, PatternMode.Pattern); + public static RedisChannel Pattern(byte[] value) => new(value, RedisChannelOptions.Pattern); /// /// Create a new redis channel from a buffer, explicitly controlling the pattern mode. @@ -84,21 +114,32 @@ public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatt /// /// The string name of the channel to create. /// The mode for name matching. + // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode) { } /// - /// Create a new redis channel from a buffer, representing a sharded channel. + /// Create a new redis channel from a buffer, representing a sharded channel. In cluster + /// environments, this channel will be routed using similar rules to , which is suitable + /// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not + /// a consideration. /// /// The name of the channel to create. - public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded); + /// Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions + /// using sharded channels must also be published with sharded channels (and vice versa). + public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted); /// - /// Create a new redis channel from a string, representing a sharded channel. + /// Create a new redis channel from a string, representing a sharded channel. In cluster + /// environments, this channel will be routed using similar rules to , which is suitable + /// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not + /// a consideration. /// /// The string name of the channel to create. - public static RedisChannel Sharded(string value) => new(value is null ? null : Encoding.UTF8.GetBytes(value), RedisChannelOptions.Sharded); + /// Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions + /// using sharded channels must also be published with sharded channels (and vice versa). + public static RedisChannel Sharded(string value) => new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted); internal RedisChannel(byte[]? value, RedisChannelOptions options) { @@ -106,6 +147,12 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options) Options = options; } + internal RedisChannel(string? value, RedisChannelOptions options) + { + Value = value is null ? null : Encoding.UTF8.GetBytes(value); + Options = options; + } + private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch { PatternMode.Auto => value != null && Array.IndexOf(value, (byte)'*') >= 0, @@ -155,7 +202,8 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options) /// The first to compare. /// The second to compare. public static bool operator ==(RedisChannel x, RedisChannel y) => - x.Options == y.Options && RedisValue.Equals(x.Value, y.Value); + (x.Options & EqualityMask) == (y.Options & EqualityMask) + && RedisValue.Equals(x.Value, y.Value); /// /// Indicate whether two channel names are equal. @@ -163,7 +211,8 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options) /// The first to compare. /// The second to compare. public static bool operator ==(string x, RedisChannel y) => - RedisValue.Equals(x == null ? null : Encoding.UTF8.GetBytes(x), y.Value); + // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract + RedisValue.Equals(x is null ? null : Encoding.UTF8.GetBytes(x), y.Value); /// /// Indicate whether two channel names are equal. @@ -178,7 +227,8 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options) /// The first to compare. /// The second to compare. public static bool operator ==(RedisChannel x, string y) => - RedisValue.Equals(x.Value, y == null ? null : Encoding.UTF8.GetBytes(y)); + // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract + RedisValue.Equals(x.Value, y is null ? null : Encoding.UTF8.GetBytes(y)); /// /// Indicate whether two channel names are equal. @@ -203,10 +253,11 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options) /// Indicate whether two channel names are equal. /// /// The to compare to. - public bool Equals(RedisChannel other) => Options == other.Options && RedisValue.Equals(Value, other.Value); + public bool Equals(RedisChannel other) => (Options & EqualityMask) == (other.Options & EqualityMask) + && RedisValue.Equals(Value, other.Value); /// - public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)Options; + public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)(Options & EqualityMask); /// /// Obtains a string representation of the channel name. @@ -266,23 +317,21 @@ public enum PatternMode [Obsolete("It is preferable to explicitly specify a " + nameof(PatternMode) + ", or use the " + nameof(Literal) + "/" + nameof(Pattern) + " methods", error: false)] public static implicit operator RedisChannel(string key) { - if (key == null) return default; + // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract + if (key is null) return default; return new RedisChannel(Encoding.UTF8.GetBytes(key), s_DefaultPatternMode); } /// - /// Create a channel name from a . + /// Create a channel name from a byte[]. /// /// The byte array to get a channel from. [Obsolete("It is preferable to explicitly specify a " + nameof(PatternMode) + ", or use the " + nameof(Literal) + "/" + nameof(Pattern) + " methods", error: false)] public static implicit operator RedisChannel(byte[]? key) - { - if (key == null) return default; - return new RedisChannel(key, s_DefaultPatternMode); - } + => key is null ? default : new RedisChannel(key, s_DefaultPatternMode); /// - /// Obtain the channel name as a . + /// Obtain the channel name as a byte[]. /// /// The channel to get a byte[] from. public static implicit operator byte[]?(RedisChannel key) => key.Value; @@ -294,7 +343,7 @@ public static implicit operator RedisChannel(byte[]? key) public static implicit operator string?(RedisChannel key) { var arr = key.Value; - if (arr == null) + if (arr is null) { return null; } @@ -303,9 +352,7 @@ public static implicit operator RedisChannel(byte[]? key) return Encoding.UTF8.GetString(arr); } catch (Exception e) when // Only catch exception throwed by Encoding.UTF8.GetString - (e is DecoderFallbackException - || e is ArgumentException - || e is ArgumentNullException) + (e is DecoderFallbackException or ArgumentException or ArgumentNullException) { return BitConverter.ToString(arr); } @@ -316,8 +363,12 @@ public static implicit operator RedisChannel(byte[]? key) // giving due consideration to the default pattern mode (UseImplicitAutoPattern) // (since we don't ship them, we don't need them in release) [Obsolete("Watch for " + nameof(UseImplicitAutoPattern), error: true)] + // ReSharper disable once UnusedMember.Local + // ReSharper disable once UnusedParameter.Local private RedisChannel(string value) => throw new NotSupportedException(); [Obsolete("Watch for " + nameof(UseImplicitAutoPattern), error: true)] + // ReSharper disable once UnusedMember.Local + // ReSharper disable once UnusedParameter.Local private RedisChannel(byte[]? value) => throw new NotSupportedException(); #endif } diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index 349864a1b..bcda4146b 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -1871,14 +1871,16 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags { if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel)); var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); - return ExecuteSync(msg, ResultProcessor.Int64); + // if we're actively subscribed: send via that connection (otherwise, follow normal rules) + return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel)); } public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel)); var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); - return ExecuteAsync(msg, ResultProcessor.Int64); + // if we're actively subscribed: send via that connection (otherwise, follow normal rules) + return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel)); } public RedisResult Execute(string command, params object[] args) diff --git a/src/StackExchange.Redis/RedisSubscriber.cs b/src/StackExchange.Redis/RedisSubscriber.cs index b641baf05..8ff9610b0 100644 --- a/src/StackExchange.Redis/RedisSubscriber.cs +++ b/src/StackExchange.Redis/RedisSubscriber.cs @@ -182,18 +182,16 @@ public Subscription(CommandFlags flags) /// internal Message GetMessage(RedisChannel channel, SubscriptionAction action, CommandFlags flags, bool internalCall) { - var isPattern = channel.IsPattern; - var isSharded = channel.IsSharded; - var command = action switch + var command = action switch // note that the Routed flag doesn't impact the message here - just the routing { - SubscriptionAction.Subscribe => channel.Options switch + SubscriptionAction.Subscribe => (channel.Options & ~RedisChannel.RedisChannelOptions.KeyRouted) switch { RedisChannel.RedisChannelOptions.None => RedisCommand.SUBSCRIBE, RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PSUBSCRIBE, RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SSUBSCRIBE, _ => Unknown(action, channel.Options), }, - SubscriptionAction.Unsubscribe => channel.Options switch + SubscriptionAction.Unsubscribe => (channel.Options & ~RedisChannel.RedisChannelOptions.KeyRouted) switch { RedisChannel.RedisChannelOptions.None => RedisCommand.UNSUBSCRIBE, RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PUNSUBSCRIBE, @@ -384,14 +382,16 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags { ThrowIfNull(channel); var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); - return ExecuteSync(msg, ResultProcessor.Int64); + // if we're actively subscribed: send via that connection (otherwise, follow normal rules) + return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel)); } public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { ThrowIfNull(channel); var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); - return ExecuteAsync(msg, ResultProcessor.Int64); + // if we're actively subscribed: send via that connection (otherwise, follow normal rules) + return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel)); } void ISubscriber.Subscribe(RedisChannel channel, Action handler, CommandFlags flags) diff --git a/src/StackExchange.Redis/ServerSelectionStrategy.cs b/src/StackExchange.Redis/ServerSelectionStrategy.cs index 44241d373..4084b4c33 100644 --- a/src/StackExchange.Redis/ServerSelectionStrategy.cs +++ b/src/StackExchange.Redis/ServerSelectionStrategy.cs @@ -47,7 +47,13 @@ internal sealed class ServerSelectionStrategy }; private readonly ConnectionMultiplexer multiplexer; - private int anyStartOffset; + private int anyStartOffset = SharedRandom.Next(); // initialize to a random value so routing isn't uniform + + #if NET6_0_OR_GREATER + private static Random SharedRandom => Random.Shared; + #else + private static Random SharedRandom { get; } = new(); + #endif private ServerEndPoint[]? map; diff --git a/tests/StackExchange.Redis.Tests/ClusterTests.cs b/tests/StackExchange.Redis.Tests/ClusterTests.cs index 6fa963b8a..8146dc9be 100644 --- a/tests/StackExchange.Redis.Tests/ClusterTests.cs +++ b/tests/StackExchange.Redis.Tests/ClusterTests.cs @@ -743,16 +743,40 @@ public async Task ConnectIncludesSubscriber() } [Theory] - [InlineData(true)] - [InlineData(false)] - public async Task ClusterPubSub(bool sharded) + [InlineData(true, false)] + [InlineData(true, true)] + [InlineData(false, false)] + [InlineData(false, true)] + public async Task ClusterPubSub(bool sharded, bool withKeyRouting) { var guid = Guid.NewGuid().ToString(); var channel = sharded ? RedisChannel.Sharded(guid) : RedisChannel.Literal(guid); + if (withKeyRouting) + { + channel = channel.WithKeyRouting(); + } await using var conn = Create(keepAlive: 1, connectTimeout: 3000, shared: false, require: sharded ? RedisFeatures.v7_0_0_rc1 : RedisFeatures.v2_0_0); Assert.True(conn.IsConnected); var pubsub = conn.GetSubscriber(); + HashSet eps = []; + for (int i = 0; i < 10; i++) + { + var ep = Format.ToString(await pubsub.IdentifyEndpointAsync(channel)); + Log($"Channel {channel} => {ep}"); + eps.Add(ep); + } + + if (sharded | withKeyRouting) + { + Assert.Single(eps); + } + else + { + // if not routed: we should have at least two different endpoints + Assert.True(eps.Count > 1); + } + List<(RedisChannel, RedisValue)> received = []; var queue = await pubsub.SubscribeAsync(channel); _ = Task.Run(async () => @@ -766,16 +790,28 @@ public async Task ClusterPubSub(bool sharded) } } }); - + var subscribedEp = Format.ToString(pubsub.SubscribedEndpoint(channel)); + Log($"Subscribed to {subscribedEp}"); + Assert.NotNull(subscribedEp); + if (sharded | withKeyRouting) + { + Assert.Equal(eps.Single(), subscribedEp); + } var db = conn.GetDatabase(); await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) await db.PingAsync(); for (int i = 0; i < 10; i++) { - // check we get a hit - Assert.Equal(1, await db.PublishAsync(channel, i.ToString())); + // publish + var receivers = await db.PublishAsync(channel, i.ToString()); + + // check we get a hit (we are the only subscriber, and because we prefer to + // use our own subscribed connection: we can reliably expect to see this hit) + Log($"Published {i} to {receivers} receiver(s) against the receiving server."); + Assert.Equal(1, receivers); } - await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) + + await Task.Delay(250); // let the sub settle (this isn't needed on RESP3, note) await db.PingAsync(); await pubsub.UnsubscribeAsync(channel); @@ -792,6 +828,8 @@ public async Task ClusterPubSub(bool sharded) var pair = snap[i]; Log("element {0}: {1}/{2}", i, pair.Channel, pair.Value); } + // even if not routed: we can expect the *order* to be correct, since there's + // only one publisher (us), and we prefer to publish via our own subscription for (int i = 0; i < 10; i++) { var pair = snap[i];