diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index b3546e969..615f44497 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -8,6 +8,8 @@ Current package versions: ## Unreleased +- Fix `SSUBSCRIBE` routing during slot migrations ([#2969 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2969)) + ## 2.9.25 - (build) Fix SNK on non-Windows builds ([#2963 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2963)) diff --git a/src/StackExchange.Redis/Message.cs b/src/StackExchange.Redis/Message.cs index a3c19ab93..0c9eb4c92 100644 --- a/src/StackExchange.Redis/Message.cs +++ b/src/StackExchange.Redis/Message.cs @@ -856,7 +856,7 @@ protected override void WriteImpl(PhysicalConnection physical) internal abstract class CommandChannelBase : Message { - protected readonly RedisChannel Channel; + internal readonly RedisChannel Channel; protected CommandChannelBase(int db, CommandFlags flags, RedisCommand command, in RedisChannel channel) : base(db, flags, command) { diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index c21bc07fc..57bcd608d 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -29,8 +29,6 @@ internal sealed partial class PhysicalConnection : IDisposable private const int DefaultRedisDatabaseCount = 16; - private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage"; - private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select( i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray(); @@ -1669,6 +1667,130 @@ internal async ValueTask ConnectedAsync(Socket? socket, ILogger? log, Sock } } + private enum PushKind + { + None, + Message, + PMessage, + SMessage, + Subscribe, + PSubscribe, + SSubscribe, + Unsubscribe, + PUnsubscribe, + SUnsubscribe, + } + private PushKind GetPushKind(in Sequence result, out RedisChannel channel) + { + var len = result.Length; + if (len < 2) + { + // for supported cases, we demand at least the kind and the subscription channel + channel = default; + return PushKind.None; + } + + const int MAX_LEN = 16; + Debug.Assert(MAX_LEN >= Enumerable.Max( + [ + PushMessage.Length, PushPMessage.Length, PushSMessage.Length, + PushSubscribe.Length, PushPSubscribe.Length, PushSSubscribe.Length, + PushUnsubscribe.Length, PushPUnsubscribe.Length, PushSUnsubscribe.Length, + ])); + ref readonly RawResult pushKind = ref result[0]; + var multiSegmentPayload = pushKind.Payload; + if (multiSegmentPayload.Length <= MAX_LEN) + { + var span = multiSegmentPayload.IsSingleSegment + ? multiSegmentPayload.First.Span + : CopyTo(stackalloc byte[MAX_LEN], multiSegmentPayload); + + var hash = FastHash.Hash64(span); + RedisChannel.RedisChannelOptions channelOptions = RedisChannel.RedisChannelOptions.None; + PushKind kind; + switch (hash) + { + case PushMessage.Hash when PushMessage.Is(hash, span) & len >= 3: + kind = PushKind.Message; + break; + case PushPMessage.Hash when PushPMessage.Is(hash, span) & len >= 4: + channelOptions = RedisChannel.RedisChannelOptions.Pattern; + kind = PushKind.PMessage; + break; + case PushSMessage.Hash when PushSMessage.Is(hash, span) & len >= 3: + channelOptions = RedisChannel.RedisChannelOptions.Sharded; + kind = PushKind.SMessage; + break; + case PushSubscribe.Hash when PushSubscribe.Is(hash, span): + kind = PushKind.Subscribe; + break; + case PushPSubscribe.Hash when PushPSubscribe.Is(hash, span): + channelOptions = RedisChannel.RedisChannelOptions.Pattern; + kind = PushKind.PSubscribe; + break; + case PushSSubscribe.Hash when PushSSubscribe.Is(hash, span): + channelOptions = RedisChannel.RedisChannelOptions.Sharded; + kind = PushKind.SSubscribe; + break; + case PushUnsubscribe.Hash when PushUnsubscribe.Is(hash, span): + kind = PushKind.Unsubscribe; + break; + case PushPUnsubscribe.Hash when PushPUnsubscribe.Is(hash, span): + channelOptions = RedisChannel.RedisChannelOptions.Pattern; + kind = PushKind.PUnsubscribe; + break; + case PushSUnsubscribe.Hash when PushSUnsubscribe.Is(hash, span): + channelOptions = RedisChannel.RedisChannelOptions.Sharded; + kind = PushKind.SUnsubscribe; + break; + default: + kind = PushKind.None; + break; + } + if (kind != PushKind.None) + { + // the channel is always the second element + channel = result[1].AsRedisChannel(ChannelPrefix, channelOptions); + return kind; + } + } + channel = default; + return PushKind.None; + + static ReadOnlySpan CopyTo(Span target, in ReadOnlySequence source) + { + source.CopyTo(target); + return target.Slice(0, (int)source.Length); + } + } + + [FastHash("message")] + private static partial class PushMessage { } + + [FastHash("pmessage")] + private static partial class PushPMessage { } + + [FastHash("smessage")] + private static partial class PushSMessage { } + + [FastHash("subscribe")] + private static partial class PushSubscribe { } + + [FastHash("psubscribe")] + private static partial class PushPSubscribe { } + + [FastHash("ssubscribe")] + private static partial class PushSSubscribe { } + + [FastHash("unsubscribe")] + private static partial class PushUnsubscribe { } + + [FastHash("punsubscribe")] + private static partial class PushPUnsubscribe { } + + [FastHash("sunsubscribe")] + private static partial class PushSUnsubscribe { } + private void MatchResult(in RawResult result) { // check to see if it could be an out-of-band pubsub message @@ -1679,85 +1801,87 @@ private void MatchResult(in RawResult result) // out of band message does not match to a queued message var items = result.GetItems(); - if (items.Length >= 3 && (items[0].IsEqual(message) || items[0].IsEqual(smessage))) + var kind = GetPushKind(items, out var subscriptionChannel); + switch (kind) { - _readStatus = items[0].IsEqual(message) ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage; + case PushKind.Message: + case PushKind.SMessage: + _readStatus = kind is PushKind.Message ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage; - // special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry) - var configChanged = muxer.ConfigurationChangedChannel; - if (configChanged != null && items[1].IsEqual(configChanged)) - { - EndPoint? blame = null; - try + // special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry) + var configChanged = muxer.ConfigurationChangedChannel; + if (configChanged != null && items[1].IsEqual(configChanged)) { - if (!items[2].IsEqual(CommonReplies.wildcard)) + EndPoint? blame = null; + try { - // We don't want to fail here, just trying to identify - _ = Format.TryParseEndPoint(items[2].GetString(), out blame); + if (!items[2].IsEqual(CommonReplies.wildcard)) + { + // We don't want to fail here, just trying to identify + _ = Format.TryParseEndPoint(items[2].GetString(), out blame); + } + } + catch + { + /* no biggie */ } - } - catch { /* no biggie */ } - Trace("Configuration changed: " + Format.ToString(blame)); - _readStatus = ReadStatus.Reconfigure; - muxer.ReconfigureIfNeeded(blame, true, "broadcast"); - } - // invoke the handlers - RedisChannel channel; - if (items[0].IsEqual(message)) - { - channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None); - Trace("MESSAGE: " + channel); - } - else // see check on outer-if that restricts to message / smessage - { - channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded); - Trace("SMESSAGE: " + channel); - } - if (!channel.IsNull) - { - if (TryGetPubSubPayload(items[2], out var payload)) - { - _readStatus = ReadStatus.InvokePubSub; - muxer.OnMessage(channel, channel, payload); + Trace("Configuration changed: " + Format.ToString(blame)); + _readStatus = ReadStatus.Reconfigure; + muxer.ReconfigureIfNeeded(blame, true, "broadcast"); } - // could be multi-message: https://github.com/StackExchange/StackExchange.Redis/issues/2507 - else if (TryGetMultiPubSubPayload(items[2], out var payloads)) + + // invoke the handlers + if (!subscriptionChannel.IsNull) { - _readStatus = ReadStatus.InvokePubSub; - muxer.OnMessage(channel, channel, payloads); + Trace($"{kind}: {subscriptionChannel}"); + if (TryGetPubSubPayload(items[2], out var payload)) + { + _readStatus = ReadStatus.InvokePubSub; + muxer.OnMessage(subscriptionChannel, subscriptionChannel, payload); + } + // could be multi-message: https://github.com/StackExchange/StackExchange.Redis/issues/2507 + else if (TryGetMultiPubSubPayload(items[2], out var payloads)) + { + _readStatus = ReadStatus.InvokePubSub; + muxer.OnMessage(subscriptionChannel, subscriptionChannel, payloads); + } } - } - return; // AND STOP PROCESSING! - } - else if (items.Length >= 4 && items[0].IsEqual(pmessage)) - { - _readStatus = ReadStatus.PubSubPMessage; + return; // and stop processing + case PushKind.PMessage: + _readStatus = ReadStatus.PubSubPMessage; - var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern); - - Trace("PMESSAGE: " + channel); - if (!channel.IsNull) - { - if (TryGetPubSubPayload(items[3], out var payload)) + var messageChannel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None); + if (!messageChannel.IsNull) { - var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern); - - _readStatus = ReadStatus.InvokePubSub; - muxer.OnMessage(sub, channel, payload); + Trace($"{kind}: {messageChannel} via {subscriptionChannel}"); + if (TryGetPubSubPayload(items[3], out var payload)) + { + _readStatus = ReadStatus.InvokePubSub; + muxer.OnMessage(subscriptionChannel, messageChannel, payload); + } + else if (TryGetMultiPubSubPayload(items[3], out var payloads)) + { + _readStatus = ReadStatus.InvokePubSub; + muxer.OnMessage(subscriptionChannel, messageChannel, payloads); + } } - else if (TryGetMultiPubSubPayload(items[3], out var payloads)) + return; // and stop processing + case PushKind.SUnsubscribe when !PeekChannelMessage(RedisCommand.SUNSUBSCRIBE, subscriptionChannel): + // then it was *unsolicited* - this probably means the slot was migrated + // (otherwise, we'll let the command-processor deal with it) + _readStatus = ReadStatus.PubSubUnsubscribe; + var server = BridgeCouldBeNull?.ServerEndPoint; + if (server is not null && muxer.TryGetSubscription(subscriptionChannel, out var subscription)) { - var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern); - - _readStatus = ReadStatus.InvokePubSub; - muxer.OnMessage(sub, channel, payloads); + // wipe and reconnect; but: to where? + // counter-intuitively, the only server we *know* already knows the new route is: + // the outgoing server, since it had to change to MIGRATING etc; the new INCOMING server + // knows, but *we don't know who that is*, and other nodes: aren't guaranteed to know (yet) + muxer.DefaultSubscriber.ResubscribeToServer(subscription, subscriptionChannel, server, cause: PushSUnsubscribe.Text); } - } - return; // AND STOP PROCESSING! + return; // and STOP PROCESSING; unsolicited } - - // if it didn't look like "[p|s]message", then we still need to process the pending queue } Trace("Matching result..."); @@ -1875,6 +1999,19 @@ static bool TryGetMultiPubSubPayload(in RawResult value, out Sequence } } + private bool PeekChannelMessage(RedisCommand command, RedisChannel channel) + { + Message? msg; + bool haveMsg; + lock (_writtenAwaitingResponse) + { + haveMsg = _writtenAwaitingResponse.TryPeek(out msg); + } + + return haveMsg && msg is CommandChannelBase typed + && typed.Command == command && typed.Channel == channel; + } + private volatile Message? _activeMessage; internal void GetHeadMessages(out Message? now, out Message? next) @@ -2168,6 +2305,7 @@ internal enum ReadStatus MatchResultComplete, ResetArena, ProcessBufferComplete, + PubSubUnsubscribe, NA = -1, } private volatile ReadStatus _readStatus; diff --git a/src/StackExchange.Redis/RedisSubscriber.cs b/src/StackExchange.Redis/RedisSubscriber.cs index 8ff9610b0..9ade78c2d 100644 --- a/src/StackExchange.Redis/RedisSubscriber.cs +++ b/src/StackExchange.Redis/RedisSubscriber.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Diagnostics.CodeAnalysis; +using System.Diagnostics.SymbolStore; using System.Net; using System.Threading; using System.Threading.Tasks; @@ -13,7 +14,7 @@ namespace StackExchange.Redis public partial class ConnectionMultiplexer { private RedisSubscriber? _defaultSubscriber; - private RedisSubscriber DefaultSubscriber => _defaultSubscriber ??= new RedisSubscriber(this, null); + internal RedisSubscriber DefaultSubscriber => _defaultSubscriber ??= new RedisSubscriber(this, null); private readonly ConcurrentDictionary subscriptions = new(); @@ -282,6 +283,17 @@ internal void GetSubscriberCounts(out int handlers, out int queues) internal ServerEndPoint? GetCurrentServer() => Volatile.Read(ref CurrentServer); internal void SetCurrentServer(ServerEndPoint? server) => CurrentServer = server; + // conditional clear + internal bool ClearCurrentServer(ServerEndPoint expected) + { + if (CurrentServer == expected) + { + CurrentServer = null; + return true; + } + + return false; + } /// /// Evaluates state and if we're not currently connected, clears the server reference. @@ -404,7 +416,7 @@ public ChannelMessageQueue Subscribe(RedisChannel channel, CommandFlags flags = return queue; } - public bool Subscribe(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags) + private bool Subscribe(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags) { ThrowIfNull(channel); if (handler == null && queue == null) { return true; } @@ -425,35 +437,58 @@ internal bool EnsureSubscribedToServer(Subscription sub, RedisChannel channel, C return ExecuteSync(message, sub.Processor, selected); } + internal void ResubscribeToServer(Subscription sub, RedisChannel channel, ServerEndPoint serverEndPoint, string cause) + { + // conditional: only if that's the server we were connected to, or "none"; we don't want to end up duplicated + if (sub.ClearCurrentServer(serverEndPoint) || !sub.IsConnected) + { + if (serverEndPoint.IsSubscriberConnected) + { + // we'll *try* for a simple resubscribe, following any -MOVED etc, but if that fails: fall back + // to full reconfigure; importantly, note that we've already recorded the disconnect + var message = sub.GetMessage(channel, SubscriptionAction.Subscribe, CommandFlags.None, false); + _ = ExecuteAsync(message, sub.Processor, serverEndPoint).ContinueWith( + t => multiplexer.ReconfigureIfNeeded(serverEndPoint.EndPoint, false, cause: cause), + TaskContinuationOptions.OnlyOnFaulted); + } + else + { + multiplexer.ReconfigureIfNeeded(serverEndPoint.EndPoint, false, cause: cause); + } + } + } + Task ISubscriber.SubscribeAsync(RedisChannel channel, Action handler, CommandFlags flags) => SubscribeAsync(channel, handler, null, flags); - public async Task SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) + Task ISubscriber.SubscribeAsync(RedisChannel channel, CommandFlags flags) => SubscribeAsync(channel, flags); + + public async Task SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None, ServerEndPoint? server = null) { var queue = new ChannelMessageQueue(channel, this); - await SubscribeAsync(channel, null, queue, flags).ForAwait(); + await SubscribeAsync(channel, null, queue, flags, server).ForAwait(); return queue; } - public Task SubscribeAsync(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags) + private Task SubscribeAsync(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags, ServerEndPoint? server = null) { ThrowIfNull(channel); if (handler == null && queue == null) { return CompletedTask.Default(null); } var sub = multiplexer.GetOrAddSubscription(channel, flags); sub.Add(handler, queue); - return EnsureSubscribedToServerAsync(sub, channel, flags, false); + return EnsureSubscribedToServerAsync(sub, channel, flags, false, server); } - public Task EnsureSubscribedToServerAsync(Subscription sub, RedisChannel channel, CommandFlags flags, bool internalCall) + public Task EnsureSubscribedToServerAsync(Subscription sub, RedisChannel channel, CommandFlags flags, bool internalCall, ServerEndPoint? server = null) { if (sub.IsConnected) { return CompletedTask.Default(null); } // TODO: Cleanup old hangers here? sub.SetCurrentServer(null); // we're not appropriately connected, so blank it out for eligible reconnection var message = sub.GetMessage(channel, SubscriptionAction.Subscribe, flags, internalCall); - var selected = multiplexer.SelectServer(message); - return ExecuteAsync(message, sub.Processor, selected); + server ??= multiplexer.SelectServer(message); + return ExecuteAsync(message, sub.Processor, server); } public EndPoint? SubscribedEndpoint(RedisChannel channel) => multiplexer.GetSubscribedServer(channel)?.EndPoint; diff --git a/src/StackExchange.Redis/ServerSelectionStrategy.cs b/src/StackExchange.Redis/ServerSelectionStrategy.cs index 4084b4c33..ca247c38b 100644 --- a/src/StackExchange.Redis/ServerSelectionStrategy.cs +++ b/src/StackExchange.Redis/ServerSelectionStrategy.cs @@ -328,7 +328,7 @@ private ServerEndPoint[] MapForMutation() return arr; } - private ServerEndPoint? Select(int slot, RedisCommand command, CommandFlags flags, bool allowDisconnected) + internal ServerEndPoint? Select(int slot, RedisCommand command, CommandFlags flags, bool allowDisconnected) { // Only interested in primary/replica preferences flags = Message.GetPrimaryReplicaFlags(flags); diff --git a/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs b/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs index dd57483b9..8af0a1c7b 100644 --- a/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs +++ b/tests/StackExchange.Redis.Tests/ClusterShardedTests.cs @@ -1,4 +1,7 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; using System.Threading.Tasks; using Xunit; @@ -174,4 +177,174 @@ private async Task MigrateSlotForTestShardChannelAsync(bool rollback) Log("Slot already migrated."); } } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task SubscribeToWrongServerAsync(bool sharded) + { + // the purpose of this test is to simulate subscribing while a node move is happening, i.e. we send + // the SSUBSCRIBE to the wrong server, get a -MOVED, and redirect; in particular: do we end up *knowing* + // where we actually subscribed to? + // + // note: to check our thinking, we also do this for regular non-sharded channels too; the point here + // being that this should behave *differently*, since there will be no -MOVED + var name = $"{Me()}:{Guid.NewGuid()}"; + var channel = sharded ? RedisChannel.Sharded(name) : RedisChannel.Literal(name).WithKeyRouting(); + await using var conn = Create(require: RedisFeatures.v7_0_0_rc1); + + var asKey = (RedisKey)(byte[])channel!; + Assert.False(asKey.IsEmpty); + var shouldBeServer = conn.GetServer(asKey); // this is where it *should* go + + // now intentionally choose *a different* server + var server = conn.GetServers().First(s => !Equals(s.EndPoint, shouldBeServer.EndPoint)); + Log($"Should be {Format.ToString(shouldBeServer.EndPoint)}; routing via {Format.ToString(server.EndPoint)}"); + + var subscriber = Assert.IsType(conn.GetSubscriber()); + var serverEndpoint = conn.GetServerEndPoint(server.EndPoint); + Assert.Equal(server.EndPoint, serverEndpoint.EndPoint); + var queue = await subscriber.SubscribeAsync(channel, server: serverEndpoint); + await Task.Delay(50); + var actual = subscriber.SubscribedEndpoint(channel); + + if (sharded) + { + // we should end up at the correct node, following the -MOVED + Assert.Equal(shouldBeServer.EndPoint, actual); + } + else + { + // we should end up where we *actually sent the message* - there is no -MOVED + Assert.Equal(serverEndpoint.EndPoint, actual); + } + + Log("Unsubscribing..."); + await queue.UnsubscribeAsync(); + Log("Unsubscribed."); + } + + [Fact] + public async Task KeepSubscribedThroughSlotMigrationAsync() + { + await using var conn = Create(require: RedisFeatures.v7_0_0_rc1, allowAdmin: true); + var name = $"{Me()}:{Guid.NewGuid()}"; + var channel = RedisChannel.Sharded(name); + var subscriber = conn.GetSubscriber(); + var queue = await subscriber.SubscribeAsync(channel); + await Task.Delay(50); + var actual = subscriber.SubscribedEndpoint(channel); + Assert.NotNull(actual); + + var asKey = (RedisKey)(byte[])channel!; + Assert.False(asKey.IsEmpty); + var slot = conn.GetHashSlot(asKey); + var viaMap = conn.ServerSelectionStrategy.Select(slot, RedisCommand.SSUBSCRIBE, CommandFlags.None, allowDisconnected: false); + + Log($"Slot {slot}, subscribed to {Format.ToString(actual)} (mapped to {Format.ToString(viaMap?.EndPoint)})"); + Assert.NotNull(viaMap); + Assert.Equal(actual, viaMap.EndPoint); + + var oldServer = conn.GetServer(asKey); // this is where it *should* go + + using (var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5))) + { + // now publish... we *expect* things to have sorted themselves out + var msg = Guid.NewGuid().ToString(); + var count = await subscriber.PublishAsync(channel, msg); + Assert.Equal(1, count); + + Log("Waiting for message on original subscription..."); + var received = await queue.ReadAsync(timeout.Token); + Log($"Message received: {received.Message}"); + Assert.Equal(msg, (string)received.Message!); + } + + // now intentionally choose *a different* server + var newServer = conn.GetServers().First(s => !Equals(s.EndPoint, oldServer.EndPoint)); + + var nodes = await newServer.ClusterNodesAsync(); + Assert.NotNull(nodes); + var fromNode = nodes[oldServer.EndPoint]?.NodeId; + var toNode = nodes[newServer.EndPoint]?.NodeId; + Assert.NotNull(fromNode); + Assert.NotNull(toNode); + Assert.Equal(oldServer.EndPoint, nodes.GetBySlot(slot)?.EndPoint); + + var ep = subscriber.SubscribedEndpoint(channel); + Log($"Endpoint before migration: {Format.ToString(ep)}"); + Log($"Migrating slot {slot} to {Format.ToString(newServer.EndPoint)}; node {fromNode} -> {toNode}..."); + + // see https://redis.io/docs/latest/commands/cluster-setslot/#redis-cluster-live-resharding-explained + WriteLog("IMPORTING", await newServer.ExecuteAsync("CLUSTER", "SETSLOT", slot, "IMPORTING", fromNode)); + WriteLog("MIGRATING", await oldServer.ExecuteAsync("CLUSTER", "SETSLOT", slot, "MIGRATING", toNode)); + + while (true) + { + var keys = (await oldServer.ExecuteAsync("CLUSTER", "GETKEYSINSLOT", slot, 100)).AsRedisKeyArray()!; + Log($"Migrating {keys.Length} keys..."); + if (keys.Length == 0) break; + foreach (var key in keys) + { + await conn.GetDatabase().KeyMigrateAsync(key, newServer.EndPoint, migrateOptions: MigrateOptions.None); + } + } + + WriteLog("NODE (old)", await newServer.ExecuteAsync("CLUSTER", "SETSLOT", slot, "NODE", toNode)); + WriteLog("NODE (new)", await oldServer.ExecuteAsync("CLUSTER", "SETSLOT", slot, "NODE", toNode)); + + void WriteLog(string caption, RedisResult result) + { + if (result.IsNull) + { + Log($"{caption}: null"); + } + else if (result.Length >= 0) + { + var arr = result.AsRedisValueArray()!; + Log($"{caption}: {arr.Length} items"); + foreach (var item in arr) + { + Log($" {item}"); + } + } + else + { + Log($"{caption}: {result}"); + } + } + + Log("Migration initiated; checking node state..."); + await Task.Delay(100); + ep = subscriber.SubscribedEndpoint(channel); + Log($"Endpoint after migration: {Format.ToString(ep)}"); + Assert.True( + ep is null || ep == newServer.EndPoint, + "Target server after migration should be null or the new server"); + + nodes = await newServer.ClusterNodesAsync(); + Assert.NotNull(nodes); + Assert.Equal(newServer.EndPoint, nodes.GetBySlot(slot)?.EndPoint); + await conn.ConfigureAsync(); + Assert.Equal(newServer, conn.GetServer(asKey)); + + using (var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5))) + { + // now publish... we *expect* things to have sorted themselves out + var msg = Guid.NewGuid().ToString(); + var count = await subscriber.PublishAsync(channel, msg); + Assert.Equal(1, count); + + Log("Waiting for message on moved subscription..."); + var received = await queue.ReadAsync(timeout.Token); + Log($"Message received: {received.Message}"); + Assert.Equal(msg, (string)received.Message!); + ep = subscriber.SubscribedEndpoint(channel); + Log($"Endpoint after receiving message: {Format.ToString(ep)}"); + } + + Log("Unsubscribing..."); + await queue.UnsubscribeAsync(); + Log("Unsubscribed."); + } }