Skip to content

Commit

Permalink
[improve][offload]keep topic/ns set-offload-policies consistent behav…
Browse files Browse the repository at this point in the history
…ior logic (#20646)

Co-authored-by: tison <wander4096@gmail.com>
  • Loading branch information
ethqunzhong and tisonkun authored Jun 29, 2023
1 parent 9cf17c6 commit 204905e
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,33 @@ public void tenants() throws Exception {
}

@Test
public void namespacesSetOffloadPolicies() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Namespaces mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
Lookup mockLookup = mock(Lookup.class);
when(admin.lookups()).thenReturn(mockLookup);

// filesystem offload
CmdNamespaces namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split(
"set-offload-policies myprop/clust/ns2 -d filesystem -oat 100M -oats 1h -oae 1h -orp bookkeeper-first"));
verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns2",
OffloadPoliciesImpl.create("filesystem", null, null,
null, null, null, null, null, 64 * 1024 * 1024, 1024 * 1024,
100 * 1024 * 1024L, 3600L, 3600 * 1000L, OffloadedReadPriority.BOOKKEEPER_FIRST));

// S3 offload
CmdNamespaces namespaces2 = new CmdNamespaces(() -> admin);
namespaces2.run(split(
"set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oats 100 -oae 10s -orp tiered-storage-first"));
verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket",
"http://test.endpoint",null, null, null, null, 32 * 1024 * 1024, 5 * 1024 * 1024,
10 * 1024 * 1024L, 100L, 10000L, OffloadedReadPriority.TIERED_STORAGE_FIRST));
}

@Test
public void namespaces() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Namespaces mockNamespaces = mock(Namespaces.class);
Expand Down Expand Up @@ -1455,6 +1482,34 @@ public void topicPolicies() throws Exception {
verify(mockGlobalTopicsPolicies).removeAutoSubscriptionCreation("persistent://prop/clust/ns1/ds1");
}

@Test
public void topicsSetOffloadPolicies() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Topics mockTopics = mock(Topics.class);
when(admin.topics()).thenReturn(mockTopics);
Schemas mockSchemas = mock(Schemas.class);
when(admin.schemas()).thenReturn(mockSchemas);
Lookup mockLookup = mock(Lookup.class);
when(admin.lookups()).thenReturn(mockLookup);

// filesystem offload
CmdTopics cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d filesystem -oat 100M -oats 1h -oae 1h -orp bookkeeper-first"));
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("filesystem", null, null
, null, null, null, null, null, 64 * 1024 * 1024, 1024 * 1024,
100 * 1024 * 1024L, 3600L, 3600 * 1000L, OffloadedReadPriority.BOOKKEEPER_FIRST);
verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1", offloadPolicies);

// S3 offload
CmdTopics cmdTopics2 = new CmdTopics(() -> admin);
cmdTopics2.run(split("set-offload-policies persistent://myprop/clust/ns1/ds2 -d s3 -r region -b bucket -e endpoint -ts 50 -m 8 -rb 9 -t 10 -orp tiered-storage-first"));
OffloadPoliciesImpl offloadPolicies2 = OffloadPoliciesImpl.create("s3", "region", "bucket"
, "endpoint", null, null, null, null,
8, 9, 10L, 50L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST);
verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds2", offloadPolicies2);
}


@Test
public void topics() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2229,7 +2229,7 @@ private class SetOffloadPolicies extends CliCommand {
@Parameter(
names = {"--bucket", "-b"},
description = "Bucket to place offloaded ledger into",
required = true)
required = false)
private String bucket;

