Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1a15f53
udf: replace function
Aug 13, 2020
7d072b9
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 14, 2020
391158f
udf: replace function
Aug 14, 2020
5eb52e1
udf: replace function
Aug 14, 2020
a79ea91
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 18, 2020
86f841f
udf: replace function
Aug 18, 2020
ade1afa
udf: replace function
Aug 18, 2020
1d95a49
udf: replace function
Aug 18, 2020
433eb87
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 24, 2020
c62d239
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 25, 2020
0a8db8c
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 25, 2020
02d3f86
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 1, 2020
41da0ba
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 8, 2020
de4e523
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 11, 2020
a863b0d
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 17, 2020
024c422
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 22, 2020
e1656c7
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 23, 2020
837e037
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 27, 2020
a36d3e7
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 17, 2020
43cfa2b
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 23, 2020
641efb5
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 24, 2020
5066131
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 26, 2020
dde044b
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 27, 2020
8f8c823
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 28, 2020
f7704ba
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 1, 2020
8726e7a
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 4, 2020
4dfc785
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 6, 2020
d4bd91c
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 11, 2020
2eb8ddf
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 12, 2020
5aa7afc
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 14, 2020
ce251b2
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 17, 2020
e73b53e
update
Nov 17, 2020
399aefa
optimize tablet select strategy
Nov 17, 2020
fb7bfa7
update
Nov 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,17 @@ private void computeScanRangeAssignmentByScheduler(
Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostPort = SimpleScheduler.getHost(minLocation.backend_id,
scanRangeLocations.getLocations(), this.idToBackend, backendIdRef);
if (!execHostPort.hostname.equals(minLocation.server.hostname) ||
execHostPort.port != minLocation.server.port) {
assignedBytesPerHost.put(minLocation.server,
assignedBytesPerHost.get(minLocation.server) - scanRangeLength);
Long id = assignedBytesPerHost.get(execHostPort);
if (id == null) {
assignedBytesPerHost.put(execHostPort, 0L);
} else {
assignedBytesPerHost.put(execHostPort, id+scanRangeLength);
}
}
this.addressToBackendID.put(execHostPort, backendIdRef.getRef());

Map<Integer, List<TScanRangeParams>> scanRanges = findOrInsert(assignment, execHostPort,
Expand Down
82 changes: 82 additions & 0 deletions fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -387,5 +387,87 @@ public void testComputeBucketShuffleJoinInstanceParam() {
Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 5, params);
Assert.assertEquals(3, params.instanceExecParams.size());
}

@Test
public void testComputeScanRangeAssignmentByScheduler() {
Coordinator coordinator = new Coordinator(context, analyzer, planner);
PlanFragmentId planFragmentId = new PlanFragmentId(1);
int scanNodeId = 1;
Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new HashMap<>();
fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>());
fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId);

TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
OlapTable olapTable = new OlapTable();
HashDistributionInfo hashDistributionInfo = new HashDistributionInfo(66, new ArrayList<>());
Deencapsulation.setField(olapTable, "defaultDistributionInfo", hashDistributionInfo);
tupleDescriptor.setTable(olapTable);

OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(scanNodeId), tupleDescriptor, "test");
// each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2}
TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
tScanRangeLocation0.backend_id = 0;
tScanRangeLocation0.server = new TNetworkAddress("0.0.0.0", 9050);
TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
tScanRangeLocation1.backend_id = 1;
tScanRangeLocation1.server = new TNetworkAddress("0.0.0.1", 9050);
TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
tScanRangeLocation2.backend_id = 2;
tScanRangeLocation2.server = new TNetworkAddress("0.0.0.2", 9050);
tScanRangeLocations.locations = new ArrayList<>();
tScanRangeLocations.locations.add(tScanRangeLocation0);
tScanRangeLocations.locations.add(tScanRangeLocation1);
tScanRangeLocations.locations.add(tScanRangeLocation2);

TScanRangeLocations tScanRangeLocations1 = new TScanRangeLocations();
TScanRangeLocation tScanRangeLocation3 = new TScanRangeLocation();
tScanRangeLocation3.backend_id = 0;
tScanRangeLocation3.server = new TNetworkAddress("0.0.0.0", 9050);
TScanRangeLocation tScanRangeLocation4 = new TScanRangeLocation();
tScanRangeLocation4.backend_id = 1;
tScanRangeLocation4.server = new TNetworkAddress("0.0.0.1", 9050);
TScanRangeLocation tScanRangeLocation5 = new TScanRangeLocation();
tScanRangeLocation5.backend_id = 2;
tScanRangeLocation5.server = new TNetworkAddress("0.0.0.2", 9050);
tScanRangeLocations1.locations = new ArrayList<>();
tScanRangeLocations1.locations.add(tScanRangeLocation3);
tScanRangeLocations1.locations.add(tScanRangeLocation4);
tScanRangeLocations1.locations.add(tScanRangeLocation5);

olapScanNode.setFragment(new PlanFragment(planFragmentId, olapScanNode,
new DataPartition(TPartitionType.UNPARTITIONED)));

// init all backend
Backend backend0 = new Backend(0, "0.0.0.0", 9060);
backend0.setAlive(false);
backend0.setBePort(9050);
Backend backend1 = new Backend(1, "0.0.0.1", 9060);
backend1.setAlive(true);
backend1.setBePort(9050);
Backend backend2 = new Backend(2, "0.0.0.2", 9060);
backend2.setAlive(true);
backend2.setBePort(9050);

ImmutableMap<Long, Backend> idToBackend =
new ImmutableMap.Builder<Long, Backend>().
put(0l, backend0).
put(1l, backend1).
put(2l, backend2).build();
Deencapsulation.setField(coordinator, "idToBackend", idToBackend);
FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment();
List<TScanRangeLocations> locations = new ArrayList<>();
locations.add(tScanRangeLocations);
locations.add(tScanRangeLocations1);
Deencapsulation.invoke(coordinator, "computeScanRangeAssignmentByScheduler",
olapScanNode, locations, assignment);
for (Map.Entry entry:assignment.entrySet()) {
Map<Integer, List<TScanRangeParams>> addr = (HashMap<Integer, List<TScanRangeParams>>) entry.getValue();
for (Map.Entry item:addr.entrySet()) {
List<TScanRangeParams> params = (List<TScanRangeParams>) item.getValue();
Assert.assertTrue(params.size() == 2);
}
}
}
}