Skip to content

Commit 312dda9

Browse files
mgravellvandyvilla
andauthored
Sharded proposal (#2886)
* Support sharded pubsub * Support sharded pubsub * fix api * fix enum * fix api * proposed refactor to single-field implementation * RedisDatabase needs to use SPUBLISH/PUBLISH correctly * support ssub on CLIENT response * add to features; fixup shipped etc --------- Co-authored-by: xli <xli@roblox.com>
1 parent 0172e03 commit 312dda9

File tree

12 files changed

+154
-56
lines changed

12 files changed

+154
-56
lines changed

src/StackExchange.Redis/ClientInfo.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,15 @@ public sealed class ClientInfo
129129
public string? Name { get; private set; }
130130

131131
/// <summary>
132-
/// Number of pattern matching subscriptions.
132+
/// Number of pattern-matching subscriptions.
133133
/// </summary>
134134
public int PatternSubscriptionCount { get; private set; }
135135

136+
/// <summary>
137+
/// Number of sharded subscriptions.
138+
/// </summary>
139+
public int ShardedSubscriptionCount { get; private set; }
140+
136141
/// <summary>
137142
/// The port of the client.
138143
/// </summary>
@@ -236,6 +241,7 @@ internal static bool TryParse(string? input, [NotNullWhen(true)] out ClientInfo[
236241
case "name": client.Name = value; break;
237242
case "sub": client.SubscriptionCount = Format.ParseInt32(value); break;
238243
case "psub": client.PatternSubscriptionCount = Format.ParseInt32(value); break;
244+
case "ssub": client.ShardedSubscriptionCount = Format.ParseInt32(value); break;
239245
case "multi": client.TransactionCommandLength = Format.ParseInt32(value); break;
240246
case "cmd": client.LastCommand = value; break;
241247
case "flags":

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1670,10 +1670,10 @@ private void MatchResult(in RawResult result)
16701670
// invoke the handlers
16711671
RedisChannel channel;
16721672
if (items[0].IsEqual(message)) {
1673-
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false);
1673+
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None);
16741674
Trace("MESSAGE: " + channel);
16751675
} else {
1676-
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: true);
1676+
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded);
16771677
Trace("SMESSAGE: " + channel);
16781678
}
16791679
if (!channel.IsNull)
@@ -1696,19 +1696,22 @@ private void MatchResult(in RawResult result)
16961696
{
16971697
_readStatus = ReadStatus.PubSubPMessage;
16981698

1699-
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false);
1699+
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
1700+
17001701
Trace("PMESSAGE: " + channel);
17011702
if (!channel.IsNull)
17021703
{
17031704
if (TryGetPubSubPayload(items[3], out var payload))
17041705
{
1705-
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false);
1706+
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
1707+
17061708
_readStatus = ReadStatus.InvokePubSub;
17071709
muxer.OnMessage(sub, channel, payload);
17081710
}
17091711
else if (TryGetMultiPubSubPayload(items[3], out var payloads))
17101712
{
1711-
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false);
1713+
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
1714+
17121715
_readStatus = ReadStatus.InvokePubSub;
17131716
muxer.OnMessage(sub, channel, payloads);
17141717
}

src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1896,4 +1896,8 @@ virtual StackExchange.Redis.RedisResult.Length.get -> int
18961896
virtual StackExchange.Redis.RedisResult.this[int index].get -> StackExchange.Redis.RedisResult!
18971897
StackExchange.Redis.ConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void
18981898
StackExchange.Redis.IConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void
1899+
StackExchange.Redis.RedisFeatures.ShardedPubSub.get -> bool
1900+
static StackExchange.Redis.RedisChannel.Sharded(byte[]? value) -> StackExchange.Redis.RedisChannel
1901+
static StackExchange.Redis.RedisChannel.Sharded(string! value) -> StackExchange.Redis.RedisChannel
1902+
StackExchange.Redis.ClientInfo.ShardedSubscriptionCount.get -> int
18991903
StackExchange.Redis.ConfigurationOptions.SetUserPfxCertificate(string! userCertificatePath, string? password = null) -> void

