Skip to content

Commit 5b50789

Browse files
committed
Add overloads for XAddOptions in StreamOperations.
Closes #2915
1 parent 8b5f29e commit 5b50789

File tree

7 files changed

+311
-1
lines changed

7 files changed

+311
-1
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

+32-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
3333
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
3434
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
35+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
3536
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
3637
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
3738
import org.springframework.data.redis.connection.stream.Consumer;
@@ -58,6 +59,7 @@
5859
* @author Tugdual Grall
5960
* @author Dengliming
6061
* @author Mark John Moreno
62+
* @author jinkshower
6163
* @since 2.2
6264
*/
6365
public interface ReactiveStreamCommands {
@@ -394,11 +396,40 @@ default Mono<RecordId> xAdd(ByteBufferRecord record) {
394396
return xAdd(Mono.just(AddStreamRecord.of(record))).next().map(CommandResponse::getOutput);
395397
}
396398

399+
/**
400+
* Add stream record with the specified options.
401+
*
402+
* @param record must not be {@literal null}.
403+
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
404+
* @return {@link Mono} the {@link RecordId id}.
405+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
406+
* @since 3.3
407+
*/
408+
default Mono<RecordId> xAdd(ByteBufferRecord record, XAddOptions xAddOptions) {
409+
410+
Assert.notNull(record, "Record must not be null");
411+
Assert.notNull(xAddOptions, "XAddOptions must not be null");
412+
413+
AddStreamRecord addStreamRecord = AddStreamRecord.of(record)
414+
.approximateTrimming(xAddOptions.isApproximateTrimming())
415+
.makeNoStream(xAddOptions.isNoMkStream());
416+
417+
if (xAddOptions.hasMaxlen()) {
418+
addStreamRecord = addStreamRecord.maxlen(xAddOptions.getMaxlen());
419+
}
420+
421+
if (xAddOptions.hasMinId()) {
422+
addStreamRecord = addStreamRecord.minId(xAddOptions.getMinId());
423+
}
424+
425+
return xAdd(Mono.just(addStreamRecord)).next().map(CommandResponse::getOutput);
426+
}
427+
397428
/**
398429
* Add stream record with given {@literal body} to {@literal key}.
399430
*
400431
* @param commands must not be {@literal null}.
401-
* @return {@link Flux} emitting the {@link RecordId} on by for for the given {@link AddStreamRecord} commands.
432+
* @return {@link Flux} emitting the {@link RecordId} on by for the given {@link AddStreamRecord} commands.
402433
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
403434
*/
404435
Flux<CommandResponse<AddStreamRecord, RecordId>> xAdd(Publisher<AddStreamRecord> commands);

src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java

+14
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.data.redis.connection.Limit;
3434
import org.springframework.data.redis.connection.ReactiveStreamCommands;
3535
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
36+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
3637
import org.springframework.data.redis.connection.convert.Converters;
3738
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
3839
import org.springframework.data.redis.connection.stream.Consumer;
@@ -60,6 +61,7 @@
6061
* @author Christoph Strobl
6162
* @author Marcin Zielinski
6263
* @author John Blum
64+
* @author jinkshower
6365
* @since 2.2
6466
*/
6567
class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperations<K, HK, HV> {
@@ -146,6 +148,18 @@ public Mono<RecordId> add(Record<K, ?> record) {
146148
return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input)));
147149
}
148150

