Skip to content

Commit

Permalink
[Enhancement](multi-catalog) support short-circuit read for hive cata…
Browse files Browse the repository at this point in the history
…log.
  • Loading branch information
王翔宇 committed Aug 8, 2023
1 parent 5ea66c0 commit 3d6e8ca
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ public void createScanRangeLocations() throws UserException {
params.setProperties(locationProperties);
}

boolean enableSqlCache = ConnectContext.get().getSessionVariable().enableFileCache;
boolean enableShortCircuitRead = HdfsResource.enableShortCircuitRead(locationProperties);
List<String> pathPartitionKeys = getPathPartitionKeys();
for (Split split : inputSplits) {
FileSplit fileSplit = (FileSplit) split;
Expand Down Expand Up @@ -313,6 +315,7 @@ public void createScanRangeLocations() throws UserException {
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}

// external data lake table
if (fileSplit instanceof IcebergSplit) {
// TODO: extract all data lake split to factory
Expand All @@ -323,14 +326,19 @@ public void createScanRangeLocations() throws UserException {
HudiScanNode.setHudiParams(rangeDesc, (HudiSplit) fileSplit);
}

Backend selectedBackend;
if (enableSqlCache) {
// Use consistent hash to assign the same scan range into the same backend among different queries
selectedBackend = backendPolicy.getNextConsistentBe(curLocations);
} else if (enableShortCircuitRead) {
// Try to find a local BE if enable hdfs short circuit read
selectedBackend = backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()));
} else {
selectedBackend = backendPolicy.getNextBe();
}

curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TScanRangeLocation location = new TScanRangeLocation();
// Use consistent hash to assign the same scan range into the same backend among different queries
Backend selectedBackend = ConnectContext.get().getSessionVariable().enableFileCache
? backendPolicy.getNextConsistentBe(curLocations)
: (HdfsResource.enableShortCircuitRead(locationProperties)
? backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()))
: backendPolicy.getNextBe());
location.setBackendId(selectedBackend.getId());
location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort()));
curLocations.addToLocations(location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;

import com.google.common.base.Stopwatch;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
Expand All @@ -32,30 +33,27 @@

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class FederationBackendPolicyTest {
@Mocked
private Env env;

@Before
public void setUp() {
Backend backend1 = new Backend(1L, "192.168.1.1", 9050);
backend1.setAlive(true);

Backend backend2 = new Backend(2L, "192.168.1.2", 9050);
backend2.setAlive(true);

Backend backend3 = new Backend(3L, "192.168.1.3", 9050);
backend3.setAlive(true);

Backend backend4 = new Backend(4L, "192.168.1.4", 9050);
backend4.setAlive(false);

SystemInfoService service = new SystemInfoService();
service.addBackend(backend1);
service.addBackend(backend2);
service.addBackend(backend3);
service.addBackend(backend4);

for (int i = 0; i < 200; i++) {
Backend backend = new Backend(Long.valueOf(i), "192.168.1." + i, 9050);
backend.setAlive(true);
service.addBackend(backend);
}
for (int i = 0; i < 10; i++) {
Backend backend = new Backend(Long.valueOf(200 + i), "192.168.2." + i, 9050);
backend.setAlive(false);
service.addBackend(backend);
}

new MockUp<Env>() {
@Mock
Expand All @@ -70,20 +68,32 @@ public SystemInfoService getCurrentSystemInfo() {
public void testGetNextBe() throws UserException {
FederationBackendPolicy policy = new FederationBackendPolicy();
policy.init();
Assertions.assertTrue(policy.numBackends() == 3);
for (int i = 0; i < policy.numBackends(); i++) {
Assertions.assertNotEquals(policy.getNextBe().getHost(), "192.168.1.4");
int backendNum = 200;
int invokeTimes = 10000;
Assertions.assertEquals(policy.numBackends(), backendNum);
Stopwatch sw = Stopwatch.createStarted();
for (int i = 0; i < invokeTimes; i++) {
Assertions.assertFalse(policy.getNextBe().getHost().contains("192.168.2."));
}
sw.stop();
System.out.println("Invoke getNextBe() " + invokeTimes
+ " times cost [" + sw.elapsed(TimeUnit.SECONDS) + "] seconds");
}

@Test
public void testGetNextLocalBe() throws UserException {
FederationBackendPolicy policy = new FederationBackendPolicy();
policy.init();
Assertions.assertTrue(policy.numBackends() == 3);
List<String> localHosts = Arrays.asList("192.168.1.3");
for (int i = 0; i < 100; i++) {
Assertions.assertEquals(policy.getNextLocalBe(localHosts).getHost(), "192.168.1.3");
int backendNum = 200;
int invokeTimes = 10000;
Assertions.assertEquals(policy.numBackends(), backendNum);
List<String> localHosts = Arrays.asList("192.168.1.197", "192.168.1.198", "192.168.1.199");
Stopwatch sw = Stopwatch.createStarted();
for (int i = 0; i < invokeTimes; i++) {
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts).getHost()));
}
sw.stop();
System.out.println("Invoke getNextLocalBe() " + invokeTimes
+ " times cost [" + sw.elapsed(TimeUnit.SECONDS) + "] seconds");
}
}

0 comments on commit 3d6e8ca

Please sign in to comment.