src/StackExchange.Redis/RawResult.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,20 +161,21 @@ public bool MoveNext()
161161
}
162162
public ReadOnlySequence<byte> Current { get; private set; }
163163
}
164-
internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode, bool isSharded)
164+
internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.RedisChannelOptions options)
165165
{
166166
switch (Resp2TypeBulkString)
167167
{
168168
case ResultType.SimpleString:
169169
case ResultType.BulkString:
170170
if (channelPrefix == null)
171171
{
172-
return isSharded ? new RedisChannel(GetBlob(), true) : new RedisChannel(GetBlob(), mode);
172+
return new RedisChannel(GetBlob(), options);
173173
}
174174
if (StartsWith(channelPrefix))
175175
{
176176
byte[] copy = Payload.Slice(channelPrefix.Length).ToArray();
177-
return isSharded ? new RedisChannel(copy, true) : new RedisChannel(copy, mode);
177+
178+
return new RedisChannel(copy, options);
178179
}
179180
return default;
180181
default:

src/StackExchange.Redis/RedisChannel.cs

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,18 @@ namespace StackExchange.Redis
99
public readonly struct RedisChannel : IEquatable<RedisChannel>
1010
{
1111
internal readonly byte[]? Value;
12-
internal readonly bool _isPatternBased;
13-
internal readonly bool _isSharded;
12+
13+
internal readonly RedisChannelOptions Options;
14+
15+
[Flags]
16+
internal enum RedisChannelOptions
17+
{
18+
None = 0,
19+
Pattern = 1 << 0,
20+
Sharded = 1 << 1,
21+
}
22+
23+
internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH;
1424

1525
/// <summary>
1626
/// Indicates whether the channel-name is either null or a zero-length value.
@@ -20,7 +30,12 @@ namespace StackExchange.Redis
2030
/// <summary>
2131
/// Indicates whether this channel represents a wildcard pattern (see <c>PSUBSCRIBE</c>).
2232
/// </summary>
23-
public bool IsPattern => _isPatternBased;
33+
public bool IsPattern => (Options & RedisChannelOptions.Pattern) != 0;
34+
35+
/// <summary>
36+
/// Indicates whether this channel represents a shard channel (see <c>SSUBSCRIBE</c>)
37+
/// </summary>
38+
public bool IsSharded => (Options & RedisChannelOptions.Sharded) != 0;
2439

2540
/// <summary>
2641
/// Indicates whether this channel represents a shard channel (see <c>SSUBSCRIBE</c>)
@@ -65,34 +80,31 @@ public static bool UseImplicitAutoPattern
6580
/// </summary>
6681
/// <param name="value">The name of the channel to create.</param>
6782
/// <param name="mode">The mode for name matching.</param>
68-
public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode), false) { }
83+
public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None) { }
6984

7085
/// <summary>
7186
/// Create a new redis channel from a string, explicitly controlling the pattern mode.
7287
/// </summary>
7388
/// <param name="value">The string name of the channel to create.</param>
7489
/// <param name="mode">The mode for name matching.</param>
75-
public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) { }
90+
public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode) { }
7691

7792
/// <summary>
78-
/// Create a new redis channel from a buffer, explicitly controlling the sharding mode.
93+
/// Create a new redis channel from a buffer, representing a sharded channel.
7994
/// </summary>
8095
/// <param name="value">The name of the channel to create.</param>
81-
/// <param name="isSharded">Whether the channel is sharded.</param>
82-
public RedisChannel(byte[]? value, bool isSharded) : this(value, false, isSharded) {}
96+
public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded);
8397

8498
/// <summary>
85-
/// Create a new redis channel from a string, explicitly controlling the sharding mode.
99+
/// Create a new redis channel from a string, representing a sharded channel.
86100
/// </summary>
87101
/// <param name="value">The string name of the channel to create.</param>
88-
/// <param name="isSharded">Whether the channel is sharded.</param>
89-
public RedisChannel(string value, bool isSharded) : this(value == null ? null : Encoding.UTF8.GetBytes(value), isSharded) {}
102+
public static RedisChannel Sharded(string value) => new(value is null ? null : Encoding.UTF8.GetBytes(value), RedisChannelOptions.Sharded);
90103

91-
private RedisChannel(byte[]? value, bool isPatternBased, bool isSharded)
104+
internal RedisChannel(byte[]? value, RedisChannelOptions options)
92105
{
93106
Value = value;
94-
_isPatternBased = isPatternBased;
95-
_isSharded = isSharded;
107+
Options = options;
96108
}
97109

