Skip to content

Commit

Permalink
Fixed KeyShared routing when messages are sent in batches of 1
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed May 29, 2020
1 parent ad2e39b commit 6ca3a75
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -129,18 +130,45 @@ public void resetCloseFuture() {
}

public static final String NONE_KEY = "NONE_KEY";

protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
metadataAndPayload.markReaderIndex();
int readerIndex = metadataAndPayload.readerIndex();
PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.resetReaderIndex();
String key = metadata.getPartitionKey();
if (log.isDebugEnabled()) {
log.debug("Parse message metadata, partition key is {}, ordering key is {}", key, metadata.getOrderingKey());
}
if (StringUtils.isNotBlank(key) || metadata.hasOrderingKey()) {
return metadata.hasOrderingKey() ? metadata.getOrderingKey().toByteArray() : key.getBytes();

try {
if (metadata.hasNumMessagesInBatch()) {
// If the message was part of a batch (eg: a batch of 1 message), we need
// to read the key from the first single-message-metadata entry
PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
.newBuilder();
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(metadataAndPayload,
singleMessageMetadataBuilder, 0, metadata.getNumMessagesInBatch());
try {
if (singleMessageMetadataBuilder.hasOrderingKey()) {
return singleMessageMetadataBuilder.getOrderingKey().toByteArray();
} else if (singleMessageMetadataBuilder.hasPartitionKey()) {
return singleMessageMetadataBuilder.getPartitionKey().getBytes();
}
} finally {
singleMessagePayload.release();
singleMessageMetadataBuilder.recycle();
}
} else {
// Message was not part of a batch
if (metadata.hasOrderingKey()) {
return metadata.getOrderingKey().toByteArray();
} else if (metadata.hasPartitionKey()) {
return metadata.getPartitionKey().getBytes();
}
}

return NONE_KEY.getBytes();
} catch (IOException e) {
// If we fail to deserialize medata, return null key
return NONE_KEY.getBytes();
} finally {
metadataAndPayload.readerIndex(readerIndex);
metadata.recycle();
}
metadata.recycle();
return NONE_KEY.getBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,43 @@ public void testSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(St
receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3));
}

@Test(dataProvider = "data")
public void testSendAndReceiveWithBatching(String topicType, boolean enableBatch)
throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(true);
String topic = topicType + "://public/default/key_shared-" + UUID.randomUUID();

@Cleanup
Consumer<Integer> consumer1 = createConsumer(topic);

@Cleanup
Consumer<Integer> consumer2 = createConsumer(topic);

@Cleanup
Consumer<Integer> consumer3 = createConsumer(topic);

@Cleanup
Producer<Integer> producer = createProducer(topic, enableBatch);

for (int i = 0; i < 1000; i++) {
// Send the same key twice so that we'll have a batch message
String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
producer.newMessage()
.key(key)
.value(i)
.sendAsync();

producer.newMessage()
.key(key)
.value(i)
.sendAsync();
}

producer.flush();

receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3));
}

@Test(dataProvider = "batch")
public void testSendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(true);
Expand Down

0 comments on commit 6ca3a75

Please sign in to comment.