Skip to content

Commit

Permalink
[Enhancement](multi-catalog) support short-circuit read for hive cata…
Browse files Browse the repository at this point in the history
…log.
  • Loading branch information
王翔宇 committed Aug 9, 2023
1 parent 886f5af commit e3c1ead
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,14 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

public class FederationBackendPolicy {
private static final Logger LOG = LogManager.getLogger(FederationBackendPolicy.class);
private final List<Backend> backends = Lists.newArrayList();
private final Map<String, Backend> backendMap = Maps.newHashMap();
private final Map<String, List<Backend>> backendMap = Maps.newHashMap();
private final Random random = new Random(System.currentTimeMillis());
private ConsistentHash<TScanRangeLocations, Backend> consistentHash;

Expand Down Expand Up @@ -96,9 +95,7 @@ public void init(BeSelectionPolicy policy) throws UserException {
if (backends.isEmpty()) {
throw new UserException("No available backends");
}
for (Backend backend : backends) {
backendMap.put(backend.getHost(), backend);
}
backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost)));
int virtualNumber = Math.max(Math.min(512 / backends.size(), 32), 2);
consistentHash = new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(),
new BackendHash(), backends, virtualNumber);
Expand All @@ -116,15 +113,17 @@ public Backend getNextConsistentBe(TScanRangeLocations scanRangeLocations) {

// Try to find a local BE, if not exists, use `getNextBe` instead
public Backend getNextLocalBe(List<String> hosts) {
List<Backend> candidateBackends = hosts.stream()
.map(backendMap::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(candidateBackends)) {
return getNextBe();
List<Backend> candidateBackends = Lists.newArrayListWithCapacity(hosts.size());
for (String host : hosts) {
List<Backend> backends = backendMap.get(host);
if (CollectionUtils.isNotEmpty(backends)) {
candidateBackends.add(backends.get(random.nextInt(backends.size())));
}
}
LOG.debug("Find {} BEs which belong(s) to the same node with the file split.", candidateBackends.size());
return candidateBackends.get(random.nextInt(candidateBackends.size()));

return CollectionUtils.isEmpty(candidateBackends)
? getNextBe()
: candidateBackends.get(random.nextInt(candidateBackends.size()));
}

public int numBackends() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,16 @@ public void setUp() {

SystemInfoService service = new SystemInfoService();

for (int i = 0; i < 200; i++) {
for (int i = 0; i < 190; i++) {
Backend backend = new Backend(Long.valueOf(i), "192.168.1." + i, 9050);
backend.setAlive(true);
service.addBackend(backend);
}
for (int i = 0; i < 10; i++) {
Backend backend = new Backend(Long.valueOf(190 + i), "192.168.1." + i, 9051);
backend.setAlive(true);
service.addBackend(backend);
}
for (int i = 0; i < 10; i++) {
Backend backend = new Backend(Long.valueOf(200 + i), "192.168.2." + i, 9050);
backend.setAlive(false);
Expand Down Expand Up @@ -87,7 +92,7 @@ public void testGetNextLocalBe() throws UserException {
int backendNum = 200;
int invokeTimes = 1000000;
Assertions.assertEquals(policy.numBackends(), backendNum);
List<String> localHosts = Arrays.asList("192.168.1.197", "192.168.1.198", "192.168.1.199");
List<String> localHosts = Arrays.asList("192.168.1.0", "192.168.1.1", "192.168.1.2");
Stopwatch sw = Stopwatch.createStarted();
for (int i = 0; i < invokeTimes; i++) {
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts).getHost()));
Expand Down

0 comments on commit e3c1ead

Please sign in to comment.