diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 5f0cf0b8a406a2..bab0a55c08dc96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -272,6 +272,8 @@ public void createScanRangeLocations() throws UserException { params.setProperties(locationProperties); } + boolean enableSqlCache = ConnectContext.get().getSessionVariable().enableFileCache; + boolean enableShortCircuitRead = HdfsResource.enableShortCircuitRead(locationProperties); List pathPartitionKeys = getPathPartitionKeys(); for (Split split : inputSplits) { FileSplit fileSplit = (FileSplit) split; @@ -313,6 +315,7 @@ public void createScanRangeLocations() throws UserException { tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc); rangeDesc.setTableFormatParams(tableFormatFileDesc); } + // external data lake table if (fileSplit instanceof IcebergSplit) { // TODO: extract all data lake split to factory @@ -323,14 +326,19 @@ public void createScanRangeLocations() throws UserException { HudiScanNode.setHudiParams(rangeDesc, (HudiSplit) fileSplit); } + Backend selectedBackend; + if (enableSqlCache) { + // Use consistent hash to assign the same scan range into the same backend among different queries + selectedBackend = backendPolicy.getNextConsistentBe(curLocations); + } else if (enableShortCircuitRead) { + // Try to find a local BE if enable hdfs short circuit read + selectedBackend = backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts())); + } else { + selectedBackend = backendPolicy.getNextBe(); + } + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); TScanRangeLocation location = new TScanRangeLocation(); - // Use consistent hash to assign the same scan range into the same backend among different queries - Backend selectedBackend = ConnectContext.get().getSessionVariable().enableFileCache - ? backendPolicy.getNextConsistentBe(curLocations) - : (HdfsResource.enableShortCircuitRead(locationProperties) - ? backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts())) - : backendPolicy.getNextBe()); location.setBackendId(selectedBackend.getId()); location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); curLocations.addToLocations(location); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java index 2ee5924f686837..0bb60733691c0f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java @@ -23,6 +23,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import com.google.common.base.Stopwatch; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; @@ -32,6 +33,7 @@ import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; public class FederationBackendPolicyTest { @Mocked @@ -39,23 +41,19 @@ public class FederationBackendPolicyTest { @Before public void setUp() { - Backend backend1 = new Backend(1L, "192.168.1.1", 9050); - backend1.setAlive(true); - - Backend backend2 = new Backend(2L, "192.168.1.2", 9050); - backend2.setAlive(true); - - Backend backend3 = new Backend(3L, "192.168.1.3", 9050); - backend3.setAlive(true); - - Backend backend4 = new Backend(4L, "192.168.1.4", 9050); - backend4.setAlive(false); SystemInfoService service = new SystemInfoService(); - service.addBackend(backend1); - service.addBackend(backend2); - service.addBackend(backend3); - service.addBackend(backend4); + + for (int i = 0; i < 200; i++) { + Backend backend = new Backend(Long.valueOf(i), "192.168.1." + i, 9050); + backend.setAlive(true); + service.addBackend(backend); + } + for (int i = 0; i < 10; i++) { + Backend backend = new Backend(Long.valueOf(200 + i), "192.168.2." + i, 9050); + backend.setAlive(false); + service.addBackend(backend); + } new MockUp() { @Mock @@ -70,20 +68,32 @@ public SystemInfoService getCurrentSystemInfo() { public void testGetNextBe() throws UserException { FederationBackendPolicy policy = new FederationBackendPolicy(); policy.init(); - Assertions.assertTrue(policy.numBackends() == 3); - for (int i = 0; i < policy.numBackends(); i++) { - Assertions.assertNotEquals(policy.getNextBe().getHost(), "192.168.1.4"); + int backendNum = 200; + int invokeTimes = 10000; + Assertions.assertEquals(policy.numBackends(), backendNum); + Stopwatch sw = Stopwatch.createStarted(); + for (int i = 0; i < invokeTimes; i++) { + Assertions.assertFalse(policy.getNextBe().getHost().contains("192.168.2.")); } + sw.stop(); + System.out.println("Invoke getNextBe() " + invokeTimes + + " times cost [" + sw.elapsed(TimeUnit.SECONDS) + "] seconds"); } @Test public void testGetNextLocalBe() throws UserException { FederationBackendPolicy policy = new FederationBackendPolicy(); policy.init(); - Assertions.assertTrue(policy.numBackends() == 3); - List localHosts = Arrays.asList("192.168.1.3"); - for (int i = 0; i < 100; i++) { - Assertions.assertEquals(policy.getNextLocalBe(localHosts).getHost(), "192.168.1.3"); + int backendNum = 200; + int invokeTimes = 10000; + Assertions.assertEquals(policy.numBackends(), backendNum); + List localHosts = Arrays.asList("192.168.1.197", "192.168.1.198", "192.168.1.199"); + Stopwatch sw = Stopwatch.createStarted(); + for (int i = 0; i < invokeTimes; i++) { + Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts).getHost())); } + sw.stop(); + System.out.println("Invoke getNextLocalBe() " + invokeTimes + + " times cost [" + sw.elapsed(TimeUnit.SECONDS) + "] seconds"); } }