Skip to content

Commit ef06691

Browse files
authored
PIP 83 : Pulsar client: Message consumption with pooled buffer (#10184)
fix api, buffer-access, duplicate code
1 parent 27dd63f commit ef06691

File tree

23 files changed

+445
-134
lines changed

23 files changed

+445
-134
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -4029,4 +4029,4 @@ public void testPartitionTopicsOnSeparateListner() throws Exception {
40294029
blockedMessageLatch.countDown();
40304030
log.info("-- Exiting {} test --", methodName);
40314031
}
4032-
}
4032+
}

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java

+104-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21+
import static java.nio.charset.StandardCharsets.UTF_8;
2122
import static java.util.UUID.randomUUID;
2223
import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
2324
import static org.mockito.Mockito.any;
@@ -30,12 +31,14 @@
3031
import static org.mockito.Mockito.verify;
3132
import static org.testng.Assert.assertEquals;
3233
import static org.testng.Assert.assertFalse;
34+
import static org.testng.Assert.assertNotEquals;
3335
import static org.testng.Assert.assertNotNull;
3436
import static org.testng.Assert.assertNull;
3537
import static org.testng.Assert.assertTrue;
3638
import static org.testng.Assert.fail;
37-
39+
import io.netty.buffer.ByteBuf;
3840
import java.lang.reflect.Field;
41+
import java.nio.ByteBuffer;
3942
import java.nio.charset.StandardCharsets;
4043
import java.security.GeneralSecurityException;
4144
import java.util.ArrayList;
@@ -55,7 +58,6 @@
5558

5659
import com.fasterxml.jackson.databind.ObjectMapper;
5760
import lombok.Cleanup;
58-
5961
import lombok.EqualsAndHashCode;
6062
import lombok.Getter;
6163
import lombok.Setter;
@@ -132,6 +134,11 @@ public Object[][] subType() {
132134
return new Object[][] { { SubscriptionType.Shared }, { SubscriptionType.Failover } };
133135
}
134136

137+
@DataProvider(name = "booleanFlagProvider")
138+
public Object[][] booleanFlagProvider() {
139+
return new Object[][] { { true }, { false } };
140+
}
141+
135142
/**
136143
* Verifies unload namespace-bundle doesn't close shared connection used by other namespace-bundle.
137144
*
@@ -918,4 +925,98 @@ public void testJsonSchemaProducerConsumerWithSpecifiedReaderAndWriter() throws
918925
private static final class TestMessageObject{
919926
private String value;
920927
}
921-
}
928+
929+
/**
930+
* It validates pooled message consumption for batch and non-batch messages.
931+
*
932+
* @throws Exception
933+
*/
934+
@Test(dataProvider = "booleanFlagProvider")
935+
public void testConsumerWithPooledMessages(boolean isBatchingEnabled) throws Exception {
936+
log.info("-- Starting {} test --", methodName);
937+
938+
@Cleanup
939+
PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
940+
941+
final String topic = "persistent://my-property/my-ns/testConsumerWithPooledMessages" + isBatchingEnabled;
942+
943+
@Cleanup
944+
Consumer<ByteBuffer> consumer = newPulsarClient.newConsumer(Schema.BYTEBUFFER).topic(topic)
945+
.subscriptionName("my-sub").poolMessages(true).subscribe();
946+
947+
@Cleanup
948+
Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled).create();
949+
950+
final int numMessages = 100;
951+
for (int i = 0; i < numMessages; i++) {
952+
producer.newMessage().value(("value-" + i).getBytes(UTF_8))
953+
.eventTime((i + 1) * 100L).sendAsync();
954+
}
955+
producer.flush();
956+
957+
// Reuse pre-allocated pooled buffer to process every message
958+
byte[] val = null;
959+
int size = 0;
960+
for (int i = 0; i < numMessages; i++) {
961+
Message<ByteBuffer> msg = consumer.receive();
962+
ByteBuffer value;
963+
try {
964+
value = msg.getValue();
965+
int capacity = value.remaining();
966+
// expand the size of buffer if needed
967+
if (capacity > size) {
968+
val = new byte[capacity];
969+
size = capacity;
970+
}
971+
// read message into pooled buffer
972+
value.get(val, 0, capacity);
973+
// process the message
974+
assertEquals(("value-" + i), new String(val, 0, capacity));
975+
} finally {
976+
msg.release();
977+
}
978+
}
979+
consumer.close();
980+
producer.close();
981+
}
982+
983+
/**
984+
* It verifies that expiry/redelivery of messages relesaes the messages without leak.
985+
*
986+
* @param isBatchingEnabled
987+
* @throws Exception
988+
*/
989+
@Test(dataProvider = "booleanFlagProvider")
990+
public void testPooledMessageWithAckTimeout(boolean isBatchingEnabled) throws Exception {
991+
log.info("-- Starting {} test --", methodName);
992+
993+
@Cleanup
994+
PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
995+
996+
final String topic = "persistent://my-property/my-ns/testPooledMessageWithAckTimeout" + isBatchingEnabled;
997+
998+
@Cleanup
999+
ConsumerImpl<ByteBuffer> consumer = (ConsumerImpl<ByteBuffer>) newPulsarClient.newConsumer(Schema.BYTEBUFFER)
1000+
.topic(topic).subscriptionName("my-sub").poolMessages(true).subscribe();
1001+
1002+
@Cleanup
1003+
Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled)
1004+
.create();
1005+
1006+
final int numMessages = 100;
1007+
for (int i = 0; i < numMessages; i++) {
1008+
producer.newMessage().value(("value-" + i).getBytes(UTF_8)).eventTime((i + 1) * 100L).sendAsync();
1009+
}
1010+
producer.flush();
1011+
1012+
retryStrategically((test) -> consumer.incomingMessages.peek() != null, 5, 500);
1013+
MessageImpl<ByteBuffer> msg = (MessageImpl) consumer.incomingMessages.peek();
1014+
assertNotNull(msg);
1015+
ByteBuf payload = ((MessageImpl) msg).getPayload();
1016+
assertNotEquals(payload.refCnt(), 0);
1017+
consumer.redeliverUnacknowledgedMessages();
1018+
assertEquals(payload.refCnt(), 0);
1019+
consumer.close();
1020+
producer.close();
1021+
}
1022+
}

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java

