Skip to content

Commit

Permalink
Fix XAddOptions maxlen handling and XPendingOptions validation
Browse files Browse the repository at this point in the history
  • Loading branch information
jinkshower committed Sep 8, 2024
1 parent fb0f0bc commit d631e5c
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ public Long getMaxlen() {
* @since 2.3
*/
public boolean hasMaxlen() {
return maxlen != null && maxlen > 0;
return maxlen != null;
}

/**
Expand Down Expand Up @@ -685,7 +685,7 @@ default Mono<PendingMessagesSummary> xPending(ByteBuffer key, String groupName)
Assert.notNull(key, "Key must not be null");
Assert.notNull(groupName, "GroupName must not be null");

return xPendingSummary(Mono.just(new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null))).next()
return xPendingSummary(Mono.just(PendingRecordsCommand.pending(key, groupName))).next()
.map(CommandResponse::getOutput);
}

Expand Down Expand Up @@ -726,7 +726,7 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer) {
*/
@Nullable
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName) {
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, Range.unbounded(), null))).next()
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName))).next()
.map(CommandResponse::getOutput);
}

Expand All @@ -743,7 +743,7 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
* @since 2.3
*/
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count) {
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count))).next()
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count))).next()
.map(CommandResponse::getOutput);
}

Expand Down Expand Up @@ -779,8 +779,8 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<
*/
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
Long count) {
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count))).next()
.map(CommandResponse::getOutput);
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count)))
.next().map(CommandResponse::getOutput);
}

