Skip to content

Commit

Permalink
Add retry with backoff to loading namespace bundles
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Dec 3, 2021
1 parent fc9b480 commit 1a35080
Showing 1 changed file with 29 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,19 @@
import com.google.common.collect.Range;
import com.google.common.hash.HashFunction;
import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
Expand All @@ -63,6 +66,7 @@ public class NamespaceBundleFactory {

private final PulsarService pulsar;
private final MetadataCache<Policies> policiesCache;
private final Duration MAX_RETRY_DURATION = Duration.ofSeconds(10);

public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
this.hashFunc = hashFunc;
Expand Down Expand Up @@ -90,30 +94,51 @@ private CompletableFuture<NamespaceBundles> loadBundles(NamespaceName namespace,
}

CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
doLoadBundles(namespace, path, future, createBackoff(), System.nanoTime() + MAX_RETRY_DURATION.toNanos());
return future;
}

private void doLoadBundles(NamespaceName namespace, String path, CompletableFuture<NamespaceBundles> future,
Backoff backoff, long retryDeadline) {
// Read the static bundle data from the policies
pulsar.getLocalMetadataStore().get(path).thenAccept(result -> {

if (result.isPresent()) {
try {
future.complete(readBundles(namespace,
result.get().getValue(), result.get().getStat().getVersion()));
} catch (IOException e) {
future.completeExceptionally(e);
handleLoadBundlesRetry(namespace, path, future, backoff, retryDeadline, e);
}
} else {
// If no local policies defined for namespace, copy from global config
copyToLocalPolicies(namespace)
.thenAccept(b -> future.complete(b))
.exceptionally(ex -> {
future.completeExceptionally(ex);
handleLoadBundlesRetry(namespace, path, future, backoff, retryDeadline, ex);
return null;
});
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
return future;
}

private void handleLoadBundlesRetry(NamespaceName namespace, String path,
CompletableFuture<NamespaceBundles> future,
Backoff backoff, long retryDeadline, Throwable e) {
if (e instanceof Error || System.nanoTime() > retryDeadline) {
future.completeExceptionally(e);
} else {
LOG.warn("Error loading bundle for {} from path {}. Retrying exception", namespace, path, e);
long retryDelay = backoff.next();
pulsar.getExecutor().schedule(() ->
doLoadBundles(namespace, path, future, backoff, retryDeadline), retryDelay, TimeUnit.MILLISECONDS);
}
}

private Backoff createBackoff() {
return new Backoff(100, TimeUnit.MILLISECONDS, 5, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
}

private NamespaceBundles readBundles(NamespaceName namespace, byte[] value, long version) throws IOException {
Expand Down

0 comments on commit 1a35080

Please sign in to comment.