98110
private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch
@@ -144,7 +156,7 @@ private RedisChannel(byte[]? value, bool isPatternBased, bool isSharded)
144156
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
145157
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
146158
public static bool operator ==(RedisChannel x, RedisChannel y) =>
147-
x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value) && x._isSharded == y._isSharded;
159+
x.Options == y.Options && RedisValue.Equals(x.Value, y.Value);
148160

149161
/// <summary>
150162
/// Indicate whether two channel names are equal.
@@ -192,10 +204,10 @@ private RedisChannel(byte[]? value, bool isPatternBased, bool isSharded)
192204
/// Indicate whether two channel names are equal.
193205
/// </summary>
194206
/// <param name="other">The <see cref="RedisChannel"/> to compare to.</param>
195-
public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value) && _isSharded == other._isSharded;
207+
public bool Equals(RedisChannel other) => Options == other.Options && RedisValue.Equals(Value, other.Value);
196208

197209
/// <inheritdoc/>
198-
public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0) + (_isSharded ? 2 : 0);
210+
public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)Options;
199211

200212
/// <summary>
201213
/// Obtains a string representation of the channel name.
@@ -224,7 +236,7 @@ internal RedisChannel Clone()
224236
return this;
225237
}
226238
var copy = (byte[])Value.Clone(); // defensive array copy
227-
return new RedisChannel(copy, _isPatternBased);
239+
return new RedisChannel(copy, Options);
228240
}
229241

230242
/// <summary>

