Skip to content

Commit b0467f0

Browse files
AnonHxynicklixinyang
authored and
nicklixinyang
committed
[improve][client]PIP-189: No batching if only one message in batch (apache#16605)
[improve][client]PIP-189: No batching if only one message in batch apache#16605 ### Motivation * See apache#16619 ### Modifications * See apache#16619 * Most of the Modifications are relevant to `BatchMessageContainerImpl` * Of course there are some tests about batching need to be modified, because batched producer can also pubulish non-batched messages when this PIP applies. The tests include: * `RGUsageMTAggrWaitForAllMsgsTest` * `BatchMessageTest` * `BrokerEntryMetadataE2ETest` * `ClientDeduplicationTest` * `TopicReaderTest` * `PulsarClientToolTest`
1 parent 5f9df13 commit b0467f0

File tree

9 files changed

+240
-35
lines changed

9 files changed

+240
-35
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -769,8 +769,8 @@ private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs,
769769
Assert.assertNotEquals(ninthPercentileValue, 0);
770770
}
771771

772-
// Empirically, there appears to be a 42-byte overhead for metadata, imposed by Pulsar runtime.
773-
private static final int PER_MESSAGE_METADATA_OHEAD = 42;
772+
// Empirically, there appears to be a 31-byte overhead for metadata, imposed by Pulsar runtime.
773+
private static final int PER_MESSAGE_METADATA_OHEAD = 31;
774774

775775
private static final int PUBLISH_INTERVAL_SECS = 10;
776776
private static final int NUM_PRODUCERS = 4;

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java

+36-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.pulsar.client.api.SubscriptionType;
5757
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
5858
import org.apache.pulsar.client.impl.ConsumerImpl;
59+
import org.apache.pulsar.client.impl.MessageIdImpl;
5960
import org.apache.pulsar.common.naming.TopicName;
6061
import org.apache.pulsar.common.util.FutureUtil;
6162
import org.awaitility.Awaitility;
@@ -865,6 +866,37 @@ public void testOrderingOfKeyBasedBatchMessageContainer() throws PulsarClientExc
865866
producer.close();
866867
}
867868

