Skip to content

Commit d4b05ca

Browse files
authored
Channel routing: revert non-S routing to random, with new API to opt into routed (#2958)
* mitigate #2955 - by default: use round-robin (not channel-routing) for "non-sharded" pub/sub - add new API for channel-routed literals/wildcards - when publishing, if we're also subscribed: use that connection - randomize where the round-robin starts, to better randomize startup behaviour * release notes * prefer a single WithKeyRouting API
1 parent f4f66be commit d4b05ca

File tree

8 files changed

+146
-46
lines changed

8 files changed

+146
-46
lines changed

docs/ReleaseNotes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Current package versions:
99
## Unreleased
1010

1111
- Fix [#2951](https://github.com/StackExchange/StackExchange.Redis/issues/2951) - sentinel reconnection failure ([#2956 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2956))
12+
- Mitigate [#2955](https://github.com/StackExchange/StackExchange.Redis/issues/2955) (unbalanced pub/sub routing) ([#2958 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2958))
1213

1314
## 2.9.17
1415

src/StackExchange.Redis/Message.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -866,7 +866,8 @@ protected CommandChannelBase(int db, CommandFlags flags, RedisCommand command, i
866866

867867
public override string CommandAndKey => Command + " " + Channel;
868868

869-
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => serverSelectionStrategy.HashSlot(Channel);
869+
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
870+
=> Channel.IsKeyRouted ? serverSelectionStrategy.HashSlot(Channel) : ServerSelectionStrategy.NoSlot;
870871
}
871872

872873
internal abstract class CommandKeyBase : Message

src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2051,3 +2051,4 @@ StackExchange.Redis.IServer.ExecuteAsync(int? database, string! command, System.
20512051
[SER001]static StackExchange.Redis.VectorSetAddRequest.Member(StackExchange.Redis.RedisValue element, System.ReadOnlyMemory<float> values, string? attributesJson = null) -> StackExchange.Redis.VectorSetAddRequest!
20522052
[SER001]static StackExchange.Redis.VectorSetSimilaritySearchRequest.ByMember(StackExchange.Redis.RedisValue member) -> StackExchange.Redis.VectorSetSimilaritySearchRequest!
20532053
[SER001]static StackExchange.Redis.VectorSetSimilaritySearchRequest.ByVector(System.ReadOnlyMemory<float> vector) -> StackExchange.Redis.VectorSetSimilaritySearchRequest!
2054+
StackExchange.Redis.RedisChannel.WithKeyRouting() -> StackExchange.Redis.RedisChannel

src/StackExchange.Redis/RedisChannel.cs

Lines changed: 79 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,20 @@ internal enum RedisChannelOptions
1818
None = 0,
1919
Pattern = 1 << 0,
2020
Sharded = 1 << 1,
21+
KeyRouted = 1 << 2,
2122
}
2223

24+
// we don't consider Routed for equality - it's an implementation detail, not a fundamental feature
25+
private const RedisChannelOptions EqualityMask = ~RedisChannelOptions.KeyRouted;
26+
2327
internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH;
2428

29+
/// <summary>
30+
/// Should we use cluster routing for this channel? This applies *either* to sharded (SPUBLISH) scenarios,
31+
/// or to scenarios using <see cref="RedisChannel.WithKeyRouting" />.
32+
/// </summary>
33+
internal bool IsKeyRouted => (Options & RedisChannelOptions.KeyRouted) != 0;
34+
2535
/// <summary>
2636
/// Indicates whether the channel-name is either null or a zero-length value.
2737
/// </summary>
@@ -51,24 +61,44 @@ public static bool UseImplicitAutoPattern
5161
private static PatternMode s_DefaultPatternMode = PatternMode.Auto;
5262

5363
/// <summary>
54-
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription.
64+
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription. In cluster
65+
/// environments, this channel will be freely routed to any applicable server - different client nodes
66+
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
67+
/// very few channels. In non-cluster environments, routing is not a consideration.
68+
/// </summary>
69+
public static RedisChannel Literal(string value) => new(value, RedisChannelOptions.None);
70+
71+
/// <summary>
72+
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription. In cluster
73+
/// environments, this channel will be freely routed to any applicable server - different client nodes
74+
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
75+
/// very few channels. In non-cluster environments, routing is not a consideration.
5576
/// </summary>
56-
public static RedisChannel Literal(string value) => new RedisChannel(value, PatternMode.Literal);
77+
public static RedisChannel Literal(byte[] value) => new(value, RedisChannelOptions.None);
5778

5879
/// <summary>
59-
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription.
80+
/// In cluster environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
81+
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
82+
/// a consideration.
6083
/// </summary>
61-
public static RedisChannel Literal(byte[] value) => new RedisChannel(value, PatternMode.Literal);
84+
/// <remarks>Note that channels from <c>Sharded</c> are always routed.</remarks>
85+
public RedisChannel WithKeyRouting() => new(Value, Options | RedisChannelOptions.KeyRouted);
6286

6387
/// <summary>
64-
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription.
88+
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription. In cluster
89+
/// environments, this channel will be freely routed to any applicable server - different client nodes
90+
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
91+
/// very few channels. In non-cluster environments, routing is not a consideration.
6592
/// </summary>
66-
public static RedisChannel Pattern(string value) => new RedisChannel(value, PatternMode.Pattern);
93+
public static RedisChannel Pattern(string value) => new(value, RedisChannelOptions.Pattern);
6794

6895
/// <summary>
69-
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription.
96+
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription. In cluster
97+
/// environments, this channel will be freely routed to any applicable server - different client nodes
98+
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
99+
/// very few channels. In non-cluster environments, routing is not a consideration.
70100
/// </summary>
71-
public static RedisChannel Pattern(byte[] value) => new RedisChannel(value, PatternMode.Pattern);
101+
public static RedisChannel Pattern(byte[] value) => new(value, RedisChannelOptions.Pattern);
72102

73103
/// <summary>
74104
/// Create a new redis channel from a buffer, explicitly controlling the pattern mode.
@@ -84,28 +114,45 @@ public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatt
84114
/// </summary>
85115
/// <param name="value">The string name of the channel to create.</param>
86116
/// <param name="mode">The mode for name matching.</param>
117+
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
87118
public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode)
88119
{
89120
}
90121

91122
/// <summary>
92-
/// Create a new redis channel from a buffer, representing a sharded channel.
123+
/// Create a new redis channel from a buffer, representing a sharded channel. In cluster
124+
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
125+
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
126+
/// a consideration.
93127
/// </summary>
94128
/// <param name="value">The name of the channel to create.</param>
95-
public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded);
129+
/// <remarks>Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions
130+
/// using sharded channels must also be published with sharded channels (and vice versa).</remarks>
131+
public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted);
96132

