From 8689d7b4dbf54226010fdc2e28f6c5fda4c8e2b6 Mon Sep 17 00:00:00 2001 From: Bowen Li <27091925@qq.com> Date: Mon, 23 Aug 2021 20:25:46 +0800 Subject: [PATCH] Exposing the broker entry metadata to client (#11553) This is an improvement to [PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata). Now the client is able to get the broker entry metadata. 1. increment the client protocol version from 17 to 18. 2. add a configuration `enableExposingBrokerEntryMetadataToClient` in the broker configuration. 3. Let the broker send the broker entry metadata to the client when `client protocol version >= 18` and `enableExposingBrokerEntryMetadataToClient` is true. 4. Add client support for broker entry metadata. Add two methods `getBrokerPublishTime()` and `getIndex()` to utilize the broker entry metadata in `Message`. (cherry picked from commit a7a37219aba89995f68448d2b011e192f75a916a) --- .../pulsar/broker/ServiceConfiguration.java | 5 ++ .../service/PulsarCommandSenderImpl.java | 10 ++- .../pulsar/broker/service/ServerCnx.java | 1 + .../service/BrokerEntryMetadataE2ETest.java | 89 ++++++++++++++----- .../impl/CompactedOutBatchMessageTest.java | 8 +- .../org/apache/pulsar/client/api/Message.java | 34 +++++++ .../pulsar/client/impl/ConsumerImpl.java | 12 ++- .../pulsar/client/impl/MessageImpl.java | 34 ++++++- .../pulsar/client/impl/TopicMessageImpl.java | 20 +++++ .../client/impl/ZeroQueueConsumerImpl.java | 7 +- pulsar-common/src/main/proto/PulsarApi.proto | 3 +- 11 files changed, 186 insertions(+), 37 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 139edcf40d691..3b125615f748d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1071,6 +1071,11 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "List of interceptors for entry metadata.") private Set brokerEntryMetadataInterceptors = new HashSet<>(); + @FieldContext( + category = CATEGORY_SERVER, + doc = "Enable or disable exposing broker entry metadata to client.") + private boolean exposingBrokerEntryMetadataToClientEnabled = false; + @FieldContext( category = CATEGORY_SERVER, doc = "Enable namespaceIsolation policy update take effect ontime or not," + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java index 35a026ffc3601..883f30d5ceded 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java @@ -243,8 +243,14 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, // increment ref-count of data and release at the end of process: // so, we can get chance to call entry.release metadataAndPayload.retain(); - // skip raw message metadata since broker timestamp only used in broker side - Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload); + // skip broker entry metadata if consumer-client doesn't support broker entry metadata or the + // features is not enabled + if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue() + || !cnx.supportBrokerMetadata() + || !cnx.getBrokerService().getPulsar().getConfig() + .isExposingBrokerEntryMetadataToClientEnabled()) { + Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload); + } // skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getValue()) { Commands.skipChecksumIfPresent(metadataAndPayload); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index b0ff87f8d70c0..76b6014b15efe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -218,6 +218,7 @@ protected Set initialValue() throws Exception { } }; + enum State { Start, Connected, Failed, Connecting } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java index b87353d95f8a4..01159b47f485e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java @@ -68,6 +68,7 @@ protected void setup() throws Exception { "org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor", "org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor" )); + conf.setExposingBrokerEntryMetadataToClientEnabled(true); baseSetup(); } @@ -222,43 +223,83 @@ public void testGetLastMessageId() throws Exception { .subscriptionType(SubscriptionType.Exclusive) .subscriptionName(subscription) .subscribe(); - consumer.getLastMessageId(); } - @Test - public void testManagedLedgerTotalSize() throws Exception { + @Test(timeOut = 20000) + public void testConsumerGetBrokerEntryMetadataForIndividualMessage() throws Exception { final String topic = newTopicName(); - final int messages = 10; - - admin.topics().createNonPartitionedTopic(topic); - admin.lookups().lookupTopic(topic); - final ManagedLedgerImpl managedLedger = pulsar.getBrokerService().getTopicIfExists(topic).get() - .map(topicObject -> (ManagedLedgerImpl) ((PersistentTopic) topicObject).getManagedLedger()) - .orElse(null); - Assert.assertNotNull(managedLedger); - final ManagedCursor cursor = managedLedger.openCursor("cursor"); // prevent ledgers being removed + final String subscription = "my-sub"; @Cleanup - final Producer producer = pulsarClient.newProducer(Schema.STRING) + Producer producer = pulsarClient.newProducer() .topic(topic) + .enableBatching(false) .create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName(subscription) + .subscribe(); + + long sendTime = System.currentTimeMillis(); + + final int messages = 10; for (int i = 0; i < messages; i++) { - producer.send("msg-" + i); + producer.send(String.valueOf(i).getBytes()); } - Assert.assertTrue(managedLedger.getTotalSize() > 0); + for (int i = 0; i < messages; i++) { + Message received = consumer.receive(); + Assert.assertTrue( + received.hasBrokerPublishTime() && received.getBrokerPublishTime().orElse(-1L) >= sendTime); + Assert.assertTrue(received.hasIndex() && received.getIndex().orElse(-1L) == i); + } - managedLedger.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS); - managedLedger.getConfig().setMaxEntriesPerLedger(1); - managedLedger.rollCurrentLedgerIfFull(); + producer.close(); + consumer.close(); + } - Awaitility.await().atMost(Duration.ofSeconds(3)) - .until(() -> managedLedger.getLedgersInfo().size() > 1); + @Test(timeOut = 20000) + public void testConsumerGetBrokerEntryMetadataForBatchMessage() throws Exception { + final String topic = newTopicName(); + final String subscription = "my-sub"; + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(true) + .batchingMaxPublishDelay(1, TimeUnit.MINUTES) + .create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName(subscription) + .subscribe(); + + long sendTime = System.currentTimeMillis(); - final List ledgerInfoList = managedLedger.getLedgersInfoAsList(); - Assert.assertEquals(ledgerInfoList.size(), 2); - Assert.assertEquals(ledgerInfoList.get(0).getSize(), managedLedger.getTotalSize()); + int numOfMessages; + // batch 1 + for (numOfMessages = 0; numOfMessages < 5; numOfMessages++) { + producer.sendAsync(String.valueOf(numOfMessages).getBytes()); + } + producer.flush(); + // batch 2 + for (; numOfMessages < 10; numOfMessages++) { + producer.sendAsync(String.valueOf(numOfMessages).getBytes()); + } + producer.flush(); + + for (int i = 0; i < numOfMessages; i++) { + Message received = consumer.receive(); + Assert.assertTrue( + received.hasBrokerPublishTime() && received.getBrokerPublishTime().orElse(-1L) >= sendTime); + Assert.assertTrue(received.hasIndex() && received.getIndex().orElse(-1L) == i); + } - cursor.close(); + producer.close(); + consumer.close(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java index f074847392f9f..2358e01dec74c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -53,6 +54,8 @@ protected void cleanup() throws Exception { public void testCompactedOutMessages() throws Exception { final String topic1 = "persistent://my-property/my-ns/my-topic"; + BrokerEntryMetadata brokerEntryMetadata = new BrokerEntryMetadata().setBrokerTimestamp(1).setBrokerTimestamp(1); + MessageMetadata metadata = new MessageMetadata() .setProducerName("foobar") .setSequenceId(1) @@ -78,9 +81,8 @@ public void testCompactedOutMessages() throws Exception { = (ConsumerImpl) pulsarClient.newConsumer().topic(topic1) .subscriptionName("my-subscriber-name").subscribe()) { // shove it in the sideways - consumer.receiveIndividualMessagesFromBatch(metadata, 0, null, batchBuffer, - new MessageIdData().setLedgerId(1234) - .setEntryId(567), consumer.cnx()); + consumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata, metadata, 0, null, + batchBuffer, new MessageIdData().setLedgerId(1234).setEntryId(567), consumer.cnx()); Message m = consumer.receive(); assertEquals(((BatchMessageIdImpl)m.getMessageId()).getLedgerId(), 1234); assertEquals(((BatchMessageIdImpl)m.getMessageId()).getEntryId(), 567); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java index b456373a8feb8..c033e467917ca 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java @@ -245,4 +245,38 @@ default Optional> getReaderSchema() { * @since 2.8.0 */ void release(); + + /** + * Check whether the message has a broker publish time + * + * @since 2.9.0 + * @return true if the message has a broker publish time, otherwise false. + */ + boolean hasBrokerPublishTime(); + + /** + * Get broker publish time from broker entry metadata. + * Note that only if the feature is enabled in the broker then the value is available. + * + * @since 2.9.0 + * @return broker publish time from broker entry metadata, or empty if the feature is not enabled in the broker. + */ + Optional getBrokerPublishTime(); + + /** + * Check whether the message has a index. + * + * @since 2.9.0 + * @return true if the message has a index, otherwise false. + */ + boolean hasIndex(); + + /** + * Get index from broker entry metadata. + * Note that only if the feature is enabled in the broker then the value is available. + * + * @since 2.9.0 + * @return index from broker entry metadata, or empty if the feature is not enabled in the broker. + */ + Optional getIndex(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index bbe8f26ec0b97..18fc9bae95a3c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -84,6 +84,7 @@ import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; +import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandAck.ValidationError; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; @@ -1000,8 +1001,10 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List ac return; } + BrokerEntryMetadata brokerEntryMetadata; MessageMetadata msgMetadata; try { + brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayload); msgMetadata = Commands.parseMessageMetadata(headersAndPayload); } catch (Throwable t) { discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch); @@ -1069,6 +1072,7 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List ac uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount, poolMessages); uncompressedPayload.release(); + message.setBrokerEntryMetadata(brokerEntryMetadata); // Enqueue the message so that it can be retrieved when application calls receive() // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it. @@ -1087,7 +1091,7 @@ uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redelive }); } else { // handle batch message enqueuing; uncompressed payload has all messages in batch - receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount, ackSet, uncompressedPayload, messageId, cnx); + receiveIndividualMessagesFromBatch(brokerEntryMetadata, msgMetadata, redeliveryCount, ackSet, uncompressedPayload, messageId, cnx); uncompressedPayload.release(); } @@ -1228,8 +1232,9 @@ private void interceptAndComplete(final Message message, final CompletableFut completePendingReceive(receivedFuture, interceptMessage); } - void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, List ackSet, ByteBuf uncompressedPayload, - MessageIdData messageId, ClientCnx cnx) { + void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata, + int redeliveryCount, List ackSet, ByteBuf uncompressedPayload, + MessageIdData messageId, ClientCnx cnx) { int batchSize = msgMetadata.getNumMessagesInBatch(); // create ack tracker for entry aka batch @@ -1289,6 +1294,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv final MessageImpl message = MessageImpl.create(topicName.toString(), batchMessageIdImpl, msgMetadata, singleMessageMetadata, singleMessagePayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount, poolMessages); + message.setBrokerEntryMetadata(brokerEntryMetadata); if (possibleToDeadLetter != null) { possibleToDeadLetter.add(message); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index bf25c068903a5..2a70e8ee12d30 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -295,13 +295,14 @@ public static MessageImpl deserializeSkipBrokerEntryMetaData( @SuppressWarnings("unchecked") MessageImpl msg = (MessageImpl) RECYCLER.get(); + BrokerEntryMetadata brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata); Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata); msg.payload = headersAndPayloadWithBrokerEntryMetadata; msg.messageId = null; msg.topic = null; msg.cnx = null; msg.properties = Collections.emptyMap(); - msg.brokerEntryMetadata = null; + msg.brokerEntryMetadata = brokerEntryMetadata; return msg; } @@ -648,6 +649,37 @@ public void release() { } } + @Override + public boolean hasBrokerPublishTime() { + return brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp(); + } + + @Override + public Optional getBrokerPublishTime() { + if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) { + return Optional.of(brokerEntryMetadata.getBrokerTimestamp()); + } + return Optional.empty(); + } + + @Override + public boolean hasIndex() { + return brokerEntryMetadata != null && brokerEntryMetadata.hasIndex(); + } + + @Override + public Optional getIndex() { + if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) { + if (msgMetadata.hasNumMessagesInBatch() && messageId instanceof BatchMessageIdImpl) { + int batchSize = ((BatchMessageIdImpl) messageId).getBatchSize(); + int batchIndex = ((BatchMessageIdImpl) messageId).getBatchIndex(); + return Optional.of(brokerEntryMetadata.getIndex() - batchSize + batchIndex + 1); + } + return Optional.of(brokerEntryMetadata.getIndex()); + } + return Optional.empty(); + } + private MessageImpl(Handle> recyclerHandle) { this.recyclerHandle = recyclerHandle; this.redeliveryCount = 0; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index e41d1b2fe1982..d33c3cdffc3ca 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -199,4 +199,24 @@ public Optional> getReaderSchema() { public void release() { msg.release(); } + + @Override + public boolean hasBrokerPublishTime() { + return msg.hasBrokerPublishTime(); + } + + @Override + public Optional getBrokerPublishTime() { + return msg.getBrokerPublishTime(); + } + + @Override + public boolean hasIndex() { + return msg.hasIndex(); + } + + @Override + public Optional getIndex() { + return msg.getIndex(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index 66ba0d47ae754..6375e37a89cc9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -178,9 +179,9 @@ protected void tryTriggerListener() { } @Override - void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, - List ackSet, - ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx) { + void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata, + int redeliveryCount, List ackSet, ByteBuf uncompressedPayload, + MessageIdData messageId, ClientCnx cnx) { log.warn( "Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size", subscription, consumerName); diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index cabd099633776..320c51c920b1d 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -254,8 +254,9 @@ enum ProtocolVersion { v14 = 14; // Add CommandAuthChallenge and CommandAuthResponse for mutual auth // Added Key_Shared subscription v15 = 15; // Add CommandGetOrCreateSchema and CommandGetOrCreateSchemaResponse - v16 = 16; // Add support for raw message metadata + v16 = 16; // Add support for broker entry metadata v17 = 17; // Added support ack receipt + v18 = 18; // Add client support for broker entry metadata } message CommandConnect {