Skip to content

Commit

Permalink
Change synchronized on FileSystemContext to avoid deadlock
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Please outline the changes and how this PR fixes the issue.

### Why are the changes needed?

Please clarify why the changes are needed. For instance,
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, describe the bug.

### Does this PR introduce any user facing changes?

Please list the user-facing changes introduced by your change, including
1. change in user-facing APIs
2. addition or removal of property keys
3. webui

pr-link: #16219
change-id: cid-f6e6af4556026e122b2e672fad6ab87a0a9f507e
  • Loading branch information
elega authored and yuzhu committed Nov 27, 2024
1 parent 122a77c commit 36f7e7e
Showing 1 changed file with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -156,11 +157,11 @@ public class FileSystemContext implements Closeable {
private boolean mUriValidationEnabled = true;

/** Cached map for workers. */
@GuardedBy("this")
private volatile List<BlockWorkerInfo> mWorkerInfoList = null;
@GuardedBy("mWorkerInfoList")
private final AtomicReference<List<BlockWorkerInfo>> mWorkerInfoList = new AtomicReference<>();

/** The policy to refresh workers list. */
@GuardedBy("this")
@GuardedBy("mWorkerInfoList")
private final RefreshPolicy mWorkerRefreshPolicy;

/**
Expand Down Expand Up @@ -632,11 +633,14 @@ public synchronized WorkerNetAddress getNodeLocalWorker() throws IOException {
*
* @return the info of all block workers eligible for reads and writes
*/
public synchronized List<BlockWorkerInfo> getCachedWorkers() throws IOException {
if (mWorkerInfoList == null || mWorkerInfoList.isEmpty() || mWorkerRefreshPolicy.attempt()) {
mWorkerInfoList = getAllWorkers();
public List<BlockWorkerInfo> getCachedWorkers() throws IOException {
synchronized (mWorkerInfoList) {
if (mWorkerInfoList.get() == null || mWorkerInfoList.get().isEmpty()
|| mWorkerRefreshPolicy.attempt()) {
mWorkerInfoList.set(getAllWorkers());
}
return mWorkerInfoList.get();
}
return mWorkerInfoList;
}

/**
Expand Down

0 comments on commit 36f7e7e

Please sign in to comment.