Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1a15f53
udf: replace function
Aug 13, 2020
7d072b9
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 14, 2020
391158f
udf: replace function
Aug 14, 2020
5eb52e1
udf: replace function
Aug 14, 2020
a79ea91
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 18, 2020
86f841f
udf: replace function
Aug 18, 2020
ade1afa
udf: replace function
Aug 18, 2020
1d95a49
udf: replace function
Aug 18, 2020
433eb87
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 24, 2020
c62d239
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 25, 2020
0a8db8c
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 25, 2020
02d3f86
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 1, 2020
41da0ba
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 8, 2020
de4e523
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 11, 2020
a863b0d
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 17, 2020
024c422
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 22, 2020
e1656c7
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 23, 2020
837e037
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 27, 2020
a36d3e7
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 17, 2020
43cfa2b
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 23, 2020
641efb5
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 24, 2020
5066131
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 26, 2020
dde044b
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 27, 2020
8f8c823
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 28, 2020
f7704ba
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 1, 2020
8726e7a
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 4, 2020
4dfc785
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 6, 2020
d4bd91c
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 11, 2020
2eb8ddf
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 12, 2020
5aa7afc
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 14, 2020
ce251b2
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 17, 2020
04c5899
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 18, 2020
2e614cf
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 24, 2020
ede2e00
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 25, 2020
0aed91f
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 26, 2020
bd28b73
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 30, 2020
e56e17d
Merge remote-tracking branch 'upstream/master' into str_replace
Dec 10, 2020
7d5d011
Merge remote-tracking branch 'upstream/master' into str_replace
Dec 17, 2020
57d8b4e
make buket join balance
Dec 17, 2020
aa6aba9
make buket join balance
Dec 17, 2020
b3819fb
abstract get host function
Dec 29, 2020
67e1c4f
rename function
Dec 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 34 additions & 33 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1252,12 +1252,12 @@ private void computeScanRangeAssignmentByColocate(
fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashedMap());
}
Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());

HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
for(Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) {
//fill scanRangeParamsList
List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq);
if (!bucketSeqToAddress.containsKey(bucketSeq)) {
getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq);
getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost);
}

for(TScanRangeLocations location: locations) {
Expand All @@ -1275,50 +1275,51 @@ private void computeScanRangeAssignmentByColocate(
}
}

// randomly choose a backend from the TScanRangeLocations for a certain bucket sequence.
private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq) throws Exception {
int randomLocation = new Random().nextInt(seqLocation.locations.size());
//ensure bucket sequence distribued to every host evenly
private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq,
HashMap<TNetworkAddress, Long> assignedBytesPerHost) throws Exception {
Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostPort = SimpleScheduler.getHost(seqLocation.locations.get(randomLocation).backend_id, seqLocation.locations, this.idToBackend, backendIdRef);
selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, backendIdRef);
Backend backend = this.idToBackend.get(backendIdRef.getRef());
TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort());
this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort);
}

public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations seqLocation,
HashMap<TNetworkAddress, Long> assignedBytesPerHost,
Reference<Long> backendIdRef) throws UserException {
Long minAssignedBytes = Long.MAX_VALUE;
TScanRangeLocation minLocation = null;
Long step = 1L;
for (final TScanRangeLocation location : seqLocation.getLocations()) {
Long assignedBytes = findOrInsert(assignedBytesPerHost, location.server, 0L);
if (assignedBytes < minAssignedBytes) {
minAssignedBytes = assignedBytes;
minLocation = location;
}
}
TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, seqLocation.locations, this.idToBackend, backendIdRef);
if (assignedBytesPerHost.containsKey(location.server)) {
assignedBytesPerHost.put(location.server,
assignedBytesPerHost.get(location.server) + step);
} else {
assignedBytesPerHost.put(location.server, step);
}
return location;
}

private void computeScanRangeAssignmentByScheduler(
final ScanNode scanNode,
final List<TScanRangeLocations> locations,
FragmentScanRangeAssignment assignment) throws Exception {

HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
Long step = 1L;
for (TScanRangeLocations scanRangeLocations : locations) {
// assign this scan range to the host w/ the fewest assigned bytes
Long minAssignedBytes = Long.MAX_VALUE;
TScanRangeLocation minLocation = null;
for (final TScanRangeLocation location : scanRangeLocations.getLocations()) {
Long assignedBytes = findOrInsert(assignedBytesPerHost, location.server, 0L);
if (assignedBytes < minAssignedBytes) {
minAssignedBytes = assignedBytes;
minLocation = location;
}
}
assignedBytesPerHost.put(minLocation.server,
assignedBytesPerHost.get(minLocation.server) + step);

Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostPort = SimpleScheduler.getHost(minLocation.backend_id,
scanRangeLocations.getLocations(), this.idToBackend, backendIdRef);
if (!execHostPort.hostname.equals(minLocation.server.hostname) ||
execHostPort.port != minLocation.server.port) {
assignedBytesPerHost.put(minLocation.server,
assignedBytesPerHost.get(minLocation.server) - step);
Long id = assignedBytesPerHost.get(execHostPort);
if (id == null) {
assignedBytesPerHost.put(execHostPort, 0L);
} else {
assignedBytesPerHost.put(execHostPort, id + step);
}
}
TScanRangeLocation minLocation = selectBackendsByRoundRobin(scanRangeLocations, assignedBytesPerHost, backendIdRef);
Backend backend = this.idToBackend.get(backendIdRef.getRef());
TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort());
this.addressToBackendID.put(execHostPort, backendIdRef.getRef());

Map<Integer, List<TScanRangeParams>> scanRanges = findOrInsert(assignment, execHostPort,
Expand Down
32 changes: 32 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,38 @@ public static TNetworkAddress getHost(long backendId,
backends, locations.size()));
}

