Skip to content

Commit 673d07f

Browse files
RobertIndiewuzhanpeng
authored and
wuzhanpeng
committed
[PIP 107][Client] Introduce chunk message ID (apache#12403)
Master Issue: apache#12402 ### Motivation This is an implementation for the PIP: apache#12402 ### Modifications * Introduce a new Message-ID type: Chunk Message-ID. The chunk message-id inherits from MessageIdImpl and adds two new methods: getFirstChunkMessageId and getLastChunkMessageID. For other method implementations, the lastChunkMessageID is called directly, which is compatible with much of the existing business logic. * Return the chunk message-id to the user when the Producer produces the chunk message or when the consumer consumes the chunk message. * In cosumer.seek, use the first chunk message-id of the chunk message-id. This will solve the problem caused by seeking chunk messages. This is also the impact of this PIP on the original business logic.
1 parent ea60449 commit 673d07f

File tree

9 files changed

+383
-19
lines changed

9 files changed

+383
-19
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java

+58
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,64 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
412412
clientBuilder.memoryLimit(10000L, SizeUnit.BYTES);
413413
}
414414

415+
@Test
416+
public void testSeekChunkMessages() throws PulsarClientException {
417+
log.info("-- Starting {} test --", methodName);
418+
this.conf.setMaxMessageSize(5);
419+
final int totalMessages = 5;
420+
final String topicName = "persistent://my-property/my-ns/test-seek-chunk";
421+
422+
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
423+
424+
Producer<byte[]> producer = producerBuilder
425+
.enableChunking(true)
426+
.enableBatching(false)
427+
.create();
428+
429+
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
430+
.topic(topicName)
431+
.subscriptionName("inclusive-seek")
432+
.startMessageIdInclusive()
433+
.subscribe();
434+
435+
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
436+
.topic(topicName)
437+
.subscriptionName("default-seek")
438+
.subscribe();
439+
440+
for (int i = 0; i < totalMessages; i++) {
441+
String message = createMessagePayload(10);
442+
producer.send(message.getBytes());
443+
}
444+
445+
Message<byte[]> msg = null;
446+
List<MessageId> msgIds = Lists.newArrayList();
447+
for (int i = 0; i < totalMessages; i++) {
448+
msg = consumer1.receive(5, TimeUnit.SECONDS);
449+
String receivedMessage = new String(msg.getData());
450+
log.info("[{}] - Received message: [{}]", i, receivedMessage);
451+
msgIds.add(msg.getMessageId());
452+
}
453+
454+
consumer1.seek(msgIds.get(1));
455+
for (int i = 1; i < totalMessages; i++) {
456+
Message<byte[]> msgAfterSeek = consumer1.receive(5, TimeUnit.SECONDS);
457+
assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
458+
}
459+
460+
consumer2.seek(msgIds.get(1));
461+
for (int i = 2; i < totalMessages; i++) {
462+
Message<byte[]> msgAfterSeek = consumer2.receive(5, TimeUnit.SECONDS);
463+
assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
464+
}
465+
466+
consumer1.close();
467+
consumer2.close();
468+
producer.close();
469+
470+
log.info("-- Exiting {} test --", methodName);
471+
}
472+
415473
private String createMessagePayload(int size) {
416474
StringBuilder str = new StringBuilder();
417475
Random rand = new Random();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.impl;
20+
21+
import io.netty.buffer.ByteBuf;
22+
import io.netty.buffer.Unpooled;
23+
import java.util.Objects;
24+
import org.apache.pulsar.client.api.MessageId;
25+
import org.apache.pulsar.common.api.proto.MessageIdData;
26+
27+
public class ChunkMessageIdImpl extends MessageIdImpl implements MessageId {
28+
private final MessageIdImpl firstChunkMsgId;
29+
30+
public ChunkMessageIdImpl(MessageIdImpl firstChunkMsgId, MessageIdImpl lastChunkMsgId) {
31+
super(lastChunkMsgId.getLedgerId(), lastChunkMsgId.getEntryId(), lastChunkMsgId.getPartitionIndex());
32+
this.firstChunkMsgId = firstChunkMsgId;
33+
}
34+
35+
public MessageIdImpl getFirstChunkMessageId() {
36+
return firstChunkMsgId;
37+
}
38+
39+
public MessageIdImpl getLastChunkMessageId() {
40+
return this;
41+
}
42+
43+
@Override
44+
public String toString() {
45+
return firstChunkMsgId.toString() + ';' + super.toString();
46+
}
47+
48+
@Override
49+
public byte[] toByteArray() {
50+
51+
// write last chunk message id
52+
MessageIdData msgId = super.writeMessageIdData(null, -1, 0);
53+
54+
// write first chunk message id
55+
msgId.setFirstChunkMessageId();
56+
firstChunkMsgId.writeMessageIdData(msgId.getFirstChunkMessageId(), -1, 0);
57+
58+
int size = msgId.getSerializedSize();
59+
ByteBuf serialized = Unpooled.buffer(size, size);
60+
msgId.writeTo(serialized);
61+
62+
return serialized.array();
63+
}
64+
65+
@Override
66+
public boolean equals(Object o) {
67+
return super.equals(o);
68+
}
69+
70+
@Override
71+
public int hashCode() {
72+
return Objects.hash(super.hashCode(), firstChunkMsgId.hashCode());
73+
}
74+
75+
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

+22-10
Original file line numberDiff line numberDiff line change
@@ -1216,6 +1216,24 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
12161216
if (uncompressedPayload == null) {
12171217
return;
12181218
}
1219+
1220+
// last chunk received: so, stitch chunked-messages and clear up chunkedMsgBuffer
1221+
if (log.isDebugEnabled()) {
1222+
log.debug("Chunked message completed chunkId {}, total-chunks {}, msgId {} sequenceId {}",
1223+
msgMetadata.getChunkId(), msgMetadata.getNumChunksFromMsg(), msgId,
1224+
msgMetadata.getSequenceId());
1225+
}
1226+
1227+
// remove buffer from the map, set the chunk message id
1228+
ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.remove(msgMetadata.getUuid());
1229+
if (chunkedMsgCtx.chunkedMessageIds.length > 0) {
1230+
msgId = new ChunkMessageIdImpl(chunkedMsgCtx.chunkedMessageIds[0],
1231+
chunkedMsgCtx.chunkedMessageIds[chunkedMsgCtx.chunkedMessageIds.length - 1]);
1232+
}
1233+
// add chunked messageId to unack-message tracker, and reduce pending-chunked-message count
1234+
unAckedChunkedMessageIdSequenceMap.put(msgId, chunkedMsgCtx.chunkedMessageIds);
1235+
pendingChunkedMessageCount--;
1236+
chunkedMsgCtx.recycle();
12191237
}
12201238

12211239
// If the topic is non-persistent, we should not ignore any messages.
@@ -1321,18 +1339,8 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
13211339
return null;
13221340
}
13231341

