Skip to content

Commit

Permalink
Exposing the broker entry metadata to client (apache#11553)
Browse files Browse the repository at this point in the history
This is an improvement to [PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata). Now the client is able to get the broker entry metadata.

1. increment the client protocol version from 17 to 18.
2. add a configuration `enableExposingBrokerEntryMetadataToClient` in the broker configuration.
3. Let the broker send the broker entry metadata to the client when `client protocol version >= 18` and `enableExposingBrokerEntryMetadataToClient` is true.
4. Add client support for broker entry metadata. Add two methods `getBrokerPublishTime()` and `getIndex()` to utilize the broker entry metadata in `Message`.

(cherry picked from commit a7a3721)
  • Loading branch information
LeBW authored and dlg99 committed Jun 15, 2022
1 parent 0dce444 commit 8689d7b
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "List of interceptors for entry metadata.")
private Set<String> brokerEntryMetadataInterceptors = new HashSet<>();

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable or disable exposing broker entry metadata to client.")
private boolean exposingBrokerEntryMetadataToClientEnabled = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable namespaceIsolation policy update take effect ontime or not," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,14 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
// increment ref-count of data and release at the end of process:
// so, we can get chance to call entry.release
metadataAndPayload.retain();
// skip raw message metadata since broker timestamp only used in broker side
Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
// skip broker entry metadata if consumer-client doesn't support broker entry metadata or the
// features is not enabled
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue()
|| !cnx.supportBrokerMetadata()
|| !cnx.getBrokerService().getPulsar().getConfig()
.isExposingBrokerEntryMetadataToClientEnabled()) {
Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
}
// skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getValue()) {
Commands.skipChecksumIfPresent(metadataAndPayload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ protected Set<ServerCnx> initialValue() throws Exception {
}
};


enum State {
Start, Connected, Failed, Connecting
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ protected void setup() throws Exception {
"org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor",
"org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"
));
conf.setExposingBrokerEntryMetadataToClientEnabled(true);
baseSetup();
}

Expand Down Expand Up @@ -222,43 +223,83 @@ public void testGetLastMessageId() throws Exception {
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(subscription)
.subscribe();
consumer.getLastMessageId();
}

