From 024ff7574b55104b2460c7969eb127577bfb54dc Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Fri, 22 Nov 2024 09:05:01 +0800 Subject: [PATCH] [fix][fn] ack messages for window function when its result is null (#23618) --- .../windowing/WindowFunctionExecutor.java | 7 +++++++ .../functions/PulsarFunctionsTest.java | 20 ++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java index c6ca4e65d33c0..1e492d74aa605 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java @@ -238,6 +238,13 @@ private void processWindow(Context context, List> tuples, List record : tuples) { + record.ack(); + } + } } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index b78a832f60933..694dcba5eaf61 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.fasterxml.jackson.databind.JsonNode; @@ -322,7 +323,17 @@ protected void testWindowFunction(String type, String[] expectedResults) throws .enableBatching(false) .create(); - for (int i = 0; i < NUM_OF_MESSAGES; i++) { + // send 3 messages first, and it won't trigger the window and so these 3 messages will not be acked + for (int i = 0; i < 3; i++) { + producer.send(String.format("%d", i).getBytes()); + } + TopicStats stats = pulsarAdmin.topics().getStats(inputTopicName, true); + SubscriptionStats subStats = stats.getSubscriptions().get("public/default/" + functionName); + assertNotNull(subStats); + assertEquals(3, subStats.getMsgBacklog()); + assertEquals(3, subStats.getUnackedMessages()); + + for (int i = 3; i < NUM_OF_MESSAGES; i++) { producer.send(String.format("%d", i).getBytes()); } @@ -348,6 +359,13 @@ protected void testWindowFunction(String type, String[] expectedResults) throws // in case last commit is not updated assertThat(i).isGreaterThanOrEqualTo(expectedResults.length - 1); + // test that all messages are acked + stats = pulsarAdmin.topics().getStats(inputTopicName, true); + subStats = stats.getSubscriptions().get("public/default/" + functionName); + assertNotNull(subStats); + assertEquals(0, subStats.getMsgBacklog()); + assertEquals(0, subStats.getUnackedMessages()); + deleteFunction(functionName); getFunctionInfoNotFound(functionName);