Skip to content

Commit 33cf2d0

Browse files
authored
[fix][client] Fix the startMessageId can't be respected as the ChunkMessageID (apache#16154)
### Motivation This is the same problem as when the consumer inclusive seeks the chunked message. See more detail in [PIP-107](apache#12402) ### Modifications * Use the first chunk message id as the startMessageId when creating the consumer/reader.
1 parent 54e9c75 commit 33cf2d0

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,15 @@ public void testSeekChunkMessages() throws PulsarClientException {
520520
assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
521521
}
522522

523+
Reader<byte[]> reader = pulsarClient.newReader()
524+
.topic(topicName)
525+
.startMessageIdInclusive()
526+
.startMessageId(msgIds.get(1))
527+
.create();
528+
529+
Message<byte[]> readMsg = reader.readNext(5, TimeUnit.SECONDS);
530+
assertEquals(msgIds.get(1), readMsg.getMessageId());
531+
523532
consumer1.close();
524533
consumer2.close();
525534
producer.close();
@@ -549,5 +558,5 @@ private String createMessagePayload(int size) {
549558
}
550559
return str.toString();
551560
}
552-
561+
553562
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,14 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
249249
interceptors);
250250
this.consumerId = client.newConsumerId();
251251
this.subscriptionMode = conf.getSubscriptionMode();
252-
this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
252+
if (startMessageId != null) {
253+
if (startMessageId instanceof ChunkMessageIdImpl) {
254+
this.startMessageId = new BatchMessageIdImpl(
255+
((ChunkMessageIdImpl) startMessageId).getFirstChunkMessageId());
256+
} else {
257+
this.startMessageId = new BatchMessageIdImpl((MessageIdImpl) startMessageId);
258+
}
259+
}
253260
this.initialStartMessageId = this.startMessageId;
254261
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
255262
AVAILABLE_PERMITS_UPDATER.set(this, 0);

0 commit comments

Comments
 (0)