Skip to content

Commit

Permalink
Add overloads for XAddOptions in StreamOpe
Browse files Browse the repository at this point in the history
Closes #2915
  • Loading branch information
jinkshower committed Jul 24, 2024
1 parent 8b5f29e commit 3130562
Show file tree
Hide file tree
Showing 7 changed files with 488 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
Expand All @@ -58,6 +59,7 @@
* @author Tugdual Grall
* @author Dengliming
* @author Mark John Moreno
* @author jinkshower
* @since 2.2
*/
public interface ReactiveStreamCommands {
Expand Down Expand Up @@ -394,11 +396,40 @@ default Mono<RecordId> xAdd(ByteBufferRecord record) {
return xAdd(Mono.just(AddStreamRecord.of(record))).next().map(CommandResponse::getOutput);
}

/**
* Add stream record with the specified options.
*
* @param record must not be {@literal null}.
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
* @return {@link Mono} the {@link RecordId id}.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
default Mono<RecordId> xAdd(ByteBufferRecord record, XAddOptions xAddOptions) {

Assert.notNull(record, "Record must not be null");
Assert.notNull(xAddOptions, "XAddOptions must not be null");

AddStreamRecord addStreamRecord = AddStreamRecord.of(record)
.approximateTrimming(xAddOptions.isApproximateTrimming())
.makeNoStream(xAddOptions.isNoMkStream());

if (xAddOptions.hasMaxlen()) {
addStreamRecord = addStreamRecord.maxlen(xAddOptions.getMaxlen());
}

if (xAddOptions.hasMinId()) {
addStreamRecord = addStreamRecord.minId(xAddOptions.getMinId());
}

return xAdd(Mono.just(addStreamRecord)).next().map(CommandResponse::getOutput);
}

/**
* Add stream record with given {@literal body} to {@literal key}.
*
* @param commands must not be {@literal null}.
* @return {@link Flux} emitting the {@link RecordId} on by for for the given {@link AddStreamRecord} commands.
* @return {@link Flux} emitting the {@link RecordId} on by for the given {@link AddStreamRecord} commands.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
*/
Flux<CommandResponse<AddStreamRecord, RecordId>> xAdd(Publisher<AddStreamRecord> commands);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.convert.Converters;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
Expand Down Expand Up @@ -60,6 +61,7 @@
* @author Christoph Strobl
* @author Marcin Zielinski
* @author John Blum
* @author jinkshower
* @since 2.2
*/
class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperations<K, HK, HV> {
Expand Down Expand Up @@ -146,6 +148,18 @@ public Mono<RecordId> add(Record<K, ?> record) {
return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input)));
}

@Override
public Mono<RecordId> add(Record<K, ?> record, XAddOptions xAddOptions) {

Assert.notNull(record.getStream(), "Key must not be null");
Assert.notNull(record.getValue(), "Body must not be null");
Assert.notNull(xAddOptions, "XAddOptions must not be null");

MapRecord<K, HK, HV> input = StreamObjectMapper.toMapRecord(this, record);

return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input), xAddOptions));
}

