Skip to content

Commit

Permalink
add interface isMetadataServiceAvailableAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
congbobo184 committed Jul 22, 2024
1 parent 243aa81 commit a196001
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,11 @@ void asyncDelete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFut
* @return properties of this managedLedger.
*/
CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(String name);

/**
* Check managed ledger metadata service is available.
*
* @return a future represents the result of the metadata service is available.
*/
CompletableFuture<Boolean> isMetadataServiceAvailableAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,11 @@ public CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(St
return store.getManagedLedgerPropertiesAsync(name);
}

@Override
public CompletableFuture<Boolean> isMetadataServiceAvailableAsync() {
return CompletableFuture.completedFuture(metadataServiceAvailable);
}

public MetaStore getMetaStore() {
return store;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -59,17 +58,19 @@ public void run() {
if (isCancel) {
return;
}
if (factory instanceof ManagedLedgerFactoryImpl
&& !((ManagedLedgerFactoryImpl) factory).isMetadataServiceAvailable()) {
return;
}
try {
loadManager.get().doLoadShedding();
} catch (Exception e) {
LOG.warn("Error during the load shedding", e);
} finally {
start();
}
this.factory.isMetadataServiceAvailableAsync().whenCompleteAsync((isAvailable, ex) -> {
if (ex == null && !isAvailable) {
LOG.warn("Metadata service is not available now, stop the current LoadSheddingTask.");
return;
}
try {
loadManager.get().doLoadShedding();
} catch (Exception e) {
LOG.warn("Error during the load shedding", e);
} finally {
start();
}
}, loadManagerExecutor);
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.Field;
import java.net.URL;
Expand All @@ -37,8 +38,11 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -59,6 +63,7 @@
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
Expand All @@ -78,6 +83,8 @@
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -498,16 +505,37 @@ public void testTask() throws Exception {

@Test
public void testMetadataServiceNotAvailable() {
ScheduledExecutorService loadManagerExecutor = Executors
.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
LoadManager loadManager = mock(LoadManager.class);
AtomicReference<LoadManager> atomicLoadManager = new AtomicReference<>(loadManager);
ManagedLedgerFactoryImpl factory = mock(ManagedLedgerFactoryImpl.class);
doReturn(false).when(factory).isMetadataServiceAvailable();
LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager, null, null, factory);
task2.run();
verify(loadManager, times(0)).doLoadShedding();
doReturn(true).when(factory).isMetadataServiceAvailable();
task2.run();
verify(loadManager, times(1)).doLoadShedding();
doReturn(CompletableFuture.completedFuture(false)).when(factory).isMetadataServiceAvailableAsync();
LoadSheddingTask task = new LoadSheddingTask(atomicLoadManager, loadManagerExecutor, null, factory);
task.run();
try {
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
try {
verify(loadManager, times(0)).doLoadShedding();
return false;
} catch (Exception e) {
return true;
}
});
fail();
} catch (Throwable e) {
Assert.assertTrue(e instanceof ConditionTimeoutException);
}
doReturn(CompletableFuture.completedFuture(true)).when(factory).isMetadataServiceAvailableAsync();
task.run();
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
try {
verify(loadManager, times(1)).doLoadShedding();
return true;
} catch (Exception e) {
return false;
}
});
}

@Test
Expand Down

0 comments on commit a196001

Please sign in to comment.