@Test
public void testManagedLedgerTotalSize() throws Exception {
@Test(timeOut = 20000)
public void testConsumerGetBrokerEntryMetadataForIndividualMessage() throws Exception {
final String topic = newTopicName();
final int messages = 10;

admin.topics().createNonPartitionedTopic(topic);
admin.lookups().lookupTopic(topic);
final ManagedLedgerImpl managedLedger = pulsar.getBrokerService().getTopicIfExists(topic).get()
.map(topicObject -> (ManagedLedgerImpl) ((PersistentTopic) topicObject).getManagedLedger())
.orElse(null);
Assert.assertNotNull(managedLedger);
final ManagedCursor cursor = managedLedger.openCursor("cursor"); // prevent ledgers being removed
final String subscription = "my-sub";

@Cleanup
final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(subscription)
.subscribe();

long sendTime = System.currentTimeMillis();

final int messages = 10;
for (int i = 0; i < messages; i++) {
producer.send("msg-" + i);
producer.send(String.valueOf(i).getBytes());
}

Assert.assertTrue(managedLedger.getTotalSize() > 0);
for (int i = 0; i < messages; i++) {
Message<byte[]> received = consumer.receive();
Assert.assertTrue(
received.hasBrokerPublishTime() && received.getBrokerPublishTime().orElse(-1L) >= sendTime);
Assert.assertTrue(received.hasIndex() && received.getIndex().orElse(-1L) == i);
}

managedLedger.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
managedLedger.getConfig().setMaxEntriesPerLedger(1);
managedLedger.rollCurrentLedgerIfFull();
producer.close();
consumer.close();
}

Awaitility.await().atMost(Duration.ofSeconds(3))
.until(() -> managedLedger.getLedgersInfo().size() > 1);
@Test(timeOut = 20000)
public void testConsumerGetBrokerEntryMetadataForBatchMessage() throws Exception {
final String topic = newTopicName();
final String subscription = "my-sub";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MINUTES)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(subscription)
.subscribe();

long sendTime = System.currentTimeMillis();

final List<LedgerInfo> ledgerInfoList = managedLedger.getLedgersInfoAsList();
Assert.assertEquals(ledgerInfoList.size(), 2);
Assert.assertEquals(ledgerInfoList.get(0).getSize(), managedLedger.getTotalSize());
int numOfMessages;
// batch 1
for (numOfMessages = 0; numOfMessages < 5; numOfMessages++) {
producer.sendAsync(String.valueOf(numOfMessages).getBytes());
}
producer.flush();
// batch 2
for (; numOfMessages < 10; numOfMessages++) {
producer.sendAsync(String.valueOf(numOfMessages).getBytes());
}
producer.flush();

for (int i = 0; i < numOfMessages; i++) {
Message<byte[]> received = consumer.receive();
Assert.assertTrue(
received.hasBrokerPublishTime() && received.getBrokerPublishTime().orElse(-1L) >= sendTime);
Assert.assertTrue(received.hasIndex() && received.getIndex().orElse(-1L) == i);
}

cursor.close();
producer.close();
consumer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand Down Expand Up @@ -53,6 +54,8 @@ protected void cleanup() throws Exception {
public void testCompactedOutMessages() throws Exception {
final String topic1 = "persistent://my-property/my-ns/my-topic";

BrokerEntryMetadata brokerEntryMetadata = new BrokerEntryMetadata().setBrokerTimestamp(1).setBrokerTimestamp(1);

MessageMetadata metadata = new MessageMetadata()
.setProducerName("foobar")
.setSequenceId(1)
Expand All @@ -78,9 +81,8 @@ public void testCompactedOutMessages() throws Exception {
= (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic1)
.subscriptionName("my-subscriber-name").subscribe()) {
// shove it in the sideways
consumer.receiveIndividualMessagesFromBatch(metadata, 0, null, batchBuffer,
new MessageIdData().setLedgerId(1234)
.setEntryId(567), consumer.cnx());
consumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata, metadata, 0, null,
batchBuffer, new MessageIdData().setLedgerId(1234).setEntryId(567), consumer.cnx());
Message<?> m = consumer.receive();
assertEquals(((BatchMessageIdImpl)m.getMessageId()).getLedgerId(), 1234);
assertEquals(((BatchMessageIdImpl)m.getMessageId()).getEntryId(), 567);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,38 @@ default Optional<Schema<?>> getReaderSchema() {
* @since 2.8.0
*/
void release();

/**
* Check whether the message has a broker publish time
*
* @since 2.9.0
* @return true if the message has a broker publish time, otherwise false.
*/
boolean hasBrokerPublishTime();

/**
* Get broker publish time from broker entry metadata.
* Note that only if the feature is enabled in the broker then the value is available.
*
* @since 2.9.0
* @return broker publish time from broker entry metadata, or empty if the feature is not enabled in the broker.
*/
Optional<Long> getBrokerPublishTime();

/**
* Check whether the message has a index.
*
* @since 2.9.0
* @return true if the message has a index, otherwise false.
*/
boolean hasIndex();

/**
* Get index from broker entry metadata.
* Note that only if the feature is enabled in the broker then the value is available.
*
* @since 2.9.0
* @return index from broker entry metadata, or empty if the feature is not enabled in the broker.
*/
Optional<Long> getIndex();
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandAck.ValidationError;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
Expand Down Expand Up @@ -1000,8 +1001,10 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
return;
}

BrokerEntryMetadata brokerEntryMetadata;
MessageMetadata msgMetadata;
try {
brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayload);
msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
} catch (Throwable t) {
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
Expand Down Expand Up @@ -1069,6 +1072,7 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount,
poolMessages);
uncompressedPayload.release();
message.setBrokerEntryMetadata(brokerEntryMetadata);

// Enqueue the message so that it can be retrieved when application calls receive()
// if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
Expand All @@ -1087,7 +1091,7 @@ uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redelive
});
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount, ackSet, uncompressedPayload, messageId, cnx);
receiveIndividualMessagesFromBatch(brokerEntryMetadata, msgMetadata, redeliveryCount, ackSet, uncompressedPayload, messageId, cnx);

