Skip to content

Commit 486dc97

Browse files
jinkshowerchristophstrobl
authored andcommitted
Add overloads for StreamOperations add with additional XAddOptions.
Closes: #2915 Original Pull Request: #2936
1 parent bfa62bf commit 486dc97

File tree

7 files changed

+488
-1
lines changed

7 files changed

+488
-1
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

+32-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
3333
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
3434
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
35+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
3536
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
3637
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
3738
import org.springframework.data.redis.connection.stream.Consumer;
@@ -58,6 +59,7 @@
5859
* @author Tugdual Grall
5960
* @author Dengliming
6061
* @author Mark John Moreno
62+
* @author jinkshower
6163
* @since 2.2
6264
*/
6365
public interface ReactiveStreamCommands {
@@ -394,11 +396,40 @@ default Mono<RecordId> xAdd(ByteBufferRecord record) {
394396
return xAdd(Mono.just(AddStreamRecord.of(record))).next().map(CommandResponse::getOutput);
395397
}
396398

399+
/**
400+
* Add stream record with the specified options.
401+
*
402+
* @param record must not be {@literal null}.
403+
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
404+
* @return {@link Mono} the {@link RecordId id}.
405+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
406+
* @since 3.3
407+
*/
408+
default Mono<RecordId> xAdd(ByteBufferRecord record, XAddOptions xAddOptions) {
409+
410+
Assert.notNull(record, "Record must not be null");
411+
Assert.notNull(xAddOptions, "XAddOptions must not be null");
412+
413+
AddStreamRecord addStreamRecord = AddStreamRecord.of(record)
414+
.approximateTrimming(xAddOptions.isApproximateTrimming())
415+
.makeNoStream(xAddOptions.isNoMkStream());
416+
417+
if (xAddOptions.hasMaxlen()) {
418+
addStreamRecord = addStreamRecord.maxlen(xAddOptions.getMaxlen());
419+
}
420+
421+
if (xAddOptions.hasMinId()) {
422+
addStreamRecord = addStreamRecord.minId(xAddOptions.getMinId());
423+
}
424+
425+
return xAdd(Mono.just(addStreamRecord)).next().map(CommandResponse::getOutput);
426+
}
427+
397428
/**
398429
* Add stream record with given {@literal body} to {@literal key}.
399430
*
400431
* @param commands must not be {@literal null}.
401-
* @return {@link Flux} emitting the {@link RecordId} on by for for the given {@link AddStreamRecord} commands.
432+
* @return {@link Flux} emitting the {@link RecordId} on by for the given {@link AddStreamRecord} commands.
402433
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
403434
*/
404435
Flux<CommandResponse<AddStreamRecord, RecordId>> xAdd(Publisher<AddStreamRecord> commands);

src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java

+14
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.data.redis.connection.Limit;
3434
import org.springframework.data.redis.connection.ReactiveStreamCommands;
3535
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
36+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
3637
import org.springframework.data.redis.connection.convert.Converters;
3738
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
3839
import org.springframework.data.redis.connection.stream.Consumer;
@@ -60,6 +61,7 @@
6061
* @author Christoph Strobl
6162
* @author Marcin Zielinski
6263
* @author John Blum
64+
* @author jinkshower
6365
* @since 2.2
6466
*/
6567
class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperations<K, HK, HV> {
@@ -146,6 +148,18 @@ public Mono<RecordId> add(Record<K, ?> record) {
146148
return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input)));
147149
}
148150

