Skip to content

Commit d7f996f

Browse files
RobertIndiecodelipenghui
authored andcommitted
[fix][client] Fix the startMessageId can't be respected as the ChunkMessageID (apache#16154)
### Motivation This is the same problem as when the consumer inclusive seeks the chunked message. See more detail in [PIP-107](apache#12402) ### Modifications * Use the first chunk message id as the startMessageId when creating the consumer/reader. (cherry picked from commit 33cf2d0)
1 parent b68fa32 commit d7f996f

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java

+9
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,15 @@ public void testSeekChunkMessages() throws PulsarClientException {
488488
assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
489489
}
490490

491+
Reader<byte[]> reader = pulsarClient.newReader()
492+
.topic(topicName)
493+
.startMessageIdInclusive()
494+
.startMessageId(msgIds.get(1))
495+
.create();
496+
497+
Message<byte[]> readMsg = reader.readNext(5, TimeUnit.SECONDS);
498+
assertEquals(msgIds.get(1), readMsg.getMessageId());
499+
491500
consumer1.close();
492501
consumer2.close();
493502
producer.close();

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,14 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
251251
interceptors);
252252
this.consumerId = client.newConsumerId();
253253
this.subscriptionMode = conf.getSubscriptionMode();
254-
this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
254+
if (startMessageId != null) {
255+
if (startMessageId instanceof ChunkMessageIdImpl) {
256+
this.startMessageId = new BatchMessageIdImpl(
257+
((ChunkMessageIdImpl) startMessageId).getFirstChunkMessageId());
258+
} else {
259+
this.startMessageId = new BatchMessageIdImpl((MessageIdImpl) startMessageId);
260+
}
261+
}
255262
this.initialStartMessageId = this.startMessageId;
256263
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
257264
AVAILABLE_PERMITS_UPDATER.set(this, 0);

0 commit comments

Comments
 (0)