diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java index c15edd2be4e43..4a5b8a8bcc244 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java @@ -18,15 +18,22 @@ */ package org.apache.pulsar.broker.resourcegroup; +import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.resources.ResourceGroupResources; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.policies.data.ResourceGroup; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; import org.slf4j.Logger; @@ -47,24 +54,32 @@ public class ResourceGroupConfigListener implements Consumer { private final ResourceGroupService rgService; private final PulsarService pulsarService; private final ResourceGroupResources rgResources; - private final ResourceGroupNamespaceConfigListener rgNamespaceConfigListener; + private volatile ResourceGroupNamespaceConfigListener rgNamespaceConfigListener; public ResourceGroupConfigListener(ResourceGroupService rgService, PulsarService pulsarService) { this.rgService = rgService; this.pulsarService = pulsarService; this.rgResources = pulsarService.getPulsarResources().getResourcegroupResources(); - loadAllResourceGroups(); this.rgResources.getStore().registerListener(this); - rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener( - rgService, pulsarService, this); + execute(() -> loadAllResourceGroupsWithRetryAsync(0)); } - private void loadAllResourceGroups() { - rgResources.listResourceGroupsAsync().whenCompleteAsync((rgList, ex) -> { - if (ex != null) { - LOG.error("Exception when fetching resource groups", ex); - return; + private void loadAllResourceGroupsWithRetryAsync(long retry) { + loadAllResourceGroupsAsync().thenAccept(__ -> { + if (rgNamespaceConfigListener == null) { + rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(rgService, pulsarService, this); } + }).exceptionally(e -> { + long nextRetry = retry + 1; + long delay = 500 * nextRetry; + LOG.error("Failed to load all resource groups during initialization, retrying after {}ms: ", delay, e); + schedule(() -> loadAllResourceGroupsWithRetryAsync(nextRetry), delay); + return null; + }); + } + + private CompletableFuture loadAllResourceGroupsAsync() { + return rgResources.listResourceGroupsAsync().thenCompose(rgList -> { final Set existingSet = rgService.resourceGroupGetAll(); HashSet newSet = new HashSet<>(); @@ -72,21 +87,26 @@ private void loadAllResourceGroups() { final Sets.SetView deleteList = Sets.difference(existingSet, newSet); - for (String rgName: deleteList) { + for (String rgName : deleteList) { deleteResourceGroup(rgName); } final Sets.SetView addList = Sets.difference(newSet, existingSet); - for (String rgName: addList) { - pulsarService.getPulsarResources().getResourcegroupResources() - .getResourceGroupAsync(rgName).thenAcceptAsync(optionalRg -> { - ResourceGroup rg = optionalRg.get(); - createResourceGroup(rgName, rg); - }).exceptionally((ex1) -> { - LOG.error("Failed to fetch resourceGroup", ex1); - return null; - }); + List> futures = new ArrayList<>(); + for (String rgName : addList) { + futures.add(pulsarService.getPulsarResources() + .getResourcegroupResources() + .getResourceGroupAsync(rgName) + .thenAccept(optionalRg -> { + if (optionalRg.isPresent()) { + ResourceGroup rg = optionalRg.get(); + createResourceGroup(rgName, rg); + } + }) + ); } + + return FutureUtil.waitForAll(futures); }); } @@ -140,7 +160,10 @@ public void accept(Notification notification) { Optional rgName = ResourceGroupResources.resourceGroupNameFromPath(notifyPath); if ((notification.getType() == NotificationType.ChildrenChanged) || (notification.getType() == NotificationType.Created)) { - loadAllResourceGroups(); + loadAllResourceGroupsAsync().exceptionally((ex) -> { + LOG.error("Exception when fetching resource groups", ex); + return null; + }); } else if (rgName.isPresent()) { switch (notification.getType()) { case Modified: @@ -151,4 +174,17 @@ public void accept(Notification notification) { } } } + + protected void execute(Runnable runnable) { + pulsarService.getExecutor().execute(catchingAndLoggingThrowables(runnable)); + } + + protected void schedule(Runnable runnable, long delayMs) { + pulsarService.getExecutor().schedule(catchingAndLoggingThrowables(runnable), delayMs, TimeUnit.MILLISECONDS); + } + + @VisibleForTesting + ResourceGroupNamespaceConfigListener getRgNamespaceConfigListener() { + return rgNamespaceConfigListener; + } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java index 90c26530850a3..4010635ed9952 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java @@ -18,20 +18,31 @@ */ package org.apache.pulsar.broker.resourcegroup; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertThrows; import com.google.common.collect.Sets; +import java.util.ArrayList; import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.resources.ResourceGroupResources; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ResourceGroup; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.metadata.api.MetadataStore; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -288,4 +299,41 @@ private void prepareData() throws PulsarAdminException { testAddRg.setDispatchRateInBytes(200L); } + + @Test + public void testNewResourceGroupNamespaceConfigListener() { + PulsarService pulsarService = mock(PulsarService.class); + PulsarResources pulsarResources = mock(PulsarResources.class); + doReturn(pulsarResources).when(pulsarService).getPulsarResources(); + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + doReturn(scheduledExecutorService).when(pulsarService).getExecutor(); + + ResourceGroupService resourceGroupService = mock(ResourceGroupService.class); + ResourceGroupResources resourceGroupResources = mock(ResourceGroupResources.class); + RuntimeException exception = new RuntimeException("listResourceGroupsAsync error"); + doReturn(CompletableFuture.failedFuture(exception)) + .when(resourceGroupResources).listResourceGroupsAsync(); + doReturn(mock(MetadataStore.class)) + .when(resourceGroupResources).getStore(); + doReturn(resourceGroupResources).when(pulsarResources).getResourcegroupResources(); + + ServiceConfiguration ServiceConfiguration = new ServiceConfiguration(); + doReturn(ServiceConfiguration).when(pulsarService).getConfiguration(); + + ResourceGroupConfigListener resourceGroupConfigListener = + new ResourceGroupConfigListener(resourceGroupService, pulsarService); + + // getResourcegroupResources() returns an error, ResourceGroupNamespaceConfigListener doesn't be created. + Awaitility.await().pollDelay(3, TimeUnit.SECONDS).untilAsserted(() -> { + assertNull(resourceGroupConfigListener.getRgNamespaceConfigListener()); + }); + + // ResourceGroupNamespaceConfigListener will be created, and uses real pulsar resource. + doReturn(CompletableFuture.completedFuture(new ArrayList())) + .when(resourceGroupResources).listResourceGroupsAsync(); + doReturn(pulsar.getPulsarResources()).when(pulsarService).getPulsarResources(); + Awaitility.await().untilAsserted(() -> { + assertNotNull(resourceGroupConfigListener.getRgNamespaceConfigListener()); + }); + } }