Skip to content

Commit

Permalink
[improve][cli] Add --cleanupSubscription to pulsar-admin
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangpengcheng committed Apr 11, 2023
1 parent 81971e2 commit 3f1e386
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ abstract class FunctionDetailsCommand extends BaseCommand {
protected String className;
@Parameter(names = { "-t", "--function-type" }, description = "The built-in Pulsar Function type")
protected String functionType;
@Parameter(names = { "--cleanup-subscription" }, description = "Whether delete the subscription when function is deleted")
protected Boolean cleanupSubscription;
@Parameter(names = "--jar", description = "Path to the JAR file for the function "
+ "(if the function is written in Java). It also supports URL path [http/https/file "
+ "(file protocol assumes that file already exists on worker host)/function "
Expand Down Expand Up @@ -471,6 +473,10 @@ void processArguments() throws Exception {
}
}

if (null != cleanupSubscription) {
functionConfig.setCleanupSubscription(cleanupSubscription);
}

if (null != inputs) {
List<String> inputTopics = Arrays.asList(inputs.split(","));
functionConfig.setInputs(inputTopics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ abstract class SinkDetailsCommand extends BaseCommand {
@Parameter(names = { "-t", "--sink-type" }, description = "The sinks's connector provider")
protected String sinkType;

@Parameter(names = { "--cleanup-subscription" }, description = "Whether delete the subscription when sink is deleted")
protected Boolean cleanupSubscription;

@Parameter(names = { "-i",
"--inputs" }, description = "The sink's input topic or topics "
+ "(multiple topics can be specified as a comma-separated list)")
Expand Down Expand Up @@ -469,6 +472,10 @@ void processArguments() throws Exception {
sinkConfig.setProcessingGuarantees(processingGuarantees);
}

if (null != cleanupSubscription) {
sinkConfig.setCleanupSubscription(cleanupSubscription);
}

if (retainOrdering != null) {
sinkConfig.setRetainOrdering(retainOrdering);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,6 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
sourceSpecBuilder.setNegativeAckRedeliveryDelayMs(sinkConfig.getNegativeAckRedeliveryDelayMs());
}

if (sinkConfig.getCleanupSubscription() != null) {
sourceSpecBuilder.setCleanupSubscription(sinkConfig.getCleanupSubscription());
} else {
sourceSpecBuilder.setCleanupSubscription(true);
}

if (sinkConfig.getSourceSubscriptionPosition() == SubscriptionInitialPosition.Earliest) {
sourceSpecBuilder.setSubscriptionPosition(Function.SubscriptionPosition.EARLIEST);
} else {
Expand Down Expand Up @@ -329,7 +323,6 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
// Set subscription position
sinkConfig.setSourceSubscriptionPosition(
convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition()));
sinkConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());

if (functionDetails.getSource().getTimeoutMs() != 0) {
sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
Expand Down Expand Up @@ -671,9 +664,6 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne
if (!StringUtils.isEmpty(newConfig.getCustomRuntimeOptions())) {
mergedConfig.setCustomRuntimeOptions(newConfig.getCustomRuntimeOptions());
}
if (newConfig.getCleanupSubscription() != null) {
mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription());
}
if (newConfig.getTransformFunction() != null) {
mergedConfig.setTransformFunction(newConfig.getTransformFunction());
}
Expand Down

0 comments on commit 3f1e386

Please sign in to comment.