1324-
// last chunk received: so, stitch chunked-messages and clear up chunkedMsgBuffer
1325-
if (log.isDebugEnabled()) {
1326-
log.debug("Chunked message completed chunkId {}, total-chunks {}, msgId {} sequenceId {}",
1327-
msgMetadata.getChunkId(), msgMetadata.getNumChunksFromMsg(), msgId, msgMetadata.getSequenceId());
1328-
}
1329-
// remove buffer from the map, add chunked messageId to unack-message tracker, and reduce pending-chunked-message count
1330-
chunkedMessagesMap.remove(msgMetadata.getUuid());
1331-
unAckedChunkedMessageIdSequenceMap.put(msgId, chunkedMsgCtx.chunkedMessageIds);
1332-
pendingChunkedMessageCount--;
13331342
compressedPayload.release();
13341343
compressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
1335-
chunkedMsgCtx.recycle();
13361344
ByteBuf uncompressedPayload = uncompressPayloadIfNeeded(messageId, msgMetadata, compressedPayload, cnx, false);
13371345
compressedPayload.release();
13381346
return uncompressedPayload;
@@ -1990,6 +1998,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
19901998
ackSet.recycle();
19911999

19922000
seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
2001+
} else if (messageId instanceof ChunkMessageIdImpl) {
2002+
ChunkMessageIdImpl msgId = (ChunkMessageIdImpl) messageId;
2003+
seek = Commands.newSeek(consumerId, requestId, msgId.getFirstChunkMessageId().getLedgerId(),
2004+
msgId.getFirstChunkMessageId().getEntryId(), new long[0]);
19932005
} else {
19942006
MessageIdImpl msgId = (MessageIdImpl) messageId;
19952007
seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), new long[0]);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java

