From 0172e03a17af9b9b6fac7246b378244c6f9ef783 Mon Sep 17 00:00:00 2001 From: vandyvilla Date: Tue, 20 May 2025 02:19:08 -0700 Subject: [PATCH 01/12] Support sharded pubsub commands (#2498) * Support sharded pubsub * Support sharded pubsub * fix api * fix enum * fix api --------- Co-authored-by: Marc Gravell --- docs/Timeouts.md | 2 +- src/StackExchange.Redis/CommandMap.cs | 6 ++-- src/StackExchange.Redis/Enums/RedisCommand.cs | 6 ++++ src/StackExchange.Redis/Message.cs | 3 ++ src/StackExchange.Redis/PhysicalConnection.cs | 25 +++++++++----- .../PublicAPI/PublicAPI.Shipped.txt | 3 ++ src/StackExchange.Redis/RawResult.cs | 6 ++-- src/StackExchange.Redis/RedisChannel.cs | 33 +++++++++++++++---- src/StackExchange.Redis/RedisSubscriber.cs | 13 +++++--- src/StackExchange.Redis/ResultProcessor.cs | 6 ++-- src/StackExchange.Redis/ServerEndPoint.cs | 4 +++ .../RedisRequest.cs | 2 +- 12 files changed, 80 insertions(+), 29 deletions(-) diff --git a/docs/Timeouts.md b/docs/Timeouts.md index 1c4ac3756..ea9830041 100644 --- a/docs/Timeouts.md +++ b/docs/Timeouts.md @@ -88,7 +88,7 @@ By default Redis Timeout exception(s) includes useful information, which can hel |qs | Queue-Awaiting-Response : {int}|There are x operations currently awaiting replies from redis server.| |aw | Active-Writer: {bool}|| |bw | Backlog-Writer: {enum} | Possible values are Inactive, Started, CheckingForWork, CheckingForTimeout, RecordingTimeout, WritingMessage, Flushing, MarkingInactive, RecordingWriteFailure, RecordingFault, SettingIdle, SpinningDown, Faulted| -|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA| +|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubSMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA| |ws | Write-State: {enum}| Possible values are Initializing, Idle, Writing, Flushing, Flushed, NA| |in | Inbound-Bytes : {long}|there are x bytes waiting to be read from the input stream from redis| |in-pipe | Inbound-Pipe-Bytes: {long}|Bytes waiting to be read| diff --git a/src/StackExchange.Redis/CommandMap.cs b/src/StackExchange.Redis/CommandMap.cs index d1e125bb3..02baf3801 100644 --- a/src/StackExchange.Redis/CommandMap.cs +++ b/src/StackExchange.Redis/CommandMap.cs @@ -31,7 +31,7 @@ public sealed class CommandMap RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither! - RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, + RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE, RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH, @@ -57,7 +57,9 @@ public sealed class CommandMap RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither! - RedisCommand.PSUBSCRIBE, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, + RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE, + + RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH, RedisCommand.SCRIPT, diff --git a/src/StackExchange.Redis/Enums/RedisCommand.cs b/src/StackExchange.Redis/Enums/RedisCommand.cs index a4647d7eb..3909be4c2 100644 --- a/src/StackExchange.Redis/Enums/RedisCommand.cs +++ b/src/StackExchange.Redis/Enums/RedisCommand.cs @@ -181,6 +181,7 @@ internal enum RedisCommand SORT, SORT_RO, SPOP, + SPUBLISH, SRANDMEMBER, SREM, STRLEN, @@ -188,6 +189,8 @@ internal enum RedisCommand SUNION, SUNIONSTORE, SSCAN, + SSUBSCRIBE, + SUNSUBSCRIBE, SWAPDB, SYNC, @@ -447,10 +450,13 @@ internal static bool IsPrimaryOnly(this RedisCommand command) case RedisCommand.SMEMBERS: case RedisCommand.SMISMEMBER: case RedisCommand.SORT_RO: + case RedisCommand.SPUBLISH: case RedisCommand.SRANDMEMBER: + case RedisCommand.SSUBSCRIBE: case RedisCommand.STRLEN: case RedisCommand.SUBSCRIBE: case RedisCommand.SUNION: + case RedisCommand.SUNSUBSCRIBE: case RedisCommand.SSCAN: case RedisCommand.SYNC: case RedisCommand.TIME: diff --git a/src/StackExchange.Redis/Message.cs b/src/StackExchange.Redis/Message.cs index b89a6b946..fd75585a5 100644 --- a/src/StackExchange.Redis/Message.cs +++ b/src/StackExchange.Redis/Message.cs @@ -569,6 +569,9 @@ internal static bool RequiresDatabase(RedisCommand command) case RedisCommand.SLAVEOF: case RedisCommand.SLOWLOG: case RedisCommand.SUBSCRIBE: + case RedisCommand.SPUBLISH: + case RedisCommand.SSUBSCRIBE: + case RedisCommand.SUNSUBSCRIBE: case RedisCommand.SWAPDB: case RedisCommand.SYNC: case RedisCommand.TIME: diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index 51fac7c3d..5e0dbbf60 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -29,7 +29,7 @@ internal sealed partial class PhysicalConnection : IDisposable private const int DefaultRedisDatabaseCount = 16; - private static readonly CommandBytes message = "message", pmessage = "pmessage"; + private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage"; private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select( i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray(); @@ -1644,9 +1644,9 @@ private void MatchResult(in RawResult result) // out of band message does not match to a queued message var items = result.GetItems(); - if (items.Length >= 3 && items[0].IsEqual(message)) + if (items.Length >= 3 && (items[0].IsEqual(message) || items[0].IsEqual(smessage))) { - _readStatus = ReadStatus.PubSubMessage; + _readStatus = items[0].IsEqual(message) ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage; // special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry) var configChanged = muxer.ConfigurationChangedChannel; @@ -1668,8 +1668,14 @@ private void MatchResult(in RawResult result) } // invoke the handlers - var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal); - Trace("MESSAGE: " + channel); + RedisChannel channel; + if (items[0].IsEqual(message)) { + channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false); + Trace("MESSAGE: " + channel); + } else { + channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: true); + Trace("SMESSAGE: " + channel); + } if (!channel.IsNull) { if (TryGetPubSubPayload(items[2], out var payload)) @@ -1690,19 +1696,19 @@ private void MatchResult(in RawResult result) { _readStatus = ReadStatus.PubSubPMessage; - var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal); + var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false); Trace("PMESSAGE: " + channel); if (!channel.IsNull) { if (TryGetPubSubPayload(items[3], out var payload)) { - var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern); + var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false); _readStatus = ReadStatus.InvokePubSub; muxer.OnMessage(sub, channel, payload); } else if (TryGetMultiPubSubPayload(items[3], out var payloads)) { - var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern); + var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false); _readStatus = ReadStatus.InvokePubSub; muxer.OnMessage(sub, channel, payloads); } @@ -1710,7 +1716,7 @@ private void MatchResult(in RawResult result) return; // AND STOP PROCESSING! } - // if it didn't look like "[p]message", then we still need to process the pending queue + // if it didn't look like "[p|s]message", then we still need to process the pending queue } Trace("Matching result..."); @@ -2110,6 +2116,7 @@ internal enum ReadStatus MatchResult, PubSubMessage, PubSubPMessage, + PubSubSMessage, Reconfigure, InvokePubSub, ResponseSequenceCheck, // high-integrity mode only diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index 8263defd3..3b42e204e 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -1309,12 +1309,15 @@ StackExchange.Redis.RedisChannel StackExchange.Redis.RedisChannel.Equals(StackExchange.Redis.RedisChannel other) -> bool StackExchange.Redis.RedisChannel.IsNullOrEmpty.get -> bool StackExchange.Redis.RedisChannel.IsPattern.get -> bool +StackExchange.Redis.RedisChannel.IsSharded.get -> bool StackExchange.Redis.RedisChannel.PatternMode StackExchange.Redis.RedisChannel.PatternMode.Auto = 0 -> StackExchange.Redis.RedisChannel.PatternMode StackExchange.Redis.RedisChannel.PatternMode.Literal = 1 -> StackExchange.Redis.RedisChannel.PatternMode StackExchange.Redis.RedisChannel.PatternMode.Pattern = 2 -> StackExchange.Redis.RedisChannel.PatternMode StackExchange.Redis.RedisChannel.RedisChannel() -> void +StackExchange.Redis.RedisChannel.RedisChannel(byte[]? value, bool isSharded) -> void StackExchange.Redis.RedisChannel.RedisChannel(byte[]? value, StackExchange.Redis.RedisChannel.PatternMode mode) -> void +StackExchange.Redis.RedisChannel.RedisChannel(string! value, bool isSharded) -> void StackExchange.Redis.RedisChannel.RedisChannel(string! value, StackExchange.Redis.RedisChannel.PatternMode mode) -> void StackExchange.Redis.RedisCommandException StackExchange.Redis.RedisCommandException.RedisCommandException(string! message) -> void diff --git a/src/StackExchange.Redis/RawResult.cs b/src/StackExchange.Redis/RawResult.cs index 300503f57..dd3ce9920 100644 --- a/src/StackExchange.Redis/RawResult.cs +++ b/src/StackExchange.Redis/RawResult.cs @@ -161,7 +161,7 @@ public bool MoveNext() } public ReadOnlySequence Current { get; private set; } } - internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode) + internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode, bool isSharded) { switch (Resp2TypeBulkString) { @@ -169,12 +169,12 @@ internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.Pattern case ResultType.BulkString: if (channelPrefix == null) { - return new RedisChannel(GetBlob(), mode); + return isSharded ? new RedisChannel(GetBlob(), true) : new RedisChannel(GetBlob(), mode); } if (StartsWith(channelPrefix)) { byte[] copy = Payload.Slice(channelPrefix.Length).ToArray(); - return new RedisChannel(copy, mode); + return isSharded ? new RedisChannel(copy, true) : new RedisChannel(copy, mode); } return default; default: diff --git a/src/StackExchange.Redis/RedisChannel.cs b/src/StackExchange.Redis/RedisChannel.cs index 561cce21f..9c0cadbf4 100644 --- a/src/StackExchange.Redis/RedisChannel.cs +++ b/src/StackExchange.Redis/RedisChannel.cs @@ -10,6 +10,7 @@ namespace StackExchange.Redis { internal readonly byte[]? Value; internal readonly bool _isPatternBased; + internal readonly bool _isSharded; /// /// Indicates whether the channel-name is either null or a zero-length value. @@ -21,6 +22,11 @@ namespace StackExchange.Redis /// public bool IsPattern => _isPatternBased; + /// + /// Indicates whether this channel represents a shard channel (see SSUBSCRIBE) + /// + public bool IsSharded => _isSharded; + internal bool IsNull => Value == null; /// @@ -59,7 +65,7 @@ public static bool UseImplicitAutoPattern /// /// The name of the channel to create. /// The mode for name matching. - public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode)) { } + public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode), false) { } /// /// Create a new redis channel from a string, explicitly controlling the pattern mode. @@ -68,10 +74,25 @@ public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatt /// The mode for name matching. public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) { } - private RedisChannel(byte[]? value, bool isPatternBased) + /// + /// Create a new redis channel from a buffer, explicitly controlling the sharding mode. + /// + /// The name of the channel to create. + /// Whether the channel is sharded. + public RedisChannel(byte[]? value, bool isSharded) : this(value, false, isSharded) {} + + /// + /// Create a new redis channel from a string, explicitly controlling the sharding mode. + /// + /// The string name of the channel to create. + /// Whether the channel is sharded. + public RedisChannel(string value, bool isSharded) : this(value == null ? null : Encoding.UTF8.GetBytes(value), isSharded) {} + + private RedisChannel(byte[]? value, bool isPatternBased, bool isSharded) { Value = value; _isPatternBased = isPatternBased; + _isSharded = isSharded; } private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch @@ -123,7 +144,7 @@ private RedisChannel(byte[]? value, bool isPatternBased) /// The first to compare. /// The second to compare. public static bool operator ==(RedisChannel x, RedisChannel y) => - x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value); + x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value) && x._isSharded == y._isSharded; /// /// Indicate whether two channel names are equal. @@ -171,10 +192,10 @@ private RedisChannel(byte[]? value, bool isPatternBased) /// Indicate whether two channel names are equal. /// /// The to compare to. - public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value); + public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value) && _isSharded == other._isSharded; /// - public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0); + public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0) + (_isSharded ? 2 : 0); /// /// Obtains a string representation of the channel name. @@ -286,4 +307,4 @@ public static implicit operator RedisChannel(byte[]? key) private RedisChannel(byte[]? value) => throw new NotSupportedException(); #endif } -} +} \ No newline at end of file diff --git a/src/StackExchange.Redis/RedisSubscriber.cs b/src/StackExchange.Redis/RedisSubscriber.cs index ee28f4c56..2b2076e03 100644 --- a/src/StackExchange.Redis/RedisSubscriber.cs +++ b/src/StackExchange.Redis/RedisSubscriber.cs @@ -183,13 +183,17 @@ public Subscription(CommandFlags flags) internal Message GetMessage(RedisChannel channel, SubscriptionAction action, CommandFlags flags, bool internalCall) { var isPattern = channel._isPatternBased; + var isSharded = channel._isSharded; var command = action switch { SubscriptionAction.Subscribe when isPattern => RedisCommand.PSUBSCRIBE, SubscriptionAction.Unsubscribe when isPattern => RedisCommand.PUNSUBSCRIBE, - SubscriptionAction.Subscribe when !isPattern => RedisCommand.SUBSCRIBE, - SubscriptionAction.Unsubscribe when !isPattern => RedisCommand.UNSUBSCRIBE, + SubscriptionAction.Subscribe when isSharded => RedisCommand.SSUBSCRIBE, + SubscriptionAction.Unsubscribe when isSharded => RedisCommand.SUNSUBSCRIBE, + + SubscriptionAction.Subscribe when !isPattern && !isSharded => RedisCommand.SUBSCRIBE, + SubscriptionAction.Unsubscribe when !isPattern && !isSharded => RedisCommand.UNSUBSCRIBE, _ => throw new ArgumentOutOfRangeException(nameof(action), "This would be an impressive boolean feat"), }; @@ -370,14 +374,14 @@ private static void ThrowIfNull(in RedisChannel channel) public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { ThrowIfNull(channel); - var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); + var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); return ExecuteSync(msg, ResultProcessor.Int64); } public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { ThrowIfNull(channel); - var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); + var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); return ExecuteAsync(msg, ResultProcessor.Int64); } @@ -515,6 +519,7 @@ private bool UnregisterSubscription(in RedisChannel channel, Action connection.BridgeCouldBeNull?.ServerEndPoint, - _ => null, + RedisCommand.SUBSCRIBE or RedisCommand.SSUBSCRIBE or RedisCommand.PSUBSCRIBE => connection.BridgeCouldBeNull?.ServerEndPoint, + _ => null }; Subscription?.SetCurrentServer(newServer); return true; @@ -1526,7 +1526,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes { case ResultType.Array: var final = result.ToArray( - (in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode), + (in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode, isSharded: false), new ChannelState(connection.ChannelPrefix, mode))!; SetResult(message, final); diff --git a/src/StackExchange.Redis/ServerEndPoint.cs b/src/StackExchange.Redis/ServerEndPoint.cs index 8b099afd2..c9d8414ba 100644 --- a/src/StackExchange.Redis/ServerEndPoint.cs +++ b/src/StackExchange.Redis/ServerEndPoint.cs @@ -260,6 +260,8 @@ public void Dispose() case RedisCommand.UNSUBSCRIBE: case RedisCommand.PSUBSCRIBE: case RedisCommand.PUNSUBSCRIBE: + case RedisCommand.SSUBSCRIBE: + case RedisCommand.SUNSUBSCRIBE: message.SetForSubscriptionBridge(); break; } @@ -278,6 +280,8 @@ public void Dispose() case RedisCommand.UNSUBSCRIBE: case RedisCommand.PSUBSCRIBE: case RedisCommand.PUNSUBSCRIBE: + case RedisCommand.SSUBSCRIBE: + case RedisCommand.SUNSUBSCRIBE: if (!KnowOrAssumeResp3()) { return subscription ?? (create ? subscription = CreateBridge(ConnectionType.Subscription, null) : null); diff --git a/toys/StackExchange.Redis.Server/RedisRequest.cs b/toys/StackExchange.Redis.Server/RedisRequest.cs index 54102815c..283076905 100644 --- a/toys/StackExchange.Redis.Server/RedisRequest.cs +++ b/toys/StackExchange.Redis.Server/RedisRequest.cs @@ -46,7 +46,7 @@ public int GetInt32(int index) public RedisKey GetKey(int index) => _inner[index].AsRedisKey(); public RedisChannel GetChannel(int index, RedisChannel.PatternMode mode) - => _inner[index].AsRedisChannel(null, mode); + => _inner[index].AsRedisChannel(null, mode, false); internal bool TryGetCommandBytes(int i, out CommandBytes command) { From 312dda997be57165b0135bd1daec735971cb0ffd Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Tue, 20 May 2025 10:27:11 +0100 Subject: [PATCH 02/12] Sharded proposal (#2886) * Support sharded pubsub * Support sharded pubsub * fix api * fix enum * fix api * proposed refactor to single-field implementation * RedisDatabase needs to use SPUBLISH/PUBLISH correctly * support ssub on CLIENT response * add to features; fixup shipped etc --------- Co-authored-by: xli --- src/StackExchange.Redis/ClientInfo.cs | 8 ++- src/StackExchange.Redis/PhysicalConnection.cs | 13 ++-- .../PublicAPI/PublicAPI.Shipped.txt | 4 ++ src/StackExchange.Redis/RawResult.cs | 7 ++- src/StackExchange.Redis/RedisChannel.cs | 48 ++++++++------ src/StackExchange.Redis/RedisDatabase.cs | 4 +- src/StackExchange.Redis/RedisFeatures.cs | 5 ++ src/StackExchange.Redis/RedisSubscriber.cs | 35 +++++++---- src/StackExchange.Redis/ResultProcessor.cs | 18 +++--- .../StackExchange.Redis.Tests/ClusterTests.cs | 62 ++++++++++++++++++- .../RedisRequest.cs | 4 +- .../StackExchange.Redis.Server/RedisServer.cs | 2 +- 12 files changed, 154 insertions(+), 56 deletions(-) diff --git a/src/StackExchange.Redis/ClientInfo.cs b/src/StackExchange.Redis/ClientInfo.cs index f04058495..c5ce0d0bf 100644 --- a/src/StackExchange.Redis/ClientInfo.cs +++ b/src/StackExchange.Redis/ClientInfo.cs @@ -129,10 +129,15 @@ public sealed class ClientInfo public string? Name { get; private set; } /// - /// Number of pattern matching subscriptions. + /// Number of pattern-matching subscriptions. /// public int PatternSubscriptionCount { get; private set; } + /// + /// Number of sharded subscriptions. + /// + public int ShardedSubscriptionCount { get; private set; } + /// /// The port of the client. /// @@ -236,6 +241,7 @@ internal static bool TryParse(string? input, [NotNullWhen(true)] out ClientInfo[ case "name": client.Name = value; break; case "sub": client.SubscriptionCount = Format.ParseInt32(value); break; case "psub": client.PatternSubscriptionCount = Format.ParseInt32(value); break; + case "ssub": client.ShardedSubscriptionCount = Format.ParseInt32(value); break; case "multi": client.TransactionCommandLength = Format.ParseInt32(value); break; case "cmd": client.LastCommand = value; break; case "flags": diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index 5e0dbbf60..954738910 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -1670,10 +1670,10 @@ private void MatchResult(in RawResult result) // invoke the handlers RedisChannel channel; if (items[0].IsEqual(message)) { - channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false); + channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None); Trace("MESSAGE: " + channel); } else { - channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: true); + channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded); Trace("SMESSAGE: " + channel); } if (!channel.IsNull) @@ -1696,19 +1696,22 @@ private void MatchResult(in RawResult result) { _readStatus = ReadStatus.PubSubPMessage; - var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal, isSharded: false); + var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern); + Trace("PMESSAGE: " + channel); if (!channel.IsNull) { if (TryGetPubSubPayload(items[3], out var payload)) { - var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false); + var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern); + _readStatus = ReadStatus.InvokePubSub; muxer.OnMessage(sub, channel, payload); } else if (TryGetMultiPubSubPayload(items[3], out var payloads)) { - var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern, isSharded: false); + var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern); + _readStatus = ReadStatus.InvokePubSub; muxer.OnMessage(sub, channel, payloads); } diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index 3b42e204e..a797b641a 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -1896,4 +1896,8 @@ virtual StackExchange.Redis.RedisResult.Length.get -> int virtual StackExchange.Redis.RedisResult.this[int index].get -> StackExchange.Redis.RedisResult! StackExchange.Redis.ConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void StackExchange.Redis.IConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void +StackExchange.Redis.RedisFeatures.ShardedPubSub.get -> bool +static StackExchange.Redis.RedisChannel.Sharded(byte[]? value) -> StackExchange.Redis.RedisChannel +static StackExchange.Redis.RedisChannel.Sharded(string! value) -> StackExchange.Redis.RedisChannel +StackExchange.Redis.ClientInfo.ShardedSubscriptionCount.get -> int StackExchange.Redis.ConfigurationOptions.SetUserPfxCertificate(string! userCertificatePath, string? password = null) -> void diff --git a/src/StackExchange.Redis/RawResult.cs b/src/StackExchange.Redis/RawResult.cs index dd3ce9920..55c44652b 100644 --- a/src/StackExchange.Redis/RawResult.cs +++ b/src/StackExchange.Redis/RawResult.cs @@ -161,7 +161,7 @@ public bool MoveNext() } public ReadOnlySequence Current { get; private set; } } - internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode, bool isSharded) + internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.RedisChannelOptions options) { switch (Resp2TypeBulkString) { @@ -169,12 +169,13 @@ internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.Pattern case ResultType.BulkString: if (channelPrefix == null) { - return isSharded ? new RedisChannel(GetBlob(), true) : new RedisChannel(GetBlob(), mode); + return new RedisChannel(GetBlob(), options); } if (StartsWith(channelPrefix)) { byte[] copy = Payload.Slice(channelPrefix.Length).ToArray(); - return isSharded ? new RedisChannel(copy, true) : new RedisChannel(copy, mode); + + return new RedisChannel(copy, options); } return default; default: diff --git a/src/StackExchange.Redis/RedisChannel.cs b/src/StackExchange.Redis/RedisChannel.cs index 9c0cadbf4..d93651604 100644 --- a/src/StackExchange.Redis/RedisChannel.cs +++ b/src/StackExchange.Redis/RedisChannel.cs @@ -9,8 +9,18 @@ namespace StackExchange.Redis public readonly struct RedisChannel : IEquatable { internal readonly byte[]? Value; - internal readonly bool _isPatternBased; - internal readonly bool _isSharded; + + internal readonly RedisChannelOptions Options; + + [Flags] + internal enum RedisChannelOptions + { + None = 0, + Pattern = 1 << 0, + Sharded = 1 << 1, + } + + internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH; /// /// Indicates whether the channel-name is either null or a zero-length value. @@ -20,7 +30,12 @@ namespace StackExchange.Redis /// /// Indicates whether this channel represents a wildcard pattern (see PSUBSCRIBE). /// - public bool IsPattern => _isPatternBased; + public bool IsPattern => (Options & RedisChannelOptions.Pattern) != 0; + + /// + /// Indicates whether this channel represents a shard channel (see SSUBSCRIBE) + /// + public bool IsSharded => (Options & RedisChannelOptions.Sharded) != 0; /// /// Indicates whether this channel represents a shard channel (see SSUBSCRIBE) @@ -65,34 +80,31 @@ public static bool UseImplicitAutoPattern /// /// The name of the channel to create. /// The mode for name matching. - public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode), false) { } + public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None) { } /// /// Create a new redis channel from a string, explicitly controlling the pattern mode. /// /// The string name of the channel to create. /// The mode for name matching. - public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) { } + public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode) { } /// - /// Create a new redis channel from a buffer, explicitly controlling the sharding mode. + /// Create a new redis channel from a buffer, representing a sharded channel. /// /// The name of the channel to create. - /// Whether the channel is sharded. - public RedisChannel(byte[]? value, bool isSharded) : this(value, false, isSharded) {} + public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded); /// - /// Create a new redis channel from a string, explicitly controlling the sharding mode. + /// Create a new redis channel from a string, representing a sharded channel. /// /// The string name of the channel to create. - /// Whether the channel is sharded. - public RedisChannel(string value, bool isSharded) : this(value == null ? null : Encoding.UTF8.GetBytes(value), isSharded) {} + public static RedisChannel Sharded(string value) => new(value is null ? null : Encoding.UTF8.GetBytes(value), RedisChannelOptions.Sharded); - private RedisChannel(byte[]? value, bool isPatternBased, bool isSharded) + internal RedisChannel(byte[]? value, RedisChannelOptions options) { Value = value; - _isPatternBased = isPatternBased; - _isSharded = isSharded; + Options = options; } private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch @@ -144,7 +156,7 @@ private RedisChannel(byte[]? value, bool isPatternBased, bool isSharded) /// The first to compare. /// The second to compare. public static bool operator ==(RedisChannel x, RedisChannel y) => - x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value) && x._isSharded == y._isSharded; + x.Options == y.Options && RedisValue.Equals(x.Value, y.Value); /// /// Indicate whether two channel names are equal. @@ -192,10 +204,10 @@ private RedisChannel(byte[]? value, bool isPatternBased, bool isSharded) /// Indicate whether two channel names are equal. /// /// The to compare to. - public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value) && _isSharded == other._isSharded; + public bool Equals(RedisChannel other) => Options == other.Options && RedisValue.Equals(Value, other.Value); /// - public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0) + (_isSharded ? 2 : 0); + public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)Options; /// /// Obtains a string representation of the channel name. @@ -224,7 +236,7 @@ internal RedisChannel Clone() return this; } var copy = (byte[])Value.Clone(); // defensive array copy - return new RedisChannel(copy, _isPatternBased); + return new RedisChannel(copy, Options); } /// diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index 7468bdb64..716176662 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -1575,14 +1575,14 @@ public Task StringLongestCommonSubsequenceWithMatchesAsync(Redis public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel)); - var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); + var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); return ExecuteSync(msg, ResultProcessor.Int64); } public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel)); - var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); + var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); return ExecuteAsync(msg, ResultProcessor.Int64); } diff --git a/src/StackExchange.Redis/RedisFeatures.cs b/src/StackExchange.Redis/RedisFeatures.cs index 225516433..faba07e68 100644 --- a/src/StackExchange.Redis/RedisFeatures.cs +++ b/src/StackExchange.Redis/RedisFeatures.cs @@ -186,6 +186,11 @@ public RedisFeatures(Version version) /// public bool SetVaradicAddRemove => Version.IsAtLeast(v2_4_0); + /// + /// Are SSUBSCRIBE and SPUBLISH available? + /// + public bool ShardedPubSub => Version.IsAtLeast(v7_0_0_rc1); + /// /// Are ZPOPMIN and ZPOPMAX available? /// diff --git a/src/StackExchange.Redis/RedisSubscriber.cs b/src/StackExchange.Redis/RedisSubscriber.cs index 2b2076e03..b641baf05 100644 --- a/src/StackExchange.Redis/RedisSubscriber.cs +++ b/src/StackExchange.Redis/RedisSubscriber.cs @@ -182,19 +182,25 @@ public Subscription(CommandFlags flags) /// internal Message GetMessage(RedisChannel channel, SubscriptionAction action, CommandFlags flags, bool internalCall) { - var isPattern = channel._isPatternBased; - var isSharded = channel._isSharded; + var isPattern = channel.IsPattern; + var isSharded = channel.IsSharded; var command = action switch { - SubscriptionAction.Subscribe when isPattern => RedisCommand.PSUBSCRIBE, - SubscriptionAction.Unsubscribe when isPattern => RedisCommand.PUNSUBSCRIBE, - - SubscriptionAction.Subscribe when isSharded => RedisCommand.SSUBSCRIBE, - SubscriptionAction.Unsubscribe when isSharded => RedisCommand.SUNSUBSCRIBE, - - SubscriptionAction.Subscribe when !isPattern && !isSharded => RedisCommand.SUBSCRIBE, - SubscriptionAction.Unsubscribe when !isPattern && !isSharded => RedisCommand.UNSUBSCRIBE, - _ => throw new ArgumentOutOfRangeException(nameof(action), "This would be an impressive boolean feat"), + SubscriptionAction.Subscribe => channel.Options switch + { + RedisChannel.RedisChannelOptions.None => RedisCommand.SUBSCRIBE, + RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PSUBSCRIBE, + RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SSUBSCRIBE, + _ => Unknown(action, channel.Options), + }, + SubscriptionAction.Unsubscribe => channel.Options switch + { + RedisChannel.RedisChannelOptions.None => RedisCommand.UNSUBSCRIBE, + RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PUNSUBSCRIBE, + RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SUNSUBSCRIBE, + _ => Unknown(action, channel.Options), + }, + _ => Unknown(action, channel.Options), }; // TODO: Consider flags here - we need to pass Fire and Forget, but don't want to intermingle Primary/Replica @@ -207,6 +213,9 @@ internal Message GetMessage(RedisChannel channel, SubscriptionAction action, Com return msg; } + private RedisCommand Unknown(SubscriptionAction action, RedisChannel.RedisChannelOptions options) + => throw new ArgumentException($"Unable to determine pub/sub operation for '{action}' against '{options}'"); + public void Add(Action? handler, ChannelMessageQueue? queue) { if (handler != null) @@ -374,14 +383,14 @@ private static void ThrowIfNull(in RedisChannel channel) public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { ThrowIfNull(channel); - var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); + var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); return ExecuteSync(msg, ResultProcessor.Int64); } public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { ThrowIfNull(channel); - var msg = channel.IsSharded ? Message.Create(-1, flags, RedisCommand.SPUBLISH, channel, message) : Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message); + var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); return ExecuteAsync(msg, ResultProcessor.Int64); } diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index 88e4a00ef..15619a4bb 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -73,7 +73,7 @@ public static readonly ResultProcessor public static readonly ResultProcessor PersistResultArray = new PersistResultArrayProcessor(); public static readonly ResultProcessor - RedisChannelArrayLiteral = new RedisChannelArrayProcessor(RedisChannel.PatternMode.Literal); + RedisChannelArrayLiteral = new RedisChannelArrayProcessor(RedisChannel.RedisChannelOptions.None); public static readonly ResultProcessor RedisKey = new RedisKeyProcessor(); @@ -1504,20 +1504,20 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes private sealed class RedisChannelArrayProcessor : ResultProcessor { - private readonly RedisChannel.PatternMode mode; - public RedisChannelArrayProcessor(RedisChannel.PatternMode mode) + private readonly RedisChannel.RedisChannelOptions options; + public RedisChannelArrayProcessor(RedisChannel.RedisChannelOptions options) { - this.mode = mode; + this.options = options; } private readonly struct ChannelState // I would use a value-tuple here, but that is binding hell { public readonly byte[]? Prefix; - public readonly RedisChannel.PatternMode Mode; - public ChannelState(byte[]? prefix, RedisChannel.PatternMode mode) + public readonly RedisChannel.RedisChannelOptions Options; + public ChannelState(byte[]? prefix, RedisChannel.RedisChannelOptions options) { Prefix = prefix; - Mode = mode; + Options = options; } } protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) @@ -1526,8 +1526,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes { case ResultType.Array: var final = result.ToArray( - (in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode, isSharded: false), - new ChannelState(connection.ChannelPrefix, mode))!; + (in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Options), + new ChannelState(connection.ChannelPrefix, options))!; SetResult(message, final); return true; diff --git a/tests/StackExchange.Redis.Tests/ClusterTests.cs b/tests/StackExchange.Redis.Tests/ClusterTests.cs index 742ce51bb..34890a89a 100644 --- a/tests/StackExchange.Redis.Tests/ClusterTests.cs +++ b/tests/StackExchange.Redis.Tests/ClusterTests.cs @@ -1,11 +1,11 @@ -using System; +using StackExchange.Redis.Profiling; +using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; -using StackExchange.Redis.Profiling; using Xunit; using Xunit.Abstractions; @@ -746,4 +746,62 @@ public void ConnectIncludesSubscriber() Assert.Equal(PhysicalBridge.State.ConnectedEstablished, server.SubscriptionConnectionState); } } + + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task ClusterPubSub(bool sharded) + { + var guid = Guid.NewGuid().ToString(); + var channel = sharded ? RedisChannel.Sharded(guid) : RedisChannel.Literal(guid); + using var conn = Create(keepAlive: 1, connectTimeout: 3000, shared: false); + Assert.True(conn.IsConnected); + + var pubsub = conn.GetSubscriber(); + List<(RedisChannel, RedisValue)> received = new(); + var queue = await pubsub.SubscribeAsync(channel); + _ = Task.Run(async () => + { // use queue API to have control over order + await foreach (var item in queue) + { + lock (received) + { + received.Add((item.Channel, item.Message)); + } + } + }); + + var db = conn.GetDatabase(); + await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) + await db.PingAsync(); + for (int i = 0; i < 10; i++) + { + // check we get a hit + Assert.Equal(1, await db.PublishAsync(channel, i.ToString())); + } + await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) + await db.PingAsync(); + await pubsub.UnsubscribeAsync(channel); + + (RedisChannel Channel, RedisValue Value)[] snap; + lock (received) + { + snap = received.ToArray(); // in case of concurrency + } + Log("items received: {0}", snap.Length); + Assert.Equal(10, snap.Length); + // separate log and validate loop here simplifies debugging (ask me how I know!) + for (int i = 0; i < 10; i++) + { + var pair = snap[i]; + Log("element {0}: {1}/{2}", i, pair.Channel, pair.Value); + } + for (int i = 0; i < 10; i++) + { + var pair = snap[i]; + Assert.Equal(channel, pair.Channel); + Assert.Equal(i, pair.Value); + } + } } diff --git a/toys/StackExchange.Redis.Server/RedisRequest.cs b/toys/StackExchange.Redis.Server/RedisRequest.cs index 283076905..36d133bab 100644 --- a/toys/StackExchange.Redis.Server/RedisRequest.cs +++ b/toys/StackExchange.Redis.Server/RedisRequest.cs @@ -45,8 +45,8 @@ public int GetInt32(int index) public RedisKey GetKey(int index) => _inner[index].AsRedisKey(); - public RedisChannel GetChannel(int index, RedisChannel.PatternMode mode) - => _inner[index].AsRedisChannel(null, mode, false); + internal RedisChannel GetChannel(int index, RedisChannel.RedisChannelOptions options) + => _inner[index].AsRedisChannel(null, options); internal bool TryGetCommandBytes(int i, out CommandBytes command) { diff --git a/toys/StackExchange.Redis.Server/RedisServer.cs b/toys/StackExchange.Redis.Server/RedisServer.cs index 63efbfd1b..52728fd44 100644 --- a/toys/StackExchange.Redis.Server/RedisServer.cs +++ b/toys/StackExchange.Redis.Server/RedisServer.cs @@ -479,7 +479,7 @@ private TypedRedisValue SubscribeImpl(RedisClient client, RedisRequest request) int index = 0; request.TryGetCommandBytes(0, out var cmd); var cmdString = TypedRedisValue.BulkString(cmd.ToArray()); - var mode = cmd[0] == (byte)'p' ? RedisChannel.PatternMode.Pattern : RedisChannel.PatternMode.Literal; + var mode = cmd[0] == (byte)'p' ? RedisChannel.RedisChannelOptions.Pattern : RedisChannel.RedisChannelOptions.None; for (int i = 1; i < request.Count; i++) { var channel = request.GetChannel(i, mode); From 98714d48caddce8e32c3deb483f8474b9416e7cc Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Tue, 20 May 2025 10:34:35 +0100 Subject: [PATCH 03/12] fix initial build errors --- src/StackExchange.Redis/CommandMap.cs | 4 +-- src/StackExchange.Redis/PhysicalConnection.cs | 9 ++++-- .../PublicAPI/PublicAPI.Shipped.txt | 2 -- src/StackExchange.Redis/RedisChannel.cs | 19 ++++++------ src/StackExchange.Redis/ResultProcessor.cs | 30 +++++++++---------- .../StackExchange.Redis.Tests/ClusterTests.cs | 12 ++++---- 6 files changed, 39 insertions(+), 37 deletions(-) diff --git a/src/StackExchange.Redis/CommandMap.cs b/src/StackExchange.Redis/CommandMap.cs index 02baf3801..31974cab9 100644 --- a/src/StackExchange.Redis/CommandMap.cs +++ b/src/StackExchange.Redis/CommandMap.cs @@ -31,7 +31,7 @@ public sealed class CommandMap RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither! - RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE, + RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE, RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH, @@ -57,7 +57,7 @@ public sealed class CommandMap RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither! - RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE, + RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE, RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH, diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index 954738910..d9f03c4de 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -384,7 +384,7 @@ public void RecordConnectionFailed( bool isInitialConnect = false, IDuplexPipe? connectingPipe = null) { - bool weAskedForThis = false; + bool weAskedForThis; Exception? outerException = innerException; IdentifyFailureType(innerException, ref failureType); var bridge = BridgeCouldBeNull; @@ -1669,10 +1669,13 @@ private void MatchResult(in RawResult result) // invoke the handlers RedisChannel channel; - if (items[0].IsEqual(message)) { + if (items[0].IsEqual(message)) + { channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None); Trace("MESSAGE: " + channel); - } else { + } + else + { channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded); Trace("SMESSAGE: " + channel); } diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index a797b641a..683332070 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -1315,9 +1315,7 @@ StackExchange.Redis.RedisChannel.PatternMode.Auto = 0 -> StackExchange.Redis.Red StackExchange.Redis.RedisChannel.PatternMode.Literal = 1 -> StackExchange.Redis.RedisChannel.PatternMode StackExchange.Redis.RedisChannel.PatternMode.Pattern = 2 -> StackExchange.Redis.RedisChannel.PatternMode StackExchange.Redis.RedisChannel.RedisChannel() -> void -StackExchange.Redis.RedisChannel.RedisChannel(byte[]? value, bool isSharded) -> void StackExchange.Redis.RedisChannel.RedisChannel(byte[]? value, StackExchange.Redis.RedisChannel.PatternMode mode) -> void -StackExchange.Redis.RedisChannel.RedisChannel(string! value, bool isSharded) -> void StackExchange.Redis.RedisChannel.RedisChannel(string! value, StackExchange.Redis.RedisChannel.PatternMode mode) -> void StackExchange.Redis.RedisCommandException StackExchange.Redis.RedisCommandException.RedisCommandException(string! message) -> void diff --git a/src/StackExchange.Redis/RedisChannel.cs b/src/StackExchange.Redis/RedisChannel.cs index d93651604..f6debd1eb 100644 --- a/src/StackExchange.Redis/RedisChannel.cs +++ b/src/StackExchange.Redis/RedisChannel.cs @@ -9,7 +9,7 @@ namespace StackExchange.Redis public readonly struct RedisChannel : IEquatable { internal readonly byte[]? Value; - + internal readonly RedisChannelOptions Options; [Flags] @@ -33,15 +33,10 @@ internal enum RedisChannelOptions public bool IsPattern => (Options & RedisChannelOptions.Pattern) != 0; /// - /// Indicates whether this channel represents a shard channel (see SSUBSCRIBE) + /// Indicates whether this channel represents a shard channel (see SSUBSCRIBE). /// public bool IsSharded => (Options & RedisChannelOptions.Sharded) != 0; - /// - /// Indicates whether this channel represents a shard channel (see SSUBSCRIBE) - /// - public bool IsSharded => _isSharded; - internal bool IsNull => Value == null; /// @@ -80,14 +75,18 @@ public static bool UseImplicitAutoPattern /// /// The name of the channel to create. /// The mode for name matching. - public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None) { } + public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None) + { + } /// /// Create a new redis channel from a string, explicitly controlling the pattern mode. /// /// The string name of the channel to create. /// The mode for name matching. - public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode) { } + public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode) + { + } /// /// Create a new redis channel from a buffer, representing a sharded channel. @@ -319,4 +318,4 @@ public static implicit operator RedisChannel(byte[]? key) private RedisChannel(byte[]? value) => throw new NotSupportedException(); #endif } -} \ No newline at end of file +} diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index 15619a4bb..47974b278 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -354,8 +354,7 @@ public bool TryParse(in RawResult result, out TimeSpan? expiry) switch (result.Resp2TypeBulkString) { case ResultType.Integer: - long time; - if (result.TryGetInt64(out time)) + if (result.TryGetInt64(out long time)) { if (time < 0) { @@ -470,7 +469,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes var newServer = message.Command switch { RedisCommand.SUBSCRIBE or RedisCommand.SSUBSCRIBE or RedisCommand.PSUBSCRIBE => connection.BridgeCouldBeNull?.ServerEndPoint, - _ => null + _ => null, }; Subscription?.SetCurrentServer(newServer); return true; @@ -1253,8 +1252,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes switch (result.Resp2TypeBulkString) { case ResultType.Integer: - long i64; - if (result.TryGetInt64(out i64)) + if (result.TryGetInt64(out long i64)) { SetResult(message, i64); return true; @@ -1262,8 +1260,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes break; case ResultType.SimpleString: case ResultType.BulkString: - double val; - if (result.TryGetDouble(out val)) + if (result.TryGetDouble(out double val)) { SetResult(message, val); return true; @@ -1366,8 +1363,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes case ResultType.Integer: case ResultType.SimpleString: case ResultType.BulkString: - long i64; - if (result.TryGetInt64(out i64)) + if (result.TryGetInt64(out long i64)) { SetResult(message, i64); return true; @@ -1423,8 +1419,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes SetResult(message, null); return true; } - double val; - if (result.TryGetDouble(out val)) + if (result.TryGetDouble(out double val)) { SetResult(message, val); return true; @@ -1449,8 +1444,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes SetResult(message, null); return true; } - long i64; - if (result.TryGetInt64(out i64)) + if (result.TryGetInt64(out long i64)) { SetResult(message, i64); return true; @@ -2167,7 +2161,10 @@ private sealed class RedisStreamInterleavedProcessor : ValuePairInterleavedProce protected override bool AllowJaggedPairs => false; // we only use this on a flattened map public static readonly RedisStreamInterleavedProcessor Instance = new(); - private RedisStreamInterleavedProcessor() { } + private RedisStreamInterleavedProcessor() + { + } + protected override RedisStream Parse(in RawResult first, in RawResult second, object? state) => new(key: first.AsRedisKey(), entries: ((MultiStreamProcessor)state!).ParseRedisStreamEntries(second)); } @@ -2549,7 +2546,10 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes internal class StreamNameValueEntryProcessor : ValuePairInterleavedProcessorBase { public static readonly StreamNameValueEntryProcessor Instance = new(); - private StreamNameValueEntryProcessor() { } + private StreamNameValueEntryProcessor() + { + } + protected override NameValueEntry Parse(in RawResult first, in RawResult second, object? state) => new NameValueEntry(first.AsRedisValue(), second.AsRedisValue()); } diff --git a/tests/StackExchange.Redis.Tests/ClusterTests.cs b/tests/StackExchange.Redis.Tests/ClusterTests.cs index 34890a89a..a51f91c05 100644 --- a/tests/StackExchange.Redis.Tests/ClusterTests.cs +++ b/tests/StackExchange.Redis.Tests/ClusterTests.cs @@ -1,11 +1,11 @@ -using StackExchange.Redis.Profiling; -using System; +using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; +using StackExchange.Redis.Profiling; using Xunit; using Xunit.Abstractions; @@ -15,7 +15,9 @@ namespace StackExchange.Redis.Tests; [Collection(SharedConnectionFixture.Key)] public class ClusterTests : TestBase { - public ClusterTests(ITestOutputHelper output, SharedConnectionFixture fixture) : base(output, fixture) { } + public ClusterTests(ITestOutputHelper output, SharedConnectionFixture fixture) : base(output, fixture) + { + } protected override string GetConfiguration() => TestConfig.Current.ClusterServersAndPorts + ",connectTimeout=10000"; @@ -747,7 +749,6 @@ public void ConnectIncludesSubscriber() } } - [Theory] [InlineData(true)] [InlineData(false)] @@ -762,7 +763,8 @@ public async Task ClusterPubSub(bool sharded) List<(RedisChannel, RedisValue)> received = new(); var queue = await pubsub.SubscribeAsync(channel); _ = Task.Run(async () => - { // use queue API to have control over order + { + // use queue API to have control over order await foreach (var item in queue) { lock (received) From af32c94e8082f265a4592aa608425e4ce972842b Mon Sep 17 00:00:00 2001 From: atakavci Date: Fri, 30 May 2025 16:05:47 +0300 Subject: [PATCH 04/12] fix sharded pubsub reconnects --- src/StackExchange.Redis/ServerEndPoint.cs | 1 + .../StackExchange.Redis.Tests/ClusterTests.cs | 54 +++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/src/StackExchange.Redis/ServerEndPoint.cs b/src/StackExchange.Redis/ServerEndPoint.cs index c9d8414ba..1e053fb56 100644 --- a/src/StackExchange.Redis/ServerEndPoint.cs +++ b/src/StackExchange.Redis/ServerEndPoint.cs @@ -636,6 +636,7 @@ internal void OnDisconnected(PhysicalBridge bridge) if (bridge == interactive) { CompletePendingConnectionMonitors("Disconnected"); + if (KnowOrAssumeResp3()) Multiplexer.UpdateSubscriptions(); } else if (bridge == subscription) { diff --git a/tests/StackExchange.Redis.Tests/ClusterTests.cs b/tests/StackExchange.Redis.Tests/ClusterTests.cs index a51f91c05..55e134268 100644 --- a/tests/StackExchange.Redis.Tests/ClusterTests.cs +++ b/tests/StackExchange.Redis.Tests/ClusterTests.cs @@ -749,6 +749,60 @@ public void ConnectIncludesSubscriber() } } + [Fact] + public async Task TestSubscriberReconnected() + { + var channel = RedisChannel.Sharded("testShardChannel"); + using var conn = Create(allowAdmin: true, keepAlive: 1, connectTimeout: 3000, shared: false); + Assert.True(conn.IsConnected); + var db = conn.GetDatabase(); + Assert.Equal(0, await db.PublishAsync(channel, "noClientReceivesThis")); + await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) + + var pubsub = conn.GetSubscriber(); + List<(RedisChannel, RedisValue)> received = new(); + var queue = await pubsub.SubscribeAsync(channel); + _ = Task.Run(async () => + { + // use queue API to have control over order + await foreach (var item in queue) + { + lock (received) + { + if (item.Channel.IsSharded && item.Channel == channel) received.Add((item.Channel, item.Message)); + } + } + }); + Assert.Equal(1, conn.GetSubscriptionsCount()); + + await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) + await db.PingAsync(); + + for (int i = 0; i < 5; i++) + { + // check we get a hit + Assert.Equal(1, await db.PublishAsync(channel, i.ToString())); + } + await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) + + // this is endpoint at index 1 which has the hashslot for "testShardChannel" + var server = conn.GetServer(conn.GetEndPoints()[1]); + server.SimulateConnectionFailure(SimulatedFailureType.All); + SetExpectedAmbientFailureCount(2); + + await Task.Delay(4000); + for (int i = 0; i < 5; i++) + { + // check we get a hit + Assert.Equal(1, await db.PublishAsync(channel, i.ToString())); + } + await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) + + Assert.Equal(1, conn.GetSubscriptionsCount()); + Assert.Equal(10, received.Count); + ClearAmbientFailures(); + } + [Theory] [InlineData(true)] [InlineData(false)] From c05fdf65e573bda45b8324fcbc7e76a3bb2798c1 Mon Sep 17 00:00:00 2001 From: atakavci Date: Wed, 4 Jun 2025 12:41:07 +0300 Subject: [PATCH 05/12] test hashslot migration with subscription to sharded pubsub --- .../StackExchange.Redis.Tests/ClusterTests.cs | 114 +++++++++++++++++- 1 file changed, 113 insertions(+), 1 deletion(-) diff --git a/tests/StackExchange.Redis.Tests/ClusterTests.cs b/tests/StackExchange.Redis.Tests/ClusterTests.cs index 55e134268..9820ab5b3 100644 --- a/tests/StackExchange.Redis.Tests/ClusterTests.cs +++ b/tests/StackExchange.Redis.Tests/ClusterTests.cs @@ -750,7 +750,7 @@ public void ConnectIncludesSubscriber() } [Fact] - public async Task TestSubscriberReconnected() + public async Task TestShardedPubsubSubscriberAgainstReconnects() { var channel = RedisChannel.Sharded("testShardChannel"); using var conn = Create(allowAdmin: true, keepAlive: 1, connectTimeout: 3000, shared: false); @@ -803,6 +803,118 @@ public async Task TestSubscriberReconnected() ClearAmbientFailures(); } + [Fact] + public async Task TestShardedPubsubSubscriberAgainsHashSlotMigration() + { + var channel = RedisChannel.Sharded("testShardChannel"); + using var conn = Create(allowAdmin: true, keepAlive: 1, connectTimeout: 3000, shared: false); + Assert.True(conn.IsConnected); + var db = conn.GetDatabase(); + Assert.Equal(0, await db.PublishAsync(channel, "noClientReceivesThis")); + await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) + + var pubsub = conn.GetSubscriber(); + List<(RedisChannel, RedisValue)> received = new(); + var queue = await pubsub.SubscribeAsync(channel); + _ = Task.Run(async () => + { + // use queue API to have control over order + await foreach (var item in queue) + { + lock (received) + { + if (item.Channel.IsSharded && item.Channel == channel) received.Add((item.Channel, item.Message)); + } + } + }); + Assert.Equal(1, conn.GetSubscriptionsCount()); + + await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) + await db.PingAsync(); + + for (int i = 0; i < 5; i++) + { + // check we get a hit + Assert.Equal(1, await db.PublishAsync(channel, i.ToString())); + } + await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) + + // lets migrate the slot for "testShardChannel" to another node + DoHashSlotMigration(); + + await Task.Delay(4000); + for (int i = 0; i < 5; i++) + { + // check we get a hit + Assert.Equal(1, await db.PublishAsync(channel, i.ToString())); + } + await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) + + Assert.Equal(1, conn.GetSubscriptionsCount()); + Assert.Equal(10, received.Count); + RollbackHashSlotMigration(); + ClearAmbientFailures(); + } + + private void DoHashSlotMigration() + { + MigrateSlotForTestShardChannel(false); + } + private void RollbackHashSlotMigration() + { + MigrateSlotForTestShardChannel(true); + } + + private void MigrateSlotForTestShardChannel(bool rollback) + { + int hashSlotForTestShardChannel = 7177; + using var conn = Create(allowAdmin: true, keepAlive: 1, connectTimeout: 5000, shared: false); + var servers = conn.GetServers(); + IServer? serverWithPort7000 = null; + IServer? serverWithPort7001 = null; + + string nodeIdForPort7000 = "780813af558af81518e58e495d63b6e248e80adf"; + string nodeIdForPort7001 = "ea828c6074663c8bd4e705d3e3024d9d1721ef3b"; + foreach (var server in servers) + { + string id = server.Execute("CLUSTER", "MYID").ToString(); + if (id == nodeIdForPort7000) + { + serverWithPort7000 = server; + } + if (id == nodeIdForPort7001) + { + serverWithPort7001 = server; + } + } + + IServer fromServer, toServer; + string fromNode, toNode; + int toPort; + if (rollback) + { + fromServer = serverWithPort7000!; + fromNode = nodeIdForPort7000; + toServer = serverWithPort7001!; + toNode = nodeIdForPort7001; + toPort = 7001; + } + else + { + fromServer = serverWithPort7001!; + fromNode = nodeIdForPort7001; + toServer = serverWithPort7000!; + toNode = nodeIdForPort7000; + toPort = 7000; + } + + Assert.Equal("OK", toServer.Execute("CLUSTER", "SETSLOT", hashSlotForTestShardChannel, "IMPORTING", fromNode).ToString()); + Assert.Equal("OK", fromServer.Execute("CLUSTER", "SETSLOT", hashSlotForTestShardChannel, "MIGRATING", toNode).ToString()); + Assert.Equal("OK", fromServer.Execute("MIGRATE", "127.0.0.1", toPort, "", 0, 5000, "KEYS", "testShardChannel").ToString()); + Assert.Equal("OK", toServer.Execute("CLUSTER", "SETSLOT", hashSlotForTestShardChannel, "NODE", toNode).ToString()); + Assert.Equal("OK", fromServer!.Execute("CLUSTER", "SETSLOT", hashSlotForTestShardChannel, "NODE", toNode).ToString()); + } + [Theory] [InlineData(true)] [InlineData(false)] From 8403aed945014379a22b2474c651c075e2efbdf1 Mon Sep 17 00:00:00 2001 From: atakavci Date: Wed, 4 Jun 2025 12:51:43 +0300 Subject: [PATCH 06/12] fix test missing migration key --- tests/StackExchange.Redis.Tests/ClusterTests.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/StackExchange.Redis.Tests/ClusterTests.cs b/tests/StackExchange.Redis.Tests/ClusterTests.cs index 9820ab5b3..c368ef9a6 100644 --- a/tests/StackExchange.Redis.Tests/ClusterTests.cs +++ b/tests/StackExchange.Redis.Tests/ClusterTests.cs @@ -810,6 +810,7 @@ public async Task TestShardedPubsubSubscriberAgainsHashSlotMigration() using var conn = Create(allowAdmin: true, keepAlive: 1, connectTimeout: 3000, shared: false); Assert.True(conn.IsConnected); var db = conn.GetDatabase(); + db.StringSet("testShardChannel", "testValue"); Assert.Equal(0, await db.PublishAsync(channel, "noClientReceivesThis")); await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) From 609ee9675628e93e9e8c2ed05ca573e318d8a18b Mon Sep 17 00:00:00 2001 From: atakavci Date: Wed, 4 Jun 2025 16:26:58 +0300 Subject: [PATCH 07/12] clean up --- tests/StackExchange.Redis.Tests/ClusterTests.cs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/StackExchange.Redis.Tests/ClusterTests.cs b/tests/StackExchange.Redis.Tests/ClusterTests.cs index c368ef9a6..7ed59a5aa 100644 --- a/tests/StackExchange.Redis.Tests/ClusterTests.cs +++ b/tests/StackExchange.Redis.Tests/ClusterTests.cs @@ -810,7 +810,6 @@ public async Task TestShardedPubsubSubscriberAgainsHashSlotMigration() using var conn = Create(allowAdmin: true, keepAlive: 1, connectTimeout: 3000, shared: false); Assert.True(conn.IsConnected); var db = conn.GetDatabase(); - db.StringSet("testShardChannel", "testValue"); Assert.Equal(0, await db.PublishAsync(channel, "noClientReceivesThis")); await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note) @@ -891,14 +890,12 @@ private void MigrateSlotForTestShardChannel(bool rollback) IServer fromServer, toServer; string fromNode, toNode; - int toPort; if (rollback) { fromServer = serverWithPort7000!; fromNode = nodeIdForPort7000; toServer = serverWithPort7001!; toNode = nodeIdForPort7001; - toPort = 7001; } else { @@ -906,12 +903,10 @@ private void MigrateSlotForTestShardChannel(bool rollback) fromNode = nodeIdForPort7001; toServer = serverWithPort7000!; toNode = nodeIdForPort7000; - toPort = 7000; } Assert.Equal("OK", toServer.Execute("CLUSTER", "SETSLOT", hashSlotForTestShardChannel, "IMPORTING", fromNode).ToString()); Assert.Equal("OK", fromServer.Execute("CLUSTER", "SETSLOT", hashSlotForTestShardChannel, "MIGRATING", toNode).ToString()); - Assert.Equal("OK", fromServer.Execute("MIGRATE", "127.0.0.1", toPort, "", 0, 5000, "KEYS", "testShardChannel").ToString()); Assert.Equal("OK", toServer.Execute("CLUSTER", "SETSLOT", hashSlotForTestShardChannel, "NODE", toNode).ToString()); Assert.Equal("OK", fromServer!.Execute("CLUSTER", "SETSLOT", hashSlotForTestShardChannel, "NODE", toNode).ToString()); } From e9523c3ca41880fbd10492d31b051042ec088bae Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Mon, 9 Jun 2025 15:40:40 +0100 Subject: [PATCH 08/12] make RESP3 detection more accurate "what was" rather than "what might have been" --- src/StackExchange.Redis/PhysicalBridge.cs | 7 +++++-- src/StackExchange.Redis/PhysicalConnection.cs | 6 +++++- src/StackExchange.Redis/ServerEndPoint.cs | 5 ++++- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/StackExchange.Redis/PhysicalBridge.cs b/src/StackExchange.Redis/PhysicalBridge.cs index a10b241cb..f5d75c188 100644 --- a/src/StackExchange.Redis/PhysicalBridge.cs +++ b/src/StackExchange.Redis/PhysicalBridge.cs @@ -124,9 +124,12 @@ public enum State : byte public RedisCommand LastCommand { get; private set; } /// - /// If we have a connection, report the protocol being used. + /// If we have (or had) a connection, report the protocol being used. /// - public RedisProtocol? Protocol => physical?.Protocol; + /// The value remains after disconnect, so that appropriate follow-up actions (pub/sub etc) can work reliably. + public RedisProtocol? Protocol => _protocol == 0 ? default(RedisProtocol?) : _protocol; + private RedisProtocol _protocol; // note starts at zero, not RESP2 + internal void SetProtocol(RedisProtocol protocol) => _protocol = protocol; public void Dispose() { diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index d9f03c4de..4e5675d68 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -276,7 +276,11 @@ private enum ReadMode : byte private RedisProtocol _protocol; // note starts at **zero**, not RESP2 public RedisProtocol? Protocol => _protocol == 0 ? null : _protocol; - internal void SetProtocol(RedisProtocol value) => _protocol = value; + internal void SetProtocol(RedisProtocol value) + { + _protocol = value; + BridgeCouldBeNull?.SetProtocol(value); + } [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times", Justification = "Trust me yo")] internal void Shutdown() diff --git a/src/StackExchange.Redis/ServerEndPoint.cs b/src/StackExchange.Redis/ServerEndPoint.cs index 1e053fb56..55fe6cff3 100644 --- a/src/StackExchange.Redis/ServerEndPoint.cs +++ b/src/StackExchange.Redis/ServerEndPoint.cs @@ -636,7 +636,10 @@ internal void OnDisconnected(PhysicalBridge bridge) if (bridge == interactive) { CompletePendingConnectionMonitors("Disconnected"); - if (KnowOrAssumeResp3()) Multiplexer.UpdateSubscriptions(); + if (Protocol is RedisProtocol.Resp3) + { + Multiplexer.UpdateSubscriptions(); + } } else if (bridge == subscription) { From 417d72cecae5840668ad09d8cc5755c93495e0ce Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Mon, 9 Jun 2025 16:09:27 +0100 Subject: [PATCH 09/12] guard sharded pub/sub tests to v7 --- tests/StackExchange.Redis.Tests/ClusterTests.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/StackExchange.Redis.Tests/ClusterTests.cs b/tests/StackExchange.Redis.Tests/ClusterTests.cs index 7ed59a5aa..fc8251ad0 100644 --- a/tests/StackExchange.Redis.Tests/ClusterTests.cs +++ b/tests/StackExchange.Redis.Tests/ClusterTests.cs @@ -753,7 +753,7 @@ public void ConnectIncludesSubscriber() public async Task TestShardedPubsubSubscriberAgainstReconnects() { var channel = RedisChannel.Sharded("testShardChannel"); - using var conn = Create(allowAdmin: true, keepAlive: 1, connectTimeout: 3000, shared: false); + using var conn = Create(allowAdmin: true, keepAlive: 1, connectTimeout: 3000, shared: false, require: RedisFeatures.v7_0_0_rc1); Assert.True(conn.IsConnected); var db = conn.GetDatabase(); Assert.Equal(0, await db.PublishAsync(channel, "noClientReceivesThis")); @@ -807,7 +807,7 @@ public async Task TestShardedPubsubSubscriberAgainstReconnects() public async Task TestShardedPubsubSubscriberAgainsHashSlotMigration() { var channel = RedisChannel.Sharded("testShardChannel"); - using var conn = Create(allowAdmin: true, keepAlive: 1, connectTimeout: 3000, shared: false); + using var conn = Create(allowAdmin: true, keepAlive: 1, connectTimeout: 3000, shared: false, require: RedisFeatures.v7_0_0_rc1); Assert.True(conn.IsConnected); var db = conn.GetDatabase(); Assert.Equal(0, await db.PublishAsync(channel, "noClientReceivesThis")); @@ -918,7 +918,7 @@ public async Task ClusterPubSub(bool sharded) { var guid = Guid.NewGuid().ToString(); var channel = sharded ? RedisChannel.Sharded(guid) : RedisChannel.Literal(guid); - using var conn = Create(keepAlive: 1, connectTimeout: 3000, shared: false); + using var conn = Create(keepAlive: 1, connectTimeout: 3000, shared: false, require: sharded ? RedisFeatures.v7_0_0_rc1 : RedisFeatures.v2_0_0); Assert.True(conn.IsConnected); var pubsub = conn.GetSubscriber(); From 3f6a898e0438f097e839715e1a26146d45c07117 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Mon, 9 Jun 2025 16:13:06 +0100 Subject: [PATCH 10/12] release notes --- docs/ReleaseNotes.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 264c0d52b..5eb4aae41 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -9,6 +9,8 @@ Current package versions: ## Unreleased No pending unreleased changes +- Add support for sharded pub/sub via `RedisChannel.Sharded` - ([#2887 by vandyvilla, atakavci and mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2887)) + ## 2.8.37 - Add `ConfigurationOptions.SetUserPemCertificate(...)` and `ConfigurationOptions.SetUserPfxCertificate(...)` methods to simplify using client certificates ([#2873 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2873)) From 339c769b120ed3bdc4a1a5fe1b9c6ce12859bfe3 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Tue, 10 Jun 2025 15:40:07 +0100 Subject: [PATCH 11/12] use Me for channel tests --- tests/StackExchange.Redis.Tests/ClusterTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/StackExchange.Redis.Tests/ClusterTests.cs b/tests/StackExchange.Redis.Tests/ClusterTests.cs index fc8251ad0..b1f9d01cd 100644 --- a/tests/StackExchange.Redis.Tests/ClusterTests.cs +++ b/tests/StackExchange.Redis.Tests/ClusterTests.cs @@ -752,7 +752,7 @@ public void ConnectIncludesSubscriber() [Fact] public async Task TestShardedPubsubSubscriberAgainstReconnects() { - var channel = RedisChannel.Sharded("testShardChannel"); + var channel = RedisChannel.Sharded(Me()); using var conn = Create(allowAdmin: true, keepAlive: 1, connectTimeout: 3000, shared: false, require: RedisFeatures.v7_0_0_rc1); Assert.True(conn.IsConnected); var db = conn.GetDatabase(); @@ -806,7 +806,7 @@ public async Task TestShardedPubsubSubscriberAgainstReconnects() [Fact] public async Task TestShardedPubsubSubscriberAgainsHashSlotMigration() { - var channel = RedisChannel.Sharded("testShardChannel"); + var channel = RedisChannel.Sharded(Me()); using var conn = Create(allowAdmin: true, keepAlive: 1, connectTimeout: 3000, shared: false, require: RedisFeatures.v7_0_0_rc1); Assert.True(conn.IsConnected); var db = conn.GetDatabase(); From 316889a7fe22151f94ac2b3f1a8d264fe6d175d9 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Tue, 10 Jun 2025 16:17:18 +0100 Subject: [PATCH 12/12] additional comments from final eyeballs --- src/StackExchange.Redis/Interfaces/ISubscriber.cs | 1 + src/StackExchange.Redis/PhysicalConnection.cs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/StackExchange.Redis/Interfaces/ISubscriber.cs b/src/StackExchange.Redis/Interfaces/ISubscriber.cs index e0c509f49..a9c0bf298 100644 --- a/src/StackExchange.Redis/Interfaces/ISubscriber.cs +++ b/src/StackExchange.Redis/Interfaces/ISubscriber.cs @@ -110,6 +110,7 @@ public interface ISubscriber : IRedis /// See /// , /// . + /// . /// void UnsubscribeAll(CommandFlags flags = CommandFlags.None); diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index 4e5675d68..c92290696 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -1678,7 +1678,7 @@ private void MatchResult(in RawResult result) channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None); Trace("MESSAGE: " + channel); } - else + else // see check on outer-if that restricts to message / smessage { channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded); Trace("SMESSAGE: " + channel);