Skip to content

Commit

Permalink
Check super stream management commands are supported
Browse files Browse the repository at this point in the history
  • Loading branch information
acogoluegnes committed Nov 13, 2023
1 parent fa1d133 commit 6868fb3
Showing 1 changed file with 23 additions and 9 deletions.
32 changes: 23 additions & 9 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,9 @@ public long applyAsLong(Object value) {
private final Duration rpcTimeout;
private final List<String> saslMechanisms;
private volatile ShutdownReason shutdownReason = null;
private final Runnable exchangeCommandVersionsCheck;
private final Runnable streamStatsCommandVersionsCheck;
private final boolean filteringSupported;
private final Runnable superStreamManagementCommandVersionsCheck;

public Client() {
this(new ClientParameters());
Expand Down Expand Up @@ -376,25 +377,36 @@ public void initChannel(SocketChannel ch) {
tuneState.getHeartbeat());
this.connectionProperties = open(parameters.virtualHost);
Set<FrameHandlerInfo> supportedCommands = maybeExchangeCommandVersions();
AtomicReference<Runnable> exchangeCommandVersionsCheckReference = new AtomicReference<>();
AtomicBoolean streamStatsSupported = new AtomicBoolean(false);
AtomicBoolean filteringSupportedReference = new AtomicBoolean(false);
AtomicBoolean superStreamManagementSupported = new AtomicBoolean(false);
supportedCommands.forEach(
c -> {
if (c.getKey() == COMMAND_STREAM_STATS) {
exchangeCommandVersionsCheckReference.set(() -> {});
streamStatsSupported.set(true);
}
if (c.getKey() == COMMAND_PUBLISH && c.getMaxVersion() >= VERSION_2) {
filteringSupportedReference.set(true);
}
if (c.getKey() == COMMAND_CREATE_SUPER_STREAM) {
superStreamManagementSupported.set(true);
}
});
this.exchangeCommandVersionsCheck =
exchangeCommandVersionsCheckReference.get() == null
? () -> {
this.streamStatsCommandVersionsCheck =
streamStatsSupported.get()
? () -> {}
: () -> {
throw new UnsupportedOperationException(
"QueryStreamInfo is available only on RabbitMQ 3.11 or more.");
}
: exchangeCommandVersionsCheckReference.get();
};
this.filteringSupported = filteringSupportedReference.get();
this.superStreamManagementCommandVersionsCheck =
superStreamManagementSupported.get()
? () -> {}
: () -> {
throw new UnsupportedOperationException(
"Super stream management is available only on RabbitMQ 3.13 or more.");
};
started.set(true);
this.metricsCollector.openConnection();
} catch (RuntimeException e) {
Expand Down Expand Up @@ -678,6 +690,7 @@ Response createSuperStream(
List<String> partitions,
List<String> routingKeys,
Map<String, String> arguments) {
this.superStreamManagementCommandVersionsCheck.run();
if (partitions.isEmpty() || routingKeys.isEmpty()) {
throw new IllegalArgumentException(
"Partitions and routing keys of a super stream cannot be empty");
Expand Down Expand Up @@ -724,6 +737,7 @@ Response createSuperStream(
}

Response deleteSuperStream(String superStream) {
this.superStreamManagementCommandVersionsCheck.run();
int length = 2 + 2 + 4 + 2 + superStream.length();
int correlationId = correlationSequence.incrementAndGet();
try {
Expand Down Expand Up @@ -1594,7 +1608,7 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
}

StreamStatsResponse streamStats(String stream) {
this.exchangeCommandVersionsCheck.run();
this.streamStatsCommandVersionsCheck.run();
if (stream == null) {
throw new IllegalArgumentException("stream must not be null");
}
Expand Down

0 comments on commit 6868fb3

Please sign in to comment.