Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class ShuffleTaskManager {
private ShuffleBufferManager shuffleBufferManager;
private Map<String, Long> appIds = Maps.newConcurrentMap();
// appId -> shuffleId -> commit count
private Map<String, Map<Long, AtomicInteger>> commitCounts = Maps.newConcurrentMap();
private Map<String, Map<Integer, AtomicInteger>> commitCounts = Maps.newConcurrentMap();
private Map<String, Map<Integer, Object>> commitLocks = Maps.newConcurrentMap();
// appId -> shuffleId -> blockIds
private Map<String, Map<Integer, Roaring64NavigableMap>> cachedBlockIds = Maps.newConcurrentMap();
Expand Down Expand Up @@ -229,9 +229,9 @@ public void addFinishedBlockIds(
}
}

public int updateAndGetCommitCount(String appId, long shuffleId) {
public int updateAndGetCommitCount(String appId, int shuffleId) {
commitCounts.putIfAbsent(appId, Maps.newConcurrentMap());
Map<Long, AtomicInteger> shuffleCommit = commitCounts.get(appId);
Map<Integer, AtomicInteger> shuffleCommit = commitCounts.get(appId);
shuffleCommit.putIfAbsent(shuffleId, new AtomicInteger(0));
AtomicInteger commitNum = shuffleCommit.get(shuffleId);
return commitNum.incrementAndGet();
Expand Down