+21-6
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ public static MessageId fromByteArray(byte[] data) throws IOException {
120120
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
121121
idData.getBatchIndex());
122122
}
123+
} else if (idData.hasFirstChunkMessageId()) {
124+
MessageIdData firstChunkIdData = idData.getFirstChunkMessageId();
125+
messageId = new ChunkMessageIdImpl(
126+
new MessageIdImpl(firstChunkIdData.getLedgerId(), firstChunkIdData.getEntryId(),
127+
firstChunkIdData.getPartition()),
128+
new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition()));
123129
} else {
124130
messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition());
125131
}
@@ -166,12 +172,14 @@ public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName)
166172
return messageId;
167173
}
168174

169-
// batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message
170-
protected byte[] toByteArray(int batchIndex, int batchSize) {
171-
MessageIdData msgId = LOCAL_MESSAGE_ID.get()
172-
.clear()
173-
.setLedgerId(ledgerId)
174-
.setEntryId(entryId);
175+
protected MessageIdData writeMessageIdData(MessageIdData msgId, int batchIndex, int batchSize) {
176+
if(msgId == null) {
177+
msgId = LOCAL_MESSAGE_ID.get()
178+
.clear();
179+
}
180+
181+
msgId.setLedgerId(ledgerId).setEntryId(entryId);
182+
175183
if (partitionIndex >= 0) {
176184
msgId.setPartition(partitionIndex);
177185
}
@@ -184,6 +192,13 @@ protected byte[] toByteArray(int batchIndex, int batchSize) {
184192
msgId.setBatchSize(batchSize);
185193
}
186194

195+
return msgId;
196+
}
197+
198+
// batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message
199+
protected byte[] toByteArray(int batchIndex, int batchSize) {
200+
MessageIdData msgId = writeMessageIdData(null, batchIndex, batchSize);
201+
187202
int size = msgId.getSerializedSize();
188203
ByteBuf serialized = Unpooled.buffer(size, size);
189204
msgId.writeTo(serialized);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,7 @@ public boolean hasReplicateFrom() {
712712
return msgMetadata.hasReplicatedFrom();
713713
}
714714

715-
void setMessageId(MessageIdImpl messageId) {
715+
void setMessageId(MessageId messageId) {
716716
this.messageId = messageId;
717717
}
718718

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

+66-2
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@
3232
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
3333
import com.google.common.annotations.VisibleForTesting;
3434
import io.netty.buffer.ByteBuf;
35+
import io.netty.util.AbstractReferenceCounted;
3536
import io.netty.util.Recycler;
3637
import io.netty.util.Recycler.Handle;
3738
import io.netty.util.ReferenceCountUtil;
39+
import io.netty.util.ReferenceCounted;
3840
import io.netty.util.Timeout;
3941
import io.netty.util.TimerTask;
4042
import io.netty.util.concurrent.ScheduledFuture;
@@ -485,6 +487,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
485487
sequenceId = msgMetadata.getSequenceId();
486488
}
487489
String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
490+
ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
488491
byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion() ?
489492
msg.getMessageBuilder().getSchemaVersion() : null;
490493
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
@@ -497,7 +500,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
497500
}
498501
serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
499502
readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,
500-
compressedPayload.readableBytes(), uncompressedSize, callback);
503+
compressedPayload.readableBytes(), uncompressedSize, callback, chunkedMessageCtx);
501504
readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize());
502505
}
503506
}
@@ -517,7 +520,7 @@ public int getNumOfPartitions() {
517520
private void serializeAndSendMessage(MessageImpl<?> msg, ByteBuf payload,
518521
long sequenceId, String uuid, int chunkId, int totalChunks, int readStartIndex, int chunkMaxSizeInBytes, ByteBuf compressedPayload,
519522
boolean compressed, int compressedPayloadSize,
520-
int uncompressedSize, SendCallback callback) throws IOException, InterruptedException {
523+
int uncompressedSize, SendCallback callback, ChunkedMessageCtx chunkedMessageCtx) throws IOException, InterruptedException {
521524
ByteBuf chunkPayload = compressedPayload;
522525
MessageMetadata msgMetadata = msg.getMessageBuilder();
523526
if (totalChunks > 1 && TopicName.get(topic).isPersistent()) {
@@ -612,6 +615,7 @@ private void serializeAndSendMessage(MessageImpl<?> msg, ByteBuf payload,
612615
op.totalChunks = totalChunks;
613616
op.chunkId = chunkId;
614617
}
618+
op.chunkedMessageCtx = chunkedMessageCtx;
615619
lastSendFuture = callback.getFuture();
616620
processOpSendMsg(op);
617621
}
@@ -1022,6 +1026,16 @@ void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long le
10221026
OpSendMsg finalOp = op;
10231027
LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, getHighestSequenceId(finalOp)));
10241028
op.setMessageId(ledgerId, entryId, partitionIndex);
1029+
if (op.totalChunks > 1) {
1030+
if (op.chunkId == 0) {
1031+
op.chunkedMessageCtx.firstChunkMessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex);
1032+
} else if (op.chunkId == op.totalChunks - 1) {
1033+
op.chunkedMessageCtx.lastChunkMessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex);
1034+
op.setMessageId(op.chunkedMessageCtx.getChunkMessageId());
1035+
}
1036+
}
1037+
1038+
10251039
// if message is chunked then call callback only on last chunk
10261040
if (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) {
10271041
try {
@@ -1169,12 +1183,54 @@ protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) {
11691183
}
11701184
}
11711185