97133
/// <summary>
98-
/// Create a new redis channel from a string, representing a sharded channel.
134+
/// Create a new redis channel from a string, representing a sharded channel. In cluster
135+
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
136+
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
137+
/// a consideration.
99138
/// </summary>
100139
/// <param name="value">The string name of the channel to create.</param>
101-
public static RedisChannel Sharded(string value) => new(value is null ? null : Encoding.UTF8.GetBytes(value), RedisChannelOptions.Sharded);
140+
/// <remarks>Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions
141+
/// using sharded channels must also be published with sharded channels (and vice versa).</remarks>
142+
public static RedisChannel Sharded(string value) => new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted);
102143

103144
internal RedisChannel(byte[]? value, RedisChannelOptions options)
104145
{
105146
Value = value;
106147
Options = options;
107148
}
108149

150+
internal RedisChannel(string? value, RedisChannelOptions options)
151+
{
152+
Value = value is null ? null : Encoding.UTF8.GetBytes(value);
153+
Options = options;
154+
}
155+
109156
private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch
110157
{
111158
PatternMode.Auto => value != null && Array.IndexOf(value, (byte)'*') >= 0,
@@ -155,15 +202,17 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options)
155202
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
156203
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
157204
public static bool operator ==(RedisChannel x, RedisChannel y) =>
158-
x.Options == y.Options && RedisValue.Equals(x.Value, y.Value);
205+
(x.Options & EqualityMask) == (y.Options & EqualityMask)
206+
&& RedisValue.Equals(x.Value, y.Value);
159207