+10
Original file line numberDiff line numberDiff line change
@@ -731,4 +731,14 @@ public interface ConsumerBuilder<T> extends Cloneable {
731731
* @return
732732
*/
733733
ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit);
734+
735+
/**
736+
* Enable pooling of messages and the underlying data buffers.
737+
* <p/>
738+
* When pooling is enabled, the application is responsible for calling Message.release() after the handling of every
739+
* received message. If “release()” is not called on a received message, there will be a memory leak. If an
740+
* application attempts to use and already “released” message, it might experience undefined behavior (eg: memory
741+
* corruption, deserialization error, etc.).
742+
*/
743+
ConsumerBuilder<T> poolMessages(boolean poolMessages);
734744
}

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java

+15
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@ public interface Message<T> {
6666
*/
6767
byte[] getData();
6868

69+
/**
70+
* Get the uncompressed message payload size in bytes.
71+
*
72+
* @return size in bytes.
73+
*/
74+
int size();
75+
6976
/**
7077
* Get the de-serialized value of the message, according the configured {@link Schema}.
7178
*
@@ -217,4 +224,12 @@ public interface Message<T> {
217224
* @return the name of cluster, from which the message is replicated.
218225
*/
219226
String getReplicatedFrom();
227+
228+
/**
229+
* Release a message back to the pool. This is required only if the consumer was created with the option to pool
230+
* messages, otherwise it will have no effect.
231+
*
232+
* @since 2.8.0
233+
*/
234+
void release();
220235
}

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java

+17
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21+
import static org.apache.pulsar.client.internal.DefaultImplementation.getBytes;
2122
import java.nio.ByteBuffer;
2223
import java.sql.Time;
2324
import java.sql.Timestamp;
@@ -120,6 +121,22 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
120121
return decode(bytes);
121122
}
122123

124+
/**
125+
* Decode a ByteBuffer into an object using a given version. <br/>
126+
*
127+
* @param data
128+
* the ByteBuffer to decode
129+
* @param schemaVersion
130+
* the schema version to decode the object. null indicates using latest version.
131+
* @return the deserialized object
132+
*/
133+
default T decode(ByteBuffer data, byte[] schemaVersion) {
134+
if (data == null) {
135+
return null;
136+
}
137+
return decode(getBytes(data), schemaVersion);
138+
}
139+
123140
/**
124141
* @return an object that represents the Schema associated metadata
125142
*/

pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java

+20
Original file line numberDiff line numberDiff line change
@@ -494,4 +494,24 @@ public static BatcherBuilder newKeyBasedBatcherBuilder() {
494494
() -> (BatcherBuilder) getConstructor("org.apache.pulsar.client.impl.KeyBasedBatcherBuilder")
495495
.newInstance());
496496
}
497+
498+
/**
499+
* Retrieves ByteBuffer data into byte[].
500+
*
501+
* @param byteBuffer
502+
* @return
503+
*/
504+
public static byte[] getBytes(ByteBuffer byteBuffer) {
505+
if (byteBuffer == null) {
506+
return null;
507+
}
508+
if (byteBuffer.hasArray() && byteBuffer.arrayOffset() == 0
509+
&& byteBuffer.array().length == byteBuffer.remaining()) {
510+
return byteBuffer.array();
511+
}
512+
// Direct buffer is not backed by array and it needs to be read from direct memory
513+
byte[] array = new byte[byteBuffer.remaining()];
514+
byteBuffer.get(array);
515+
return array;
516+
}
497517
}

pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java

+22-12
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.pulsar.client.cli;
2020

