Skip to content

Commit

Permalink
[fix](nereids) adjust logical repeat property derive (apache#44360)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Problem Summary:
Before this pr, if the children of PhysicalRepeat is hash-distributed
based on a and b, and the common columns that can be extracted from the
PhysicalRepeat grouping sets are a, b, and c, the PhysicalRepeat will
output Any distribution property.
This pr allow PhysicalRepeat maintain child hash distribution property
in this situaltion.
  • Loading branch information
feiniaofeiafei authored Jan 9, 2025
1 parent af8a975 commit 60b4903
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import com.google.common.collect.Sets;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -372,22 +373,15 @@ public PhysicalProperties visitPhysicalRepeat(PhysicalRepeat<? extends Plan> rep
);
}
List<ExprId> orderedShuffledColumns = distributionSpecHash.getOrderedShuffledColumns();
if (!intersectGroupingKeys.isEmpty() && intersectGroupingKeys.size()
>= Sets.newHashSet(orderedShuffledColumns).size()) {
boolean hashColumnsChanged = false;
for (Expression intersectGroupingKey : intersectGroupingKeys) {
if (!(intersectGroupingKey instanceof SlotReference)) {
hashColumnsChanged = true;
break;
}
if (!(orderedShuffledColumns.contains(((SlotReference) intersectGroupingKey).getExprId()))) {
hashColumnsChanged = true;
break;
}
}
if (!hashColumnsChanged) {
return childrenOutputProperties.get(0);
Set<ExprId> intersectGroupingKeysId = new HashSet<>();
for (Expression key : intersectGroupingKeys) {
if (!(key instanceof SlotReference)) {
break;
}
intersectGroupingKeysId.add(((SlotReference) key).getExprId());
}
if (intersectGroupingKeysId.containsAll(orderedShuffledColumns)) {
return childrenOutputProperties.get(0);
}
}
output = PhysicalProperties.createAnyFromHash((DistributionSpecHash) childDistributionSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,4 +885,27 @@ void testRepeatReturnChild() {
PhysicalProperties result = deriver.getOutputProperties(null, groupExpression);
Assertions.assertEquals(child, result);
}

@Test
void testRepeatReturnChild2() {
SlotReference c1 = new SlotReference(
new ExprId(1), "c1", TinyIntType.INSTANCE, true, ImmutableList.of());
SlotReference c2 = new SlotReference(
new ExprId(2), "c2", TinyIntType.INSTANCE, true, ImmutableList.of());
SlotReference c3 = new SlotReference(
new ExprId(3), "c3", TinyIntType.INSTANCE, true, ImmutableList.of());
PhysicalRepeat<GroupPlan> repeat = new PhysicalRepeat<>(
ImmutableList.of(ImmutableList.of(c1, c2, c3), ImmutableList.of(c1, c2), ImmutableList.of(c1, c2)),
ImmutableList.of(c1, c2, c3),
logicalProperties,
groupPlan
);
GroupExpression groupExpression = new GroupExpression(repeat);
new Group(null, groupExpression, null);
PhysicalProperties child = PhysicalProperties.createHash(
ImmutableList.of(new ExprId(1)), ShuffleType.EXECUTION_BUCKETED);
ChildOutputPropertyDeriver deriver = new ChildOutputPropertyDeriver(Lists.newArrayList(child));
PhysicalProperties result = deriver.getOutputProperties(null, groupExpression);
Assertions.assertEquals(child, result);
}
}

0 comments on commit 60b4903

Please sign in to comment.