diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 01da5c6dd71449..09e38734752857 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import org.apache.commons.collections.map.HashedMap; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.catalog.Catalog; @@ -977,7 +978,9 @@ private void computeFragmentHosts() throws Exception { } //for ColocateJoin fragment - if (bucketSeqToAddress.size() > 0 && isColocateJoin(fragment.getPlanRoot())) { + if (isColocateJoin(fragment.getPlanRoot()) && fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId()) + && fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0) { + Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()); for (Map.Entry>> scanRanges : bucketSeqToScanRange.entrySet()) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, bucketSeqToAddress.get(scanRanges.getKey()), 0, params); @@ -1127,11 +1130,16 @@ private void computeScanRangeAssignmentByColocate( final OlapScanNode scanNode, FragmentScanRangeAssignment assignment) throws Exception { + if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) { + fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashedMap()); + } + Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); + for(Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) { //fill scanRangeParamsList List locations = scanNode.bucketSeq2locations.get(bucketSeq); if (!bucketSeqToAddress.containsKey(bucketSeq)) { - getExecHostPortForBucketSeq(locations.get(0), bucketSeq); + getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq); } for(TScanRangeLocations location: locations) { @@ -1150,7 +1158,7 @@ private void computeScanRangeAssignmentByColocate( } // randomly choose a backend from the TScanRangeLocations for a certain bucket sequence. - private void getExecHostPortForBucketSeq(TScanRangeLocations seqLocation, Integer bucketSeq) throws Exception { + private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq) throws Exception { int randomLocation = new Random().nextInt(seqLocation.locations.size()); Reference backendIdRef = new Reference(); TNetworkAddress execHostPort = SimpleScheduler.getHost(seqLocation.locations.get(randomLocation).backend_id, seqLocation.locations, this.idToBackend, backendIdRef); @@ -1158,7 +1166,7 @@ private void getExecHostPortForBucketSeq(TScanRangeLocations seqLocation, Intege throw new UserException("there is no scanNode Backend"); } this.addressToBackendID.put(execHostPort, backendIdRef.getRef()); - this.bucketSeqToAddress.put(bucketSeq, execHostPort); + this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort); } private void computeScanRangeAssignmentByScheduler( @@ -1349,7 +1357,7 @@ class BucketSeqToScanRange extends HashMap bucketSeqToAddress = Maps.newHashMap(); + private Map> fragmentIdToSeqToAddressMap = Maps.newHashMap(); private Set colocateFragmentIds = new HashSet<>(); // record backend execute state