Skip to content

Commit

Permalink
[fix][broker] Fix thread unsafe access on the bundle range cache for …
Browse files Browse the repository at this point in the history
…load managers

### Background Knowledge

A concurrent hash map (no matter the `ConcurrentOpenHashMap` in Pulsar
or the official `ConcurrentHashMap`) is not a synchronized hash map.

For example, given a `ConcurrentHashMap<Integer, Integer>` object `m`,

```java
synchronized (m) {
    m.computeIfAbsent(1, __ -> 100); // [1]
    m.computeIfAbsent(2, __ -> 200); // [2]
}
```

```java
m.computeIfAbsent(1, __ -> 300); // [3]
```

If these two code blocks are called in two threads, `[1]->[3]->[2]` is a
possible case.

### Motivation

`SimpleLoadManagerImpl` and `ModularLoadManagerImpl` both maintain the
bundle range cache:

```java
// The 1st key is broker, the 2nd key is namespace
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
    brokerToNamespaceToBundleRange;
```

However, when accessing the `namespace -> bundle` map, it still needs a
synchronized code block:

https://github.com/apache/pulsar/blob/1c495e190b3c569e9dfd44acef2a697c93a1f771/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java#L591-L595

The code above is a composite operation of `clear()` and multiple
`computeIfAbsent` operations on the `ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>` object.

So the other place that access this map also requires the same lock even
if the operation itself is thread safe:

https://github.com/apache/pulsar/blob/1c495e190b3c569e9dfd44acef2a697c93a1f771/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java#L882-L886

P.S. `SimpleLoadManagerImpl` does not apply the synchronized block.

However, when accessing `brokerToNamespaceToBundleRange` in the static
methods of `LoadManagerShared`, they are not synchronized. So the access
on the `Map<String, Set<String>>` value is not thread safe.

### Modifications

Add a `BundleRangeCache` abstraction to provide some methods to support
required operations on the bundle range cache.
  • Loading branch information
BewareMyPower committed Aug 25, 2024
1 parent 1c495e1 commit a86fe70
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 178 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.stream.Stream;

public class BundleRangeCache {

// Map from brokers to namespaces to the bundle ranges in that namespace assigned to that broker.
// Used to distribute bundles within a namespace evenly across brokers.
private final Map<String, Map<String, Set<String>>> data = new ConcurrentHashMap<>();

public void reloadFromBundles(String broker, Stream<String> bundles) {
final var namespaceToBundleRange = new ConcurrentHashMap<String, Set<String>>();
bundles.forEach(bundleName -> {
final String namespace = LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundleName);
initConcurrentHashSet(namespaceToBundleRange, namespace).add(bundleRange);
});
data.put(broker, namespaceToBundleRange);
}

public void addBundleRange(String broker, String namespace, String bundleRange) {
getBundleRangeSet(broker, namespace).add(bundleRange);
}

public int getBundles(String broker, String namespace) {
return getBundleRangeSet(broker, namespace).size();
}

public List<CompletableFuture<Void>> runTasks(
BiFunction<String/* broker */, String/* namespace */, CompletableFuture<Void>> task) {
return data.entrySet().stream().flatMap(e -> {
final var broker = e.getKey();
return e.getValue().entrySet().stream().filter(__ -> !__.getValue().isEmpty()).map(Map.Entry::getKey)
.map(namespace -> task.apply(broker, namespace));
}).toList();
}

private Set<String> getBundleRangeSet(String broker, String namespace) {
return initConcurrentHashSet(data.computeIfAbsent(broker, __ -> new ConcurrentHashMap<>()), namespace);
}

private static Set<String> initConcurrentHashSet(Map<String, Set<String>> namespaceToBundleRangeSet,
String namespace) {
return namespaceToBundleRangeSet.computeIfAbsent(namespace, __ -> ConcurrentHashMap.newKeySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
Expand Down Expand Up @@ -282,24 +280,6 @@ public static CompletableFuture<Set<String>> applyNamespacePoliciesAsync(
return brokerCandidateCache;
});
}
/**
* Using the given bundles, populate the namespace to bundle range map.
*
* @param bundles
* Bundles with which to populate.
* @param target
* Map to fill.
*/
public static void fillNamespaceToBundlesMap(final Set<String> bundles,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> target) {
bundles.forEach(bundleName -> {
final String namespaceName = getNamespaceNameFromBundleName(bundleName);
final String bundleRange = getBundleRangeFromBundleName(bundleName);
target.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
});
}

// From a full bundle name, extract the bundle range.
public static String getBundleRangeFromBundleName(String bundleName) {
Expand Down Expand Up @@ -359,8 +339,7 @@ public static boolean isLoadSheddingEnabled(final PulsarService pulsar) {
public static void removeMostServicingBrokersForNamespace(
final String assignedBundleName,
final Set<String> candidates,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
brokerToNamespaceToBundleRange) {
final BundleRangeCache brokerToNamespaceToBundleRange) {
if (candidates.isEmpty()) {
return;
}
Expand All @@ -369,13 +348,7 @@ public static void removeMostServicingBrokersForNamespace(
int leastBundles = Integer.MAX_VALUE;

for (final String broker : candidates) {
int bundles = (int) brokerToNamespaceToBundleRange
.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder().build())
.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.size();
int bundles = brokerToNamespaceToBundleRange.getBundles(broker, namespaceName);
leastBundles = Math.min(leastBundles, bundles);
if (leastBundles == 0) {
break;
Expand All @@ -386,13 +359,8 @@ public static void removeMostServicingBrokersForNamespace(
// `leastBundles` may differ from the actual value.

final int finalLeastBundles = leastBundles;
candidates.removeIf(
broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder().build())
.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.size() > finalLeastBundles);
candidates.removeIf(broker ->
brokerToNamespaceToBundleRange.getBundles(broker, namespaceName) > finalLeastBundles);
}

/**
Expand Down Expand Up @@ -426,8 +394,7 @@ public static void removeMostServicingBrokersForNamespace(
public static void filterAntiAffinityGroupOwnedBrokers(
final PulsarService pulsar, final String assignedBundleName,
final Set<String> candidates,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
brokerToNamespaceToBundleRange,
final BundleRangeCache brokerToNamespaceToBundleRange,
Map<String, String> brokerToDomainMap) {
if (candidates.isEmpty()) {
return;
Expand Down Expand Up @@ -572,8 +539,7 @@ private static void filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(
*/
public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOwnedBrokers(
final PulsarService pulsar, final String namespaceName,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
brokerToNamespaceToBundleRange) {
final BundleRangeCache brokerToNamespaceToBundleRange) {

CompletableFuture<Map<String, Integer>> antiAffinityNsBrokersResult = new CompletableFuture<>();
getNamespaceAntiAffinityGroupAsync(pulsar, namespaceName)
Expand All @@ -584,18 +550,11 @@ public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOw
}
final String antiAffinityGroup = antiAffinityGroupOptional.get();
final Map<String, Integer> brokerToAntiAffinityNamespaceCount = new ConcurrentHashMap<>();
final List<CompletableFuture<Void>> futures = new ArrayList<>();
brokerToNamespaceToBundleRange.forEach((broker, nsToBundleRange) -> {
nsToBundleRange.forEach((ns, bundleRange) -> {
if (bundleRange.isEmpty()) {
return;
}

CompletableFuture<Void> future = new CompletableFuture<>();
futures.add(future);
countAntiAffinityNamespaceOwnedBrokers(broker, ns, future,
pulsar, antiAffinityGroup, brokerToAntiAffinityNamespaceCount);
});
final var futures = brokerToNamespaceToBundleRange.runTasks((broker, namespace) -> {
final var future = new CompletableFuture<Void>();
countAntiAffinityNamespaceOwnedBrokers(broker, namespace, future,
pulsar, antiAffinityGroup, brokerToAntiAffinityNamespaceCount);
return future;
});
FutureUtil.waitForAll(futures)
.thenAccept(r -> antiAffinityNsBrokersResult.complete(brokerToAntiAffinityNamespaceCount));
Expand Down Expand Up @@ -698,7 +657,6 @@ public static Optional<String> getNamespaceAntiAffinityGroup(
* by different broker.
*
* @param namespace
* @param bundle
* @param currentBroker
* @param pulsar
* @param brokerToNamespaceToBundleRange
Expand All @@ -707,10 +665,9 @@ public static Optional<String> getNamespaceAntiAffinityGroup(
* @throws Exception
*/
public static boolean shouldAntiAffinityNamespaceUnload(
String namespace, String bundle, String currentBroker,
String namespace, String currentBroker,
final PulsarService pulsar,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
brokerToNamespaceToBundleRange,
final BundleRangeCache brokerToNamespaceToBundleRange,
Set<String> candidateBrokers) throws Exception {

Map<String, Integer> brokerNamespaceCount = getAntiAffinityNamespaceOwnedBrokers(pulsar, namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
Expand Down Expand Up @@ -72,8 +73,6 @@
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.Notification;
Expand Down Expand Up @@ -116,10 +115,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
// Broker host usage object used to calculate system resource usage.
private BrokerHostUsage brokerHostUsage;

// Map from brokers to namespaces to the bundle ranges in that namespace assigned to that broker.
// Used to distribute bundles within a namespace evenly across brokers.
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
brokerToNamespaceToBundleRange;
private final BundleRangeCache brokerToNamespaceToBundleRange = new BundleRangeCache();

// Path to the ZNode containing the LocalBrokerData json for this broker.
private String brokerZnodePath;
Expand Down Expand Up @@ -199,10 +195,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
*/
public ModularLoadManagerImpl() {
brokerCandidateCache = new HashSet<>();
brokerToNamespaceToBundleRange =
ConcurrentOpenHashMap.<String,
ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
.build();
defaultStats = new NamespaceBundleStats();
filterPipeline = new ArrayList<>();
loadData = new LoadData();
Expand Down Expand Up @@ -582,17 +574,9 @@ private void updateBundleData() {
TimeAverageBrokerData timeAverageData = new TimeAverageBrokerData();
timeAverageData.reset(statsMap.keySet(), bundleData, defaultStats);
brokerData.setTimeAverageData(timeAverageData);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker, k ->
ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.clear();
LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), namespaceToBundleRange);
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(), namespaceToBundleRange);
}

brokerToNamespaceToBundleRange.reloadFromBundles(broker,
Stream.of(statsMap.keySet(), preallocatedBundleData.keySet()).flatMap(Collection::stream));
}

// Remove not active bundle from loadData
Expand Down Expand Up @@ -736,7 +720,7 @@ public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle
.getBundle(namespace, bundle);
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(), brokerTopicLoadingPredicate);
return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle, currentBroker, pulsar,
return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, currentBroker, pulsar,
brokerToNamespaceToBundleRange, brokerCandidateCache);
}

Expand Down Expand Up @@ -873,17 +857,7 @@ private void preallocateBundle(String bundle, String broker) {

final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
}
brokerToNamespaceToBundleRange.addBundleRange(broker, namespaceName, bundleRange);
}

@VisibleForTesting
Expand Down
Loading

0 comments on commit a86fe70

Please sign in to comment.