160208
/// <summary>
161209
/// Indicate whether two channel names are equal.
162210
/// </summary>
163211
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
164212
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
165213
public static bool operator ==(string x, RedisChannel y) =>
166-
RedisValue.Equals(x == null ? null : Encoding.UTF8.GetBytes(x), y.Value);
214+
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
215+
RedisValue.Equals(x is null ? null : Encoding.UTF8.GetBytes(x), y.Value);
167216

168217
/// <summary>
169218
/// Indicate whether two channel names are equal.
@@ -178,7 +227,8 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options)
178227
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
179228
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
180229
public static bool operator ==(RedisChannel x, string y) =>
181-
RedisValue.Equals(x.Value, y == null ? null : Encoding.UTF8.GetBytes(y));
230+
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
231+
RedisValue.Equals(x.Value, y is null ? null : Encoding.UTF8.GetBytes(y));
182232

183233
/// <summary>
184234
/// Indicate whether two channel names are equal.
@@ -203,10 +253,11 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options)
203253
/// Indicate whether two channel names are equal.
204254
/// </summary>
205255
/// <param name="other">The <see cref="RedisChannel"/> to compare to.</param>
206-
public bool Equals(RedisChannel other) => Options == other.Options && RedisValue.Equals(Value, other.Value);
256+
public bool Equals(RedisChannel other) => (Options & EqualityMask) == (other.Options & EqualityMask)
257+
&& RedisValue.Equals(Value, other.Value);
207258

208259
/// <inheritdoc/>
209-
public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)Options;
260+
public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)(Options & EqualityMask);
210261

211262
/// <summary>
212263
/// Obtains a string representation of the channel name.
@@ -266,23 +317,21 @@ public enum PatternMode
266317
[Obsolete("It is preferable to explicitly specify a " + nameof(PatternMode) + ", or use the " + nameof(Literal) + "/" + nameof(Pattern) + " methods", error: false)]
267318
public static implicit operator RedisChannel(string key)
268319
{
269-
if (key == null) return default;
320+
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
321+
if (key is null) return default;
270322
return new RedisChannel(Encoding.UTF8.GetBytes(key), s_DefaultPatternMode);
271323
}
272324

273325
/// <summary>
274-
/// Create a channel name from a <see cref="T:byte[]"/>.
326+
/// Create a channel name from a <c>byte[]</c>.
275327
/// </summary>
276328
/// <param name="key">The byte array to get a channel from.</param>
277329
[Obsolete("It is preferable to explicitly specify a " + nameof(PatternMode) + ", or use the " + nameof(Literal) + "/" + nameof(Pattern) + " methods", error: false)]
278330
public static implicit operator RedisChannel(byte[]? key)
279-
{
280-
if (key == null) return default;
281-
return new RedisChannel(key, s_DefaultPatternMode);
282-
}
331+
=> key is null ? default : new RedisChannel(key, s_DefaultPatternMode);
283332

