Skip to content

Commit

Permalink
[ClientAPI]Fix hasMessageAvailable() (apache#6362)
Browse files Browse the repository at this point in the history
Fixes apache#6333

Previously, `hasMoreMessages` is test against:
```
return lastMessageIdInBroker.compareTo(lastDequeuedMessage) == 0
                && incomingMessages.size() > 0;
```
However, the `incomingMessages` could be 0 when the consumer/reader has just started and hasn't received any messages yet.

In this PR, the last entry is retrieved and decoded to get message metadata. for the batchIndex field population.
(cherry picked from commit baf155f)
  • Loading branch information
yjshen authored and jiazhai committed May 17, 2020
1 parent f6470cc commit 234a61f
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;

import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -59,6 +63,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
Expand Down Expand Up @@ -1396,22 +1401,83 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
Topic topic = consumer.getSubscription().getTopic();
Position position = topic.getLastMessageId();
int partitionIndex = TopicName.getPartitionIndex(topic.getName());
if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
topic.getName(), consumer.getSubscription().getName(), position, partitionIndex);
}
MessageIdData messageId = MessageIdData.newBuilder()
.setLedgerId(((PositionImpl)position).getLedgerId())
.setEntryId(((PositionImpl)position).getEntryId())
.setPartition(partitionIndex)
.build();

ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
getLargestBatchIndexWhenPossible(
topic,
(PositionImpl) position,
partitionIndex,
requestId,
consumer.getSubscription().getName());

} else {
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found"));
}
}

private void getLargestBatchIndexWhenPossible(
Topic topic,
PositionImpl position,
int partitionIndex,
long requestId,
String subscriptionName) {

PersistentTopic persistentTopic = (PersistentTopic) topic;
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();

// If it's not pointing to a valid entry, respond messageId of the current position.
if (position.getEntryId() == -1) {
MessageIdData messageId = MessageIdData.newBuilder()
.setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId())
.setPartition(partitionIndex).build();

ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
}

// For a valid position, we read the entry out and parse the batch size from its metadata.
CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
entryFuture.complete(entry);
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
entryFuture.completeExceptionally(exception);
}
}, null);

CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
int batchSize = metadata.getNumMessagesInBatch();
entry.release();
return batchSize;
});

batchSizeFuture.whenComplete((batchSize, e) -> {
if (e != null) {
ctx.writeAndFlush(Commands.newError(
requestId, ServerError.MetadataError, "Failed to get batch size for entry " + e.getMessage()));
} else {
int largestBatchIndex = batchSize > 1 ? batchSize - 1 : -1;

if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
topic.getName(), subscriptionName, position, partitionIndex);
}

MessageIdData messageId = MessageIdData.newBuilder()
.setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId())
.setPartition(partitionIndex)
.setBatchIndex(largestBatchIndex).build();

ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
}
});
}

@Override
protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
final long requestId = commandGetTopicsOfNamespace.getRequestId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
Expand Down Expand Up @@ -95,7 +96,10 @@ private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
} else {
log.info("Commencing phase one of compaction for {}, reading to {}",
reader.getTopic(), lastMessageId);
phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastMessageId, latestForKey,
// Each entry is processed as a whole, discard the batchIndex part deliberately.
MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId;
MessageIdImpl lastEntryMessageId = new MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(), lastImpl.getPartitionIndex());
phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastEntryMessageId, latestForKey,
loopPromise);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ public static Object[][] variationsForResetOnLatestMsg() {
};
}

@DataProvider
public static Object[][] variationsForHasMessageAvailable() {
return new Object[][] {
// batching / start-inclusive
{true, true},
{true, false},
{false, true},
{false, false},
};
}

@Test
public void testSimpleReader() throws Exception {
Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader")
Expand Down Expand Up @@ -531,6 +542,68 @@ public void testMessageAvailableAfterRestart() throws Exception {

}

@Test(dataProvider = "variationsForHasMessageAvailable")
public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive) throws Exception {
final String topicName = "persistent://my-property/my-ns/HasMessageAvailable";
final int numOfMessage = 100;

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic(topicName);

if (enableBatch) {
producerBuilder
.enableBatching(true)
.batchingMaxMessages(10);
} else {
producerBuilder
.enableBatching(false);
}

Producer<byte[]> producer = producerBuilder.create();

CountDownLatch latch = new CountDownLatch(numOfMessage);

List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>());

for (int i = 0; i < numOfMessage; i++) {
producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> {
if (e != null) {
Assert.fail();
} else {
allIds.add(mid);
}
latch.countDown();
});
}

latch.await();

allIds.sort(null); // make sure the largest mid appears at last.

for (MessageId id : allIds) {
Reader<byte[]> reader;

if (startInclusive) {
reader = pulsarClient.newReader().topic(topicName)
.startMessageId(id).startMessageIdInclusive().create();
} else {
reader = pulsarClient.newReader().topic(topicName)
.startMessageId(id).create();
}

if (startInclusive) {
assertTrue(reader.hasMessageAvailable());
} else if (id != allIds.get(allIds.size() - 1)) {
assertTrue(reader.hasMessageAvailable());
} else {
assertFalse(reader.hasMessageAvailable());
}
reader.close();
}

producer.close();
}

@Test
public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception {
final int numOfMessage = 10;
Expand Down Expand Up @@ -794,7 +867,7 @@ public void testReaderStartInMiddleOfBatch() throws Exception {
.batchingMaxMessages(10)
.create();

CountDownLatch latch = new CountDownLatch(100);
CountDownLatch latch = new CountDownLatch(numOfMessage);

List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
while (reader.hasMessageAvailable()) {
Assert.assertTrue(keys.remove(reader.readNext().getKey()));
}
Assert.assertTrue(keys.isEmpty());
// start from latest with start message inclusive should only read the last message in batch
Assert.assertTrue(keys.size() == 9);
Assert.assertFalse(keys.contains("key9"));
Assert.assertFalse(reader.hasMessageAvailable());
}

Expand Down
Loading

0 comments on commit 234a61f

Please sign in to comment.