Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix XAddOptions maxlen handling and XPendingOptions validation #2985

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ public Long getMaxlen() {
* @since 2.3
*/
public boolean hasMaxlen() {
return maxlen != null && maxlen > 0;
return maxlen != null;
Copy link
Contributor Author

@jinkshower jinkshower Sep 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For context, Lettuce and Jedis treat maxlen edge case differently. XAddArgs in Lettuce asserts maxlen > 0 whereas Jedis doesn't check on maxlen so it follows default redis behaviour which asserts maxlen >=0.

My best choice was just let the user know. So I didn't put any assertion on maxlen. If we validate against negative values (maxlen < 0), it could be confusing since maxlen is validated differently across multiple layers.

java.lang.IllegalArgumentException: Maxlen must be greater 0

	at io.lettuce.core.internal.LettuceAssert.isTrue(LettuceAssert.java:193)
	at io.lettuce.core.XAddArgs.maxlen(XAddArgs.java:121)
	at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xAdd(LettuceStreamCommands.java:84)
	at org.springframework.data.redis.connection.DefaultedRedisConnection.xAdd(DefaultedRedisConnection.java:489)
	at org.springframework.data.redis.connection.DefaultStringRedisConnection.xAdd(DefaultStringRedisConnection.java:2918)
	at org.springframework.data.redis.core.DefaultStreamOperations.lambda$add$2(DefaultStreamOperations.java:153)
127.0.0.1:6379> xadd mystream maxlen -1 * test "value"
(error) ERR The MAXLEN argument must be >= 0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fine to resort to driver defaults.

}

/**
Expand Down Expand Up @@ -685,7 +685,7 @@ default Mono<PendingMessagesSummary> xPending(ByteBuffer key, String groupName)
Assert.notNull(key, "Key must not be null");
Assert.notNull(groupName, "GroupName must not be null");

return xPendingSummary(Mono.just(new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null))).next()
return xPendingSummary(Mono.just(PendingRecordsCommand.pending(key, groupName))).next()
.map(CommandResponse::getOutput);
}

Expand Down Expand Up @@ -726,7 +726,7 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer) {
*/
@Nullable
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName) {
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, Range.unbounded(), null))).next()
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName))).next()
.map(CommandResponse::getOutput);
}

Expand All @@ -743,7 +743,7 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
* @since 2.3
*/
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count) {
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count))).next()
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count))).next()
.map(CommandResponse::getOutput);
}

Expand Down Expand Up @@ -779,8 +779,8 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<
*/
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
Long count) {
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count))).next()
.map(CommandResponse::getOutput);
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count)))
.next().map(CommandResponse::getOutput);
}