1186+
static class ChunkedMessageCtx extends AbstractReferenceCounted {
1187+
protected MessageIdImpl firstChunkMessageId;
1188+
protected MessageIdImpl lastChunkMessageId;
1189+
1190+
public ChunkMessageIdImpl getChunkMessageId() {
1191+
return new ChunkMessageIdImpl(firstChunkMessageId, lastChunkMessageId);
1192+
}
1193+
1194+
private static final Recycler<ProducerImpl.ChunkedMessageCtx> RECYCLER =
1195+
new Recycler<ProducerImpl.ChunkedMessageCtx>() {
1196+
protected ProducerImpl.ChunkedMessageCtx newObject(
1197+
Recycler.Handle<ProducerImpl.ChunkedMessageCtx> handle) {
1198+
return new ProducerImpl.ChunkedMessageCtx(handle);
1199+
}
1200+
};
1201+
1202+
public static ChunkedMessageCtx get(int totalChunks) {
1203+
ChunkedMessageCtx chunkedMessageCtx = RECYCLER.get();
1204+
chunkedMessageCtx.setRefCnt(totalChunks);
1205+
return chunkedMessageCtx;
1206+
}
1207+
1208+
private final Handle<ProducerImpl.ChunkedMessageCtx> recyclerHandle;
1209+
1210+
private ChunkedMessageCtx(Handle<ChunkedMessageCtx> recyclerHandle) {
1211+
this.recyclerHandle = recyclerHandle;
1212+
}
1213+
1214+
@Override
1215+
protected void deallocate() {
1216+
this.firstChunkMessageId = null;
1217+
this.lastChunkMessageId = null;
1218+
recyclerHandle.recycle(this);
1219+
}
1220+
1221+
@Override
1222+
public ReferenceCounted touch(Object hint) {
1223+
return this;
1224+
}
1225+
}
1226+
11721227
protected static final class OpSendMsg {
11731228
MessageImpl<?> msg;
11741229
List<MessageImpl<?>> msgs;
11751230
ByteBufPair cmd;
11761231
SendCallback callback;
11771232
Runnable rePopulate;
1233+
ChunkedMessageCtx chunkedMessageCtx;
11781234
long uncompressedSize;
11791235
long sequenceId;
11801236
long createdAt;
@@ -1277,6 +1333,8 @@ void recycle() {
12771333
retryCount = 0;
12781334
batchSizeByte = 0;
12791335
numMessagesInBatch = 1;
1336+
ReferenceCountUtil.safeRelease(chunkedMessageCtx);
1337+
chunkedMessageCtx = null;
12801338
recyclerHandle.recycle(this);
12811339
}
12821340

@@ -1299,6 +1357,12 @@ void setMessageId(long ledgerId, long entryId, int partitionIndex) {
12991357
}
13001358
}
13011359

1360+
void setMessageId(ChunkMessageIdImpl chunkMessageId) {
1361+
if (msg != null) {
1362+
msg.setMessageId(chunkMessageId);
1363+
}
1364+
}
1365+
13021366
private OpSendMsg(Handle<OpSendMsg> recyclerHandle) {
13031367
this.recyclerHandle = recyclerHandle;
13041368
}

0 commit comments

Comments
 (0)