Skip to content

Commit 41685a5

Browse files
committed
SUNSUBSCRIBE handling; if possible, use the active connection to find where we should be subscribing
1 parent de1da5b commit 41685a5

File tree

3 files changed

+41
-6
lines changed

3 files changed

+41
-6
lines changed

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1870,11 +1870,11 @@ private void MatchResult(in RawResult result)
18701870
var server = BridgeCouldBeNull?.ServerEndPoint;
18711871
if (server is not null && muxer.TryGetSubscription(subscriptionChannel, out var subscription))
18721872
{
1873-
if (subscription.GetCurrentServer() == server)
1874-
{
1875-
subscription.SetCurrentServer(null); // wipe
1876-
muxer.ReconfigureIfNeeded(server.EndPoint, fromBroadcast: true, PushSUnsubscribe.Text);
1877-
}
1873+
// wipe and reconnect; but: to where?
1874+
// counter-intuitively, the only server we *know* already knows the new route is:
1875+
// the outgoing server, since it had to change to MIGRATING etc; the new INCOMING server
1876+
// knows, but *we don't know who that is*, and other nodes: aren't guaranteed to know (yet)
1877+
muxer.DefaultSubscriber.ResubscribeToServer(subscription, subscriptionChannel, server, cause: PushSUnsubscribe.Text);
18781878
}
18791879
return; // and STOP PROCESSING; unsolicited
18801880
}

src/StackExchange.Redis/RedisSubscriber.cs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Concurrent;
33
using System.Diagnostics.CodeAnalysis;
4+
using System.Diagnostics.SymbolStore;
45
using System.Net;
56
using System.Threading;
67
using System.Threading.Tasks;
@@ -13,7 +14,7 @@ namespace StackExchange.Redis
1314
public partial class ConnectionMultiplexer
1415
{
1516
private RedisSubscriber? _defaultSubscriber;
16-
private RedisSubscriber DefaultSubscriber => _defaultSubscriber ??= new RedisSubscriber(this, null);
17+
internal RedisSubscriber DefaultSubscriber => _defaultSubscriber ??= new RedisSubscriber(this, null);
1718

1819
private readonly ConcurrentDictionary<RedisChannel, Subscription> subscriptions = new();
1920

@@ -282,6 +283,17 @@ internal void GetSubscriberCounts(out int handlers, out int queues)
282283

283284
internal ServerEndPoint? GetCurrentServer() => Volatile.Read(ref CurrentServer);
284285
internal void SetCurrentServer(ServerEndPoint? server) => CurrentServer = server;
286+
// conditional clear
287+
internal bool ClearCurrentServer(ServerEndPoint expected)
288+
{
289+
if (CurrentServer == expected)
290+
{
291+
CurrentServer = null;
292+
return true;
293+
}
294+
295+
return false;
296+
}
285297

286298
/// <summary>
287299
/// Evaluates state and if we're not currently connected, clears the server reference.
@@ -425,6 +437,27 @@ internal bool EnsureSubscribedToServer(Subscription sub, RedisChannel channel, C
425437
return ExecuteSync(message, sub.Processor, selected);
426438
}
427439

440+
internal void ResubscribeToServer(Subscription sub, RedisChannel channel, ServerEndPoint serverEndPoint, string cause)
441+
{
442+
// conditional: only if that's the server we were connected to, or "none"; we don't want to end up duplicated
443+
if (sub.ClearCurrentServer(serverEndPoint) || !sub.IsConnected)
444+
{
445+
if (serverEndPoint.IsSubscriberConnected)
446+
{
447+
// we'll *try* for a simple resubscribe, following any -MOVED etc, but if that fails: fall back
448+
// to full reconfigure; importantly, note that we've already recorded the disconnect
449+
var message = sub.GetMessage(channel, SubscriptionAction.Subscribe, CommandFlags.None, false);
450+
_ = ExecuteAsync(message, sub.Processor, serverEndPoint).ContinueWith(
451+
t => multiplexer.ReconfigureIfNeeded(serverEndPoint.EndPoint, false, cause: cause),
452+
TaskContinuationOptions.OnlyOnFaulted);
453+
}
454+
else
455+
{
456+
multiplexer.ReconfigureIfNeeded(serverEndPoint.EndPoint, false, cause: cause);
457+
}
458+
}
459+
}
460+
428461
Task ISubscriber.SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags)
429462
=> SubscribeAsync(channel, handler, null, flags);
430463

tests/StackExchange.Redis.Tests/ClusterShardedTests.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,8 @@ void WriteLog(string caption, RedisResult result)
339339
var received = await queue.ReadAsync(timeout.Token);
340340
Log($"Message received: {received.Message}");
341341
Assert.Equal(msg, (string)received.Message!);
342+
ep = subscriber.SubscribedEndpoint(channel);
343+
Log($"Endpoint after receiving message: {Format.ToString(ep)}");
342344
}
343345

344346
Log("Unsubscribing...");

0 commit comments

Comments
 (0)