Skip to content

Commit

Permalink
Added infinite time retention configuration option (#1135)
Browse files Browse the repository at this point in the history
* Added infinite time retention configuration option

* Fixed test

* Updated CLI docs
  • Loading branch information
merlimat authored Jan 31, 2018
1 parent 99fb872 commit 1bca601
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,16 @@ public ManagedLedgerConfig setThrottleMarkDelete(double throttleMarkDelete) {
}

/**
* Set the retention time for the ManagedLedger
* <p>
* Retention time will prevent data from being deleted for at least the specified amount of time, even if no cursors
* are created, or if all the cursors have marked the data for deletion.
* <p>
* A retention time of 0 (the default), will to have no time based retention.
* <p>
* Specifying a negative retention time will make the data to be retained indefinitely, based on the
* {@link #setRetentionSizeInMB(long)} value.
*
* @param retentionTime
* duration for which messages should be retained
* @param unit
Expand All @@ -338,6 +348,15 @@ public long getRetentionTimeMillis() {
}

/**
* The retention size is used to set a maximum retention size quota on the ManagedLedger.
* <p>
* This setting works in conjuction with {@link #setRetentionSizeInMB(long)} and places a max size for retention,
* after which the data is deleted.
* <p>
* A retention size of 0, will make data to be deleted immediately.
* <p>
* A retention size of -1, means to have an unlimited retention size.
*
* @param retentionSizeInMB
* quota for message retention
*/
Expand All @@ -357,7 +376,7 @@ public long getRetentionSizeInMB() {
/**
* Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
* corrupted at bookkeeper and managed-cursor is stuck at that ledger.
*
*
* @param autoSkipNonRecoverableData
*/
public boolean isAutoSkipNonRecoverableData() {
Expand All @@ -384,10 +403,10 @@ public ManagedLedgerConfig setMaxUnackedRangesToPersist(int maxUnackedRangesToPe
this.maxUnackedRangesToPersist = maxUnackedRangesToPersist;
return this;
}

/**
* @return max unacked message ranges up to which it can store in Zookeeper
*
*
*/
public int getMaxUnackedRangesToPersistInZk() {
return maxUnackedRangesToPersistInZk;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
final static long WaitTimeAfterLedgerCreationFailureMs = 10000;

volatile PositionImpl lastConfirmedEntry;

protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;

Expand Down Expand Up @@ -1485,10 +1485,21 @@ private void scheduleDeferredTrimming() {
}

private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
if (config.getRetentionTimeMillis() < 0) {
// Negative retention time equates to infinite retention
return false;
}

long elapsedMs = System.currentTimeMillis() - ledgerTimestamp;
return elapsedMs > config.getRetentionTimeMillis();
}

private boolean isLedgerRetentionOverSizeQuota() {
// Handle the -1 size limit as "infinite" size quota
return config.getRetentionSizeInMB() > 0
&& TOTAL_SIZE_UPDATER.get(this) > ((long) config.getRetentionSizeInMB()) * 1024 * 1024;
}

/**
* Checks whether there are ledger that have been fully consumed and deletes them
*
Expand Down Expand Up @@ -1537,7 +1548,7 @@ void internalTrimConsumedLedgers() {
// skip ledger if retention constraint met
for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {
boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
boolean overRetentionQuota = TOTAL_SIZE_UPDATER.get(this) > ((long) config.getRetentionSizeInMB()) * 1024 * 1024;
boolean overRetentionQuota = isLedgerRetentionOverSizeQuota();

if (log.isDebugEnabled()) {
log.debug(
Expand Down Expand Up @@ -1714,7 +1725,7 @@ private void asyncDeleteLedger(long ledgerId, long retry) {
}
}, null);
}

private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) {
List<LedgerInfo> ledgers = Lists.newArrayList(ManagedLedgerImpl.this.ledgers.values());
AtomicInteger ledgersToDelete = new AtomicInteger(ledgers.size());
Expand Down Expand Up @@ -2199,7 +2210,7 @@ public static ManagedLedgerException createManagedLedgerException(int bkErrorCod
return new ManagedLedgerException(BKException.getMessage(bkErrorCode));
}
}

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -1668,11 +1668,35 @@ public void testDeletionAfterRetention() throws Exception {
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
// let retention expire
Thread.sleep(1000);
ml.close();
// sleep for trim
Thread.sleep(100);
ml.internalTrimConsumedLedgers();

assertTrue(ml.getLedgersInfoAsList().size() <= 1);
assertTrue(ml.getTotalSize() <= "shortmessage".getBytes().length);
ml.close();
}

@Test
public void testInfiniteRetention() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(-1);
config.setRetentionTime(-1, TimeUnit.HOURS);
config.setMaxEntriesPerLedger(1);

ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("retention_test_ledger", config);
ManagedCursor c1 = ml.openCursor("c1");
ml.addEntry("iamaverylongmessagethatshouldberetained".getBytes());
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
ml.close();

// reopen ml
ml = (ManagedLedgerImpl) factory.open("retention_test_ledger", config);
c1 = ml.openCursor("c1");
ml.addEntry("shortmessage".getBytes());
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
ml.close();
assertTrue(ml.getLedgersInfoAsList().size() > 1);
assertTrue(ml.getTotalSize() > "shortmessage".getBytes().length);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -86,7 +87,6 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
Expand Down Expand Up @@ -1323,9 +1323,13 @@ private boolean shouldTopicBeRetained() {
Optional<Policies> policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()));
// If no policies, the default is to have no retention and delete the inactive topic
return policies.map(p -> p.retention_policies)
.map(rp -> System.nanoTime() - lastActive < TimeUnit.MINUTES.toNanos(rp.getRetentionTimeInMinutes()))
.orElse(false).booleanValue();
return policies.map(p -> p.retention_policies).map(rp -> {
long retentionTime = TimeUnit.MINUTES.toNanos(rp.getRetentionTimeInMinutes());

// Negative retention time means the topic should be retained indefinitely,
// because its own data has to be retained
return retentionTime < 0 || (System.nanoTime() - lastActive) < retentionTime;
}).orElse(false).booleanValue();
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies", topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,53 @@ public void testGcAndRetentionPolicy() throws Exception {
assertNull(pulsar.getBrokerService().getTopicReference(topicName));
}

/**
* A topic that has retention policy set to -1, should not be GCed
* until it has been inactive for at least the retention time and the data
* should never be deleted
*/
@Test
public void testInfiniteRetentionPolicy() throws Exception {
// Retain data forever
admin.namespaces().setRetention("prop/use/ns-abc", new RetentionPolicies(-1, -1));

// 1. Simple successful GC
String topicName = "persistent://prop/use/ns-abc/topic-10";
Producer producer = pulsarClient.createProducer(topicName);
producer.close();

assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
runGC();
// Should not have been deleted, since we have retention
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));


// Remove retention
admin.namespaces().setRetention("prop/use/ns-abc", new RetentionPolicies(0, 10));
Thread.sleep(300);

// 2. Topic is not GCed with live connection
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
String subName = "sub1";
Consumer consumer = pulsarClient.subscribe(topicName, subName, conf);

runGC();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));