284333
/// <summary>
285-
/// Obtain the channel name as a <see cref="T:byte[]"/>.
334+
/// Obtain the channel name as a <c>byte[]</c>.
286335
/// </summary>
287336
/// <param name="key">The channel to get a byte[] from.</param>
288337
public static implicit operator byte[]?(RedisChannel key) => key.Value;
@@ -294,7 +343,7 @@ public static implicit operator RedisChannel(byte[]? key)
294343
public static implicit operator string?(RedisChannel key)
295344
{
296345
var arr = key.Value;
297-
if (arr == null)
346+
if (arr is null)
298347
{
299348
return null;
300349
}
@@ -303,9 +352,7 @@ public static implicit operator RedisChannel(byte[]? key)
303352
return Encoding.UTF8.GetString(arr);
304353
}
305354
catch (Exception e) when // Only catch exception throwed by Encoding.UTF8.GetString
306-
(e is DecoderFallbackException
307-
|| e is ArgumentException
308-
|| e is ArgumentNullException)
355+
(e is DecoderFallbackException or ArgumentException or ArgumentNullException)
309356
{
310357
return BitConverter.ToString(arr);
311358
}
@@ -316,8 +363,12 @@ public static implicit operator RedisChannel(byte[]? key)
316363
// giving due consideration to the default pattern mode (UseImplicitAutoPattern)
317364
// (since we don't ship them, we don't need them in release)
318365
[Obsolete("Watch for " + nameof(UseImplicitAutoPattern), error: true)]
366+
// ReSharper disable once UnusedMember.Local
367+
// ReSharper disable once UnusedParameter.Local
319368
private RedisChannel(string value) => throw new NotSupportedException();
320369
[Obsolete("Watch for " + nameof(UseImplicitAutoPattern), error: true)]
370+
// ReSharper disable once UnusedMember.Local
371+
// ReSharper disable once UnusedParameter.Local
321372
private RedisChannel(byte[]? value) => throw new NotSupportedException();
322373
#endif
323374
}

src/StackExchange.Redis/RedisDatabase.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1871,14 +1871,16 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags
18711871
{
18721872
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
18731873
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
1874-
return ExecuteSync(msg, ResultProcessor.Int64);
1874+
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
1875+
return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
18751876
}
18761877

18771878
public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
18781879
{
18791880
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
18801881
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
1881-
return ExecuteAsync(msg, ResultProcessor.Int64);
1882+
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
1883+
return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
18821884
}
18831885

18841886
public RedisResult Execute(string command, params object[] args)

src/StackExchange.Redis/RedisSubscriber.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,18 +182,16 @@ public Subscription(CommandFlags flags)
182182
/// </summary>
183183
internal Message GetMessage(RedisChannel channel, SubscriptionAction action, CommandFlags flags, bool internalCall)
184184
{
185-
var isPattern = channel.IsPattern;
186-
var isSharded = channel.IsSharded;
187-
var command = action switch
185+
var command = action switch // note that the Routed flag doesn't impact the message here - just the routing
188186
{
189-
SubscriptionAction.Subscribe => channel.Options switch
187+
SubscriptionAction.Subscribe => (channel.Options & ~RedisChannel.RedisChannelOptions.KeyRouted) switch
190188
{
191189
RedisChannel.RedisChannelOptions.None => RedisCommand.SUBSCRIBE,
192190
RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PSUBSCRIBE,
193191
RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SSUBSCRIBE,
194192
_ => Unknown(action, channel.Options),
195193
},
196-
SubscriptionAction.Unsubscribe => channel.Options switch
194+
SubscriptionAction.Unsubscribe => (channel.Options & ~RedisChannel.RedisChannelOptions.KeyRouted) switch
197195
{
198196
RedisChannel.RedisChannelOptions.None => RedisCommand.UNSUBSCRIBE,
199197
RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PUNSUBSCRIBE,
@@ -384,14 +382,16 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags
384382
{
385383
ThrowIfNull(channel);
386384
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
387-
return ExecuteSync(msg, ResultProcessor.Int64);
385+
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
386+
return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
388387
}
389388

390389
public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
391390
{
392391
ThrowIfNull(channel);
393392
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
394-
return ExecuteAsync(msg, ResultProcessor.Int64);
393+
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
394+
return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
395395
}
396396

397397
void ISubscriber.Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags)

0 commit comments

Comments
 (0)