Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.qe;

import org.apache.commons.collections.map.HashedMap;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.catalog.Catalog;
Expand Down Expand Up @@ -977,7 +978,9 @@ private void computeFragmentHosts() throws Exception {
}

//for ColocateJoin fragment
if (bucketSeqToAddress.size() > 0 && isColocateJoin(fragment.getPlanRoot())) {
if (isColocateJoin(fragment.getPlanRoot()) && fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId())
&& fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0) {
Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragment.getFragmentId());
for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> scanRanges : bucketSeqToScanRange.entrySet()) {
FInstanceExecParam instanceParam = new FInstanceExecParam(null, bucketSeqToAddress.get(scanRanges.getKey()), 0, params);

Expand Down Expand Up @@ -1127,11 +1130,16 @@ private void computeScanRangeAssignmentByColocate(
final OlapScanNode scanNode,
FragmentScanRangeAssignment assignment) throws Exception {

if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashedMap());
}
Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());

for(Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) {
//fill scanRangeParamsList
List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq);
if (!bucketSeqToAddress.containsKey(bucketSeq)) {
getExecHostPortForBucketSeq(locations.get(0), bucketSeq);
getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq);
}

for(TScanRangeLocations location: locations) {
Expand All @@ -1150,15 +1158,15 @@ private void computeScanRangeAssignmentByColocate(
}

// randomly choose a backend from the TScanRangeLocations for a certain bucket sequence.
private void getExecHostPortForBucketSeq(TScanRangeLocations seqLocation, Integer bucketSeq) throws Exception {
private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq) throws Exception {
int randomLocation = new Random().nextInt(seqLocation.locations.size());
Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostPort = SimpleScheduler.getHost(seqLocation.locations.get(randomLocation).backend_id, seqLocation.locations, this.idToBackend, backendIdRef);
if (execHostPort == null) {
throw new UserException("there is no scanNode Backend");
}
this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
this.bucketSeqToAddress.put(bucketSeq, execHostPort);
this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort);
}

private void computeScanRangeAssignmentByScheduler(
Expand Down Expand Up @@ -1349,7 +1357,7 @@ class BucketSeqToScanRange extends HashMap<Integer, Map<Integer, List<TScanRange
}

private BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
private Map<Integer, TNetworkAddress> bucketSeqToAddress = Maps.newHashMap();
private Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap();
private Set<Integer> colocateFragmentIds = new HashSet<>();

// record backend execute state
Expand Down