@Override
public Flux<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
Expand Down Expand Up @@ -54,6 +55,7 @@
* @author Christoph Strobl
* @author Marcin Zielinski
* @author John Blum
* @author jinkshower
* @since 2.2
*/
class DefaultStreamOperations<K, HK, HV> extends AbstractOperations<K, Object> implements StreamOperations<K, HK, HV> {
Expand Down Expand Up @@ -136,6 +138,21 @@ public RecordId add(Record<K, ?> record) {
return execute(connection -> connection.xAdd(binaryRecord));
}

@Nullable
@Override
@SuppressWarnings("unchecked")
public RecordId add(Record<K , ?> record, XAddOptions options) {

Assert.notNull(record, "Record must not be null");
Assert.notNull(options, "XAddOptions must not be null");

MapRecord<K, HK, HV> input = StreamObjectMapper.toMapRecord(this, record);

ByteRecord binaryRecord = input.serialize(keySerializer(), hashKeySerializer(), hashValueSerializer());

return execute(connection -> connection.streamCommands().xAdd(binaryRecord, options));
}

@Override
public List<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
Expand Down Expand Up @@ -54,6 +55,7 @@
* @author Dengliming
* @author Marcin Zielinski
* @author John Blum
* @author jinkshower
* @since 2.2
*/
public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
Expand Down Expand Up @@ -94,6 +96,63 @@ default Mono<Long> acknowledge(String group, Record<K, ?> record) {
return acknowledge(record.getRequiredStream(), group, record.getId());
}

/**
* Append one or more records to the stream {@code key} with the specified options.
*
* @param key the stream key.
* @param bodyPublisher record body {@link Publisher}.
* @param xAddOptions parameters for the {@literal XADD} call.
* @return the record Ids.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
default Flux<RecordId> add (K key, Publisher<? extends Map<? extends HK, ? extends HV>> bodyPublisher,
XAddOptions xAddOptions) {
return Flux.from(bodyPublisher).flatMap(it -> add(key, it, xAddOptions));
}

/**
* Append a record to the stream {@code key} with the specified options.
*
* @param key the stream key.
* @param content record content as Map.
* @param xAddOptions parameters for the {@literal XADD} call.
* @return the {@link Mono} emitting the {@link RecordId}.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
default Mono<RecordId> add(K key, Map<? extends HK, ? extends HV> content, XAddOptions xAddOptions) {
return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
}

/**
* Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options.
*
* @param record the record to append.
* @param xAddOptions parameters for the {@literal XADD} call.
* @return the {@link Mono} emitting the {@link RecordId}.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
@SuppressWarnings("unchecked")
default Mono<RecordId> add(MapRecord<K, ? extends HK, ? extends HV> record, XAddOptions xAddOptions) {
return add((Record) record, xAddOptions);
}

/**
* Append the record, backed by the given value, to the stream with the specified options.
* The value will be hashed and serialized.
*
* @param record must not be {@literal null}.
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
* @return the {@link Mono} emitting the {@link RecordId}.
* @see MapRecord
* @see ObjectRecord
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
Mono<RecordId> add(Record<K, ?> record, XAddOptions xAddOptions);

/**
* Append one or more records to the stream {@code key}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
Expand Down Expand Up @@ -53,6 +54,7 @@
* @author Dengliming
* @author Marcin Zielinski
* @author John Blum
* @author jinkshower
* @since 2.2
*/
public interface StreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
Expand Down Expand Up @@ -95,6 +97,53 @@ default Long acknowledge(String group, Record<K, ?> record) {
return acknowledge(record.getRequiredStream(), group, record.getId());
}

/**
* Append a record to the stream {@code key} with the specified options.
*
* @param key the stream key.
* @param content record content as Map.
* @param xAddOptions additional parameters for the {@literal XADD} call.
* @return the record Id. {@literal null} when used in pipeline / transaction.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
@SuppressWarnings("unchecked")
@Nullable
default RecordId add(K key, Map<? extends HK, ? extends HV> content, XAddOptions xAddOptions) {
return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
}

/**
* Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options.
*
* @param record the record to append.
* @param xAddOptions additional parameters for the {@literal XADD} call.
* @return the record Id. {@literal null} when used in pipeline / transaction.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
@SuppressWarnings("unchecked")
@Nullable
default RecordId add(MapRecord<K, ? extends HK, ? extends HV> record, XAddOptions xAddOptions) {
return add((Record) record, xAddOptions);
}

/**
* Append the record, backed by the given value, to the stream with the specified options.
* The value will be hashed and serialized.
*
* @param record must not be {@literal null}.
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
* @return the record Id. {@literal null} when used in pipeline / transaction.
* @see MapRecord
* @see ObjectRecord
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
@SuppressWarnings("unchecked")
@Nullable
RecordId add(Record<K, ?> record, XAddOptions xAddOptions);

/**
* Append a record to the stream {@code key}.
*
Expand Down
Loading

0 comments on commit 3130562

Please sign in to comment.