Skip to content

Commit

Permalink
[fix][broker] Fix ResourceGroups loading (apache#21781)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
(cherry picked from commit 80b491d)
(cherry picked from commit dfb1a67)
  • Loading branch information
nodece authored and nikhil-ctds committed Apr 4, 2024
1 parent 0a5100f commit 6cd168b
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@
*/
package org.apache.pulsar.broker.resourcegroup;

import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.slf4j.Logger;
Expand All @@ -47,46 +54,59 @@ public class ResourceGroupConfigListener implements Consumer<Notification> {
private final ResourceGroupService rgService;
private final PulsarService pulsarService;
private final ResourceGroupResources rgResources;
private final ResourceGroupNamespaceConfigListener rgNamespaceConfigListener;
private volatile ResourceGroupNamespaceConfigListener rgNamespaceConfigListener;

public ResourceGroupConfigListener(ResourceGroupService rgService, PulsarService pulsarService) {
this.rgService = rgService;
this.pulsarService = pulsarService;
this.rgResources = pulsarService.getPulsarResources().getResourcegroupResources();
loadAllResourceGroups();
this.rgResources.getStore().registerListener(this);
rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(
rgService, pulsarService, this);
execute(() -> loadAllResourceGroupsWithRetryAsync(0));
}

private void loadAllResourceGroups() {
rgResources.listResourceGroupsAsync().whenCompleteAsync((rgList, ex) -> {
if (ex != null) {
LOG.error("Exception when fetching resource groups", ex);
return;
private void loadAllResourceGroupsWithRetryAsync(long retry) {
loadAllResourceGroupsAsync().thenAccept(__ -> {
if (rgNamespaceConfigListener == null) {
rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(rgService, pulsarService, this);
}
}).exceptionally(e -> {
long nextRetry = retry + 1;
long delay = 500 * nextRetry;
LOG.error("Failed to load all resource groups during initialization, retrying after {}ms: ", delay, e);
schedule(() -> loadAllResourceGroupsWithRetryAsync(nextRetry), delay);
return null;
});
}

private CompletableFuture<Void> loadAllResourceGroupsAsync() {
return rgResources.listResourceGroupsAsync().thenCompose(rgList -> {
final Set<String> existingSet = rgService.resourceGroupGetAll();
HashSet<String> newSet = new HashSet<>();

newSet.addAll(rgList);

final Sets.SetView<String> deleteList = Sets.difference(existingSet, newSet);

for (String rgName: deleteList) {
for (String rgName : deleteList) {
deleteResourceGroup(rgName);
}

final Sets.SetView<String> addList = Sets.difference(newSet, existingSet);
for (String rgName: addList) {
pulsarService.getPulsarResources().getResourcegroupResources()
.getResourceGroupAsync(rgName).thenAcceptAsync(optionalRg -> {
ResourceGroup rg = optionalRg.get();
createResourceGroup(rgName, rg);
}).exceptionally((ex1) -> {
LOG.error("Failed to fetch resourceGroup", ex1);
return null;
});
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String rgName : addList) {
futures.add(pulsarService.getPulsarResources()
.getResourcegroupResources()
.getResourceGroupAsync(rgName)
.thenAccept(optionalRg -> {
if (optionalRg.isPresent()) {
ResourceGroup rg = optionalRg.get();
createResourceGroup(rgName, rg);
}
})
);
}

return FutureUtil.waitForAll(futures);
});
}

Expand Down Expand Up @@ -140,7 +160,10 @@ public void accept(Notification notification) {
Optional<String> rgName = ResourceGroupResources.resourceGroupNameFromPath(notifyPath);
if ((notification.getType() == NotificationType.ChildrenChanged)
|| (notification.getType() == NotificationType.Created)) {
loadAllResourceGroups();
loadAllResourceGroupsAsync().exceptionally((ex) -> {
LOG.error("Exception when fetching resource groups", ex);
return null;
});
} else if (rgName.isPresent()) {
switch (notification.getType()) {
case Modified:
Expand All @@ -151,4 +174,17 @@ public void accept(Notification notification) {
}
}
}

protected void execute(Runnable runnable) {
pulsarService.getExecutor().execute(catchingAndLoggingThrowables(runnable));
}

protected void schedule(Runnable runnable, long delayMs) {
pulsarService.getExecutor().schedule(catchingAndLoggingThrowables(runnable), delayMs, TimeUnit.MILLISECONDS);
}

@VisibleForTesting
ResourceGroupNamespaceConfigListener getRgNamespaceConfigListener() {
return rgNamespaceConfigListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,31 @@
*/
package org.apache.pulsar.broker.resourcegroup;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -288,4 +299,41 @@ private void prepareData() throws PulsarAdminException {
testAddRg.setDispatchRateInBytes(200L);

}

@Test
public void testNewResourceGroupNamespaceConfigListener() {
PulsarService pulsarService = mock(PulsarService.class);
PulsarResources pulsarResources = mock(PulsarResources.class);
doReturn(pulsarResources).when(pulsarService).getPulsarResources();
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
doReturn(scheduledExecutorService).when(pulsarService).getExecutor();

ResourceGroupService resourceGroupService = mock(ResourceGroupService.class);
ResourceGroupResources resourceGroupResources = mock(ResourceGroupResources.class);
RuntimeException exception = new RuntimeException("listResourceGroupsAsync error");
doReturn(CompletableFuture.failedFuture(exception))
.when(resourceGroupResources).listResourceGroupsAsync();
doReturn(mock(MetadataStore.class))
.when(resourceGroupResources).getStore();
doReturn(resourceGroupResources).when(pulsarResources).getResourcegroupResources();

ServiceConfiguration ServiceConfiguration = new ServiceConfiguration();
doReturn(ServiceConfiguration).when(pulsarService).getConfiguration();

ResourceGroupConfigListener resourceGroupConfigListener =
new ResourceGroupConfigListener(resourceGroupService, pulsarService);

// getResourcegroupResources() returns an error, ResourceGroupNamespaceConfigListener doesn't be created.
Awaitility.await().pollDelay(3, TimeUnit.SECONDS).untilAsserted(() -> {
assertNull(resourceGroupConfigListener.getRgNamespaceConfigListener());
});

// ResourceGroupNamespaceConfigListener will be created, and uses real pulsar resource.
doReturn(CompletableFuture.completedFuture(new ArrayList<String>()))
.when(resourceGroupResources).listResourceGroupsAsync();
doReturn(pulsar.getPulsarResources()).when(pulsarService).getPulsarResources();
Awaitility.await().untilAsserted(() -> {
assertNotNull(resourceGroupConfigListener.getRgNamespaceConfigListener());
});
}
}

0 comments on commit 6cd168b

Please sign in to comment.