From 315c4af9272cd61cb6f88d8d5d37031d18a35b78 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Fri, 18 Oct 2024 19:31:48 +0800 Subject: [PATCH] [improve][test] Added message properties tests for batch and non-batch messages (#23473) (cherry picked from commit 8de27a2116596ce63d7cd09534b168dbb64f64da) (cherry picked from commit 986a4db22f46d1bbeb878a7133863b192157ec59) --- .../broker/admin/PersistentTopicsTest.java | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 42912d17c1c2b..bf4c1f938f035 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -1893,4 +1893,76 @@ public void testCreateMissingPartitions() throws Exception { String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testCreateMissingPartitions"; assertThrows(PulsarAdminException.NotFoundException.class, () -> admin.topics().createMissedPartitions(topicName)); } + + @Test + public void testPeekMessageWithProperties() throws Exception { + String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testPeekMessageWithProperties"; + admin.topics().createNonPartitionedTopic(topicName); + + // Test non-batch messages + @Cleanup + Producer nonBatchProducer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(false) + .create(); + + Map props1 = new HashMap<>(); + props1.put("key1", "value1"); + props1.put("KEY2", "VALUE2"); + props1.put("KeY3", "VaLuE3"); + + nonBatchProducer.newMessage() + .properties(props1) + .value("non-batch-message") + .send(); + + Message peekedMessage = admin.topics().peekMessages(topicName, "sub-peek", 1).get(0); + assertEquals(new String(peekedMessage.getData()), "non-batch-message"); + assertEquals(peekedMessage.getProperties().size(), 3); + assertEquals(peekedMessage.getProperties().get("key1"), "value1"); + assertEquals(peekedMessage.getProperties().get("KEY2"), "VALUE2"); + assertEquals(peekedMessage.getProperties().get("KeY3"), "VaLuE3"); + + admin.topics().truncate(topicName); + + // Test batch messages + @Cleanup + Producer batchProducer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(true) + .batchingMaxPublishDelay(1, TimeUnit.SECONDS) + .batchingMaxMessages(2) + .create(); + + Map props2 = new HashMap<>(); + props2.put("batch-key1", "batch-value1"); + props2.put("BATCH-KEY2", "BATCH-VALUE2"); + props2.put("BaTcH-kEy3", "BaTcH-vAlUe3"); + + batchProducer.newMessage() + .properties(props2) + .value("batch-message-1") + .sendAsync(); + + batchProducer.newMessage() + .properties(props2) + .value("batch-message-2") + .send(); + + List> peekedMessages = admin.topics().peekMessages(topicName, "sub-peek", 2); + assertEquals(peekedMessages.size(), 2); + + for (int i = 0; i < 2; i++) { + Message batchMessage = peekedMessages.get(i); + assertEquals(new String(batchMessage.getData()), "batch-message-" + (i + 1)); + assertEquals(batchMessage.getProperties().size(), + 3 + 2 // 3 properties from the message + 2 properties from the batch + ); + assertEquals(batchMessage.getProperties().get("X-Pulsar-num-batch-message"), "2"); + assertNotNull(batchMessage.getProperties().get("X-Pulsar-batch-size")); + assertEquals(batchMessage.getProperties().get("batch-key1"), "batch-value1"); + assertEquals(batchMessage.getProperties().get("BATCH-KEY2"), "BATCH-VALUE2"); + assertEquals(batchMessage.getProperties().get("BaTcH-kEy3"), "BaTcH-vAlUe3"); + } + } }