Skip to content

Commit

Permalink
Add support for SUNSUBSCRIBE #2759 (#2851)
Browse files Browse the repository at this point in the history
* Add support for `SUNSUBSCRIBE` #2759

* replace junit.Assert with assertj
  • Loading branch information
atakavci authored May 20, 2024
1 parent cc9d240 commit 76efb8b
Show file tree
Hide file tree
Showing 24 changed files with 207 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ public void ssubscribed(K channel, long count) {
notifications.ssubscribed(getNode(), channel, count);
}

@Override
public void sunsubscribed(K channel, long count) {
notifications.sunsubscribed(getNode(), channel, count);
}

private RedisClusterNode getNode() {
return nodeId != null ? getPartitions().getPartitionByNodeId(nodeId) : getPartitions().getPartition(host, port);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ protected void notifyListeners(PubSubMessage<K, V> output) {
case ssubscribe:
multicast.ssubscribed(clusterNode, output.channel(), output.count());
break;
case sunsubscribe:
multicast.sunsubscribed(clusterNode, output.channel(), output.count());
break;
default:
throw new UnsupportedOperationException("Operation " + output.type() + " not supported");
}
Expand Down Expand Up @@ -207,6 +210,12 @@ public void ssubscribed(RedisClusterNode node, K channel, long count) {
clusterListeners.forEach(listener -> listener.ssubscribed(node, channel, count));
}

@Override
public void sunsubscribed(RedisClusterNode node, K channel, long count) {
getListeners().forEach(listener -> listener.sunsubscribed(channel, count));
clusterListeners.forEach(listener -> listener.sunsubscribed(node, channel, count));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,9 @@ public void ssubscribed(RedisClusterNode node, K channel, long count) {
// empty adapter method
}

@Override
public void sunsubscribed(RedisClusterNode node, K channel, long count) {
// empty adapter method
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,16 @@ default void ssubscribed(RedisClusterNode node, K shardChannel, long count) {
subscribed(node, shardChannel, count);
}

/**
* Unsubscribed from a shard channel.
*
* @param node the {@link RedisClusterNode} from which the {@code message} originates.
* @param shardChannel Shard channel
* @param count Subscription count.
* @since 7.0
*/
default void sunsubscribed(RedisClusterNode node, K shardChannel, long count) {
unsubscribed(node, shardChannel, count);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,13 @@ public interface NodeSelectionPubSubAsyncCommands<K, V> {
*/
AsyncExecutions<Void> ssubscribe(K... shardChannels);

/**
* Stop listening for messages posted to the given shard channels.
*
* @param shardChannels the channels
* @return RedisFuture&lt;Void&gt; Future to synchronize {@code unsubscribe} completion.
* @since 7.0
*/
AsyncExecutions<Void> sunsubscribe(K... shardChannels);

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,13 @@ public interface NodeSelectionPubSubReactiveCommands<K, V> {
*/
ReactiveExecutions<Void> ssubscribe(K... shardCchannels);

/**
* Stop listening for messages posted to the given shard channels.
*
* @param shardCchannels the channels
* @return RedisFuture&lt;Void&gt; Future to synchronize {@code unsubscribe} completion.
* @since 7.0
*/
ReactiveExecutions<Void> sunsubscribe(K... shardCchannels);

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,13 @@ public interface NodeSelectionPubSubCommands<K, V> {
*/
Executions<Void> ssubscribe(K... shardChannels);

/**
* Stop listening for messages posted to the given channels.
*
* @param shardChannels the channels
* @return Executions Future to synchronize {@code unsubscribe} completion.
* @since 7.0
*/
Executions<Void> sunsubscribe(K... shardChannels);

}
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/protocol/CommandType.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public enum CommandType implements ProtocolKeyword {

// Pub/Sub

PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE, SPUBLISH,
PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE, SPUBLISH, SUNSUBSCRIBE,

// Sets

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ final Command<K, V, V> subscribe(K... channels) {
return pubSubCommand(SUBSCRIBE, new PubSubOutput<>(codec), channels);
}

@SafeVarargs
final Command<K, V, V> sunsubscribe(K... shardChannels) {
LettuceAssert.notEmpty(shardChannels, "Shard channels " + MUST_NOT_BE_EMPTY);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKeys(shardChannels);
return createCommand(SUNSUBSCRIBE, new PubSubOutput<>(codec), args);
}

@SafeVarargs
final Command<K, V, V> unsubscribe(K... channels) {
return pubSubCommand(UNSUBSCRIBE, new PubSubOutput<>(codec), channels);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ private boolean shouldCompleteCommand(PubSubOutput.Type type, RedisCommand<?, ?,
case unsubscribe:
return commandType.equalsIgnoreCase("UNSUBSCRIBE");

case sunsubscribe:
return commandType.equalsIgnoreCase("SUNSUBSCRIBE");

case punsubscribe:
return commandType.equalsIgnoreCase("PUNSUBSCRIBE");
}
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ protected void notifyListeners(PubSubMessage<K, V> message) {
case ssubscribe:
listener.ssubscribed(message.channel(), message.count());
break;
case sunsubscribe:
listener.sunsubscribed(message.channel(), message.count());
break;
default:
throw new UnsupportedOperationException("Operation " + message.type() + " not supported");
}
Expand All @@ -293,6 +296,9 @@ private void updateInternalState(PubSubMessage<K, V> message) {
case ssubscribe:
shardChannels.add(new Wrapper<>(message.channel()));
break;
case sunsubscribe:
shardChannels.remove(new Wrapper<>(message.channel()));
break;
default:
break;
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/lettuce/core/pubsub/PubSubOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class PubSubOutput<K, V> extends CommandOutput<K, V, V> implements PubSub

public enum Type {

message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe, smessage;
message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe, smessage, sunsubscribe;

private final static Set<String> names = new HashSet<>();

Expand Down Expand Up @@ -124,6 +124,7 @@ private void handleOutput(ByteBuffer bytes) {
case subscribe:
case unsubscribe:
case ssubscribe:
case sunsubscribe:
channel = codec.decodeKey(bytes);
break;
default:
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,9 @@ public void ssubscribed(K shardChannel, long count) {
// empty adapter method
}

@Override
public void sunsubscribed(K shardChannel, long count) {
// empty adapter method
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ public RedisFuture<Void> ssubscribe(K... channels) {
return (RedisFuture<Void>) dispatch(commandBuilder.ssubscribe(channels));
}

@Override
@SuppressWarnings("unchecked")
public RedisFuture<Void> sunsubscribe(K... channels) {
return (RedisFuture<Void>) dispatch(commandBuilder.sunsubscribe(channels));
}

@Override
@SuppressWarnings("unchecked")
public StatefulRedisPubSubConnection<K, V> getStatefulConnection() {
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ default void ssubscribed(K shardChannel, long count) {
subscribed(shardChannel, count);
}

/**
* Unsubscribed from a shard channel.
*
* @param shardChannel Channel
* @param count Subscription count.
* @since 7.0
*/
default void sunsubscribed(K shardChannel, long count) {
unsubscribed(shardChannel, count);
}

/**
* Message received from a shard channel subscription.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ public Mono<Void> ssubscribe(K... shardChannels) {
return createFlux(() -> commandBuilder.ssubscribe(shardChannels)).then();
}

@Override
public Mono<Void> sunsubscribe(K... shardChannels) {
return createFlux(() -> commandBuilder.sunsubscribe(shardChannels)).then();
}

@Override
@SuppressWarnings("unchecked")
public StatefulRedisPubSubConnection<K, V> getStatefulConnection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,13 @@ public interface RedisPubSubAsyncCommands<K, V> extends RedisAsyncCommands<K, V>
*/
RedisFuture<Void> ssubscribe(K... shardChannels);

/**
* Stop listening for messages posted to the given channels.
*
* @param shardChannels the shard channels
* @return RedisFuture&lt;Void&gt; Future to synchronize {@code unsubscribe} completion.
* @since 7.0
*/
RedisFuture<Void> sunsubscribe(K... shardChannels);

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ public interface RedisPubSubReactiveCommands<K, V> extends RedisReactiveCommands
*/
Mono<Void> ssubscribe(K... shardChannels);

/**
* Stop listening for messages posted to the given channels. The {@link Mono} completes without a result as soon as the
* subscription is unregistered.
*
* @param shardChannels the channels.
* @return Mono&lt;Void&gt; Mono for {@code unsubscribe} command.
* @since 7.0
*/
Mono<Void> sunsubscribe(K... shardChannels);

/**
* @return the underlying connection.
* @since 6.2, will be removed with Lettuce 7 to avoid exposing the underlying connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ public interface RedisPubSubCommands<K, V> extends RedisCommands<K, V> {
*/
void ssubscribe(K... shardChannels);

/**
* Stop listening for messages posted to the given channels.
*
* @param shardChannels the channels
* @since 7.0
*/
void sunsubscribe(K... shardChannels);

/**
* @return the underlying connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,22 @@ void publishToShardChannelViaNewClientWithNoRedirects() throws Exception {
cmd.getStatefulConnection().close();
}

@Test
@EnabledOnCommand("SSUBSCRIBE")
void unubscribeFromShardChannel() {
pubSubConnection.sync().ssubscribe(shardChannel);
pubSubConnection.sync().spublish(shardChannel, "msg1");

pubSubConnection.sync().sunsubscribe(shardChannel);
pubSubConnection.sync().spublish(shardChannel, "msg2");

pubSubConnection.sync().ssubscribe(shardChannel);
pubSubConnection.sync().spublish(shardChannel, "msg3");

Wait.untilEquals("msg1", connectionListener.getMessages()::poll).waitOrTimeout();
Wait.untilEquals("msg3", connectionListener.getMessages()::poll).waitOrTimeout();
}

@Test
void myIdWorksAfterDisconnect() throws InterruptedException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,15 @@ void ssubscribe() {
assertThat(command.getOutput()).isInstanceOf(PubSubOutput.class);
}

@Test
void sunsubscribe() {
String channel = "channelPattern";
Command<String, String, String> command = this.commandBuilder.sunsubscribe(channel);

assertThat(command.getType()).isEqualTo(SUNSUBSCRIBE);
assertThat(command.getArgs()).isInstanceOf(CommandArgs.class);
assertThat(command.getArgs().toCommandString()).isEqualTo("key<channelPattern>");
assertThat(command.getOutput()).isInstanceOf(PubSubOutput.class);
}

}
13 changes: 13 additions & 0 deletions src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,13 @@ void ssubscribe() throws Exception {
assertThat((long) counts.take()).isGreaterThan(0);
}

@Test
void sunsubscribe() throws Exception {
StepVerifier.create(pubsub.sunsubscribe(channel)).verifyComplete();
assertThat(shardChannels.take()).isEqualTo(channel);
assertThat((long) counts.take()).isEqualTo(0);
}

@Test
void pubsubCloseOnClientShutdown() {

Expand Down Expand Up @@ -515,6 +522,12 @@ public void ssubscribed(String shardChannel, long count) {
counts.add(count);
}

@Override
public void sunsubscribed(String shardChannel, long count) {
shardChannels.add(shardChannel);
counts.add(count);
}

<T> T block(Mono<T> mono) {
return mono.block();
}
Expand Down
Loading

0 comments on commit 76efb8b

Please sign in to comment.