From 2b0c2eaf0c28e3788c9a4759d2ad402e6a63aeda Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 1 Nov 2024 16:07:34 +0800 Subject: [PATCH 1/2] [improve][io] Support update subscription position for sink connector --- .../org/apache/pulsar/functions/utils/SinkConfigUtils.java | 4 +++- .../apache/pulsar/functions/utils/SinkConfigUtilsTest.java | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 6631c053fac49..65b6b97fc6ee9 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -724,7 +724,9 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne if (newConfig.getTransformFunctionConfig() != null) { mergedConfig.setTransformFunctionConfig(newConfig.getTransformFunctionConfig()); } - + if (newConfig.getSourceSubscriptionPosition() != null) { + mergedConfig.setSourceSubscriptionPosition(newConfig.getSourceSubscriptionPosition()); + } return mergedConfig; } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java index 5c2b6d92b9366..9657183f701ba 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java @@ -38,6 +38,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; @@ -565,6 +566,7 @@ private SinkConfig createSinkConfig() { inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build()); sinkConfig.setInputSpecs(inputSpecs); sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + sinkConfig.setSourceSubscriptionPosition(SubscriptionInitialPosition.Earliest); sinkConfig.setRetainOrdering(false); sinkConfig.setRetainKeyOrdering(false); sinkConfig.setConfigs(new HashMap<>()); From 10d457b9dbbdd5b4c22e80c56d84a084beaed143 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 1 Nov 2024 16:45:00 +0800 Subject: [PATCH 2/2] Add unit test --- .../functions/utils/SinkConfigUtilsTest.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java index 9657183f701ba..c4c79a635eac0 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java @@ -225,6 +225,18 @@ public void testCleanSubscriptionField() throws IOException { } } + @Test + public void testUpdateSubscriptionPosition() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createSinkConfig(); + newSinkConfig.setSourceSubscriptionPosition(SubscriptionInitialPosition.Earliest); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + new Gson().toJson(newSinkConfig), + new Gson().toJson(mergedConfig) + ); + } + @Test public void testMergeEqual() { SinkConfig sinkConfig = createSinkConfig(); @@ -566,7 +578,7 @@ private SinkConfig createSinkConfig() { inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build()); sinkConfig.setInputSpecs(inputSpecs); sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); - sinkConfig.setSourceSubscriptionPosition(SubscriptionInitialPosition.Earliest); + sinkConfig.setSourceSubscriptionPosition(SubscriptionInitialPosition.Latest); sinkConfig.setRetainOrdering(false); sinkConfig.setRetainKeyOrdering(false); sinkConfig.setConfigs(new HashMap<>());