uncompressedPayload.release();
}
Expand Down Expand Up @@ -1228,8 +1232,9 @@ private void interceptAndComplete(final Message<T> message, final CompletableFut
completePendingReceive(receivedFuture, interceptMessage);
}

void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload,
MessageIdData messageId, ClientCnx cnx) {
void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata,
int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload,
MessageIdData messageId, ClientCnx cnx) {
int batchSize = msgMetadata.getNumMessagesInBatch();

// create ack tracker for entry aka batch
Expand Down Expand Up @@ -1289,6 +1294,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
final MessageImpl<T> message = MessageImpl.create(topicName.toString(), batchMessageIdImpl,
msgMetadata, singleMessageMetadata, singleMessagePayload,
createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount, poolMessages);
message.setBrokerEntryMetadata(brokerEntryMetadata);
if (possibleToDeadLetter != null) {
possibleToDeadLetter.add(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,14 @@ public static MessageImpl<byte[]> deserializeSkipBrokerEntryMetaData(
@SuppressWarnings("unchecked")
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();

BrokerEntryMetadata brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);
Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata);
msg.payload = headersAndPayloadWithBrokerEntryMetadata;
msg.messageId = null;
msg.topic = null;
msg.cnx = null;
msg.properties = Collections.emptyMap();
msg.brokerEntryMetadata = null;
msg.brokerEntryMetadata = brokerEntryMetadata;
return msg;
}

Expand Down Expand Up @@ -648,6 +649,37 @@ public void release() {
}
}

@Override
public boolean hasBrokerPublishTime() {
return brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp();
}

@Override
public Optional<Long> getBrokerPublishTime() {
if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
return Optional.of(brokerEntryMetadata.getBrokerTimestamp());
}
return Optional.empty();
}

@Override
public boolean hasIndex() {
return brokerEntryMetadata != null && brokerEntryMetadata.hasIndex();
}

@Override
public Optional<Long> getIndex() {
if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
if (msgMetadata.hasNumMessagesInBatch() && messageId instanceof BatchMessageIdImpl) {
int batchSize = ((BatchMessageIdImpl) messageId).getBatchSize();
int batchIndex = ((BatchMessageIdImpl) messageId).getBatchIndex();
return Optional.of(brokerEntryMetadata.getIndex() - batchSize + batchIndex + 1);
}
return Optional.of(brokerEntryMetadata.getIndex());
}
return Optional.empty();
}

private MessageImpl(Handle<MessageImpl<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
this.redeliveryCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,24 @@ public Optional<Schema<?>> getReaderSchema() {
public void release() {
msg.release();
}

@Override
public boolean hasBrokerPublishTime() {
return msg.hasBrokerPublishTime();
}

@Override
public Optional<Long> getBrokerPublishTime() {
return msg.getBrokerPublishTime();
}

@Override
public boolean hasIndex() {
return msg.hasIndex();
}

@Override
public Optional<Long> getIndex() {
return msg.getIndex();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;

Expand Down Expand Up @@ -178,9 +179,9 @@ protected void tryTriggerListener() {
}

@Override
void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount,
List<Long> ackSet,
ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx) {
void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata,
int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload,
MessageIdData messageId, ClientCnx cnx) {
log.warn(
"Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size",
subscription, consumerName);
Expand Down
3 changes: 2 additions & 1 deletion pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,9 @@ enum ProtocolVersion {
v14 = 14; // Add CommandAuthChallenge and CommandAuthResponse for mutual auth
// Added Key_Shared subscription
v15 = 15; // Add CommandGetOrCreateSchema and CommandGetOrCreateSchemaResponse
v16 = 16; // Add support for raw message metadata
v16 = 16; // Add support for broker entry metadata
v17 = 17; // Added support ack receipt
v18 = 18; // Add client support for broker entry metadata
}

message CommandConnect {
Expand Down

0 comments on commit 8689d7b

Please sign in to comment.