869+
@Test(dataProvider = "containerBuilder")
870+
public void testBatchSendOneMessage(BatcherBuilder builder) throws Exception {
871+
final String topicName = "persistent://prop/ns-abc/testBatchSendOneMessage-" + UUID.randomUUID();
872+
final String subscriptionName = "sub-1";
873+
874+
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
875+
.subscriptionType(SubscriptionType.Shared).subscribe();
876+
877+
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
878+
.batchingMaxPublishDelay(1, TimeUnit.SECONDS).batchingMaxMessages(10).enableBatching(true)
879+
.batcherBuilder(builder)
880+
.create();
881+
String msg = "my-message";
882+
MessageId messageId = producer.newMessage().value(msg.getBytes()).property("key1", "value1").send();
883+
884+
Assert.assertTrue(messageId instanceof MessageIdImpl);
885+
Assert.assertFalse(messageId instanceof BatchMessageIdImpl);
886+
887+
Message<byte[]> received = consumer.receive();
888+
assertEquals(received.getSequenceId(), 0);
889+
consumer.acknowledge(received);
890+
891+
Assert.assertEquals(new String(received.getData()), msg);
892+
Assert.assertFalse(received.getProperties().isEmpty());
893+
Assert.assertEquals(received.getProperties().get("key1"), "value1");
894+
Assert.assertFalse(received.getMessageId() instanceof BatchMessageIdImpl);
895+
896+
producer.close();
897+
consumer.close();
898+
}
899+
868900
@Test(dataProvider = "containerBuilder")
869901
public void testRetrieveSequenceIdGenerated(BatcherBuilder builder) throws Exception {
870902

@@ -1034,7 +1066,10 @@ private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subTyp
10341066
if (enableBatch) {
10351067
// only ack messages which batch index < 2, which means we will not to ack the
10361068
// whole batch for the batch that with more than 2 messages
1037-
if (((BatchMessageIdImpl) message.getMessageId()).getBatchIndex() < 2) {
1069+
if ((message.getMessageId() instanceof BatchMessageIdImpl)
1070+
&& ((BatchMessageIdImpl) message.getMessageId()).getBatchIndex() < 2) {
1071+
consumer.acknowledgeAsync(message).get();
1072+
} else if (!(message.getMessageId() instanceof BatchMessageIdImpl)){
10381073
consumer.acknowledgeAsync(message).get();
10391074
}
10401075
} else {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java

+44-23
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import static org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
2222

2323
import java.time.Duration;
24+
import java.util.ArrayList;
2425
import java.util.List;
26+
import java.util.concurrent.CompletableFuture;
2527
import java.util.concurrent.TimeUnit;
2628
import lombok.Cleanup;
2729
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -36,6 +38,7 @@
3638
import org.apache.pulsar.client.impl.MessageIdImpl;
3739
import org.apache.pulsar.client.impl.MessageImpl;
3840
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
41+
import org.apache.pulsar.common.util.FutureUtil;
3942
import org.assertj.core.util.Sets;
4043
import org.awaitility.Awaitility;
4144
import org.testng.Assert;
@@ -211,57 +214,75 @@ public void testBatchMessage() throws Exception {
211214
final String topic = newTopicName();
212215
final String subscription = "my-sub";
213216
final long eventTime= 200;
217+
final int msgNum = 2;
214218

215219
@Cleanup
216220
Producer<byte[]> producer = pulsarClient.newProducer()
217221
.topic(topic)
222+
// make sure 2 messages in one batch, because if only one message in batch,
223+
// producer will not send batched messages
224+
.batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
225+
.batchingMaxMessages(msgNum)
226+
.batchingMaxBytes(Integer.MAX_VALUE)
218227
.enableBatching(true)
219228
.create();
220229

221230
long sendTime = System.currentTimeMillis();
222-
// send message which is batch message and only contains one message, so do not set the deliverAtTime
223-
MessageIdImpl messageId = (MessageIdImpl) producer.newMessage()
231+
// send message which is batch message, so do not set the deliverAtTime
232+
List<CompletableFuture<MessageId>> messageIdsFuture = new ArrayList<>(msgNum);
233+
for (int i = 0; i < msgNum; ++i) {
234+
CompletableFuture<MessageId> messageId = producer.newMessage()
224235
.eventTime(eventTime)
225-
.value(("hello").getBytes())
226-
.send();
236+
.value(("hello" + i).getBytes())
237+
.sendAsync();
238+
messageIdsFuture.add(messageId);
239+
}
240+
FutureUtil.waitForAll(messageIdsFuture);
227241

228242
// 1. test for peekMessages
229243
admin.topics().createSubscription(topic, subscription, MessageId.earliest);
230-
final List<Message<byte[]>> messages = admin.topics().peekMessages(topic, subscription, 1);
231-
Assert.assertEquals(messages.size(), 1);
232-
233-
MessageImpl message = (MessageImpl) messages.get(0);
234-
Assert.assertEquals(message.getData(), ("hello").getBytes());
235-
Assert.assertTrue(message.getPublishTime() >= sendTime);
236-
BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata();
237-
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
238-
Assert.assertEquals(entryMetadata.getIndex(), 0);
239-
System.out.println(message.getProperties());
240-
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
241-
// make sure BATCH_SIZE_HEADER > 0
242-
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
244+
final List<Message<byte[]>> messages = admin.topics().peekMessages(topic, subscription, msgNum);
245+
Assert.assertEquals(messages.size(), msgNum);
246+
247+
MessageImpl message;
248+
BrokerEntryMetadata entryMetadata;
249+
for (int i = 0; i < msgNum; ++i) {
250+
message = (MessageImpl) messages.get(i);
251+
Assert.assertEquals(message.getData(), ("hello" + i).getBytes());
252+
Assert.assertTrue(message.getPublishTime() >= sendTime);
253+
entryMetadata = message.getBrokerEntryMetadata();
254+
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
255+
Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1);
256+
System.out.println(message.getProperties());
257+
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), msgNum);
258+
// make sure BATCH_SIZE_HEADER > 0
259+
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
260+
}
243261

262+
// getMessagesById and examineMessage only return the first messages in the batch
244263
// 2. test for getMessagesById
264+
MessageIdImpl messageId = (MessageIdImpl) messageIdsFuture.get(0).get();
245265
message = (MessageImpl) admin.topics().getMessageById(topic, messageId.getLedgerId(), messageId.getEntryId());
246-
Assert.assertEquals(message.getData(), ("hello").getBytes());
266+
// getMessagesById return the first message in the batch
267+
Assert.assertEquals(message.getData(), ("hello" + 0).getBytes());
247268
Assert.assertTrue(message.getPublishTime() >= sendTime);
248269
entryMetadata = message.getBrokerEntryMetadata();
249270
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
250-
Assert.assertEquals(entryMetadata.getIndex(), 0);
271+
Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1);
251272
System.out.println(message.getProperties());
252-
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
273+
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), msgNum);
253274
// make sure BATCH_SIZE_HEADER > 0
254275
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
255276