2121
import static org.apache.commons.lang3.StringUtils.isNotBlank;
22-
22+
import static org.apache.pulsar.client.internal.DefaultImplementation.getBytes;
2323
import com.beust.jcommander.Parameter;
2424
import com.beust.jcommander.ParameterException;
2525
import com.beust.jcommander.Parameters;
@@ -31,6 +31,7 @@
3131
import java.io.ByteArrayOutputStream;
3232
import java.io.IOException;
3333
import java.net.URI;
34+
import java.nio.ByteBuffer;
3435
import java.util.ArrayList;
3536
import java.util.Arrays;
3637
import java.util.Base64;
@@ -122,7 +123,9 @@ public class CmdConsume {
122123
@Parameter(names = { "-st", "--schema-type"}, description = "Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'")
123124
private String schematype = "bytes";
124125

125-
126+
@Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message")
127+
private boolean poolMessages = true;
128+
126129
private ClientBuilder clientBuilder;
127130
private Authentication authentication;
128131
private String serviceURL;
@@ -171,6 +174,8 @@ private String interpretMessage(Message<?> message, boolean displayHex) throws I
171174
} else if (value instanceof GenericRecord) {
172175
Map<String, Object> asMap = genericRecordToMap((GenericRecord) value);
173176
data = asMap.toString();
177+
} else if (value instanceof ByteBuffer) {
178+
data = new String(getBytes((ByteBuffer) value));
174179
} else {
175180
data = value.toString();
176181
}
@@ -233,7 +238,7 @@ private int consume(String topic) {
233238
try {
234239
ConsumerBuilder<?> builder;
235240
PulsarClient client = clientBuilder.build();
236-
Schema<?> schema = Schema.BYTES;
241+
Schema<?> schema = poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
237242
if ("auto_consume".equals(schematype)) {
238243
schema = Schema.AUTO_CONSUME();
239244
} else if (!"bytes".equals(schematype)) {
@@ -243,7 +248,8 @@ private int consume(String topic) {
243248
.subscriptionName(this.subscriptionName)
244249
.subscriptionType(subscriptionType)
245250
.subscriptionMode(subscriptionMode)
246-
.subscriptionInitialPosition(subscriptionInitialPosition);
251+
.subscriptionInitialPosition(subscriptionInitialPosition)
252+
.poolMessages(poolMessages);
247253

248254
if (isRegex) {
249255
builder.topicsPattern(Pattern.compile(topic));
@@ -275,15 +281,19 @@ private int consume(String topic) {
275281
if (msg == null) {
276282
LOG.debug("No message to consume after waiting for 5 seconds.");
277283
} else {
278-
numMessagesConsumed += 1;
279-
if (!hideContent) {
280-
System.out.println(MESSAGE_BOUNDARY);
281-
String output = this.interpretMessage(msg, displayHex);
282-
System.out.println(output);
283-
} else if (numMessagesConsumed % 1000 == 0) {
284-
System.out.println("Received " + numMessagesConsumed + " messages");
284+
try {
285+
numMessagesConsumed += 1;
286+
if (!hideContent) {
287+
System.out.println(MESSAGE_BOUNDARY);
288+
String output = this.interpretMessage(msg, displayHex);
289+
System.out.println(output);
290+
} else if (numMessagesConsumed % 1000 == 0) {
291+
System.out.println("Received " + numMessagesConsumed + " messages");
292+
}
293+
consumer.acknowledge(msg);
294+
} finally {
295+
msg.release();
285296
}
286-
consumer.acknowledge(msg);
287297
}
288298
}
289299
client.close();

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -908,23 +908,28 @@ protected boolean hasPendingBatchReceive() {
908908
}
909909

910910
protected void increaseIncomingMessageSize(final Message<?> message) {
911-
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
912-
this, message.getData() == null ? 0 : message.getData().length);
911+
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size());
913912
}
914913

915914
protected void resetIncomingMessageSize() {
916915
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
917916
}
918917

919918
protected void decreaseIncomingMessageSize(final Message<?> message) {
920-
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
921-
(message.getData() != null) ? -message.getData().length : 0);
919+
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.size());
922920
}
923921

924922
public long getIncomingMessageSize() {
925923
return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
926924
}
927925

926+
protected void clearIncomingMessages() {
927+
// release messages if they are pooled messages
928+
incomingMessages.forEach(Message::release);
929+
incomingMessages.clear();
930+
resetIncomingMessageSize();
931+
}
932+
928933
protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
929934

930935
private ExecutorService getExecutor(Message<T> msg) {

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java

+5
Original file line numberDiff line numberDiff line change
@@ -458,4 +458,9 @@ public ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, Ti
458458
return this;
459459
}
460460

461+
@Override
462+
public ConsumerBuilder<T> poolMessages(boolean poolMessages) {
463+
conf.setPoolMessages(poolMessages);
464+
return this;
465+
}
461466
}

0 commit comments

Comments
 (0)