// 3. Topic with subscription is not GCed even with no connections
consumer.close();

runGC();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));

// 4. Topic can be GCed after unsubscribe
admin.persistentTopics().deleteSubscription(topicName, subName);

runGC();
assertNull(pulsar.getBrokerService().getTopicReference(topicName));
}

@Test
public void testMessageExpiry() throws Exception {
int messageTTLSecs = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ void run() throws PulsarAdminException {
private class GetAntiAffinityGroup extends CliCommand {
@Parameter(description = "property/cluster/namespace\n", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
Expand Down Expand Up @@ -263,14 +263,14 @@ void run() throws PulsarAdminException {
private class DeleteAntiAffinityGroup extends CliCommand {
@Parameter(description = "property/cluster/namespace\n", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace);
}
}


@Parameters(commandDescription = "Enable or disable deduplication for a namespace")
private class SetDeduplication extends CliCommand {
Expand Down Expand Up @@ -300,10 +300,12 @@ private class SetRetention extends CliCommand {
private java.util.List<String> params;

@Parameter(names = { "--time",
"-t" }, description = "Retention time in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w)", required = true)
"-t" }, description = "Retention time in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w). "
+ "0 means no retention and -1 means infinite time retention", required = true)
private String retentionTimeStr;

@Parameter(names = { "--size", "-s" }, description = "Retention size limit (eg: 10M, 16G)", required = true)
@Parameter(names = { "--size", "-s" }, description = "Retention size limit (eg: 10M, 16G, 3T). "
+ "0 means no retention and -1 means infinite size retention", required = true)
private String limitStr;

@Override
Expand Down Expand Up @@ -625,6 +627,10 @@ private static long validateSizeString(String s) {
case 'G':
return Long.parseLong(subStr) * 1024 * 1024 * 1024;

case 't':
case 'T':
return Long.parseLong(subStr) * 1024 * 1024 * 1024 * 1024;

default:
return Long.parseLong(s);
}
Expand Down Expand Up @@ -680,7 +686,7 @@ public CmdNamespaces(PulsarAdmin admin) {

jcommander.addCommand("get-message-ttl", new GetMessageTTL());
jcommander.addCommand("set-message-ttl", new SetMessageTTL());

jcommander.addCommand("get-anti-affinity-group", new GetAntiAffinityGroup());
jcommander.addCommand("set-anti-affinity-group", new SetAntiAffinityGroup());
jcommander.addCommand("get-anti-affinity-namespaces", new GetAntiAffinityNamespaces());
Expand Down
8 changes: 4 additions & 4 deletions site/_data/cli/pulsar-admin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ commands:
argument: property/cluster/namespace
options:
- flags: -s, --size
description: The retention size limits (for example `10M` or `16G`)
description: The retention size limits (for example `10M`, `16G` or `3T`). 0 means no retention and -1 means infinite size retention
- flags: -t, --time
description: "The retention time in minutes, hours, days, or weeks. Examples: `100m`, `13h`, `2d`, `5w`."
description: "The retention time in minutes, hours, days, or weeks. Examples: `100m`, `13h`, `2d`, `5w`. 0 means no retention and -1 means infinite time retention"
- name: unload
description: Unload a namespace or namespace bundle from the current serving broker.
argument: property/cluster/namespace
Expand Down Expand Up @@ -295,14 +295,14 @@ commands:
description: Look up a topic from the current serving broker
argument: persistent://property/cluster/namespace/topic
- name: bundle-range
description: Get the namespace bundle which contains the given topic
description: Get the namespace bundle which contains the given topic
argument: persistent://property/cluster/namespace/topic
- name: delete
description: Delete a topic. The topic cannot be deleted if there are any active subscriptions or producers connected to the topic.
argument: persistent://property/cluster/namespace/topic
- name: unload
description: Unload a topic
argument: persistent://property/cluster/namespace/topic
argument: persistent://property/cluster/namespace/topic
- name: subscriptions
description: Get the list of subscriptions on the topic
argument: persistent://property/cluster/namespace/topic
Expand Down

0 comments on commit 1bca601

Please sign in to comment.