151+
@Override
152+
public Mono<RecordId> add(Record<K, ?> record, XAddOptions xAddOptions) {
153+
154+
Assert.notNull(record.getStream(), "Key must not be null");
155+
Assert.notNull(record.getValue(), "Body must not be null");
156+
Assert.notNull(xAddOptions, "XAddOptions must not be null");
157+
158+
MapRecord<K, HK, HV> input = StreamObjectMapper.toMapRecord(this, record);
159+
160+
return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input), xAddOptions));
161+
}
162+
149163
@Override
150164
public Flux<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {
151165

src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java

+17
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.springframework.data.domain.Range;
2727
import org.springframework.data.redis.connection.Limit;
2828
import org.springframework.data.redis.connection.RedisConnection;
29+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
2930
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
3031
import org.springframework.data.redis.connection.stream.ByteRecord;
3132
import org.springframework.data.redis.connection.stream.Consumer;
@@ -54,6 +55,7 @@
5455
* @author Christoph Strobl
5556
* @author Marcin Zielinski
5657
* @author John Blum
58+
* @author jinkshower
5759
* @since 2.2
5860
*/
5961
class DefaultStreamOperations<K, HK, HV> extends AbstractOperations<K, Object> implements StreamOperations<K, HK, HV> {
@@ -136,6 +138,21 @@ public RecordId add(Record<K, ?> record) {
136138
return execute(connection -> connection.xAdd(binaryRecord));
137139
}
138140

141+
@Nullable
142+
@Override
143+
@SuppressWarnings("unchecked")
144+
public RecordId add(Record<K , ?> record, XAddOptions options) {
145+
146+
Assert.notNull(record, "Record must not be null");
147+
Assert.notNull(options, "XAddOptions must not be null");
148+
149+
MapRecord<K, HK, HV> input = StreamObjectMapper.toMapRecord(this, record);
150+
151+
ByteRecord binaryRecord = input.serialize(keySerializer(), hashKeySerializer(), hashValueSerializer());
152+
153+
return execute(connection -> connection.streamCommands().xAdd(binaryRecord, options));
154+
}
155+
139156
@Override
140157
public List<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {
141158

src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java

+59
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.springframework.data.domain.Range;
2727
import org.springframework.data.redis.connection.Limit;
2828
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
29+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
2930
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
3031
import org.springframework.data.redis.connection.stream.Consumer;
3132
import org.springframework.data.redis.connection.stream.MapRecord;
@@ -54,6 +55,7 @@
5455
* @author Dengliming
5556
* @author Marcin Zielinski
5657
* @author John Blum
58+
* @author jinkshower
5759
* @since 2.2
5860
*/
5961
public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -94,6 +96,63 @@ default Mono<Long> acknowledge(String group, Record<K, ?> record) {
9496
return acknowledge(record.getRequiredStream(), group, record.getId());
9597
}
9698

99+
/**
100+
* Append one or more records to the stream {@code key} with the specified options.
101+
*
102+
* @param key the stream key.
103+
* @param bodyPublisher record body {@link Publisher}.
104+
* @param xAddOptions parameters for the {@literal XADD} call.
105+
* @return the record Ids.
106+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
107+
* @since 3.3
108+
*/
109+
default Flux<RecordId> add (K key, Publisher<? extends Map<? extends HK, ? extends HV>> bodyPublisher,
110+
XAddOptions xAddOptions) {
111+
return Flux.from(bodyPublisher).flatMap(it -> add(key, it, xAddOptions));
112+
}
113+
114+
/**
115+
* Append a record to the stream {@code key} with the specified options.
116+
*
117+
* @param key the stream key.
118+
* @param content record content as Map.
119+
* @param xAddOptions parameters for the {@literal XADD} call.
120+
* @return the {@link Mono} emitting the {@link RecordId}.
121+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
122+
* @since 3.3
123+
*/
124+
default Mono<RecordId> add(K key, Map<? extends HK, ? extends HV> content, XAddOptions xAddOptions) {
125+
return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
126+
}
127+
128+
/**
129+
* Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options.
130+
*
131+
* @param record the record to append.
132+
* @param xAddOptions parameters for the {@literal XADD} call.
133+
* @return the {@link Mono} emitting the {@link RecordId}.
134+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
135+
* @since 3.3
136+
*/
137+
@SuppressWarnings("unchecked")
138+
default Mono<RecordId> add(MapRecord<K, ? extends HK, ? extends HV> record, XAddOptions xAddOptions) {
139+
return add((Record) record, xAddOptions);
140+
}
141+
142+
/**
143+
* Append the record, backed by the given value, to the stream with the specified options.
144+
* The value will be hashed and serialized.
145+
*
146+
* @param record must not be {@literal null}.
147+
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
148+
* @return the {@link Mono} emitting the {@link RecordId}.
149+
* @see MapRecord
150+
* @see ObjectRecord
151+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
152+
* @since 3.3
153+
*/
154+
Mono<RecordId> add(Record<K, ?> record, XAddOptions xAddOptions);
155+
97156
/**
98157
* Append one or more records to the stream {@code key}.
99158
*

src/main/java/org/springframework/data/redis/core/StreamOperations.java

+49
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.data.domain.Range;
2626
import org.springframework.data.redis.connection.Limit;
2727
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
28+
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
2829
import org.springframework.data.redis.connection.stream.ByteRecord;
2930
import org.springframework.data.redis.connection.stream.Consumer;
3031
import org.springframework.data.redis.connection.stream.MapRecord;
@@ -53,6 +54,7 @@
5354
* @author Dengliming
5455
* @author Marcin Zielinski
5556
* @author John Blum
57+
* @author jinkshower
5658
* @since 2.2
5759
*/
5860
public interface StreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -95,6 +97,53 @@ default Long acknowledge(String group, Record<K, ?> record) {
9597
return acknowledge(record.getRequiredStream(), group, record.getId());
9698
}
9799

100+
/**
101+
* Append a record to the stream {@code key} with the specified options.
102+
*
103+
* @param key the stream key.
104+
* @param content record content as Map.
105+
* @param xAddOptions additional parameters for the {@literal XADD} call.
106+
* @return the record Id. {@literal null} when used in pipeline / transaction.
107+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
108+
* @since 3.3
109+
*/
110+
@SuppressWarnings("unchecked")
111+
@Nullable
112+
default RecordId add(K key, Map<? extends HK, ? extends HV> content, XAddOptions xAddOptions) {
113+
return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
114+
}
115+
116+
/**
117+
* Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options.
118+
*
119+
* @param record the record to append.
120+
* @param xAddOptions additional parameters for the {@literal XADD} call.
121+
* @return the record Id. {@literal null} when used in pipeline / transaction.
122+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
123+
* @since 3.3
124+
*/
125+
@SuppressWarnings("unchecked")
126+
@Nullable
127+
default RecordId add(MapRecord<K, ? extends HK, ? extends HV> record, XAddOptions xAddOptions) {
128+
return add((Record) record, xAddOptions);
129+
}
130+
131+
/**
132+
* Append the record, backed by the given value, to the stream with the specified options.
133+
* The value will be hashed and serialized.
134+
*
135+
* @param record must not be {@literal null}.
136+
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
137+
* @return the record Id. {@literal null} when used in pipeline / transaction.
138+
* @see MapRecord
139+
* @see ObjectRecord
140+
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
141+
* @since 3.3
142+
*/
143+
@SuppressWarnings("unchecked")
144+
@Nullable
145+
RecordId add(Record<K, ?> record, XAddOptions xAddOptions);
146+
98147
/**
99148
* Append a record to the stream {@code key}.
100149
*

0 commit comments

Comments
 (0)