public static TScanRangeLocation getLocation(TScanRangeLocation minLocation,
List<TScanRangeLocation> locations,
ImmutableMap<Long, Backend> backends,
Reference<Long> backendIdRef)
throws UserException {
if (CollectionUtils.isEmpty(locations) || backends == null || backends.isEmpty()) {
throw new UserException("scan range location or candidate backends is empty");
}
Backend backend = backends.get(minLocation.backend_id);
if (isAvailable(backend)) {
backendIdRef.setRef(minLocation.backend_id);
return minLocation;
} else {
for (TScanRangeLocation location : locations) {
if (location.backend_id == minLocation.backend_id) {
continue;
}
// choose the first alive backend(in analysis stage, the locations are random)
Backend candidateBackend = backends.get(location.backend_id);
if (isAvailable(candidateBackend)) {
backendIdRef.setRef(location.backend_id);
return location;
}
}
}

// no backend returned
throw new UserException("there is no scanNode Backend. " +
getBackendErrorMsg(locations.stream().map(l -> l.backend_id).collect(Collectors.toList()),
backends, locations.size()));
}

public static TNetworkAddress getHost(ImmutableMap<Long, Backend> backends,
Reference<Long> backendIdRef)
throws UserException {
Expand Down
61 changes: 61 additions & 0 deletions fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.Maps;
import org.apache.commons.collections.map.HashedMap;

public class CoordinatorTest extends Coordinator {
static Planner planner = new Planner();
Expand Down Expand Up @@ -469,5 +471,64 @@ public void testComputeScanRangeAssignmentByScheduler() {
}
}
}

@Test
public void testGetExecHostPortForFragmentIDAndBucketSeq() {
Coordinator coordinator = new Coordinator(context, analyzer, planner);
PlanFragmentId planFragmentId = new PlanFragmentId(1);
// each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2}
TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
tScanRangeLocation0.backend_id = 0;
tScanRangeLocation0.server = new TNetworkAddress("0.0.0.0", 9050);
TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
tScanRangeLocation1.backend_id = 1;
tScanRangeLocation1.server = new TNetworkAddress("0.0.0.1", 9050);
TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
tScanRangeLocation2.backend_id = 2;
tScanRangeLocation2.server = new TNetworkAddress("0.0.0.2", 9050);
tScanRangeLocations.locations = new ArrayList<>();
tScanRangeLocations.locations.add(tScanRangeLocation0);
tScanRangeLocations.locations.add(tScanRangeLocation1);
tScanRangeLocations.locations.add(tScanRangeLocation2);

// init all backend
Backend backend0 = new Backend(0, "0.0.0.0", 9060);
backend0.setAlive(true);
backend0.setBePort(9050);
Backend backend1 = new Backend(1, "0.0.0.1", 9060);
backend1.setAlive(true);
backend1.setBePort(9050);
Backend backend2 = new Backend(2, "0.0.0.2", 9060);
backend2.setAlive(true);
backend2.setBePort(9050);

ImmutableMap<Long, Backend> idToBackend =
new ImmutableMap.Builder<Long, Backend>().
put(0l, backend0).
put(1l, backend1).
put(2l, backend2).build();
Deencapsulation.setField(coordinator, "idToBackend", idToBackend);
Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap();
fragmentIdToSeqToAddressMap.put(planFragmentId, new HashedMap());
Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap", fragmentIdToSeqToAddressMap);
List<TScanRangeLocations> locations = new ArrayList<>();
locations.add(tScanRangeLocations);

HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq",tScanRangeLocations,
planFragmentId, 1, assignedBytesPerHost);
Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq",tScanRangeLocations,
planFragmentId, 2, assignedBytesPerHost);
Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq",tScanRangeLocations,
planFragmentId, 3, assignedBytesPerHost);
List<String> hosts = new ArrayList<>();
for (Map.Entry item:assignedBytesPerHost.entrySet()) {
Assert.assertTrue((Long)item.getValue() == 1);
TNetworkAddress addr = (TNetworkAddress)item.getKey();
hosts.add(addr.hostname);
}
Assert.assertTrue(hosts.size() == 3);
}
}