151+
@Override
152+
public Mono<RecordId> add(Record<K, ?> record, XAddOptions xAddOptions) {
153+
154+
Assert.notNull(record.getStream(), "Key must not be null");
155+
Assert.notNull(record.getValue(), "Body must not be null");
156+
Assert.notNull(xAddOptions, "XAddOptions must not be null");
157+
158+
MapRecord<K, HK, HV> input = StreamObjectMapper.toMapRecord(this, record);
159+
160+
return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input), xAddOptions));
161+
}
162+
149163
@Override
150164
public Flux<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {
151165

src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java

+17
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.springframework.data.domain.Range;
2727
import org.springframework.data.redis.connection.Limit;
2828
import org.springframework.data.redis.connection.RedisConnection;
29+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
2930
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
3031
import org.springframework.data.redis.connection.stream.ByteRecord;
3132
import org.springframework.data.redis.connection.stream.Consumer;
@@ -54,6 +55,7 @@
5455
* @author Christoph Strobl
5556
* @author Marcin Zielinski
5657
* @author John Blum
58+
* @author jinkshower
5759
* @since 2.2
5860
*/
5961
class DefaultStreamOperations<K, HK, HV> extends AbstractOperations<K, Object> implements StreamOperations<K, HK, HV> {
@@ -136,6 +138,21 @@ public RecordId add(Record<K, ?> record) {
136138
return execute(connection -> connection.xAdd(binaryRecord));
137139
}
138140

141+
@Nullable
142+
@Override
143+
@SuppressWarnings("unchecked")
144+
public RecordId add(Record<K , ?> record, XAddOptions options) {
145+
146+
Assert.notNull(record, "Record must not be null");
147+
Assert.notNull(options, "XAddOptions must not be null");
148+
149+
MapRecord<K, HK, HV> input = StreamObjectMapper.toMapRecord(this, record);
150+
151+
ByteRecord binaryRecord = input.serialize(keySerializer(), hashKeySerializer(), hashValueSerializer());
152+
153+
return execute(connection -> connection.streamCommands().xAdd(binaryRecord, options));
154+
}
155+
139156
@Override
140157
public List<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {
141158

src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java

+44
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.springframework.data.domain.Range;
2727
import org.springframework.data.redis.connection.Limit;
2828
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
29+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
2930
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
3031
import org.springframework.data.redis.connection.stream.Consumer;
3132
import org.springframework.data.redis.connection.stream.MapRecord;
@@ -54,6 +55,7 @@
5455
* @author Dengliming
5556
* @author Marcin Zielinski
5657
* @author John Blum
58+
* @author jinkshower
5759
* @since 2.2
5860
*/
5961
public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -94,6 +96,48 @@ default Mono<Long> acknowledge(String group, Record<K, ?> record) {
9496
return acknowledge(record.getRequiredStream(), group, record.getId());
9597
}
9698

99+
/**
100+
* Append a record to the stream {@code key} with the specified options.
101+
*
102+
* @param key the stream key.
103+
* @param content record content as Map.
104+
* @param xAddOptions parameters for the {@literal XADD} call.
105+
* @return the {@link Mono} emitting the {@link RecordId}.
106+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
107+
* @since 3.3
108+
*/
109+
default Mono<RecordId> add(K key, Map<? extends HK, ? extends HV> content, XAddOptions xAddOptions) {
110+
return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
111+
}
112+
113+
/**
114+
* Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options.
115+
*
116+
* @param record the record to append.
117+
* @param xAddOptions parameters for the {@literal XADD} call.
118+
* @return the {@link Mono} emitting the {@link RecordId}.
119+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
120+
* @since 3.3
121+
*/
122+
@SuppressWarnings("unchecked")
123+
default Mono<RecordId> add(MapRecord<K, ? extends HK, ? extends HV> record, XAddOptions xAddOptions) {
124+
return add((Record) record, xAddOptions);
125+
}
126+
127+
/**
128+
* Append the record, backed by the given value, to the stream with the specified options.
129+
* The value will be hashed and serialized.
130+
*
131+
* @param record must not be {@literal null}.
132+
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
133+
* @return the {@link Mono} emitting the {@link RecordId}.
134+
* @see MapRecord
135+
* @see ObjectRecord
136+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
137+
* @since 3.3
138+
*/
139+
Mono<RecordId> add(Record<K, ?> record, XAddOptions xAddOptions);
140+
97141
/**
98142
* Append one or more records to the stream {@code key}.
99143
*

src/main/java/org/springframework/data/redis/core/StreamOperations.java

+49
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.data.domain.Range;
2626
import org.springframework.data.redis.connection.Limit;
2727
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
28+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
2829
import org.springframework.data.redis.connection.stream.ByteRecord;
2930
import org.springframework.data.redis.connection.stream.Consumer;
3031
import org.springframework.data.redis.connection.stream.MapRecord;
@@ -53,6 +54,7 @@
5354
* @author Dengliming
5455
* @author Marcin Zielinski
5556
* @author John Blum
57+
* @author jinkshower
5658
* @since 2.2
5759
*/
5860
public interface StreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -95,6 +97,53 @@ default Long acknowledge(String group, Record<K, ?> record) {
9597
return acknowledge(record.getRequiredStream(), group, record.getId());
9698
}
9799

100+
/**
101+
* Append a record to the stream {@code key} with the specified options.
102+
*
103+
* @param key the stream key.
104+
* @param content record content as Map.
105+
* @param xAddOptions additional parameters for the {@literal XADD} call.
106+
* @return the record Id. {@literal null} when used in pipeline / transaction.
107+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
108+
* @since 3.3
109+
*/
110+
@SuppressWarnings("unchecked")
111+
@Nullable
112+
default RecordId add(K key, Map<? extends HK, ? extends HV> content, XAddOptions xAddOptions) {
113+
return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
114+
}
115+
116+
/**
117+
* Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options.
118+
*
119+
* @param record the record to append.
120+
* @param xAddOptions additional parameters for the {@literal XADD} call.
121+
* @return the record Id. {@literal null} when used in pipeline / transaction.
122+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
123+
* @since 3.3
124+
*/
125+
@SuppressWarnings("unchecked")
126+
@Nullable
127+
default RecordId add(MapRecord<K, ? extends HK, ? extends HV> record, XAddOptions xAddOptions) {
128+
return add((Record) record, xAddOptions);
129+
}
130+
131+
/**
132+
* Append the record, backed by the given value, to the stream with the specified options.
133+
* The value will be hashed and serialized.
134+
*
135+
* @param record must not be {@literal null}.
136+
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
137+
* @return the record Id. {@literal null} when used in pipeline / transaction.
138+
* @see MapRecord
139+
* @see ObjectRecord
140+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
141+
* @since 3.3
142+
*/
143+
@SuppressWarnings("unchecked")
144+
@Nullable
145+
RecordId add(Record<K, ?> record, XAddOptions xAddOptions);
146+
98147
/**
99148
* Append a record to the stream {@code key}.
100149
*

src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java

+98
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.springframework.data.redis.connection.Limit;
3636
import org.springframework.data.redis.connection.RedisConnection;
3737
import org.springframework.data.redis.connection.RedisConnectionFactory;
38+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
3839
import org.springframework.data.redis.connection.stream.Consumer;
3940
import org.springframework.data.redis.connection.stream.MapRecord;
4041
import org.springframework.data.redis.connection.stream.ReadOffset;
@@ -61,6 +62,7 @@
6162
* @author Mark Paluch
6263
* @author Christoph Strobl
6364
* @author Marcin Zielinski
65+
* @author jinkshower
6466
*/
6567
@MethodSource("testParams")
6668
@SuppressWarnings("unchecked")
@@ -192,6 +194,102 @@ void addShouldAddReadSimpleMessageWithRawSerializer() {
192194
.verifyComplete();
193195
}
194196

197+
@ParameterizedRedisTest // GH-2915
198+
void addShouldAddMessageWithOptions() {
199+
200+
K key = keyFactory.instance();
201+
HK hashKey = hashKeyFactory.instance();
202+
HV value = valueFactory.instance();
203+
204+
streamOperations.add(key, Collections.singletonMap(hashKey, value)).block();
205+
206+
HV newValue = valueFactory.instance();
207+
XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false);
208+
209+
RecordId messageId = streamOperations.add(key, Collections.singletonMap(hashKey, newValue), options).block();
210+
211+
streamOperations.range(key, Range.unbounded()) //
212+
.as(StepVerifier::create) //
213+
.consumeNextWith(actual -> {
214+
215+
assertThat(actual.getId()).isEqualTo(messageId);
216+
assertThat(actual.getStream()).isEqualTo(key);
217+
assertThat(actual).hasSize(1);
218+
219+
if (!(key instanceof byte[] || value instanceof byte[])) {
220+
assertThat(actual.getValue()).containsEntry(hashKey, newValue);
221+
}
222+
223+
}) //
224+
.verifyComplete();
225+
}
226+
227+
@ParameterizedRedisTest // GH-2915
228+
void addShouldAddReadSimpleMessageWithOptions() {
229+
230+
assumeTrue(!(serializer instanceof Jackson2JsonRedisSerializer)
231+
&& !(serializer instanceof GenericJackson2JsonRedisSerializer)
232+
&& !(serializer instanceof JdkSerializationRedisSerializer) && !(serializer instanceof OxmSerializer));
233+
234+
K key = keyFactory.instance();
235+
HV value = valueFactory.instance();
236+
237+
streamOperations.add(StreamRecords.objectBacked(value).withStreamKey(key)).block();
238+
239+
HV newValue = valueFactory.instance();
240+
XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false);
241+
242+
RecordId messageId = streamOperations.add(StreamRecords.objectBacked(newValue).withStreamKey(key), options).block();
243+
244+
streamOperations.range((Class<HV>) value.getClass(), key, Range.unbounded())
245+
.as(StepVerifier::create) //
246+
.consumeNextWith(actual -> {
247+
assertThat(actual.getId()).isEqualTo(messageId);
248+
assertThat(actual.getStream()).isEqualTo(key);
249+
250+
assertThat(actual.getValue()).isEqualTo(newValue);
251+
}) //
252+
.expectNextCount(0)
253+
.verifyComplete();
254+
}
255+
256+
@ParameterizedRedisTest // GH-2915
257+
void addShouldAddReadSimpleMessageWithRawSerializerWithOptions() {
258+
259+
assumeTrue(!(serializer instanceof Jackson2JsonRedisSerializer)
260+
&& !(serializer instanceof GenericJackson2JsonRedisSerializer));
261+
262+
SerializationPair<K> keySerializer = redisTemplate.getSerializationContext().getKeySerializationPair();
263+
264+
RedisSerializationContext<K, String> serializationContext = RedisSerializationContext
265+
.<K, String> newSerializationContext(StringRedisSerializer.UTF_8).key(keySerializer)
266+
.hashValue(SerializationPair.raw()).hashKey(SerializationPair.raw()).build();
267+
268+
ReactiveRedisTemplate<K, String> raw = new ReactiveRedisTemplate<>(redisTemplate.getConnectionFactory(),
269+
serializationContext);
270+
271+
K key = keyFactory.instance();
272+
Person value = new PersonObjectFactory().instance();
273+
274+
raw.opsForStream().add(StreamRecords.objectBacked(value).withStreamKey(key)).block();
275+
276+
Person newValue = new PersonObjectFactory().instance();
277+
XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false);
278+
279+
RecordId messageId = raw.opsForStream().add(StreamRecords.objectBacked(newValue).withStreamKey(key), options).block();
280+
281+
raw.opsForStream().range((Class<HV>) value.getClass(), key, Range.unbounded())
282+
.as(StepVerifier::create) //
283+
.consumeNextWith(it -> {
284+
285+
assertThat(it.getId()).isEqualTo(messageId);
286+
assertThat(it.getStream()).isEqualTo(key);
287+
assertThat(it.getValue()).isEqualTo(newValue);
288+
}) //
289+
.expectNextCount(0)
290+
.verifyComplete();
291+
}
292+
195293
@ParameterizedRedisTest // DATAREDIS-864
196294
void rangeShouldReportMessages() {
197295

0 commit comments

Comments
 (0)