Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Dec 17, 2024
1 parent 485d42d commit b5f0211
Showing 1 changed file with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ protected List<AssignedJob> insideMachineParallelization(
ConnectContext context = statementContext.getConnectContext();
boolean useLocalShuffleToAddParallel = useLocalShuffleToAddParallel();
List<AssignedJob> instances = Lists.newArrayList();
int localShuffleParallelExecNum = fragment.getParallelExecNum();
for (Entry<DistributedPlanWorker, UninstancedScanSource> entry : workerToScanRanges.entrySet()) {
DistributedPlanWorker worker = entry.getKey();

Expand All @@ -86,16 +85,19 @@ protected List<AssignedJob> insideMachineParallelization(
// scan tbl1: [tablet_10001, tablet_10002, tablet_10003, tablet_10004] // no instances
// }
ScanSource scanSource = entry.getValue().scanSource;
if (useLocalShuffleToAddParallel) {
assignLocalShuffleJobs(scanSource, localShuffleParallelExecNum, instances, context, worker);
} else {
// usually, its tablets num, or buckets num
int scanSourceMaxParallel = scanSource.maxParallel(scanNodes);

// now we should compute how many instances to process the data,
// for example: two instances
int instanceNum = degreeOfParallelism(scanSourceMaxParallel);
// usually, its tablets num, or buckets num
int scanSourceMaxParallel = Math.max(scanSource.maxParallel(scanNodes), 1);
int maxParallel = useLocalShuffleToAddParallel
? Math.max(fragment.getParallelExecNum(), scanSourceMaxParallel)
: scanSourceMaxParallel;

// now we should compute how many instances to process the data,
// for example: two instances
int instanceNum = degreeOfParallelism(maxParallel);
if (useLocalShuffleToAddParallel) {
assignLocalShuffleJobs(scanSource, instanceNum, instances, context, worker);
} else {
assignedDefaultJobs(scanSource, instanceNum, instances, context, worker);
}
}
Expand Down Expand Up @@ -180,7 +182,7 @@ protected int degreeOfParallelism(int maxParallel) {
}

// the scan instance num should not larger than the tablets num
return Math.min(maxParallel, Math.max(fragment.getParallelExecNum(), 1));
return maxParallel;
}

protected List<AssignedJob> fillUpSingleEmptyInstance(DistributedPlanWorkerManager workerManager) {
Expand Down

0 comments on commit b5f0211

Please sign in to comment.