diff --git a/src/main/java/io/lettuce/core/ClaimedStreamMessage.java b/src/main/java/io/lettuce/core/ClaimedStreamMessage.java new file mode 100644 index 000000000..73b029900 --- /dev/null +++ b/src/main/java/io/lettuce/core/ClaimedStreamMessage.java @@ -0,0 +1,40 @@ +package io.lettuce.core; + +import java.time.Duration; +import java.util.Map; + +/** + * Stream message returned by XREADGROUP when entries were claimed from the PEL using CLAIM min-idle-time. Contains additional + * metadata: milliseconds since last delivery and redelivery count. + */ +public class ClaimedStreamMessage extends StreamMessage { + + private final long msSinceLastDelivery; + + private final long redeliveryCount; + + public ClaimedStreamMessage(K stream, String id, Map body, long msSinceLastDelivery, long redeliveryCount) { + super(stream, id, body); + this.msSinceLastDelivery = msSinceLastDelivery; + this.redeliveryCount = redeliveryCount; + } + + public long getMsSinceLastDelivery() { + return msSinceLastDelivery; + } + + public Duration getSinceLastDelivery() { + return Duration.ofMillis(msSinceLastDelivery); + } + + public long getRedeliveryCount() { + return redeliveryCount; + } + + @Override + public boolean isClaimed() { + // "Really claimed" implies it was previously delivered at least once. + return redeliveryCount >= 1; + } + +} diff --git a/src/main/java/io/lettuce/core/StreamMessage.java b/src/main/java/io/lettuce/core/StreamMessage.java index a38dd07a3..f27de0781 100644 --- a/src/main/java/io/lettuce/core/StreamMessage.java +++ b/src/main/java/io/lettuce/core/StreamMessage.java @@ -46,6 +46,16 @@ public Map getBody() { return body; } + /** + * Whether this message was reclaimed from the pending entries list (PEL) using XREADGROUP … CLAIM. Default: false. + * + * Note: When CLAIM is used, servers may attach delivery metadata to all entries in the reply (including fresh ones). Use + * this indicator to distinguish actually reclaimed entries (true) from normal entries (false). + */ + public boolean isClaimed() { + return false; + } + @Override public boolean equals(Object o) { if (this == o) diff --git a/src/main/java/io/lettuce/core/XReadArgs.java b/src/main/java/io/lettuce/core/XReadArgs.java index b6fa3ead9..049d59004 100644 --- a/src/main/java/io/lettuce/core/XReadArgs.java +++ b/src/main/java/io/lettuce/core/XReadArgs.java @@ -23,6 +23,8 @@ public class XReadArgs implements CompositeArgument { private boolean noack; + private Long claimMinIdleTime; + /** * Builder entry points for {@link XReadArgs}. */ @@ -90,6 +92,21 @@ public static XReadArgs noack(boolean noack) { return new XReadArgs().noack(noack); } + /** + * Create a new {@link XReadArgs} and set CLAIM min-idle-time (milliseconds). Only valid for XREADGROUP. + */ + public static XReadArgs claim(long milliseconds) { + return new XReadArgs().claim(milliseconds); + } + + /** + * Create a new {@link XReadArgs} and set CLAIM min-idle-time. Only valid for XREADGROUP. + */ + public static XReadArgs claim(Duration timeout) { + LettuceAssert.notNull(timeout, "Claim timeout must not be null"); + return claim(timeout.toMillis()); + } + } /** @@ -141,6 +158,29 @@ public XReadArgs noack(boolean noack) { return this; } + /** + * Claim idle pending messages first with a minimum idle time (milliseconds). Only valid for XREADGROUP. + * + * @since 7.0 + */ + public XReadArgs claim(long milliseconds) { + + this.claimMinIdleTime = milliseconds; + return this; + } + + /** + * Claim idle pending messages first with a minimum idle time. Only valid for XREADGROUP. + * + * @since 7.0 + */ + public XReadArgs claim(Duration timeout) { + + LettuceAssert.notNull(timeout, "Claim timeout must not be null"); + + return claim(timeout.toMillis()); + } + public void build(CommandArgs args) { if (block != null) { @@ -154,6 +194,10 @@ public void build(CommandArgs args) { if (noack) { args.add(CommandKeyword.NOACK); } + + if (claimMinIdleTime != null) { + args.add("CLAIM").add(claimMinIdleTime); + } } /** diff --git a/src/main/java/io/lettuce/core/output/StreamReadOutput.java b/src/main/java/io/lettuce/core/output/StreamReadOutput.java index 977b7bf86..8fd202516 100644 --- a/src/main/java/io/lettuce/core/output/StreamReadOutput.java +++ b/src/main/java/io/lettuce/core/output/StreamReadOutput.java @@ -8,6 +8,8 @@ import io.lettuce.core.StreamMessage; import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.ClaimedStreamMessage; + import io.lettuce.core.internal.LettuceAssert; /** @@ -31,6 +33,10 @@ public class StreamReadOutput extends CommandOutput body; + private Long msSinceLastDelivery; + + private Long redeliveryCount; + private boolean bodyReceived = false; public StreamReadOutput(RedisCodec codec) { @@ -51,6 +57,20 @@ public void set(ByteBuffer bytes) { return; } + // Handle extra metadata for claimed entries that may arrive as bulk strings (RESP2/RESP3) + if (id != null && bodyReceived && key == null && bytes != null) { + // Use a duplicate so decoding doesn't advance the original buffer position. + String s = decodeString(bytes.duplicate()); + if (msSinceLastDelivery == null && isDigits(s)) { + msSinceLastDelivery = Long.parseLong(s); + return; + } + if (redeliveryCount == null && isDigits(s)) { + redeliveryCount = Long.parseLong(s); + return; + } + } + if (id == null) { id = decodeString(bytes); return; @@ -75,6 +95,23 @@ public void set(ByteBuffer bytes) { key = null; } + @Override + public void set(long integer) { + + // Extra integers appear only for claimed entries (XREADGROUP with CLAIM) + if (id != null && bodyReceived) { + if (msSinceLastDelivery == null) { + msSinceLastDelivery = integer; + return; + } + if (redeliveryCount == null) { + redeliveryCount = integer; + return; + } + } + super.set(integer); + } + @Override public void multi(int count) { @@ -91,15 +128,25 @@ public void multi(int count) { @Override public void complete(int depth) { - if (depth == 3 && bodyReceived) { - subscriber.onNext(output, new StreamMessage<>(stream, id, body == null ? Collections.emptyMap() : body)); + // Emit the message when the entry array (id/body[/extras]) completes. + if (depth == 2 && bodyReceived) { + Map map = body == null ? Collections.emptyMap() : body; + if (msSinceLastDelivery != null || redeliveryCount != null) { + subscriber.onNext(output, + new ClaimedStreamMessage<>(stream, id, map, msSinceLastDelivery == null ? 0L : msSinceLastDelivery, + redeliveryCount == null ? 0L : redeliveryCount)); + } else { + subscriber.onNext(output, new StreamMessage<>(stream, id, map)); + } bodyReceived = false; key = null; body = null; id = null; + msSinceLastDelivery = null; + redeliveryCount = null; } - // RESP2/RESP3 compat + // RESP2/RESP3 compat for stream key reset upon finishing the outer array element if (depth == 2 && skipStreamKeyReset) { skipStreamKeyReset = false; } @@ -113,6 +160,17 @@ public void complete(int depth) { } } + private static boolean isDigits(String s) { + if (s == null || s.isEmpty()) + return false; + for (int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + if (c < '0' || c > '9') + return false; + } + return true; + } + @Override public void setSubscriber(Subscriber> subscriber) { LettuceAssert.notNull(subscriber, "Subscriber must not be null"); diff --git a/src/test/java/io/lettuce/core/XReadGroupClaimIntegrationTests.java b/src/test/java/io/lettuce/core/XReadGroupClaimIntegrationTests.java new file mode 100644 index 000000000..16cd8b6af --- /dev/null +++ b/src/test/java/io/lettuce/core/XReadGroupClaimIntegrationTests.java @@ -0,0 +1,343 @@ +package io.lettuce.core; + +import static io.lettuce.TestTags.INTEGRATION_TEST; +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import io.lettuce.core.models.stream.PendingMessages; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import javax.inject.Inject; +import org.junit.jupiter.api.extension.ExtendWith; +import io.lettuce.test.LettuceExtension; +import io.lettuce.test.condition.RedisConditions; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisCommands; + +/** + * Integration tests for XREADGROUP with CLAIM min-idle-time using a Redis at localhost:6379. These tests are tolerant to + * servers that do not yet support CLAIM: if the server returns an error for CLAIM, the test will be skipped. + */ +@Tag(INTEGRATION_TEST) +@ExtendWith(LettuceExtension.class) +class XReadGroupClaimIntegrationTests { + + private final RedisClient client; + + private StatefulRedisConnection connection; + + private RedisCommands sync; + + @Inject + XReadGroupClaimIntegrationTests(RedisClient client) { + this.client = client; + } + + @BeforeEach + void setup() { + connection = client.connect(); + sync = connection.sync(); + // Require Redis 8.3.224+ (RC1) for XREADGROUP CLAIM support + assumeTrue(RedisConditions.of(sync).hasVersionGreaterOrEqualsTo("8.3.224"), + "Redis 8.3.224+ required for XREADGROUP CLAIM"); + } + + @AfterEach + void tearDown() { + if (connection != null) + connection.close(); + } + + @Test + void claimPendingEntriesReturnsClaimedStreamMessageWithMetadata() throws Exception { + String key = "it:stream:claim:" + UUID.randomUUID(); + String group = "g"; + String c1 = "c1"; + String c2 = "c2"; + + // Clean slate + try { + sync.xgroupDestroy(key, group); + } catch (Exception ignore) { + } + sync.del(key); + + // Produce two entries + Map body = new HashMap<>(); + body.put("f", "v"); + sync.xadd(key, body); + sync.xadd(key, body); + + // Create group at 0-0 and consume with c1 to move entries to PEL + sync.xgroupCreate(XReadArgs.StreamOffset.from(key, "0-0"), group); + sync.xreadgroup(Consumer.from(group, c1), XReadArgs.Builder.count(10), XReadArgs.StreamOffset.lastConsumed(key)); + + // Ensure idle time + Thread.sleep(51); + + // Produce fresh entries that are NOT claimed (not pending) + sync.xadd(key, body); + sync.xadd(key, body); + + try { + XReadArgs readArgs = XReadArgs.Builder.claim(Duration.ofMillis(50)); + readArgs.count(10); + List> res = sync.xreadgroup(Consumer.from(group, c2), readArgs, + XReadArgs.StreamOffset.lastConsumed(key)); + + assertThat(res).isNotNull(); + assertThat(res).isNotEmpty(); + + long claimedCount = res.stream().filter(StreamMessage::isClaimed).count(); + long freshCount = res.size() - claimedCount; + + assertThat(claimedCount).isEqualTo(2); + assertThat(freshCount).isEqualTo(2); + + // Order: claimed entries first, then fresh entries + int firstNonClaimedIdx = -1; + for (int i = 0; i < res.size(); i++) { + if (!res.get(i).isClaimed()) { + firstNonClaimedIdx = i; + break; + } + } + assertThat(firstNonClaimedIdx).isGreaterThanOrEqualTo(0); + for (int i = 0; i < firstNonClaimedIdx; i++) { + assertThat(res.get(i).isClaimed()).isTrue(); + } + for (int i = firstNonClaimedIdx; i < res.size(); i++) { + assertThat(res.get(i).isClaimed()).isFalse(); + } + + // Validate metadata on one claimed entry + ClaimedStreamMessage first = (ClaimedStreamMessage) res.stream() + .filter(StreamMessage::isClaimed).findFirst().get(); + assertThat(first.getMsSinceLastDelivery()).isGreaterThanOrEqualTo(51); + assertThat(first.getRedeliveryCount()).isGreaterThanOrEqualTo(1); + assertThat(first.getBody()).containsEntry("f", "v"); + } finally { + try { + sync.xgroupDestroy(key, group); + } catch (Exception ignore) { + } + sync.del(key); + } + } + + @Test + void claimMovesPendingFromC1ToC2AndRemainsPendingUntilAck() throws Exception { + String key = "it:stream:claim:move:" + UUID.randomUUID(); + String group = "g"; + String c1 = "c1"; + String c2 = "c2"; + + // Clean slate + try { + sync.xgroupDestroy(key, group); + } catch (Exception ignore) { + } + sync.del(key); + + Map body = new HashMap<>(); + body.put("f", "v"); + + // Produce two entries + String id1 = sync.xadd(key, body); + String id2 = sync.xadd(key, body); + + // Create group and consume with c1 so entries become pending for c1 + sync.xgroupCreate(XReadArgs.StreamOffset.from(key, "0-0"), group); + sync.xreadgroup(Consumer.from(group, c1), XReadArgs.Builder.count(10), XReadArgs.StreamOffset.lastConsumed(key)); + + // Verify pending belongs to c1 + PendingMessages before = sync.xpending(key, group); + assertThat(before.getCount()).isEqualTo(2); + assertThat(before.getConsumerMessageCount().getOrDefault(c1, 0L)).isEqualTo(2); + + // Ensure idle time so entries are claimable + Thread.sleep(51); + + try { + // Claim with c2 + XReadArgs readArgs = XReadArgs.Builder.claim(Duration.ofMillis(50)); + readArgs.count(10); + List> res = sync.xreadgroup(Consumer.from(group, c2), readArgs, + XReadArgs.StreamOffset.lastConsumed(key)); + + assertThat(res).isNotNull(); + assertThat(res).isNotEmpty(); + long claimed = res.stream().filter(StreamMessage::isClaimed).count(); + assertThat(claimed).isEqualTo(2); + + // After claim: entries are pending for c2 (moved), not acked yet + PendingMessages afterClaim = sync.xpending(key, group); + assertThat(afterClaim.getCount()).isEqualTo(2); + assertThat(afterClaim.getConsumerMessageCount().getOrDefault(c1, 0L)).isEqualTo(0); + assertThat(afterClaim.getConsumerMessageCount().getOrDefault(c2, 0L)).isEqualTo(2); + + // XACK the claimed entries -> PEL should become empty + long acked = sync.xack(key, group, res.get(0).getId(), res.get(1).getId()); + assertThat(acked).isEqualTo(2); + + PendingMessages afterAck = sync.xpending(key, group); + assertThat(afterAck.getCount()).isEqualTo(0); + } finally { + try { + sync.xgroupDestroy(key, group); + } catch (Exception ignore) { + } + sync.del(key); + } + } + + @Test + void claimWithNoackDoesNotCreatePendingAndRemovesClaimedFromPel() throws Exception { + String key = "it:stream:claim:noack:" + UUID.randomUUID(); + String group = "g"; + String c1 = "c1"; + String c2 = "c2"; + + // Clean slate + try { + sync.xgroupDestroy(key, group); + } catch (Exception ignore) { + } + sync.del(key); + + // Produce two entries that will become pending for c1 + Map body = new HashMap<>(); + body.put("f", "v"); + sync.xadd(key, body); + sync.xadd(key, body); + + // Create group at 0-0 and consume with c1 to move entries to PEL + sync.xgroupCreate(XReadArgs.StreamOffset.from(key, "0-0"), group); + sync.xreadgroup(Consumer.from(group, c1), XReadArgs.Builder.count(10), XReadArgs.StreamOffset.lastConsumed(key)); + + // Verify pending belongs to c1 + PendingMessages before = sync.xpending(key, group); + assertThat(before.getCount()).isEqualTo(2); + assertThat(before.getConsumerMessageCount().getOrDefault(c1, 0L)).isEqualTo(2); + + // Ensure idle time so entries are claimable + Thread.sleep(51); + + // Also produce fresh entries that should not be added to PEL when NOACK is set + sync.xadd(key, body); + sync.xadd(key, body); + + try { + // Claim with NOACK using c2 + XReadArgs readArgs = XReadArgs.Builder.claim(Duration.ofMillis(50)); + readArgs.noack(true).count(10); + List> res = sync.xreadgroup(Consumer.from(group, c2), readArgs, + XReadArgs.StreamOffset.lastConsumed(key)); + + assertThat(res).isNotNull(); + assertThat(res).isNotEmpty(); + + long claimedCount = res.stream().filter(StreamMessage::isClaimed).count(); + long freshCount = res.size() - claimedCount; + assertThat(claimedCount).isEqualTo(2); + assertThat(freshCount).isEqualTo(2); + + // After NOACK read, previously pending entries remain pending (NOACK does not remove them) + PendingMessages afterNoack = sync.xpending(key, group); + assertThat(afterNoack.getCount()).isEqualTo(2); + // Claimed entries remain pending and are now owned by c2 (CLAIM reassigns ownership). Fresh entries were not added + // to PEL. + assertThat(afterNoack.getConsumerMessageCount().getOrDefault(c1, 0L)).isEqualTo(0); + assertThat(afterNoack.getConsumerMessageCount().getOrDefault(c2, 0L)).isEqualTo(2); + } finally { + try { + sync.xgroupDestroy(key, group); + } catch (Exception ignore) { + } + sync.del(key); + } + } + + @Test + void claimHonorsMinIdleTimeGatingAndRedeliversAfterDelay() throws Exception { + String key = "it:stream:claim:auto:" + UUID.randomUUID(); + String group = "g"; + String consumer = "c1"; + + // Clean slate + try { + sync.xgroupDestroy(key, group); + } catch (Exception ignore) { + } + sync.del(key); + + try { + // Create group at "$" and create stream if missing + sync.xgroupCreate(XReadArgs.StreamOffset.latest(key), group, XGroupCreateArgs.Builder.mkstream()); + + // Add entries after group creation so they are delivered as fresh + Map body = new HashMap<>(); + body.put("f", "v"); + sync.xadd(key, body); + sync.xadd(key, body); + + // First read with CLAIM(minIdle=100ms); entries should be fresh (not claimed) + XReadArgs args = XReadArgs.Builder.claim(Duration.ofMillis(50)).count(10); + List> first = sync.xreadgroup(Consumer.from(group, consumer), args, + XReadArgs.StreamOffset.lastConsumed(key)); + + assertThat(first).isNotNull(); + assertThat(first.size()).isEqualTo(2); + for (StreamMessage m : first) { + // With CLAIM present, server attaches extras even for fresh entries (redelivery=0) + assertThat(m).isInstanceOf(ClaimedStreamMessage.class); + assertThat(m.isClaimed()).isFalse(); + ClaimedStreamMessage cm = (ClaimedStreamMessage) m; + assertThat(cm.getRedeliveryCount()).isEqualTo(0); + assertThat(cm.getMsSinceLastDelivery()).isEqualTo(0); + } + + // Immediate repeat: should be empty (idle < minIdle) + List> immediate = sync.xreadgroup(Consumer.from(group, consumer), args, + XReadArgs.StreamOffset.lastConsumed(key)); + assertThat(immediate).isEmpty(); + + // After sufficient idle time, entries are claimed/redelivered with redeliveryCount >= 1 + Thread.sleep(51); + List> afterIdle = sync.xreadgroup(Consumer.from(group, consumer), args, + XReadArgs.StreamOffset.lastConsumed(key)); + + assertThat(afterIdle).isNotNull(); + assertThat(afterIdle.size()).isEqualTo(2); + for (StreamMessage m : afterIdle) { + assertThat(m).isInstanceOf(ClaimedStreamMessage.class); + assertThat(m.isClaimed()).isTrue(); + ClaimedStreamMessage cm = (ClaimedStreamMessage) m; + assertThat(cm.getRedeliveryCount()).isGreaterThanOrEqualTo(1); + assertThat(cm.getMsSinceLastDelivery()).isGreaterThanOrEqualTo(51); + } + + // PEL remains assigned to the consumer until XACK + PendingMessages pel = sync.xpending(key, group); + assertThat(pel.getCount()).isEqualTo(2); + assertThat(pel.getConsumerMessageCount().getOrDefault(consumer, 0L)).isEqualTo(2); + } finally { + try { + sync.xgroupDestroy(key, group); + } catch (Exception ignore) { + } + sync.del(key); + } + } + +} diff --git a/src/test/java/io/lettuce/core/output/StreamReadOutputUnitTests.java b/src/test/java/io/lettuce/core/output/StreamReadOutputUnitTests.java index af7a7883d..ec7a3b4f0 100644 --- a/src/test/java/io/lettuce/core/output/StreamReadOutputUnitTests.java +++ b/src/test/java/io/lettuce/core/output/StreamReadOutputUnitTests.java @@ -11,6 +11,8 @@ import io.lettuce.core.StreamMessage; import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.ClaimedStreamMessage; + /** * Unit tests for {@link StreamReadOutput}. * @@ -179,4 +181,180 @@ void shouldDecodeFromTwoStreams() { assertThat(streamMessage2.getBody()).hasSize(1).containsEntry("key2", "value2"); } + @Test + void shouldDecodeClaimedEntryWithMetadata() { + + // Stream and single claimed entry + sut.multi(2); + sut.set(ByteBuffer.wrap("stream-key".getBytes())); + sut.complete(1); + sut.multi(1); + sut.multi(4); + sut.set(ByteBuffer.wrap("1234-12".getBytes())); + sut.complete(3); + sut.multi(2); + sut.set(ByteBuffer.wrap("key".getBytes())); + sut.complete(4); + sut.set(ByteBuffer.wrap("value".getBytes())); + sut.complete(4); + // extras for claimed pending entry + sut.set(5000); + sut.set(2); + sut.complete(3); + sut.complete(2); + sut.complete(1); + sut.complete(0); + + assertThat(sut.get()).hasSize(1); + StreamMessage streamMessage = sut.get().get(0); + assertThat(streamMessage).isInstanceOf(ClaimedStreamMessage.class); + ClaimedStreamMessage claimed = (ClaimedStreamMessage) streamMessage; + assertThat(claimed.getMsSinceLastDelivery()).isEqualTo(5000); + assertThat(claimed.getRedeliveryCount()).isEqualTo(2); + assertThat(claimed.getBody()).hasSize(1).containsEntry("key", "value"); + } + + @Test + void shouldDecodeClaimedEntryWithMetadataAsBulkStrings() { + + // Stream and single claimed entry with extras as bulk strings (RESP2/RESP3 variant) + sut.multi(2); + sut.set(ByteBuffer.wrap("stream-key".getBytes())); + sut.complete(1); + sut.multi(1); + sut.multi(4); + sut.set(ByteBuffer.wrap("1234-12".getBytes())); + sut.complete(3); + sut.multi(2); + sut.set(ByteBuffer.wrap("key".getBytes())); + sut.complete(4); + sut.set(ByteBuffer.wrap("value".getBytes())); + sut.complete(4); + // extras for claimed pending entry as bulk strings + sut.set(ByteBuffer.wrap("5000".getBytes())); + sut.set(ByteBuffer.wrap("2".getBytes())); + sut.complete(3); + sut.complete(2); + sut.complete(1); + sut.complete(0); + + assertThat(sut.get()).hasSize(1); + StreamMessage streamMessage = sut.get().get(0); + assertThat(streamMessage).isInstanceOf(ClaimedStreamMessage.class); + ClaimedStreamMessage claimed = (ClaimedStreamMessage) streamMessage; + assertThat(claimed.getMsSinceLastDelivery()).isEqualTo(5000); + assertThat(claimed.getRedeliveryCount()).isEqualTo(2); + assertThat(claimed.getBody()).hasSize(1).containsEntry("key", "value"); + } + + @Test + void shouldDecodeFreshEntryWithZeroRedeliveriesAsNotClaimed() { + + // Stream and single entry that carries extras with redeliveryCount=0 + sut.multi(2); + sut.set(ByteBuffer.wrap("stream-key".getBytes())); + sut.complete(1); + sut.multi(1); + sut.multi(4); + sut.set(ByteBuffer.wrap("1234-12".getBytes())); + sut.complete(3); + sut.multi(2); + sut.set(ByteBuffer.wrap("key".getBytes())); + sut.complete(4); + sut.set(ByteBuffer.wrap("value".getBytes())); + sut.complete(4); + // extras indicate not previously delivered (redeliveryCount=0) + sut.set(1000); // ms since last delivery + sut.set(0); // redeliveryCount + sut.complete(3); + sut.complete(2); + sut.complete(1); + sut.complete(0); + + assertThat(sut.get()).hasSize(1); + StreamMessage streamMessage = sut.get().get(0); + assertThat(streamMessage).isInstanceOf(ClaimedStreamMessage.class); + assertThat(streamMessage.isClaimed()).isFalse(); + ClaimedStreamMessage claimed = (ClaimedStreamMessage) streamMessage; + assertThat(claimed.getMsSinceLastDelivery()).isEqualTo(1000); + assertThat(claimed.getRedeliveryCount()).isEqualTo(0); + } + + @Test + void shouldDecodeMixedBatchClaimedFirstThenFresh() { + + // One stream with three entries: two claimed (redelivery >= 1) then one fresh (redelivery == 0) + sut.multi(2); + sut.set(ByteBuffer.wrap("stream-key".getBytes())); + sut.complete(1); + sut.multi(3); + + // Entry #1 (claimed) + sut.multi(4); + sut.set(ByteBuffer.wrap("1-0".getBytes())); + sut.complete(3); + sut.multi(2); + sut.set(ByteBuffer.wrap("f1".getBytes())); + sut.complete(4); + sut.set(ByteBuffer.wrap("v1".getBytes())); + sut.complete(4); + sut.set(1500); // msSinceLastDelivery + sut.set(2); // redeliveryCount + sut.complete(3); + sut.complete(2); + + // Entry #2 (claimed) + sut.multi(4); + sut.set(ByteBuffer.wrap("2-0".getBytes())); + sut.complete(3); + sut.multi(2); + sut.set(ByteBuffer.wrap("f2".getBytes())); + sut.complete(4); + sut.set(ByteBuffer.wrap("v2".getBytes())); + sut.complete(4); + sut.set(1200); + sut.set(1); + sut.complete(3); + sut.complete(2); + + // Entry #3 (fresh, still carries metadata with redeliveryCount=0) + sut.multi(4); + sut.set(ByteBuffer.wrap("3-0".getBytes())); + sut.complete(3); + sut.multi(2); + sut.set(ByteBuffer.wrap("f3".getBytes())); + sut.complete(4); + sut.set(ByteBuffer.wrap("v3".getBytes())); + sut.complete(4); + sut.set(10); + sut.set(0); + sut.complete(3); + + sut.complete(2); + + sut.complete(1); + sut.complete(0); + + assertThat(sut.get()).hasSize(3); + StreamMessage m1 = sut.get().get(0); + StreamMessage m2 = sut.get().get(1); + StreamMessage m3 = sut.get().get(2); + + // All entries carry extras => ClaimedStreamMessage type, but isClaimed reflects redeliveryCount >= 1 + assertThat(m1).isInstanceOf(ClaimedStreamMessage.class); + assertThat(m2).isInstanceOf(ClaimedStreamMessage.class); + assertThat(m3).isInstanceOf(ClaimedStreamMessage.class); + + assertThat(m1.isClaimed()).isTrue(); + assertThat(m2.isClaimed()).isTrue(); + assertThat(m3.isClaimed()).isFalse(); + + ClaimedStreamMessage c1 = (ClaimedStreamMessage) m1; + ClaimedStreamMessage c2 = (ClaimedStreamMessage) m2; + ClaimedStreamMessage c3 = (ClaimedStreamMessage) m3; + assertThat(c1.getRedeliveryCount()).isEqualTo(2); + assertThat(c2.getRedeliveryCount()).isEqualTo(1); + assertThat(c3.getRedeliveryCount()).isEqualTo(0); + } + }