diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index c8da4209033f54..44626dd7b3a4a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; +import java.io.IOException; import java.util.Collections; import java.util.List; @@ -129,18 +130,45 @@ public void resetCloseFuture() { } public static final String NONE_KEY = "NONE_KEY"; + protected byte[] peekStickyKey(ByteBuf metadataAndPayload) { - metadataAndPayload.markReaderIndex(); + int readerIndex = metadataAndPayload.readerIndex(); PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); - metadataAndPayload.resetReaderIndex(); - String key = metadata.getPartitionKey(); - if (log.isDebugEnabled()) { - log.debug("Parse message metadata, partition key is {}, ordering key is {}", key, metadata.getOrderingKey()); - } - if (StringUtils.isNotBlank(key) || metadata.hasOrderingKey()) { - return metadata.hasOrderingKey() ? metadata.getOrderingKey().toByteArray() : key.getBytes(); + + try { + if (metadata.hasNumMessagesInBatch()) { + // If the message was part of a batch (eg: a batch of 1 message), we need + // to read the key from the first single-message-metadata entry + PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata + .newBuilder(); + ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(metadataAndPayload, + singleMessageMetadataBuilder, 0, metadata.getNumMessagesInBatch()); + try { + if (singleMessageMetadataBuilder.hasOrderingKey()) { + return singleMessageMetadataBuilder.getOrderingKey().toByteArray(); + } else if (singleMessageMetadataBuilder.hasPartitionKey()) { + return singleMessageMetadataBuilder.getPartitionKey().getBytes(); + } + } finally { + singleMessagePayload.release(); + singleMessageMetadataBuilder.recycle(); + } + } else { + // Message was not part of a batch + if (metadata.hasOrderingKey()) { + return metadata.getOrderingKey().toByteArray(); + } else if (metadata.hasPartitionKey()) { + return metadata.getPartitionKey().getBytes(); + } + } + + return NONE_KEY.getBytes(); + } catch (IOException e) { + // If we fail to deserialize medata, return null key + return NONE_KEY.getBytes(); + } finally { + metadataAndPayload.readerIndex(readerIndex); + metadata.recycle(); } - metadata.recycle(); - return NONE_KEY.getBytes(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index b5b6271d6fa7d1..4ae57b3ff781cc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -121,6 +121,43 @@ public void testSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(St receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3)); } + @Test(dataProvider = "data") + public void testSendAndReceiveWithBatching(String topicType, boolean enableBatch) + throws PulsarClientException { + this.conf.setSubscriptionKeySharedEnable(true); + String topic = topicType + "://public/default/key_shared-" + UUID.randomUUID(); + + @Cleanup + Consumer consumer1 = createConsumer(topic); + + @Cleanup + Consumer consumer2 = createConsumer(topic); + + @Cleanup + Consumer consumer3 = createConsumer(topic); + + @Cleanup + Producer producer = createProducer(topic, enableBatch); + + for (int i = 0; i < 1000; i++) { + // Send the same key twice so that we'll have a batch message + String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS)); + producer.newMessage() + .key(key) + .value(i) + .sendAsync(); + + producer.newMessage() + .key(key) + .value(i) + .sendAsync(); + } + + producer.flush(); + + receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3)); + } + @Test(dataProvider = "batch") public void testSendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException { this.conf.setSubscriptionKeySharedEnable(true);