Skip to content

Commit 416daff

Browse files
propose support for XREADGROUP CLAIM (#2972)
* propose support for XREADGROUP CLAIM * release notes * comma nit * add integration tests for `claimMinIdleTime` * support claimMinIdleTime on single-stream read API * u8-ify the stream messages * Update src/StackExchange.Redis/APITypes/StreamEntry.cs Co-authored-by: Philo <philon@microsoft.com> --------- Co-authored-by: Philo <philon@microsoft.com>
1 parent 24ed30c commit 416daff

File tree

13 files changed

+304
-57
lines changed

13 files changed

+304
-57
lines changed

docs/ReleaseNotes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ Current package versions:
88

99
## Unreleased
1010

11+
- Support `XREADGROUP CLAIM` ([#2972 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2972))
1112
- Support `MSETEX` (Redis 8.4.0) for multi-key operations with expiration ([#2977 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2977))
1213

1314
## 2.9.32

src/StackExchange.Redis/APITypes/StreamEntry.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,19 @@ public StreamEntry(RedisValue id, NameValueEntry[] values)
1414
{
1515
Id = id;
1616
Values = values;
17+
IdleTime = null;
18+
DeliveryCount = 0;
19+
}
20+
21+
/// <summary>
22+
/// Creates a stream entry.
23+
/// </summary>
24+
public StreamEntry(RedisValue id, NameValueEntry[] values, TimeSpan? idleTime, int deliveryCount)
25+
{
26+
Id = id;
27+
Values = values;
28+
IdleTime = idleTime;
29+
DeliveryCount = deliveryCount;
1730
}
1831

1932
/// <summary>
@@ -51,6 +64,18 @@ public RedisValue this[RedisValue fieldName]
5164
}
5265
}
5366

67+
/// <summary>
68+
/// Delivery count - the number of times this entry has been delivered: 0 for new messages that haven't been delivered before,
69+
/// 1+ for claimed messages (previously unacknowledged entries).
70+
/// </summary>
71+
public int DeliveryCount { get; }
72+
73+
/// <summary>
74+
/// Idle time in milliseconds - the number of milliseconds elapsed since this entry was last delivered to a consumer.
75+
/// </summary>
76+
/// <remarks>This member is populated when using <c>XREADGROUP</c> with <c>CLAIM</c>.</remarks>
77+
public TimeSpan? IdleTime { get; }
78+
5479
/// <summary>
5580
/// Indicates that the Redis Stream Entry is null.
5681
/// </summary>

src/StackExchange.Redis/Interfaces/IDatabase.cs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2971,7 +2971,22 @@ IEnumerable<SortedSetEntry> SortedSetScan(
29712971
/// <param name="flags">The flags to use for this operation.</param>
29722972
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
29732973
/// <remarks><seealso href="https://redis.io/commands/xreadgroup"/></remarks>
2974-
StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
2974+
StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, bool noAck, CommandFlags flags);
2975+
2976+
/// <summary>
2977+
/// Read messages from a stream into an associated consumer group.
2978+
/// </summary>
2979+
/// <param name="key">The key of the stream.</param>
2980+
/// <param name="groupName">The name of the consumer group.</param>
2981+
/// <param name="consumerName">The consumer name.</param>
2982+
/// <param name="position">The position from which to read the stream. Defaults to <see cref="StreamPosition.NewMessages"/> when <see langword="null"/>.</param>
2983+
/// <param name="count">The maximum number of messages to return.</param>
2984+
/// <param name="noAck">When true, the message will not be added to the pending message list.</param>
2985+
/// <param name="claimMinIdleTime">Auto-claim messages that have been idle for at least this long.</param>
2986+
/// <param name="flags">The flags to use for this operation.</param>
2987+
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
2988+
/// <remarks><seealso href="https://redis.io/commands/xreadgroup"/></remarks>
2989+
StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);
29752990

29762991
/// <summary>
29772992
/// Read from multiple streams into the given consumer group.
@@ -3004,7 +3019,25 @@ IEnumerable<SortedSetEntry> SortedSetScan(
30043019
/// <para>Equivalent of calling <c>XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</c>.</para>
30053020
/// <para><seealso href="https://redis.io/commands/xreadgroup"/></para>
30063021
/// </remarks>
3007-
RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
3022+
RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags);
3023+
3024+
/// <summary>
3025+
/// Read from multiple streams into the given consumer group.
3026+
/// The consumer group with the given <paramref name="groupName"/> will need to have been created for each stream prior to calling this method.
3027+
/// </summary>
3028+
/// <param name="streamPositions">Array of streams and the positions from which to begin reading for each stream.</param>
3029+
/// <param name="groupName">The name of the consumer group.</param>
3030+
/// <param name="consumerName">The name of the consumer.</param>
3031+
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
3032+
/// <param name="noAck">When true, the message will not be added to the pending message list.</param>
3033+
/// <param name="claimMinIdleTime">Auto-claim messages that have been idle for at least this long.</param>
3034+
/// <param name="flags">The flags to use for this operation.</param>
3035+
/// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
3036+
/// <remarks>
3037+
/// <para>Equivalent of calling <c>XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</c>.</para>
3038+
/// <para><seealso href="https://redis.io/commands/xreadgroup"/></para>
3039+
/// </remarks>
3040+
RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);
30083041

30093042
/// <summary>
30103043
/// Trim the stream to a specified maximum length.

src/StackExchange.Redis/Interfaces/IDatabaseAsync.VectorSets.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,7 @@ Task<bool> VectorSetSetAttributesJsonAsync(
9292
RedisKey key,
9393
VectorSetSimilaritySearchRequest query,
9494
CommandFlags flags = CommandFlags.None);
95+
96+
/// <inheritdoc cref="IDatabase.StreamReadGroup(StreamPosition[], RedisValue, RedisValue, int?, bool, TimeSpan?, CommandFlags)"/>
97+
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);
9598
}