@Parameter(
Expand Down Expand Up @@ -2265,7 +2265,8 @@ private class SetOffloadPolicies extends CliCommand {

@Parameter(
names = {"--maxBlockSize", "-mbs"},
description = "Max block size (eg: 32M, 64M), default is 64MB",
description = "Max block size (eg: 32M, 64M), default is 64MB"
+ "s3 and google-cloud-storage requires this parameter",
required = false)
private String maxBlockSizeStr;

Expand All @@ -2277,7 +2278,8 @@ private class SetOffloadPolicies extends CliCommand {

@Parameter(
names = {"--offloadAfterElapsed", "-oae"},
description = "Offload after elapsed in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w).",
description = "Delay time in Millis for deleting the bookkeeper ledger after offload "
+ "(or seconds,minutes,hours,days,weeks eg: 10s, 100m, 3h, 2d, 5w).",
required = false)
private String offloadAfterElapsedStr;

Expand All @@ -2289,7 +2291,7 @@ private class SetOffloadPolicies extends CliCommand {

@Parameter(
names = {"--offloadAfterThresholdInSeconds", "-oats"},
description = "Offload after threshold seconds (eg: 1,5,10)",
description = "Offload after threshold seconds (or minutes,hours,days,weeks eg: 100m, 3h, 2d, 5w).",
required = false
)
private String offloadAfterThresholdInSecondsStr;
Expand Down Expand Up @@ -2390,7 +2392,13 @@ && maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, Long.MAX_VALUE)) {

Long offloadThresholdInSeconds = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS;
if (StringUtils.isNotEmpty(offloadAfterThresholdInSecondsStr)) {
long offloadThresholdInSeconds0 = Long.parseLong(offloadAfterThresholdInSecondsStr.trim());
Long offloadThresholdInSeconds0;
try {
offloadThresholdInSeconds0 = TimeUnit.SECONDS.toSeconds(
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterThresholdInSecondsStr.trim()));
} catch (IllegalArgumentException exception) {
throw new ParameterException(exception.getMessage());
}
if (maxValueCheck("OffloadAfterThresholdInSeconds", offloadThresholdInSeconds0, Long.MAX_VALUE)) {
offloadThresholdInSeconds = offloadThresholdInSeconds0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
Expand All @@ -47,6 +48,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
Expand All @@ -67,6 +69,7 @@
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
Expand Down Expand Up @@ -2143,27 +2146,32 @@ private class SetOffloadPolicies extends CliCommand {
, description = "S3 role session name used for STSAssumeRoleSessionCredentialsProvider")
private String s3RoleSessionName;

@Parameter(names = {"-m", "--maxBlockSizeInBytes"},
description = "ManagedLedger offload max block Size in bytes,"
+ "s3 and google-cloud-storage requires this parameter")
private int maxBlockSizeInBytes;
@Parameter(
names = {"-m", "--maxBlockSizeInBytes", "--maxBlockSize", "-mbs"},
description = "Max block size (eg: 32M, 64M), default is 64MB"
+ "s3 and google-cloud-storage requires this parameter",
required = false)
private String maxBlockSizeStr;

@Parameter(names = {"-rb", "--readBufferSizeInBytes"},
description = "ManagedLedger offload read buffer size in bytes,"
+ "s3 and google-cloud-storage requires this parameter")
private int readBufferSizeInBytes;
@Parameter(
names = {"-rb", "--readBufferSizeInBytes", "--readBufferSize", "-rbs"},
description = "Read buffer size (eg: 1M, 5M), default is 1MB"
+ "s3 and google-cloud-storage requires this parameter",
required = false)
private String readBufferSizeStr;

@Parameter(names = {"-t", "--offloadThresholdInBytes"}
, description = "ManagedLedger offload threshold in bytes", required = true)
private long offloadThresholdInBytes;
@Parameter(names = {"-t", "--offloadThresholdInBytes", "--offloadAfterThreshold", "-oat"}
, description = "Offload after threshold size (eg: 1M, 5M)", required = false)
private String offloadAfterThresholdStr;

@Parameter(names = {"-ts", "--offloadThresholdInSeconds"}
, description = "ManagedLedger offload threshold in seconds")
private Long offloadThresholdInSeconds;
@Parameter(names = {"-ts", "--offloadThresholdInSeconds", "--offloadAfterThresholdInSeconds", "-oats"},
description = "Offload after threshold seconds (or minutes,hours,days,weeks eg: 100m, 3h, 2d, 5w).")
private String offloadAfterThresholdInSecondsStr;

@Parameter(names = {"-dl", "--offloadDeletionLagInMillis"}
, description = "ManagedLedger offload deletion lag in bytes")
private Long offloadDeletionLagInMillis;
@Parameter(names = {"-dl", "--offloadDeletionLagInMillis", "--offloadAfterElapsed", "-oae"}
, description = "Delay time in Millis for deleting the bookkeeper ledger after offload "
+ "(or seconds,minutes,hours,days,weeks eg: 10s, 100m, 3h, 2d, 5w).")
private String offloadAfterElapsedStr;

@Parameter(names = {"--offloadedReadPriority", "-orp"},
description = "Read priority for offloaded messages. "
Expand All @@ -2175,10 +2183,102 @@ private class SetOffloadPolicies extends CliCommand {
)
private String offloadReadPriorityStr;

public final List<String> driverNames = OffloadPoliciesImpl.DRIVER_NAMES;

public boolean driverSupported(String driver) {
return driverNames.stream().anyMatch(d -> d.equalsIgnoreCase(driver));
}

public boolean isS3Driver(String driver) {
if (StringUtils.isEmpty(driver)) {
return false;
}
return driver.equalsIgnoreCase(driverNames.get(0)) || driver.equalsIgnoreCase(driverNames.get(1));
}

public boolean positiveCheck(String paramName, long value) {
if (value <= 0) {
throw new ParameterException(paramName + " is not be negative or 0!");
}
return true;
}

public boolean maxValueCheck(String paramName, long value, long maxValue) {
if (value > maxValue) {
throw new ParameterException(paramName + " is not bigger than " + maxValue + "!");
}
return true;
}

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);

if (!driverSupported(driver)) {
throw new ParameterException(
"The driver " + driver + " is not supported, "
+ "(Possible values: " + String.join(",", driverNames) + ").");
}
if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) {
throw new ParameterException(
"Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set"
+ " if s3 offload enabled");
}

int maxBlockSizeInBytes = OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
if (StringUtils.isNotEmpty(maxBlockSizeStr)) {
long maxBlockSize = validateSizeString(maxBlockSizeStr);
if (positiveCheck("MaxBlockSize", maxBlockSize)
&& maxValueCheck("MaxBlockSize", maxBlockSize, Integer.MAX_VALUE)) {
maxBlockSizeInBytes = Long.valueOf(maxBlockSize).intValue();
}
}

int readBufferSizeInBytes = OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
if (StringUtils.isNotEmpty(readBufferSizeStr)) {
long readBufferSize = validateSizeString(readBufferSizeStr);
if (positiveCheck("ReadBufferSize", readBufferSize)
&& maxValueCheck("ReadBufferSize", readBufferSize, Integer.MAX_VALUE)) {
readBufferSizeInBytes = Long.valueOf(readBufferSize).intValue();
}
}

Long offloadAfterElapsedInMillis = OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
if (StringUtils.isNotEmpty(offloadAfterElapsedStr)) {
Long offloadAfterElapsed;
try {
offloadAfterElapsed = TimeUnit.SECONDS.toMillis(
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterElapsedStr));
} catch (IllegalArgumentException exception) {
throw new ParameterException(exception.getMessage());
}
if (maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, Long.MAX_VALUE)) {
offloadAfterElapsedInMillis = offloadAfterElapsed;
}
}

Long offloadAfterThresholdInBytes = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
if (StringUtils.isNotEmpty(offloadAfterThresholdStr)) {
long offloadAfterThreshold = validateSizeString(offloadAfterThresholdStr);
if (maxValueCheck("OffloadAfterThreshold", offloadAfterThreshold, Long.MAX_VALUE)) {
offloadAfterThresholdInBytes = offloadAfterThreshold;
}
}

Long offloadThresholdInSeconds = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS;
if (StringUtils.isNotEmpty(offloadAfterThresholdInSecondsStr)) {
Long offloadThresholdInSeconds0;
try {
offloadThresholdInSeconds0 = TimeUnit.SECONDS.toSeconds(
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterThresholdInSecondsStr.trim()));
} catch (IllegalArgumentException exception) {
throw new ParameterException(exception.getMessage());
}
if (maxValueCheck("OffloadAfterThresholdInSeconds", offloadThresholdInSeconds0, Long.MAX_VALUE)) {
offloadThresholdInSeconds = offloadThresholdInSeconds0;
}
}

OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY;

if (this.offloadReadPriorityStr != null) {
Expand All @@ -2193,12 +2293,11 @@ void run() throws PulsarAdminException {
}
}

OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(driver, region, bucket, endpoint,
OffloadPolicies offloadPolicies = OffloadPoliciesImpl.create(driver, region, bucket, endpoint,
s3Role, s3RoleSessionName,
awsId, awsSecret,
maxBlockSizeInBytes,
readBufferSizeInBytes, offloadThresholdInBytes, offloadThresholdInSeconds,
offloadDeletionLagInMillis, offloadedReadPriority);
maxBlockSizeInBytes, readBufferSizeInBytes, offloadAfterThresholdInBytes,
offloadThresholdInSeconds, offloadAfterElapsedInMillis, offloadedReadPriority);

getTopics().setOffloadPolicies(persistentTopic, offloadPolicies);
}
Expand Down

0 comments on commit 204905e

Please sign in to comment.