256277
// 3. test for examineMessage
257278
message = (MessageImpl) admin.topics().examineMessage(topic, "earliest", 1);
258-
Assert.assertEquals(message.getData(), ("hello").getBytes());
279+
Assert.assertEquals(message.getData(), ("hello" + 0).getBytes());
259280
Assert.assertTrue(message.getPublishTime() >= sendTime);
260281
entryMetadata = message.getBrokerEntryMetadata();
261282
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
262-
Assert.assertEquals(entryMetadata.getIndex(), 0);
283+
Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1);
263284
System.out.println(message.getProperties());
264-
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
285+
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), msgNum);
265286
// make sure BATCH_SIZE_HEADER > 0
266287
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
267288
}

pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.stream.Collectors;
3333
import lombok.extern.slf4j.Slf4j;
3434
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
35+
import org.apache.pulsar.client.impl.MessageIdImpl;
3536
import org.apache.pulsar.common.util.FutureUtil;
3637
import org.awaitility.Awaitility;
3738
import org.testng.annotations.AfterClass;
@@ -361,8 +362,10 @@ public void testKeyBasedBatchingOrder() throws Exception {
361362
for (int i = 0; i < 5; i++) {
362363
// Currently sending a duplicated message won't throw an exception. Instead, an invalid result is returned.
363364
final MessageId messageId = producer.newMessage().value("msg").sequenceId(i).send();
364-
assertTrue(messageId instanceof BatchMessageIdImpl);
365-
final BatchMessageIdImpl messageIdImpl = (BatchMessageIdImpl) messageId;
365+
// a duplicated message will send in a single batch, that will perform as a non-batched sending
366+
assertTrue(messageId instanceof MessageIdImpl);
367+
assertFalse(messageId instanceof BatchMessageIdImpl);
368+
final MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
366369
assertEquals(messageIdImpl.getLedgerId(), -1L);
367370
assertEquals(messageIdImpl.getEntryId(), -1L);
368371
}

pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1080,7 +1080,7 @@ public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive)
10801080
}
10811081