src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -725,13 +725,16 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
725725
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, CommandFlags flags);
726726

727727
/// <inheritdoc cref="IDatabase.StreamReadGroup(RedisKey, RedisValue, RedisValue, RedisValue?, int?, bool, CommandFlags)"/>
728-
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
728+
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, bool noAck, CommandFlags flags);
729+
730+
/// <inheritdoc cref="IDatabase.StreamReadGroup(RedisKey, RedisValue, RedisValue, RedisValue?, int?, bool, TimeSpan?, CommandFlags)"/>
731+
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);
729732

730733
/// <inheritdoc cref="IDatabase.StreamReadGroup(StreamPosition[], RedisValue, RedisValue, int?, CommandFlags)"/>
731734
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags);
732735

733736
/// <inheritdoc cref="IDatabase.StreamReadGroup(StreamPosition[], RedisValue, RedisValue, int?, bool, CommandFlags)"/>
734-
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
737+
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags);
735738

736739
/// <inheritdoc cref="IDatabase.StreamTrim(RedisKey, int, bool, CommandFlags)"/>
737740
Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags);

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,12 +690,18 @@ public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupNa
690690
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None) =>
691691
Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, position, count, noAck, flags);
692692

693+
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) =>
694+
Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, position, count, noAck, claimMinIdleTime, flags);
695+
693696
public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags) =>
694697
Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, flags);
695698

696699
public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None) =>
697700
Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, noAck, flags);
698701

702+
public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) =>
703+
Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, noAck, claimMinIdleTime, flags);
704+
699705
public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
700706
Inner.StreamTrimAsync(ToInner(key), maxLength, useApproximateMaxLength, flags);
701707

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,12 +672,18 @@ public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisVa
672672
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None) =>
673673
Inner.StreamReadGroup(ToInner(key), groupName, consumerName, position, count, noAck, flags);
674674

675+
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) =>
676+
Inner.StreamReadGroup(ToInner(key), groupName, consumerName, position, count, noAck, claimMinIdleTime, flags);
677+
675678
public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags) =>
676679
Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, flags);
677680

678681
public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None) =>
679682
Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, noAck, flags);
680683

684+
public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) =>
685+
Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, noAck, claimMinIdleTime, flags);
686+
681687
public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
682688
Inner.StreamTrim(ToInner(key), maxLength, useApproximateMaxLength, flags);
683689

0 commit comments

Comments
 (0)