Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@ public List<List<PhysicalProperties>> visitPhysicalFilter(PhysicalFilter<? exten
DistributionSpec distributionSpec = originChildrenProperties.get(0).getDistributionSpec();
// process must shuffle
if (distributionSpec instanceof DistributionSpecMustShuffle) {
Plan child = filter.child();
Plan realChild = getChildPhysicalPlan(child);
Plan realChild = children.get(0).getPlan();
if (realChild instanceof PhysicalProject
|| realChild instanceof PhysicalFilter
|| realChild instanceof PhysicalLimit) {
Expand Down Expand Up @@ -320,19 +319,6 @@ private boolean isBucketShuffleDownGrade(Plan oneSidePlan, DistributionSpecHash
}
}

private Plan getChildPhysicalPlan(Plan plan) {
if (!(plan instanceof GroupPlan)) {
return null;
}
GroupPlan groupPlan = (GroupPlan) plan;
if (groupPlan == null || groupPlan.getGroup() == null
|| groupPlan.getGroup().getPhysicalExpressions().isEmpty()) {
return null;
} else {
return groupPlan.getGroup().getPhysicalExpressions().get(0).getPlan();
}
}

private PhysicalOlapScan findDownGradeBucketShuffleCandidate(GroupPlan groupPlan) {
if (groupPlan == null || groupPlan.getGroup() == null
|| groupPlan.getGroup().getPhysicalExpressions().isEmpty()) {
Expand Down Expand Up @@ -602,8 +588,7 @@ public List<List<PhysicalProperties>> visitPhysicalProject(PhysicalProject<? ext
DistributionSpec distributionSpec = originChildrenProperties.get(0).getDistributionSpec();
// process must shuffle
if (distributionSpec instanceof DistributionSpecMustShuffle) {
Plan child = project.child();
Plan realChild = getChildPhysicalPlan(child);
Plan realChild = children.get(0).getPlan();
if (realChild instanceof PhysicalLimit) {
visit(project, context);
} else if (realChild instanceof PhysicalProject) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,44 +47,29 @@

public class ChildrenPropertiesRegulatorTest {

private List<GroupExpression> children;
private JobContext mockedJobContext;
private List<PhysicalProperties> originOutputChildrenProperties
= Lists.newArrayList(PhysicalProperties.MUST_SHUFFLE);

@BeforeEach
public void setUp() {
Group childGroup = Mockito.mock(Group.class);
Mockito.when(childGroup.getLogicalProperties()).thenReturn(Mockito.mock(LogicalProperties.class));
GroupExpression child = Mockito.mock(GroupExpression.class);
Mockito.when(child.getOutputProperties(Mockito.any())).thenReturn(PhysicalProperties.MUST_SHUFFLE);
Mockito.when(child.getOwnerGroup()).thenReturn(childGroup);
Map<PhysicalProperties, Pair<Cost, List<PhysicalProperties>>> lct = Maps.newHashMap();
lct.put(PhysicalProperties.MUST_SHUFFLE, Pair.of(Cost.zero(), Lists.newArrayList()));
Mockito.when(child.getLowestCostTable()).thenReturn(lct);
children = Lists.newArrayList(child);

mockedJobContext = Mockito.mock(JobContext.class);
Mockito.when(mockedJobContext.getCascadesContext()).thenReturn(Mockito.mock(CascadesContext.class));

}

@Test
public void testMustShuffleProjectProjectCanNotMerge() {
testMustShuffleProject(PhysicalProject.class, DistributionSpecExecutionAny.class, false);

}

@Test
public void testMustShuffleProjectProjectCanMerge() {
testMustShuffleProject(PhysicalProject.class, DistributionSpecMustShuffle.class, true);

}

@Test
public void testMustShuffleProjectFilter() {
testMustShuffleProject(PhysicalFilter.class, DistributionSpecMustShuffle.class, true);

}

@Test
Expand All @@ -111,6 +96,19 @@ public void testMustShuffleProject(Class<? extends Plan> childClazz,
Mockito.when(mockedGroupPlan.getGroup()).thenReturn(mockedGroup);
// let AbstractTreeNode's init happy
Mockito.when(mockedGroupPlan.getAllChildrenTypes()).thenReturn(new BitSet());

List<GroupExpression> children;
Group childGroup = Mockito.mock(Group.class);
Mockito.when(childGroup.getLogicalProperties()).thenReturn(Mockito.mock(LogicalProperties.class));
GroupExpression child = Mockito.mock(GroupExpression.class);
Mockito.when(child.getOutputProperties(Mockito.any())).thenReturn(PhysicalProperties.MUST_SHUFFLE);
Mockito.when(child.getOwnerGroup()).thenReturn(childGroup);
Map<PhysicalProperties, Pair<Cost, List<PhysicalProperties>>> lct = Maps.newHashMap();
lct.put(PhysicalProperties.MUST_SHUFFLE, Pair.of(Cost.zero(), Lists.newArrayList()));
Mockito.when(child.getLowestCostTable()).thenReturn(lct);
Mockito.when(child.getPlan()).thenReturn(mockedChild);
children = Lists.newArrayList(child);

PhysicalProject parentPlan = new PhysicalProject<>(Lists.newArrayList(), null, mockedGroupPlan);
GroupExpression parent = new GroupExpression(parentPlan);
parentPlan = parentPlan.withGroupExpression(Optional.of(parent));
Expand Down Expand Up @@ -157,6 +155,19 @@ private void testMustShuffleFilter(Class<? extends Plan> childClazz) {
Mockito.when(mockedGroupPlan.getGroup()).thenReturn(mockedGroup);
// let AbstractTreeNode's init happy
Mockito.when(mockedGroupPlan.getAllChildrenTypes()).thenReturn(new BitSet());

List<GroupExpression> children;
Group childGroup = Mockito.mock(Group.class);
Mockito.when(childGroup.getLogicalProperties()).thenReturn(Mockito.mock(LogicalProperties.class));
GroupExpression child = Mockito.mock(GroupExpression.class);
Mockito.when(child.getOutputProperties(Mockito.any())).thenReturn(PhysicalProperties.MUST_SHUFFLE);
Mockito.when(child.getOwnerGroup()).thenReturn(childGroup);
Map<PhysicalProperties, Pair<Cost, List<PhysicalProperties>>> lct = Maps.newHashMap();
lct.put(PhysicalProperties.MUST_SHUFFLE, Pair.of(Cost.zero(), Lists.newArrayList()));
Mockito.when(child.getLowestCostTable()).thenReturn(lct);
Mockito.when(child.getPlan()).thenReturn(mockedChild);
children = Lists.newArrayList(child);

GroupExpression parent = new GroupExpression(new PhysicalFilter<>(Sets.newHashSet(), null, mockedGroupPlan));
ChildrenPropertiesRegulator regulator = new ChildrenPropertiesRegulator(parent, children,
new ArrayList<>(originOutputChildrenProperties), null, mockedJobContext);
Expand Down