Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix thread unsafe access on the bundle range cache for load manager #23217

Merged

Conversation

BewareMyPower
Copy link
Contributor

@BewareMyPower BewareMyPower commented Aug 25, 2024

Background Knowledge

A concurrent hash map (no matter the ConcurrentOpenHashMap in Pulsar or the official ConcurrentHashMap) is not a synchronized hash map.

For example, given a ConcurrentHashMap<Integer, Integer> object m,

synchronized (m) {
    m.computeIfAbsent(1, __ -> 100); // [1]
    m.computeIfAbsent(2, __ -> 200); // [2]
}
m.computeIfAbsent(1, __ -> 300); // [3]

If these two code blocks are called in two threads, [1]->[3]->[2] is a possible case.

Motivation

SimpleLoadManagerImpl and ModularLoadManagerImpl both maintain the bundle range cache:

// The 1st key is broker, the 2nd key is namespace
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
    brokerToNamespaceToBundleRange;

However, when accessing the namespace -> bundle map, it still needs a synchronized code block:

synchronized (namespaceToBundleRange) {
namespaceToBundleRange.clear();
LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), namespaceToBundleRange);
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(), namespaceToBundleRange);
}

The code above is a composite operation of clear() and multiple computeIfAbsent operations on the ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> object.

So the other place that access this map also requires the same lock even if the operation itself is thread safe:

synchronized (namespaceToBundleRange) {
namespaceToBundleRange.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
}

P.S. SimpleLoadManagerImpl does not apply the synchronized block.

However, when accessing brokerToNamespaceToBundleRange in the static methods of LoadManagerShared, they are not synchronized. So the access on the Map<String, Set<String>> value is not thread safe.

Modifications

Add a BundleRangeCache abstraction to provide some methods to support required operations on the bundle range cache. Apply synchronized key word to access any internal map (namespace -> bundle range set) to guarantee thread safety.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Aug 25, 2024
@BewareMyPower BewareMyPower force-pushed the bewaremypower/load-manager-cache branch from ead813a to 27e7ac6 Compare August 25, 2024 14:16
@BewareMyPower BewareMyPower self-assigned this Aug 25, 2024
@BewareMyPower BewareMyPower added type/bug The PR fixed a bug or issue reported a bug area/broker labels Aug 25, 2024
…load managers

### Background Knowledge

A concurrent hash map (no matter the `ConcurrentOpenHashMap` in Pulsar
or the official `ConcurrentHashMap`) is not a synchronized hash map.

For example, given a `ConcurrentHashMap<Integer, Integer>` object `m`,

```java
synchronized (m) {
    m.computeIfAbsent(1, __ -> 100); // [1]
    m.computeIfAbsent(2, __ -> 200); // [2]
}
```

```java
m.computeIfAbsent(1, __ -> 300); // [3]
```

If these two code blocks are called in two threads, `[1]->[3]->[2]` is a
possible case.

### Motivation

`SimpleLoadManagerImpl` and `ModularLoadManagerImpl` both maintain the
bundle range cache:

```java
// The 1st key is broker, the 2nd key is namespace
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
    brokerToNamespaceToBundleRange;
```

However, when accessing the `namespace -> bundle` map, it still needs a
synchronized code block:

https://github.com/apache/pulsar/blob/1c495e190b3c569e9dfd44acef2a697c93a1f771/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java#L591-L595

The code above is a composite operation of `clear()` and multiple
`computeIfAbsent` operations on the `ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>` object.

So the other place that access this map also requires the same lock even
if the operation itself is thread safe:

https://github.com/apache/pulsar/blob/1c495e190b3c569e9dfd44acef2a697c93a1f771/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java#L882-L886

P.S. `SimpleLoadManagerImpl` does not apply the synchronized block.

However, when accessing `brokerToNamespaceToBundleRange` in the static
methods of `LoadManagerShared`, they are not synchronized. So the access
on the `Map<String, Set<String>>` value is not thread safe.

### Modifications

Add a `BundleRangeCache` abstraction to provide some methods to support
required operations on the bundle range cache.
@BewareMyPower BewareMyPower force-pushed the bewaremypower/load-manager-cache branch from 27e7ac6 to a86fe70 Compare August 25, 2024 14:28
@lhotari
Copy link
Member

lhotari commented Aug 26, 2024

A concurrent hash map (no matter the ConcurrentOpenHashMap in Pulsar or the official ConcurrentHashMap) is not a synchronized hash map.

@BewareMyPower btw. not the same same, but might be useful information: Some map implementations don't have an atomic computeIfAbsent implementation, for example ConcurrentSkipListMap. Issue #21301 was about that.

@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Aug 26, 2024

Some map implementations don't have an atomic computeIfAbsent implementation

A more accurate description is that the ConcurrentSkipListMap does not guarantee if mappingFunction argument is called the computed value must be inserted to the map.

