Skip to content

Commit

Permalink
Merge branch 'up/master' into website/branch-2.7.2-chapter-4
Browse files Browse the repository at this point in the history
* up/master:
  [C++] Fixed connection read error logging (apache#12492)
  [Pulsar SQL] Pulsar SQL support query big entry data (apache#12448)
  [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order (apache#12456)
  Allow to have different instances LocalMemoryMetadataStore that share the same state (apache#12390)
  Remove unused ConsumerImpl.isTxnMessage (apache#12472)
  • Loading branch information
Yan Zhang committed Oct 26, 2021
2 parents d23dde2 + 6edcaa7 commit fa9166a
Show file tree
Hide file tree
Showing 18 changed files with 449 additions and 163 deletions.
3 changes: 2 additions & 1 deletion conf/presto/catalog/pulsar.properties
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ pulsar.max-split-queue-cache-size=-1
# to prevent erroneous rewriting
pulsar.namespace-delimiter-rewrite-enable=false
pulsar.rewrite-namespace-delimiter=/

# max size of one batch message (default value is 5MB)
# pulsar.max-message-size=5242880

####### TIERED STORAGE OFFLOADER CONFIGS #######

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.client.api;

import static org.mockito.ArgumentMatchers.any;
Expand All @@ -24,16 +25,24 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -70,6 +79,7 @@ protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws
// method calls on the interface.
Mockito.withSettings().defaultAnswer(AdditionalAnswers.delegatesTo(internalExecutorService)));
}

@Override
public ExecutorService getInternalExecutorService() {
return internalExecutorServiceDelegate;
Expand Down Expand Up @@ -119,4 +129,69 @@ public void testMultiTopicsConsumerCloses() throws Exception {
verify(internalExecutorServiceDelegate, times(0))
.schedule(any(Runnable.class), anyLong(), any());
}

// test that reproduces the issue that PR https://github.com/apache/pulsar/pull/12456 fixes
// where MultiTopicsConsumerImpl has a data race that causes out-of-order delivery of messages
@Test
public void testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer()
throws PulsarAdminException, PulsarClientException, ExecutionException, InterruptedException,
TimeoutException {
String topicName = newTopicName();
int numPartitions = 2;
int numMessages = 100000;
admin.topics().createPartitionedTopic(topicName, numPartitions);

Producer<Long>[] producers = new Producer[numPartitions];

for (int i = 0; i < numPartitions; i++) {
producers[i] = pulsarClient.newProducer(Schema.INT64)
// produce to each partition directly so that order can be maintained in sending
.topic(topicName + "-partition-" + i)
.enableBatching(true)
.maxPendingMessages(30000)
.maxPendingMessagesAcrossPartitions(60000)
.batchingMaxMessages(10000)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
.batchingMaxBytes(4 * 1024 * 1024)
.blockIfQueueFull(true)
.create();
}

@Cleanup
Consumer<Long> consumer = pulsarClient
.newConsumer(Schema.INT64)
// consume on the partitioned topic
.topic(topicName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.receiverQueueSize(numMessages)
.subscriptionName(methodName)
.subscribe();

// produce sequence numbers to each partition topic
long sequenceNumber = 1L;
for (int i = 0; i < numMessages; i++) {
for (Producer<Long> producer : producers) {
producer.newMessage()
.value(sequenceNumber)
.sendAsync();
}
sequenceNumber++;
}
for (Producer<Long> producer : producers) {
producer.close();
}

// receive and validate sequences in the partitioned topic
Map<String, AtomicLong> receivedSequences = new HashMap<>();
int receivedCount = 0;
while (receivedCount < numPartitions * numMessages) {
Message<Long> message = consumer.receiveAsync().get(5, TimeUnit.SECONDS);
consumer.acknowledge(message);
receivedCount++;
AtomicLong receivedSequenceCounter =
receivedSequences.computeIfAbsent(message.getTopicName(), k -> new AtomicLong(1L));
Assert.assertEquals(message.getValue().longValue(), receivedSequenceCounter.getAndIncrement());
}
Assert.assertEquals(numPartitions * numMessages, receivedCount);
}
}
4 changes: 2 additions & 2 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,9 @@ void ClientConnection::handleRead(const boost::system::error_code& err, size_t b
if (err || bytesTransferred == 0) {
if (err) {
if (err == boost::asio::error::operation_aborted) {
LOG_DEBUG(cnxString_ << "Read failed: " << err.message());
LOG_DEBUG(cnxString_ << "Read operation was canceled: " << err.message());
} else {
LOG_ERROR(cnxString_ << "Read operation was cancelled");
LOG_ERROR(cnxString_ << "Read operation failed: " << err.message());
}
} // else: bytesTransferred == 0, which means server has closed the connection
close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,7 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
if (message == null) {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
}
if (message != null) {
} else {
messageProcessed(message);
result.complete(beforeConsume(message));
}
Expand Down Expand Up @@ -1239,10 +1238,6 @@ private void tryTriggerListener() {
}
}

private boolean isTxnMessage(MessageMetadata messageMetadata) {
return messageMetadata.hasTxnidMostBits() && messageMetadata.hasTxnidLeastBits();
}

private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId,
MessageIdData messageId, ClientCnx cnx) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
}

private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
consumer.receiveAsync().thenAccept(message -> {
consumer.receiveAsync().thenAcceptAsync(message -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Receive message from sub consumer:{}",
topic, subscription, consumer.getTopic());
Expand All @@ -260,16 +260,16 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
// or if any consumer is already paused (to create fair chance for already paused consumers)
pausedConsumers.add(consumer);

// Since we din't get a mutex, the condition on the incoming queue might have changed after
// Since we didn't get a mutex, the condition on the incoming queue might have changed after
// we have paused the current consumer. We need to re-check in order to avoid this consumer
// from getting stalled.
resumeReceivingFromPausedConsumersIfNeeded();
} else {
// Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
// recursion and stack overflow
internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer));
// Call receiveAsync() if the incoming queue is not full. Because this block is run with
// thenAcceptAsync, there is no chance for recursion that would lead to stack overflow.
receiveMessageFromConsumer(consumer);
}
}).exceptionally(ex -> {
}, internalPinnedExecutor).exceptionally(ex -> {
if (ex instanceof PulsarClientException.AlreadyClosedException
|| ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
// ignore the exception that happens when the consumer is closed
Expand All @@ -281,6 +281,7 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
});
}

// Must be called from the internalPinnedExecutor thread
private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
checkArgument(message instanceof MessageImpl);
TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(consumer.getTopic(),
Expand Down Expand Up @@ -409,17 +410,19 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
Message<T> message = incomingMessages.poll();
if (message == null) {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
}
internalPinnedExecutor.execute(() -> {
Message<T> message = incomingMessages.poll();
if (message == null) {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
}
});
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -165,7 +166,7 @@ public void testReceiveAsyncCanBeCancelled() {
// given
MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
assertTrue(consumer.hasNextPendingReceive());
Awaitility.await().untilAsserted(() -> assertTrue(consumer.hasNextPendingReceive()));
// when
future.cancel(true);
// then
Expand Down
Loading

0 comments on commit fa9166a

Please sign in to comment.