diff --git a/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java b/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java index 747018ca60..2340ef9344 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java @@ -177,6 +177,11 @@ public void ssubscribed(K channel, long count) { notifications.ssubscribed(getNode(), channel, count); } + @Override + public void sunsubscribed(K channel, long count) { + notifications.sunsubscribed(getNode(), channel, count); + } + private RedisClusterNode getNode() { return nodeId != null ? getPartitions().getPartitionByNodeId(nodeId) : getPartitions().getPartition(host, port); } diff --git a/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java b/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java index 9288dd8f85..9ec0a019c6 100644 --- a/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java +++ b/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java @@ -94,6 +94,9 @@ protected void notifyListeners(PubSubMessage output) { case ssubscribe: multicast.ssubscribed(clusterNode, output.channel(), output.count()); break; + case sunsubscribe: + multicast.sunsubscribed(clusterNode, output.channel(), output.count()); + break; default: throw new UnsupportedOperationException("Operation " + output.type() + " not supported"); } @@ -207,6 +210,12 @@ public void ssubscribed(RedisClusterNode node, K channel, long count) { clusterListeners.forEach(listener -> listener.ssubscribed(node, channel, count)); } + @Override + public void sunsubscribed(RedisClusterNode node, K channel, long count) { + getListeners().forEach(listener -> listener.sunsubscribed(channel, count)); + clusterListeners.forEach(listener -> listener.sunsubscribed(node, channel, count)); + } + } } diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java index 554efa3009..3c7822af6b 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java @@ -52,4 +52,9 @@ public void ssubscribed(RedisClusterNode node, K channel, long count) { // empty adapter method } + @Override + public void sunsubscribed(RedisClusterNode node, K channel, long count) { + // empty adapter method + } + } diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java index 93da2f5313..b1755f80b9 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java @@ -91,4 +91,16 @@ default void ssubscribed(RedisClusterNode node, K shardChannel, long count) { subscribed(node, shardChannel, count); } + /** + * Unsubscribed from a shard channel. + * + * @param node the {@link RedisClusterNode} from which the {@code message} originates. + * @param shardChannel Shard channel + * @param count Subscription count. + * @since 7.0 + */ + default void sunsubscribed(RedisClusterNode node, K shardChannel, long count) { + unsubscribed(node, shardChannel, count); + } + } diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/api/async/NodeSelectionPubSubAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/pubsub/api/async/NodeSelectionPubSubAsyncCommands.java index 512afbaabd..19f1980e01 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/api/async/NodeSelectionPubSubAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/api/async/NodeSelectionPubSubAsyncCommands.java @@ -51,4 +51,13 @@ public interface NodeSelectionPubSubAsyncCommands { */ AsyncExecutions ssubscribe(K... shardChannels); + /** + * Stop listening for messages posted to the given shard channels. + * + * @param shardChannels the channels + * @return RedisFuture<Void> Future to synchronize {@code unsubscribe} completion. + * @since 7.0 + */ + AsyncExecutions sunsubscribe(K... shardChannels); + } diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/api/reactive/NodeSelectionPubSubReactiveCommands.java b/src/main/java/io/lettuce/core/cluster/pubsub/api/reactive/NodeSelectionPubSubReactiveCommands.java index b61395cf2e..3da91dddd1 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/api/reactive/NodeSelectionPubSubReactiveCommands.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/api/reactive/NodeSelectionPubSubReactiveCommands.java @@ -51,4 +51,13 @@ public interface NodeSelectionPubSubReactiveCommands { */ ReactiveExecutions ssubscribe(K... shardCchannels); + /** + * Stop listening for messages posted to the given shard channels. + * + * @param shardCchannels the channels + * @return RedisFuture<Void> Future to synchronize {@code unsubscribe} completion. + * @since 7.0 + */ + ReactiveExecutions sunsubscribe(K... shardCchannels); + } diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/api/sync/NodeSelectionPubSubCommands.java b/src/main/java/io/lettuce/core/cluster/pubsub/api/sync/NodeSelectionPubSubCommands.java index ba7fe76af7..6a3716c9fc 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/api/sync/NodeSelectionPubSubCommands.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/api/sync/NodeSelectionPubSubCommands.java @@ -51,4 +51,13 @@ public interface NodeSelectionPubSubCommands { */ Executions ssubscribe(K... shardChannels); + /** + * Stop listening for messages posted to the given channels. + * + * @param shardChannels the channels + * @return Executions Future to synchronize {@code unsubscribe} completion. + * @since 7.0 + */ + Executions sunsubscribe(K... shardChannels); + } diff --git a/src/main/java/io/lettuce/core/protocol/CommandType.java b/src/main/java/io/lettuce/core/protocol/CommandType.java index da1f6499c8..37ae008fb1 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandType.java +++ b/src/main/java/io/lettuce/core/protocol/CommandType.java @@ -74,7 +74,7 @@ public enum CommandType implements ProtocolKeyword { // Pub/Sub - PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE, SPUBLISH, + PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE, SPUBLISH, SUNSUBSCRIBE, // Sets diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java b/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java index 3da52d877f..65c226593e 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java @@ -118,6 +118,14 @@ final Command subscribe(K... channels) { return pubSubCommand(SUBSCRIBE, new PubSubOutput<>(codec), channels); } + @SafeVarargs + final Command sunsubscribe(K... shardChannels) { + LettuceAssert.notEmpty(shardChannels, "Shard channels " + MUST_NOT_BE_EMPTY); + + CommandArgs args = new CommandArgs<>(codec).addKeys(shardChannels); + return createCommand(SUNSUBSCRIBE, new PubSubOutput<>(codec), args); + } + @SafeVarargs final Command unsubscribe(K... channels) { return pubSubCommand(UNSUBSCRIBE, new PubSubOutput<>(codec), channels); diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubCommandHandler.java b/src/main/java/io/lettuce/core/pubsub/PubSubCommandHandler.java index 0cb9bd1a55..30d811c71f 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubCommandHandler.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubCommandHandler.java @@ -227,6 +227,9 @@ private boolean shouldCompleteCommand(PubSubOutput.Type type, RedisCommand message) { case ssubscribe: listener.ssubscribed(message.channel(), message.count()); break; + case sunsubscribe: + listener.sunsubscribed(message.channel(), message.count()); + break; default: throw new UnsupportedOperationException("Operation " + message.type() + " not supported"); } @@ -293,6 +296,9 @@ private void updateInternalState(PubSubMessage message) { case ssubscribe: shardChannels.add(new Wrapper<>(message.channel())); break; + case sunsubscribe: + shardChannels.remove(new Wrapper<>(message.channel())); + break; default: break; } diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java b/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java index 18c19ccddb..0ec41ff646 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java @@ -38,7 +38,7 @@ public class PubSubOutput extends CommandOutput implements PubSub public enum Type { - message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe, smessage; + message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe, smessage, sunsubscribe; private final static Set names = new HashSet<>(); @@ -124,6 +124,7 @@ private void handleOutput(ByteBuffer bytes) { case subscribe: case unsubscribe: case ssubscribe: + case sunsubscribe: channel = codec.decodeKey(bytes); break; default: diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java index 8b061bacf0..20f03c1bbc 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java @@ -69,4 +69,9 @@ public void ssubscribed(K shardChannel, long count) { // empty adapter method } + @Override + public void sunsubscribed(K shardChannel, long count) { + // empty adapter method + } + } diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java index 20515e60fd..3e282b5410 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java @@ -111,6 +111,12 @@ public RedisFuture ssubscribe(K... channels) { return (RedisFuture) dispatch(commandBuilder.ssubscribe(channels)); } + @Override + @SuppressWarnings("unchecked") + public RedisFuture sunsubscribe(K... channels) { + return (RedisFuture) dispatch(commandBuilder.sunsubscribe(channels)); + } + @Override @SuppressWarnings("unchecked") public StatefulRedisPubSubConnection getStatefulConnection() { diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java index ccc1f0e1e1..ae0aff2e4a 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java @@ -88,6 +88,17 @@ default void ssubscribed(K shardChannel, long count) { subscribed(shardChannel, count); } + /** + * Unsubscribed from a shard channel. + * + * @param shardChannel Channel + * @param count Subscription count. + * @since 7.0 + */ + default void sunsubscribed(K shardChannel, long count) { + unsubscribed(shardChannel, count); + } + /** * Message received from a shard channel subscription. * diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java index adff1b58fa..19e878f3f6 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java @@ -169,6 +169,11 @@ public Mono ssubscribe(K... shardChannels) { return createFlux(() -> commandBuilder.ssubscribe(shardChannels)).then(); } + @Override + public Mono sunsubscribe(K... shardChannels) { + return createFlux(() -> commandBuilder.sunsubscribe(shardChannels)).then(); + } + @Override @SuppressWarnings("unchecked") public StatefulRedisPubSubConnection getStatefulConnection() { diff --git a/src/main/java/io/lettuce/core/pubsub/api/async/RedisPubSubAsyncCommands.java b/src/main/java/io/lettuce/core/pubsub/api/async/RedisPubSubAsyncCommands.java index 90c301a58e..8750588552 100644 --- a/src/main/java/io/lettuce/core/pubsub/api/async/RedisPubSubAsyncCommands.java +++ b/src/main/java/io/lettuce/core/pubsub/api/async/RedisPubSubAsyncCommands.java @@ -60,4 +60,13 @@ public interface RedisPubSubAsyncCommands extends RedisAsyncCommands */ RedisFuture ssubscribe(K... shardChannels); + /** + * Stop listening for messages posted to the given channels. + * + * @param shardChannels the shard channels + * @return RedisFuture<Void> Future to synchronize {@code unsubscribe} completion. + * @since 7.0 + */ + RedisFuture sunsubscribe(K... shardChannels); + } diff --git a/src/main/java/io/lettuce/core/pubsub/api/reactive/RedisPubSubReactiveCommands.java b/src/main/java/io/lettuce/core/pubsub/api/reactive/RedisPubSubReactiveCommands.java index d6f0476d67..6f1f0e0817 100644 --- a/src/main/java/io/lettuce/core/pubsub/api/reactive/RedisPubSubReactiveCommands.java +++ b/src/main/java/io/lettuce/core/pubsub/api/reactive/RedisPubSubReactiveCommands.java @@ -107,6 +107,16 @@ public interface RedisPubSubReactiveCommands extends RedisReactiveCommands */ Mono ssubscribe(K... shardChannels); + /** + * Stop listening for messages posted to the given channels. The {@link Mono} completes without a result as soon as the + * subscription is unregistered. + * + * @param shardChannels the channels. + * @return Mono<Void> Mono for {@code unsubscribe} command. + * @since 7.0 + */ + Mono sunsubscribe(K... shardChannels); + /** * @return the underlying connection. * @since 6.2, will be removed with Lettuce 7 to avoid exposing the underlying connection. diff --git a/src/main/java/io/lettuce/core/pubsub/api/sync/RedisPubSubCommands.java b/src/main/java/io/lettuce/core/pubsub/api/sync/RedisPubSubCommands.java index 44a59b6c6d..b3ea69fb62 100644 --- a/src/main/java/io/lettuce/core/pubsub/api/sync/RedisPubSubCommands.java +++ b/src/main/java/io/lettuce/core/pubsub/api/sync/RedisPubSubCommands.java @@ -49,6 +49,14 @@ public interface RedisPubSubCommands extends RedisCommands { */ void ssubscribe(K... shardChannels); + /** + * Stop listening for messages posted to the given channels. + * + * @param shardChannels the channels + * @since 7.0 + */ + void sunsubscribe(K... shardChannels); + /** * @return the underlying connection. */ diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index 772af41dab..cd08f520e6 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -211,6 +211,22 @@ void publishToShardChannelViaNewClientWithNoRedirects() throws Exception { cmd.getStatefulConnection().close(); } + @Test + @EnabledOnCommand("SSUBSCRIBE") + void unubscribeFromShardChannel() { + pubSubConnection.sync().ssubscribe(shardChannel); + pubSubConnection.sync().spublish(shardChannel, "msg1"); + + pubSubConnection.sync().sunsubscribe(shardChannel); + pubSubConnection.sync().spublish(shardChannel, "msg2"); + + pubSubConnection.sync().ssubscribe(shardChannel); + pubSubConnection.sync().spublish(shardChannel, "msg3"); + + Wait.untilEquals("msg1", connectionListener.getMessages()::poll).waitOrTimeout(); + Wait.untilEquals("msg3", connectionListener.getMessages()::poll).waitOrTimeout(); + } + @Test void myIdWorksAfterDisconnect() throws InterruptedException { diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubCommandBuilderUnitTests.java b/src/test/java/io/lettuce/core/pubsub/PubSubCommandBuilderUnitTests.java index 4d4e798187..89e93b234e 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubCommandBuilderUnitTests.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubCommandBuilderUnitTests.java @@ -139,4 +139,15 @@ void ssubscribe() { assertThat(command.getOutput()).isInstanceOf(PubSubOutput.class); } + @Test + void sunsubscribe() { + String channel = "channelPattern"; + Command command = this.commandBuilder.sunsubscribe(channel); + + assertThat(command.getType()).isEqualTo(SUNSUBSCRIBE); + assertThat(command.getArgs()).isInstanceOf(CommandArgs.class); + assertThat(command.getArgs().toCommandString()).isEqualTo("key"); + assertThat(command.getOutput()).isInstanceOf(PubSubOutput.class); + } + } diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java b/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java index e5d91a156e..7dd7d11001 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java @@ -357,6 +357,13 @@ void ssubscribe() throws Exception { assertThat((long) counts.take()).isGreaterThan(0); } + @Test + void sunsubscribe() throws Exception { + StepVerifier.create(pubsub.sunsubscribe(channel)).verifyComplete(); + assertThat(shardChannels.take()).isEqualTo(channel); + assertThat((long) counts.take()).isEqualTo(0); + } + @Test void pubsubCloseOnClientShutdown() { @@ -515,6 +522,12 @@ public void ssubscribed(String shardChannel, long count) { counts.add(count); } + @Override + public void sunsubscribed(String shardChannel, long count) { + shardChannels.add(shardChannel); + counts.add(count); + } + T block(Mono mono) { return mono.block(); } diff --git a/src/test/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImplUnitTests.java b/src/test/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImplUnitTests.java index 65b0a960aa..49dee5c2d8 100644 --- a/src/test/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImplUnitTests.java +++ b/src/test/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImplUnitTests.java @@ -15,7 +15,6 @@ import static io.lettuce.core.protocol.CommandType.*; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.*; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -50,7 +49,7 @@ void psubscribe() throws ExecutionException, InterruptedException { assertInstanceOf(PubSubOutput.class, capturedCommand.getValue().getOutput()); assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("key"); - assertNotEquals(capturedCommand.getValue(), dispachedMock); + assertThat(capturedCommand.getValue()).isNotEqualTo(dispachedMock); } @Test @@ -69,7 +68,7 @@ void punsubscribe() throws ExecutionException, InterruptedException { assertInstanceOf(PubSubOutput.class, capturedCommand.getValue().getOutput()); assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("key"); - assertNotEquals(capturedCommand.getValue(), dispachedMock); + assertThat(capturedCommand.getValue()).isNotEqualTo(dispachedMock); } @Test @@ -88,7 +87,7 @@ void subscribe() throws ExecutionException, InterruptedException { assertInstanceOf(PubSubOutput.class, capturedCommand.getValue().getOutput()); assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("key"); - assertNotEquals(capturedCommand.getValue(), dispachedMock); + assertThat(capturedCommand.getValue()).isNotEqualTo(dispachedMock); } @Test @@ -107,7 +106,7 @@ void unsubscribe() throws ExecutionException, InterruptedException { assertInstanceOf(PubSubOutput.class, capturedCommand.getValue().getOutput()); assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("key"); - assertNotEquals(capturedCommand.getValue(), dispachedMock); + assertThat(capturedCommand.getValue()).isNotEqualTo(dispachedMock); } @Test @@ -128,7 +127,7 @@ void publish() throws ExecutionException, InterruptedException { assertInstanceOf(IntegerOutput.class, capturedCommand.getValue().getOutput()); assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("key value"); - assertNotEquals(capturedCommand.getValue(), dispachedMock); + assertThat(capturedCommand.getValue()).isNotEqualTo(dispachedMock); } @Test @@ -148,7 +147,7 @@ void pubsubChannels() throws ExecutionException, InterruptedException { assertInstanceOf(KeyListOutput.class, capturedCommand.getValue().getOutput()); assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("CHANNELS key"); - assertNotEquals(capturedCommand.getValue(), dispachedMock); + assertThat(capturedCommand.getValue()).isNotEqualTo(dispachedMock); } @Test @@ -168,7 +167,7 @@ void pubsubNumsub() throws ExecutionException, InterruptedException { assertInstanceOf(MapOutput.class, capturedCommand.getValue().getOutput()); assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("NUMSUB key"); - assertNotEquals(capturedCommand.getValue(), dispachedMock); + assertThat(capturedCommand.getValue()).isNotEqualTo(dispachedMock); } @Test @@ -188,7 +187,7 @@ void pubsubShardChannels() throws ExecutionException, InterruptedException { assertInstanceOf(KeyListOutput.class, capturedCommand.getValue().getOutput()); assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("SHARDCHANNELS key"); - assertNotEquals(capturedCommand.getValue(), dispachedMock); + assertThat(capturedCommand.getValue()).isNotEqualTo(dispachedMock); } @Test @@ -208,7 +207,7 @@ void pubsubShardNumsub() throws ExecutionException, InterruptedException { assertInstanceOf(MapOutput.class, capturedCommand.getValue().getOutput()); assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("SHARDNUMSUB key"); - assertNotEquals(capturedCommand.getValue(), dispachedMock); + assertThat(capturedCommand.getValue()).isNotEqualTo(dispachedMock); } @Test @@ -227,7 +226,26 @@ void ssubscribe() throws ExecutionException, InterruptedException { assertInstanceOf(PubSubOutput.class, capturedCommand.getValue().getOutput()); assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("key"); - assertNotEquals(capturedCommand.getValue(), dispachedMock); + assertThat(capturedCommand.getValue()).isNotEqualTo(dispachedMock); + } + + @Test + void sunsubscribe() throws ExecutionException, InterruptedException { + String pattern = "channelPattern"; + AsyncCommand dispachedMock = mock(AsyncCommand.class); + when(mockedConnection.dispatch((RedisCommand) any())).thenReturn(dispachedMock); + + commands.sunsubscribe(pattern).get(); + + ArgumentCaptor capturedCommand = ArgumentCaptor.forClass(AsyncCommand.class); + + verify(mockedConnection).dispatch(capturedCommand.capture()); + + assertThat(capturedCommand.getValue().getType()).isEqualTo(SUNSUBSCRIBE); + assertInstanceOf(PubSubOutput.class, capturedCommand.getValue().getOutput()); + assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("key"); + + assertThat(capturedCommand.getValue()).isNotEqualTo(dispachedMock); } } diff --git a/src/test/java/io/lettuce/core/support/PubSubTestListener.java b/src/test/java/io/lettuce/core/support/PubSubTestListener.java index 1afcb42766..fa7d0f6fc0 100644 --- a/src/test/java/io/lettuce/core/support/PubSubTestListener.java +++ b/src/test/java/io/lettuce/core/support/PubSubTestListener.java @@ -74,6 +74,12 @@ public void ssubscribed(String shardChannel, long count) { shardCounts.add(count); } + @Override + public void sunsubscribed(String shardChannel, long count) { + shardChannels.add(shardChannel); + counts.add(count); + } + @Override public void psubscribed(String pattern, long count) { patterns.add(pattern);