Given the following example:

Thread 1:

final var result1 = map.computeIfAbsent("key", __ -> {
    System.out.println("value1");
    return "value1";
});

Thread 2:

final var result2 = map.computeIfAbsent("key", __ -> {
    System.out.println("value2");
    return "value2";
});

There is a case that both "value1" and "value2" are printed but result1 and result2 are the same ("value1" or "value2").

However, it's allowed because the Java Language Specification only guarantees the happens-before relationship on concurrent collections, see https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly

The methods of all classes in java.util.concurrent and its subpackages extend these guarantees to higher-level synchronization. In particular:

  • Actions in a thread prior to placing an object into any concurrent collection happen-before actions subsequent to the access or removal of that element from the collection in another thread.

Let's simplify and extend the example above.

Thread 1:

final var result1 = map.computeIfAbsent("key", __ -> "value1"); // A
final var result3 = map.get("key"); // C

Thread 2:

final var result2 = map.computeIfAbsent("key", __ -> "value2"); // B
final var result4 = map.get("key"); // D

There are only two possible cases for the ConcurrentSkipListMap (and ConcurrentHashMap): value1 and value2. If it's value1, then we will have A happens-before B because the behavior is just like:

  1. A inserted "key -> value1" and succeeded
  2. A returned "value1" because it's inserted
  3. B inserted "key -> value2" and failed
  4. B returned the existing value "value1"

The "concurrent" hash map only guarantees B could not return a value other than "value1". Because the last write operation before the read operation of B is A so B could only see "value1" written by A.

Besides, the concurrent map only guarantees:

  • result3 is "value1" because A happens-before C, A reads "value1" and there is no other write operation between A and C
  • result4 is "value1" before B happens-before D, B reads "value1" and there is no other write operation between B and D

@lhotari
Copy link
Member

lhotari commented Aug 26, 2024

A more accurate description is that the ConcurrentSkipListMap does not guarantee if mappingFunction argument is called the computed value must be inserted to the map.

@BewareMyPower Yes, I agree. I mentioned it since #21301 has been an issue in the past.

@BewareMyPower
Copy link
Contributor Author

BTW, the API document in ConcurrentSkipListMap#computeIfAbsent is clear for this behavior:

     * The function
     * is <em>NOT</em> guaranteed to be applied once atomically only
     * if the value is not present.

It's required by the Map#computeIfAbsent API documentation:

     * In particular, all implementations of
     * subinterface {@link java.util.concurrent.ConcurrentMap} must document
     * whether the mapping function is applied once atomically only if the value
     * is not present. 

Here is the document for ConcurrentHashMap:

     * The supplied
     * function is invoked exactly once per invocation of this method
     * if the key is absent, else not at all.  Some attempted update
     * operations on this map by other threads may be blocked while
     * computation is in progress, so the computation should be short
     * and simple.
     *
     * <p>The mapping function must not modify this map during computation.

I believe many calls of ConcurrentHashMap#computeIfAbsent are not proper (not short and simple) or even wrong (trying to modify the map) in Pulsar, but it's another topic.

Copy link
Member

@thetumbled thetumbled left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@codecov-commenter
Copy link

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 74.54%. Comparing base (bbc6224) to head (99598f6).
Report is 548 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23217      +/-   ##
============================================
+ Coverage     73.57%   74.54%   +0.97%     
- Complexity    32624    34233    +1609     
============================================
  Files          1877     1923      +46     
  Lines        139502   144762    +5260     
  Branches      15299    15838     +539     
============================================
+ Hits         102638   107918    +5280     
+ Misses        28908    28577     -331     
- Partials       7956     8267     +311     
Flag Coverage Δ
inttests 27.67% <56.00%> (+3.08%) ⬆️
systests 24.64% <56.00%> (+0.32%) ⬆️
unittests 73.90% <100.00%> (+1.06%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...lsar/broker/loadbalance/impl/BundleRangeCache.java 100.00% <100.00%> (ø)
...sar/broker/loadbalance/impl/LoadManagerShared.java 74.15% <100.00%> (-1.57%) ⬇️
...roker/loadbalance/impl/ModularLoadManagerImpl.java 83.49% <100.00%> (-1.49%) ⬇️
...broker/loadbalance/impl/SimpleLoadManagerImpl.java 75.59% <100.00%> (+4.70%) ⬆️

... and 539 files with indirect coverage changes

@BewareMyPower BewareMyPower merged commit 325c6a5 into apache:master Aug 28, 2024
51 checks passed
@BewareMyPower BewareMyPower deleted the bewaremypower/load-manager-cache branch August 28, 2024 11:26
grssam pushed a commit to grssam/pulsar that referenced this pull request Sep 4, 2024
@lhotari lhotari added this to the 4.0.0 milestone Oct 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-not-needed Your PR changes do not impact docs ready-to-test type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants