diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 91ec701c5c1e70..2cdffaaf6d09ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1252,12 +1252,12 @@ private void computeScanRangeAssignmentByColocate( fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashedMap()); } Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); - + HashMap assignedBytesPerHost = Maps.newHashMap(); for(Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) { //fill scanRangeParamsList List locations = scanNode.bucketSeq2locations.get(bucketSeq); if (!bucketSeqToAddress.containsKey(bucketSeq)) { - getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq); + getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost); } for(TScanRangeLocations location: locations) { @@ -1275,50 +1275,51 @@ private void computeScanRangeAssignmentByColocate( } } - // randomly choose a backend from the TScanRangeLocations for a certain bucket sequence. - private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq) throws Exception { - int randomLocation = new Random().nextInt(seqLocation.locations.size()); + //ensure bucket sequence distribued to every host evenly + private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq, + HashMap assignedBytesPerHost) throws Exception { Reference backendIdRef = new Reference(); - TNetworkAddress execHostPort = SimpleScheduler.getHost(seqLocation.locations.get(randomLocation).backend_id, seqLocation.locations, this.idToBackend, backendIdRef); + selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, backendIdRef); + Backend backend = this.idToBackend.get(backendIdRef.getRef()); + TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort()); this.addressToBackendID.put(execHostPort, backendIdRef.getRef()); this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort); } + public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations seqLocation, + HashMap assignedBytesPerHost, + Reference backendIdRef) throws UserException { + Long minAssignedBytes = Long.MAX_VALUE; + TScanRangeLocation minLocation = null; + Long step = 1L; + for (final TScanRangeLocation location : seqLocation.getLocations()) { + Long assignedBytes = findOrInsert(assignedBytesPerHost, location.server, 0L); + if (assignedBytes < minAssignedBytes) { + minAssignedBytes = assignedBytes; + minLocation = location; + } + } + TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, seqLocation.locations, this.idToBackend, backendIdRef); + if (assignedBytesPerHost.containsKey(location.server)) { + assignedBytesPerHost.put(location.server, + assignedBytesPerHost.get(location.server) + step); + } else { + assignedBytesPerHost.put(location.server, step); + } + return location; + } + private void computeScanRangeAssignmentByScheduler( final ScanNode scanNode, final List locations, FragmentScanRangeAssignment assignment) throws Exception { HashMap assignedBytesPerHost = Maps.newHashMap(); - Long step = 1L; for (TScanRangeLocations scanRangeLocations : locations) { - // assign this scan range to the host w/ the fewest assigned bytes - Long minAssignedBytes = Long.MAX_VALUE; - TScanRangeLocation minLocation = null; - for (final TScanRangeLocation location : scanRangeLocations.getLocations()) { - Long assignedBytes = findOrInsert(assignedBytesPerHost, location.server, 0L); - if (assignedBytes < minAssignedBytes) { - minAssignedBytes = assignedBytes; - minLocation = location; - } - } - assignedBytesPerHost.put(minLocation.server, - assignedBytesPerHost.get(minLocation.server) + step); - Reference backendIdRef = new Reference(); - TNetworkAddress execHostPort = SimpleScheduler.getHost(minLocation.backend_id, - scanRangeLocations.getLocations(), this.idToBackend, backendIdRef); - if (!execHostPort.hostname.equals(minLocation.server.hostname) || - execHostPort.port != minLocation.server.port) { - assignedBytesPerHost.put(minLocation.server, - assignedBytesPerHost.get(minLocation.server) - step); - Long id = assignedBytesPerHost.get(execHostPort); - if (id == null) { - assignedBytesPerHost.put(execHostPort, 0L); - } else { - assignedBytesPerHost.put(execHostPort, id + step); - } - } + TScanRangeLocation minLocation = selectBackendsByRoundRobin(scanRangeLocations, assignedBytesPerHost, backendIdRef); + Backend backend = this.idToBackend.get(backendIdRef.getRef()); + TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort()); this.addressToBackendID.put(execHostPort, backendIdRef.getRef()); Map> scanRanges = findOrInsert(assignment, execHostPort, diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java index d2c88fe7c59725..3943a13291cb29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java @@ -92,6 +92,38 @@ public static TNetworkAddress getHost(long backendId, backends, locations.size())); } + public static TScanRangeLocation getLocation(TScanRangeLocation minLocation, + List locations, + ImmutableMap backends, + Reference backendIdRef) + throws UserException { + if (CollectionUtils.isEmpty(locations) || backends == null || backends.isEmpty()) { + throw new UserException("scan range location or candidate backends is empty"); + } + Backend backend = backends.get(minLocation.backend_id); + if (isAvailable(backend)) { + backendIdRef.setRef(minLocation.backend_id); + return minLocation; + } else { + for (TScanRangeLocation location : locations) { + if (location.backend_id == minLocation.backend_id) { + continue; + } + // choose the first alive backend(in analysis stage, the locations are random) + Backend candidateBackend = backends.get(location.backend_id); + if (isAvailable(candidateBackend)) { + backendIdRef.setRef(location.backend_id); + return location; + } + } + } + + // no backend returned + throw new UserException("there is no scanNode Backend. " + + getBackendErrorMsg(locations.stream().map(l -> l.backend_id).collect(Collectors.toList()), + backends, locations.size())); + } + public static TNetworkAddress getHost(ImmutableMap backends, Reference backendIdRef) throws UserException { diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index b2f780f6794d75..35efae288aa887 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -59,6 +59,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.collect.Maps; +import org.apache.commons.collections.map.HashedMap; public class CoordinatorTest extends Coordinator { static Planner planner = new Planner(); @@ -469,5 +471,64 @@ public void testComputeScanRangeAssignmentByScheduler() { } } } + + @Test + public void testGetExecHostPortForFragmentIDAndBucketSeq() { + Coordinator coordinator = new Coordinator(context, analyzer, planner); + PlanFragmentId planFragmentId = new PlanFragmentId(1); + // each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2} + TScanRangeLocations tScanRangeLocations = new TScanRangeLocations(); + TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation(); + tScanRangeLocation0.backend_id = 0; + tScanRangeLocation0.server = new TNetworkAddress("0.0.0.0", 9050); + TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation(); + tScanRangeLocation1.backend_id = 1; + tScanRangeLocation1.server = new TNetworkAddress("0.0.0.1", 9050); + TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation(); + tScanRangeLocation2.backend_id = 2; + tScanRangeLocation2.server = new TNetworkAddress("0.0.0.2", 9050); + tScanRangeLocations.locations = new ArrayList<>(); + tScanRangeLocations.locations.add(tScanRangeLocation0); + tScanRangeLocations.locations.add(tScanRangeLocation1); + tScanRangeLocations.locations.add(tScanRangeLocation2); + + // init all backend + Backend backend0 = new Backend(0, "0.0.0.0", 9060); + backend0.setAlive(true); + backend0.setBePort(9050); + Backend backend1 = new Backend(1, "0.0.0.1", 9060); + backend1.setAlive(true); + backend1.setBePort(9050); + Backend backend2 = new Backend(2, "0.0.0.2", 9060); + backend2.setAlive(true); + backend2.setBePort(9050); + + ImmutableMap idToBackend = + new ImmutableMap.Builder(). + put(0l, backend0). + put(1l, backend1). + put(2l, backend2).build(); + Deencapsulation.setField(coordinator, "idToBackend", idToBackend); + Map> fragmentIdToSeqToAddressMap = Maps.newHashMap(); + fragmentIdToSeqToAddressMap.put(planFragmentId, new HashedMap()); + Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap", fragmentIdToSeqToAddressMap); + List locations = new ArrayList<>(); + locations.add(tScanRangeLocations); + + HashMap assignedBytesPerHost = Maps.newHashMap(); + Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq",tScanRangeLocations, + planFragmentId, 1, assignedBytesPerHost); + Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq",tScanRangeLocations, + planFragmentId, 2, assignedBytesPerHost); + Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq",tScanRangeLocations, + planFragmentId, 3, assignedBytesPerHost); + List hosts = new ArrayList<>(); + for (Map.Entry item:assignedBytesPerHost.entrySet()) { + Assert.assertTrue((Long)item.getValue() == 1); + TNetworkAddress addr = (TNetworkAddress)item.getKey(); + hosts.add(addr.hostname); + } + Assert.assertTrue(hosts.size() == 3); + } }