From 3f1e3864e7f11a7772799303df004d69a319fe0e Mon Sep 17 00:00:00 2001 From: "jiang.pengcheng" Date: Thu, 6 Apr 2023 17:03:46 +0800 Subject: [PATCH 1/2] [improve][cli] Add `--cleanupSubscription` to pulsar-admin --- .../java/org/apache/pulsar/admin/cli/CmdFunctions.java | 6 ++++++ .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 7 +++++++ .../apache/pulsar/functions/utils/SinkConfigUtils.java | 10 ---------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 05bab9c6f198b..aabf664fa2b3f 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -201,6 +201,8 @@ abstract class FunctionDetailsCommand extends BaseCommand { protected String className; @Parameter(names = { "-t", "--function-type" }, description = "The built-in Pulsar Function type") protected String functionType; + @Parameter(names = { "--cleanup-subscription" }, description = "Whether delete the subscription when function is deleted") + protected Boolean cleanupSubscription; @Parameter(names = "--jar", description = "Path to the JAR file for the function " + "(if the function is written in Java). It also supports URL path [http/https/file " + "(file protocol assumes that file already exists on worker host)/function " @@ -471,6 +473,10 @@ void processArguments() throws Exception { } } + if (null != cleanupSubscription) { + functionConfig.setCleanupSubscription(cleanupSubscription); + } + if (null != inputs) { List inputTopics = Arrays.asList(inputs.split(",")); functionConfig.setInputs(inputTopics); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 4af9221dd2eab..d7f00d1a28d64 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -291,6 +291,9 @@ abstract class SinkDetailsCommand extends BaseCommand { @Parameter(names = { "-t", "--sink-type" }, description = "The sinks's connector provider") protected String sinkType; + @Parameter(names = { "--cleanup-subscription" }, description = "Whether delete the subscription when sink is deleted") + protected Boolean cleanupSubscription; + @Parameter(names = { "-i", "--inputs" }, description = "The sink's input topic or topics " + "(multiple topics can be specified as a comma-separated list)") @@ -469,6 +472,10 @@ void processArguments() throws Exception { sinkConfig.setProcessingGuarantees(processingGuarantees); } + if (null != cleanupSubscription) { + sinkConfig.setCleanupSubscription(cleanupSubscription); + } + if (retainOrdering != null) { sinkConfig.setRetainOrdering(retainOrdering); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index d79f787588c95..1a4009f5295e2 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -202,12 +202,6 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail sourceSpecBuilder.setNegativeAckRedeliveryDelayMs(sinkConfig.getNegativeAckRedeliveryDelayMs()); } - if (sinkConfig.getCleanupSubscription() != null) { - sourceSpecBuilder.setCleanupSubscription(sinkConfig.getCleanupSubscription()); - } else { - sourceSpecBuilder.setCleanupSubscription(true); - } - if (sinkConfig.getSourceSubscriptionPosition() == SubscriptionInitialPosition.Earliest) { sourceSpecBuilder.setSubscriptionPosition(Function.SubscriptionPosition.EARLIEST); } else { @@ -329,7 +323,6 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) { // Set subscription position sinkConfig.setSourceSubscriptionPosition( convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition())); - sinkConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription()); if (functionDetails.getSource().getTimeoutMs() != 0) { sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs()); @@ -671,9 +664,6 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne if (!StringUtils.isEmpty(newConfig.getCustomRuntimeOptions())) { mergedConfig.setCustomRuntimeOptions(newConfig.getCustomRuntimeOptions()); } - if (newConfig.getCleanupSubscription() != null) { - mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription()); - } if (newConfig.getTransformFunction() != null) { mergedConfig.setTransformFunction(newConfig.getTransformFunction()); } From 762dda15b8f320151dd2420ec674ee1f2b36d837 Mon Sep 17 00:00:00 2001 From: "jiang.pengcheng" Date: Tue, 11 Apr 2023 15:20:51 +0800 Subject: [PATCH 2/2] Fix style --- .../main/java/org/apache/pulsar/admin/cli/CmdFunctions.java | 3 ++- .../src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index aabf664fa2b3f..91ec7183ff13c 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -201,7 +201,8 @@ abstract class FunctionDetailsCommand extends BaseCommand { protected String className; @Parameter(names = { "-t", "--function-type" }, description = "The built-in Pulsar Function type") protected String functionType; - @Parameter(names = { "--cleanup-subscription" }, description = "Whether delete the subscription when function is deleted") + @Parameter(names = "--cleanup-subscription", description = "Whether delete the subscription " + + "when function is deleted") protected Boolean cleanupSubscription; @Parameter(names = "--jar", description = "Path to the JAR file for the function " + "(if the function is written in Java). It also supports URL path [http/https/file " diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index d7f00d1a28d64..0b27dd8d0a737 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -291,7 +291,8 @@ abstract class SinkDetailsCommand extends BaseCommand { @Parameter(names = { "-t", "--sink-type" }, description = "The sinks's connector provider") protected String sinkType; - @Parameter(names = { "--cleanup-subscription" }, description = "Whether delete the subscription when sink is deleted") + @Parameter(names = "--cleanup-subscription", description = "Whether delete the subscription " + + "when sink is deleted") protected Boolean cleanupSubscription; @Parameter(names = { "-i",