/**
Expand Down Expand Up @@ -832,9 +832,15 @@ static PendingRecordsCommand pending(ByteBuffer key, String groupName) {
/**
* Create new {@link PendingRecordsCommand} with given {@link Range} and limit.
*
* @param range must not be {@literal null}.
* @param count the max number of messages to return. Must not be negative.
* @return new instance of {@link XPendingOptions}.
*/
public PendingRecordsCommand range(Range<String> range, Long count) {
public PendingRecordsCommand range(Range<?> range, Long count) {

Assert.notNull(range, "Range must not be null");
Assert.isTrue(count > -1, "Count must not be negative");

return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
}

Expand Down Expand Up @@ -886,7 +892,7 @@ public boolean hasConsumer() {
* @return {@literal true} count is set.
*/
public boolean isLimited() {
return count != null && count > -1;
return count != null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public Long getMaxlen() {
* @return {@literal true} if {@literal MAXLEN} is set.
*/
public boolean hasMaxlen() {
return maxlen != null && maxlen > 0;
return maxlen != null;
}

/**
Expand Down Expand Up @@ -788,19 +788,28 @@ public static XPendingOptions unbounded() {
/**
* Create new {@link XPendingOptions} with an unbounded {@link Range} ({@literal - +}).
*
* @param count the max number of messages to return. Must not be {@literal null}.
* @param count the max number of messages to return. Must not be negative.
* @return new instance of {@link XPendingOptions}.
*/
public static XPendingOptions unbounded(Long count) {

Assert.isTrue(count > -1, "Count must not be negative");

return new XPendingOptions(null, Range.unbounded(), count);
}

/**
* Create new {@link XPendingOptions} with given {@link Range} and limit.
*
* @param range must not be {@literal null}.
* @param count the max number of messages to return. Must not be negative.
* @return new instance of {@link XPendingOptions}.
*/
public static XPendingOptions range(Range<?> range, Long count) {

Assert.notNull(range, "Range must not be null");
Assert.isTrue(count > -1, "Count must not be negative");

return new XPendingOptions(null, range, count);
}

Expand Down Expand Up @@ -848,7 +857,7 @@ public boolean hasConsumer() {
* @return {@literal true} count is set.
*/
public boolean isLimited() {
return count != null && count > -1;
return count != null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public StreamReadOptions block(Duration timeout) {
*/
public StreamReadOptions count(long count) {

Assert.isTrue(count > 0, "Count must be greater or equal to zero");
Assert.isTrue(count > 0, "Count must be greater than zero");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix description to represent assertion


return new StreamReadOptions(block, count, noack);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.springframework.data.redis.connection;

import static org.assertj.core.api.Assertions.*;

import java.nio.ByteBuffer;

import org.junit.jupiter.api.Test;

import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.ReactiveStreamCommands.PendingRecordsCommand;

/**
* Unit tests for {@link ReactiveStreamCommands}.
*
* @author jinkshower
*/
class ReactiveStreamCommandsUnitTests {

@Test // GH-2982
void pendingRecordsCommandRangeShouldThrowExceptionWhenRangeIsNull() {

ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes());
String groupName = "my-group";

PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName);

assertThatThrownBy(() -> command.range(null, 10L)).isInstanceOf(IllegalArgumentException.class);
}

@Test // GH-2982
void pendingRecordsCommandRangeShouldThrowExceptionWhenCountIsNegative() {

ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes());
String groupName = "my-group";

PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName);
Range<?> range = Range.closed("0", "10");

assertThatThrownBy(() -> command.range(range, -1L)).isInstanceOf(IllegalArgumentException.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.springframework.data.redis.connection;

import static org.assertj.core.api.Assertions.*;

import org.junit.jupiter.api.Test;

import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;

/**
* Unit tests for {@link RedisStreamCommands}.
*
* @author jinkshower
*/
class RedisStreamCommandsUnitTests {

@Test // GH-2982
void xPendingOptionsUnboundedShouldThrowExceptionWhenCountIsNegative() {

assertThatThrownBy(() -> XPendingOptions.unbounded(-1L)).isInstanceOf(IllegalArgumentException.class);
}

@Test // GH-2982
void xPendingOptionsRangeShouldThrowExceptionWhenRangeIsNull() {

assertThatThrownBy(() -> XPendingOptions.range(null, 10L)).isInstanceOf(IllegalArgumentException.class);
}

@Test // GH-2982
void xPendingOptionsRangeShouldThrowExceptionWhenCountIsNegative() {

Range<?> range = Range.closed("0", "10");

assertThatThrownBy(() -> XPendingOptions.range(range, -1L)).isInstanceOf(IllegalArgumentException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,21 @@ void addMaxLenShouldLimitSimpleMessageWithRawSerializerSize() {
.verifyComplete();
}

@ParameterizedRedisTest // GH-2982
void addNegativeMaxlenShouldThrowException() {

K key = keyFactory.instance();
HK hashKey = hashKeyFactory.instance();
HV value = valueFactory.instance();

XAddOptions options = XAddOptions.maxlen(-1).approximateTrimming(false);

streamOperations.add(key, Collections.singletonMap(hashKey, value), options).as(StepVerifier::create)
.expectError(IllegalArgumentException.class).verify();

streamOperations.range(key, Range.unbounded()).as(StepVerifier::create).expectNextCount(0L).verifyComplete();
}

@ParameterizedRedisTest // GH-2915
void addMinIdShouldEvictLowerIdMessages() {

Expand Down Expand Up @@ -528,6 +543,21 @@ void pendingShouldReadMessageDetails() {

}

@ParameterizedRedisTest // GH-2982
void pendingNegativeCountShouldThrowException() {

K key = keyFactory.instance();
HK hashKey = hashKeyFactory.instance();
HV value = valueFactory.instance();

streamOperations.add(key, Collections.singletonMap(hashKey, value)).block();

streamOperations.createGroup(key, ReadOffset.from("0-0"), "my-group").block();

streamOperations.pending(key, "my-group", Range.unbounded(), -1L).as(StepVerifier::create)
.expectError(IllegalArgumentException.class).verify();
}

@ParameterizedRedisTest // GH-2465
void claimShouldReadMessageDetails() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,19 @@ void addMaxLenShouldLimitSimpleMessagesSize() {
assertThat(message.getValue()).isEqualTo(newValue);
}

@ParameterizedRedisTest // GH-2982
void addNegativeMaxlenShouldThrowException() {

K key = keyFactory.instance();
HV value = hashValueFactory.instance();

XAddOptions options = XAddOptions.maxlen(-1).approximateTrimming(false);

assertThatThrownBy(() -> streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key), options));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing Redis/Driver behavior exceeds the scope of what we want to test. We will remove these tests during the merge.


assertThat(streamOps.range(key, Range.unbounded())).isEmpty();
}

@ParameterizedRedisTest // GH-2915
void addMinIdShouldEvictLowerIdMessages() {

Expand Down Expand Up @@ -565,6 +578,19 @@ void pendingShouldReadMessageDetails() {
assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
}

@ParameterizedRedisTest // GH-2982
void pendingNegativeCountShouldThrowException() {
K key = keyFactory.instance();
HK hashKey = hashKeyFactory.instance();
HV value = hashValueFactory.instance();

streamOps.add(key, Collections.singletonMap(hashKey, value));
streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group");

assertThatThrownBy(() -> streamOps.pending(key, "my-group", Range.unbounded(), -1L))
.isInstanceOf(IllegalArgumentException.class);
}

@ParameterizedRedisTest // GH-2465
void claimShouldReadMessageDetails() {

Expand Down