10821082
@Test(timeOut = 20000)
1083-
public void testHasMessageAvailableWithBatch() throws Exception {
1083+
public void testHasMessageAvailable() throws Exception {
10841084
final String topicName = "persistent://my-property/my-ns/testHasMessageAvailableWithBatch";
10851085
final int numOfMessage = 10;
10861086

@@ -1092,11 +1092,11 @@ public void testHasMessageAvailableWithBatch() throws Exception {
10921092

10931093
//For batch-messages with single message, the type of client messageId should be the same as that of broker
10941094
MessageIdImpl messageId = (MessageIdImpl) producer.send("msg".getBytes());
1095-
assertTrue(messageId instanceof MessageIdImpl);
1095+
assertFalse(messageId instanceof BatchMessageIdImpl);
10961096
ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName)
10971097
.startMessageId(messageId).startMessageIdInclusive().create();
10981098
MessageIdImpl lastMsgId = (MessageIdImpl) reader.getConsumer().getLastMessageId();
1099-
assertTrue(messageId instanceof BatchMessageIdImpl);
1099+
assertFalse(lastMsgId instanceof BatchMessageIdImpl);
11001100
assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId());
11011101
assertEquals(lastMsgId.getEntryId(), messageId.getEntryId());
11021102
reader.close();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.cli;
20+
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.doAnswer;
23+
import static org.mockito.Mockito.doReturn;
24+
import static org.mockito.Mockito.spy;
25+
import java.util.Properties;
26+
import java.util.concurrent.TimeUnit;
27+
import org.apache.pulsar.client.api.Authentication;
28+
import org.apache.pulsar.client.api.ClientBuilder;
29+
import org.apache.pulsar.client.api.MessageId;
30+
import org.apache.pulsar.client.api.Producer;
31+
import org.apache.pulsar.client.api.ProducerBuilder;
32+
import org.apache.pulsar.client.api.Schema;
33+
import org.apache.pulsar.client.api.TypedMessageBuilder;
34+
import org.apache.pulsar.client.impl.PulsarClientImpl;
35+
import org.mockito.stubbing.Answer;
36+
import org.testng.Assert;
37+
38+
/**
39+
* An implement of {@link PulsarClientTool} for test, which will publish messages iff there is enough messages
40+
* in the batch.
41+
*/
42+
public class PulsarClientToolForceBatchNum extends PulsarClientTool{
43+
private final String topic;
44+
private final int batchNum;
45+
46+
/**
47+
*
48+
* @param properties properties
49+
* @param topic topic
50+
* @param batchNum iff there is batchNum messages in the batch, the producer will flush and send.
51+
*/
52+
public PulsarClientToolForceBatchNum(Properties properties, String topic, int batchNum) {
53+
super(properties);
54+
this.topic = topic;
55+
this.batchNum = batchNum;
56+
}
57+
58+
@Override
59+
protected void initJCommander() {
60+
super.initJCommander();
61+
produceCommand = new CmdProduce() {
62+
@Override
63+
public void updateConfig(ClientBuilder newBuilder, Authentication authentication, String serviceURL) {
64+
try {
65+
super.updateConfig(mockClientBuilder(newBuilder), authentication, serviceURL);
66+
} catch (Exception e) {
67+
Assert.fail("update config fail " + e.getMessage());
68+
}
69+
}
70+
};
71+
jcommander.addCommand("produce", produceCommand);
72+
}
73+
74+
private ClientBuilder mockClientBuilder(ClientBuilder newBuilder) throws Exception {
75+
PulsarClientImpl client = (PulsarClientImpl) newBuilder.build();
76+
ProducerBuilder<byte[]> producerBuilder = client.newProducer()
77+
.batchingMaxBytes(Integer.MAX_VALUE)
78+
.batchingMaxMessages(batchNum)
79+
.batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
80+
.topic(topic);
81+
Producer<byte[]> producer = producerBuilder.create();
82+
83+
PulsarClientImpl mockClient = spy(client);
84+
ProducerBuilder<byte[]> mockProducerBuilder = spy(producerBuilder);
85+
Producer<byte[]> mockProducer = spy(producer);
86+
ClientBuilder mockClientBuilder = spy(newBuilder);
87+
88+
doAnswer((Answer<TypedMessageBuilder>) invocation -> {
89+
TypedMessageBuilder typedMessageBuilder = spy((TypedMessageBuilder) invocation.callRealMethod());
90+
doAnswer((Answer<MessageId>) invocation1 -> {
91+
TypedMessageBuilder mock = ((TypedMessageBuilder) invocation1.getMock());
92+
// using sendAsync() to replace send()
93+
mock.sendAsync();
94+
return null;
95+
}).when(typedMessageBuilder).send();
96+
return typedMessageBuilder;
97+
}).when(mockProducer).newMessage();
98+
99+
doReturn(mockProducer).when(mockProducerBuilder).create();
100+
doReturn(mockProducerBuilder).when(mockClient).newProducer(any(Schema.class));
101+
doReturn(mockClient).when(mockClientBuilder).build();
102+
return mockClientBuilder;
103+
}
104+
}

pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -244,16 +244,19 @@ public void testDisableBatching() throws Exception {
244244
properties.setProperty("useTls", "false");
245245

246246
final String topicName = getTopicWithRandomSuffix("disable-batching");
247-
final int numberOfMessages = 5;
247+
// `numberOfMessages` should be an even number, because we set `batchNum` as 2, make sure batch and non batch
248+
// messages in the same batch
249+
final int numberOfMessages = 6;
250+
final int batchNum = 2;
248251

249252
@Cleanup
250253
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
251254

252-
PulsarClientTool pulsarClientTool1 = new PulsarClientTool(properties);
255+
PulsarClientTool pulsarClientTool1 = new PulsarClientToolForceBatchNum(properties, topicName, batchNum);
253256
String[] args1 = {"produce", "-m", "batched", "-n", Integer.toString(numberOfMessages), topicName};
254257
Assert.assertEquals(pulsarClientTool1.run(args1), 0);
255258

256-
PulsarClientTool pulsarClientTool2 = new PulsarClientTool(properties);
259+
PulsarClientTool pulsarClientTool2 = new PulsarClientToolForceBatchNum(properties, topicName, batchNum);
257260
String[] args2 = {"produce", "-m", "non-batched", "-n", Integer.toString(numberOfMessages), "-db", topicName};
258261
Assert.assertEquals(pulsarClientTool2.run(args2), 0);
259262

0 commit comments

Comments
 (0)