Skip to content

Commit

Permalink
Randomly distribute spooling directories across buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
linzebing committed Sep 5, 2022
1 parent eb548b0 commit 9d57b98
Showing 1 changed file with 3 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class FileSystemExchange
private final long exchangeSourceHandleTargetDataSizeInBytes;
private final ExecutorService executor;

private final Map<Integer, String> randomizedPrefixes = new ConcurrentHashMap<>();
private final Map<Integer, URI> outputDirectories = new ConcurrentHashMap<>();

@GuardedBy("this")
private final Set<Integer> allSinks = new HashSet<>();
Expand Down Expand Up @@ -270,12 +270,10 @@ private ListenableFuture<Multimap<Integer, FileStatus>> getCommittedPartitions(i

private URI getTaskOutputDirectory(int taskPartitionId)
{
URI baseDirectory = baseDirectories.get(taskPartitionId % baseDirectories.size());
String randomizedHexPrefix = randomizedPrefixes.computeIfAbsent(taskPartitionId, ignored -> generateRandomizedHexPrefix());

// Add a randomized prefix to evenly distribute data into different S3 shards
// Data output file path format: {randomizedHexPrefix}.{queryId}.{stageId}.{sinkPartitionId}/{attemptId}/{sourcePartitionId}_{splitId}.data
return baseDirectory.resolve(randomizedHexPrefix + "." + exchangeContext.getQueryId() + "." + exchangeContext.getExchangeId() + "." + taskPartitionId + PATH_SEPARATOR);
return outputDirectories.computeIfAbsent(taskPartitionId, ignored -> baseDirectories.get(ThreadLocalRandom.current().nextInt(baseDirectories.size()))
.resolve(generateRandomizedHexPrefix() + "." + exchangeContext.getQueryId() + "." + exchangeContext.getExchangeId() + "." + taskPartitionId + PATH_SEPARATOR));
}

@Override
Expand Down

0 comments on commit 9d57b98

Please sign in to comment.