src/StackExchange.Redis/RedisDatabase.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1575,14 +1575,14 @@ public Task<LCSMatchResult> StringLongestCommonSubsequenceWithMatchesAsync(Redis
15751575
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
15761576
{
15771577
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
1578-
var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
1578+
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
15791579
return ExecuteSync(msg, ResultProcessor.Int64);
15801580
}
15811581

15821582
public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
15831583
{
15841584
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
1585-
var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
1585+
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
15861586
return ExecuteAsync(msg, ResultProcessor.Int64);
15871587
}
15881588

src/StackExchange.Redis/RedisFeatures.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,11 @@ public RedisFeatures(Version version)
186186
/// </summary>
187187
public bool SetVaradicAddRemove => Version.IsAtLeast(v2_4_0);
188188

189+
/// <summary>
190+
/// Are <see href="https://redis.io/commands/ssubscribe/">SSUBSCRIBE</see> and <see href="https://redis.io/commands/spublish/">SPUBLISH</see> available?
191+
/// </summary>
192+
public bool ShardedPubSub => Version.IsAtLeast(v7_0_0_rc1);
193+
189194
/// <summary>
190195
/// Are <see href="https://redis.io/commands/zpopmin/">ZPOPMIN</see> and <see href="https://redis.io/commands/zpopmax/">ZPOPMAX</see> available?
191196
/// </summary>

src/StackExchange.Redis/RedisSubscriber.cs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -182,19 +182,25 @@ public Subscription(CommandFlags flags)
182182
/// </summary>
183183
internal Message GetMessage(RedisChannel channel, SubscriptionAction action, CommandFlags flags, bool internalCall)
184184
{
185-
var isPattern = channel._isPatternBased;
186-
var isSharded = channel._isSharded;
185+
var isPattern = channel.IsPattern;
186+
var isSharded = channel.IsSharded;
187187
var command = action switch
188188
{
189-
SubscriptionAction.Subscribe when isPattern => RedisCommand.PSUBSCRIBE,
190-
SubscriptionAction.Unsubscribe when isPattern => RedisCommand.PUNSUBSCRIBE,
191-
192-
SubscriptionAction.Subscribe when isSharded => RedisCommand.SSUBSCRIBE,
193-
SubscriptionAction.Unsubscribe when isSharded => RedisCommand.SUNSUBSCRIBE,
194-
195-
SubscriptionAction.Subscribe when !isPattern && !isSharded => RedisCommand.SUBSCRIBE,
196-
SubscriptionAction.Unsubscribe when !isPattern && !isSharded => RedisCommand.UNSUBSCRIBE,
197-
_ => throw new ArgumentOutOfRangeException(nameof(action), "This would be an impressive boolean feat"),
189+
SubscriptionAction.Subscribe => channel.Options switch
190+
{
191+
RedisChannel.RedisChannelOptions.None => RedisCommand.SUBSCRIBE,
192+
RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PSUBSCRIBE,
193+
RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SSUBSCRIBE,
194+
_ => Unknown(action, channel.Options),
195+
},
196+
SubscriptionAction.Unsubscribe => channel.Options switch
197+
{
198+
RedisChannel.RedisChannelOptions.None => RedisCommand.UNSUBSCRIBE,
199+
RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PUNSUBSCRIBE,
200+
RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SUNSUBSCRIBE,
201+
_ => Unknown(action, channel.Options),
202+
},
203+
_ => Unknown(action, channel.Options),
198204
};
199205

200206
// TODO: Consider flags here - we need to pass Fire and Forget, but don't want to intermingle Primary/Replica
@@ -207,6 +213,9 @@ internal Message GetMessage(RedisChannel channel, SubscriptionAction action, Com
207213
return msg;
208214
}
209215

216+
private RedisCommand Unknown(SubscriptionAction action, RedisChannel.RedisChannelOptions options)
217+
=> throw new ArgumentException($"Unable to determine pub/sub operation for '{action}' against '{options}'");
218+
210219
public void Add(Action<RedisChannel, RedisValue>? handler, ChannelMessageQueue? queue)
211220
{
212221
if (handler != null)
@@ -374,14 +383,14 @@ private static void ThrowIfNull(in RedisChannel channel)
374383
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
375384
{
376385
ThrowIfNull(channel);
377-
var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
386+
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
378387
return ExecuteSync(msg, ResultProcessor.Int64);
379388
}
380389

381390
public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
382391
{
383392
ThrowIfNull(channel);
384-
var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
393+
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
385394
return ExecuteAsync(msg, ResultProcessor.Int64);
386395
}
387396

src/StackExchange.Redis/ResultProcessor.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public static readonly ResultProcessor<long?>
7373
public static readonly ResultProcessor<PersistResult[]> PersistResultArray = new PersistResultArrayProcessor();
7474

7575
public static readonly ResultProcessor<RedisChannel[]>
76-
RedisChannelArrayLiteral = new RedisChannelArrayProcessor(RedisChannel.PatternMode.Literal);
76+
RedisChannelArrayLiteral = new RedisChannelArrayProcessor(RedisChannel.RedisChannelOptions.None);
7777

7878
public static readonly ResultProcessor<RedisKey>
7979
RedisKey = new RedisKeyProcessor();
@@ -1504,20 +1504,20 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
15041504

15051505
private sealed class RedisChannelArrayProcessor : ResultProcessor<RedisChannel[]>
15061506
{
1507-
private readonly RedisChannel.PatternMode mode;
1508-
public RedisChannelArrayProcessor(RedisChannel.PatternMode mode)
1507+
private readonly RedisChannel.RedisChannelOptions options;
1508+
public RedisChannelArrayProcessor(RedisChannel.RedisChannelOptions options)
15091509
{
1510-
this.mode = mode;
1510+
this.options = options;
15111511
}
15121512

15131513
private readonly struct ChannelState // I would use a value-tuple here, but that is binding hell
15141514
{
15151515
public readonly byte[]? Prefix;
1516-
public readonly RedisChannel.PatternMode Mode;
1517-
public ChannelState(byte[]? prefix, RedisChannel.PatternMode mode)
1516+
public readonly RedisChannel.RedisChannelOptions Options;
1517+
public ChannelState(byte[]? prefix, RedisChannel.RedisChannelOptions options)
15181518
{
15191519
Prefix = prefix;
1520-
Mode = mode;
1520+
Options = options;
15211521
}
15221522
}
15231523
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
@@ -1526,8 +1526,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
15261526
{
15271527
case ResultType.Array:
15281528
var final = result.ToArray(
1529-
(in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode, isSharded: false),
1530-
new ChannelState(connection.ChannelPrefix, mode))!;
1529+
(in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Options),
1530+
new ChannelState(connection.ChannelPrefix, options))!;
15311531

15321532
SetResult(message, final);
15331533
return true;

0 commit comments

Comments
 (0)