/**
Expand Down Expand Up @@ -832,9 +832,15 @@ static PendingRecordsCommand pending(ByteBuffer key, String groupName) {
/**
* Create new {@link PendingRecordsCommand} with given {@link Range} and limit.
*
* @param range must not be {@literal null}.
* @param count the max number of messages to return. Must not be negative.
* @return new instance of {@link XPendingOptions}.
*/
public PendingRecordsCommand range(Range<String> range, Long count) {
public PendingRecordsCommand range(Range<?> range, Long count) {

Assert.notNull(range, "Range must not be null");
Assert.isTrue(count > -1, "Count must not be negative");

return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
}

Expand Down Expand Up @@ -886,7 +892,7 @@ public boolean hasConsumer() {
* @return {@literal true} count is set.
*/
public boolean isLimited() {
return count != null && count > -1;
return count != null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public Long getMaxlen() {
* @return {@literal true} if {@literal MAXLEN} is set.
*/
public boolean hasMaxlen() {
return maxlen != null && maxlen > 0;
return maxlen != null;
}

/**
Expand Down Expand Up @@ -788,19 +788,28 @@ public static XPendingOptions unbounded() {
/**
* Create new {@link XPendingOptions} with an unbounded {@link Range} ({@literal - +}).
*
* @param count the max number of messages to return. Must not be {@literal null}.
* @param count the max number of messages to return. Must not be negative.
* @return new instance of {@link XPendingOptions}.
*/
public static XPendingOptions unbounded(Long count) {

Assert.isTrue(count > -1, "Count must not be negative");

return new XPendingOptions(null, Range.unbounded(), count);
}

/**
* Create new {@link XPendingOptions} with given {@link Range} and limit.
*
* @param range must not be {@literal null}.
* @param count the max number of messages to return. Must not be negative.
* @return new instance of {@link XPendingOptions}.
*/
public static XPendingOptions range(Range<?> range, Long count) {

Assert.notNull(range, "Range must not be null");
Assert.isTrue(count > -1, "Count must not be negative");

return new XPendingOptions(null, range, count);
}

Expand Down Expand Up @@ -848,7 +857,7 @@ public boolean hasConsumer() {
* @return {@literal true} count is set.
*/
public boolean isLimited() {
return count != null && count > -1;
return count != null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public StreamReadOptions block(Duration timeout) {
*/
public StreamReadOptions count(long count) {

Assert.isTrue(count > 0, "Count must be greater or equal to zero");
Assert.isTrue(count > 0, "Count must be greater than zero");

return new StreamReadOptions(block, count, noack);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.springframework.data.redis.connection;

import static org.assertj.core.api.Assertions.*;

import java.nio.ByteBuffer;

import org.junit.jupiter.api.Test;

import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.ReactiveStreamCommands.PendingRecordsCommand;

/**
* Unit tests for {@link ReactiveStreamCommands}.
*
* @author jinkshower
*/
class ReactiveStreamCommandsUnitTests {

@Test // GH-2982
void pendingRecordsCommandRangeShouldThrowExceptionWhenRangeIsNull() {

ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes());
String groupName = "my-group";

PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName);

assertThatThrownBy(() -> command.range(null, 10L)).isInstanceOf(IllegalArgumentException.class);
}

@Test // GH-2982
void pendingRecordsCommandRangeShouldThrowExceptionWhenCountIsNegative() {

ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes());
String groupName = "my-group";

PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName);
Range<?> range = Range.closed("0", "10");

assertThatThrownBy(() -> command.range(range, -1L)).isInstanceOf(IllegalArgumentException.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.springframework.data.redis.connection;

import static org.assertj.core.api.Assertions.*;

import org.junit.jupiter.api.Test;

import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;

/**
* Unit tests for {@link RedisStreamCommands}.
*
* @author jinkshower
*/
class RedisStreamCommandsUnitTests {

@Test // GH-2982
void xPendingOptionsUnboundedShouldThrowExceptionWhenCountIsNegative() {

assertThatThrownBy(() -> XPendingOptions.unbounded(-1L)).isInstanceOf(IllegalArgumentException.class);
}

@Test // GH-2982
void xPendingOptionsRangeShouldThrowExceptionWhenRangeIsNull() {

assertThatThrownBy(() -> XPendingOptions.range(null, 10L)).isInstanceOf(IllegalArgumentException.class);
}

@Test // GH-2982
void xPendingOptionsRangeShouldThrowExceptionWhenCountIsNegative() {

Range<?> range = Range.closed("0", "10");

assertThatThrownBy(() -> XPendingOptions.range(range, -1L)).isInstanceOf(IllegalArgumentException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,21 @@ void addMaxLenShouldLimitSimpleMessageWithRawSerializerSize() {
.verifyComplete();
}

@ParameterizedRedisTest // GH-2982
void addNegativeMaxlenShouldThrowException() {

K key = keyFactory.instance();
HK hashKey = hashKeyFactory.instance();
HV value = valueFactory.instance();

XAddOptions options = XAddOptions.maxlen(-1).approximateTrimming(false);

streamOperations.add(key, Collections.singletonMap(hashKey, value), options).as(StepVerifier::create)
.expectError(IllegalArgumentException.class).verify();

streamOperations.range(key, Range.unbounded()).as(StepVerifier::create).expectNextCount(0L).verifyComplete();
}

@ParameterizedRedisTest // GH-2915
void addMinIdShouldEvictLowerIdMessages() {

Expand Down Expand Up @@ -528,6 +543,21 @@ void pendingShouldReadMessageDetails() {

}

@ParameterizedRedisTest // GH-2982
void pendingNegativeCountShouldThrowException() {

K key = keyFactory.instance();
HK hashKey = hashKeyFactory.instance();
HV value = valueFactory.instance();

streamOperations.add(key, Collections.singletonMap(hashKey, value)).block();

streamOperations.createGroup(key, ReadOffset.from("0-0"), "my-group").block();

streamOperations.pending(key, "my-group", Range.unbounded(), -1L).as(StepVerifier::create)
.expectError(IllegalArgumentException.class).verify();
}

@ParameterizedRedisTest // GH-2465
void claimShouldReadMessageDetails() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,19 @@ void addMaxLenShouldLimitSimpleMessagesSize() {
assertThat(message.getValue()).isEqualTo(newValue);
}

@ParameterizedRedisTest // GH-2982
void addNegativeMaxlenShouldThrowException() {

K key = keyFactory.instance();
HV value = hashValueFactory.instance();

XAddOptions options = XAddOptions.maxlen(-1).approximateTrimming(false);

assertThatThrownBy(() -> streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key), options));

assertThat(streamOps.range(key, Range.unbounded())).isEmpty();
}

@ParameterizedRedisTest // GH-2915
void addMinIdShouldEvictLowerIdMessages() {

Expand Down Expand Up @@ -565,6 +578,19 @@ void pendingShouldReadMessageDetails() {
assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
}

@ParameterizedRedisTest // GH-2982
void pendingNegativeCountShouldThrowException() {
K key = keyFactory.instance();
HK hashKey = hashKeyFactory.instance();
HV value = hashValueFactory.instance();

streamOps.add(key, Collections.singletonMap(hashKey, value));
streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group");

assertThatThrownBy(() -> streamOps.pending(key, "my-group", Range.unbounded(), -1L))
.isInstanceOf(IllegalArgumentException.class);
}

@ParameterizedRedisTest // GH-2465
void claimShouldReadMessageDetails() {

Expand Down

0 comments on commit d631e5c

Please sign in to comment.