Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,27 +144,28 @@ private Set<FuncDepsItem> findValidItems(Set<Slot> requireOutputs) {
* <p>
* Example:
* Given:
* - Initial slots: {{A, B, C}, {D, E}, {F, G}}
* - Required outputs: {A, D, F}
* - Valid functional dependencies: {A} -> {B}, {D, E} -> {G}, {F} -> {G}
* - Initial slots: {{A}, {B}, {C}, {D}, {E}}
* - Required outputs: {}
* - validItems: {A} -> {B}, {B} -> {C}, {C} -> {D}, {D} -> {A}, {A} -> {E}
*
* Process:
* 1. Start with minSlotSet = {{A, B, C}, {D, E}, {F, G}}
* 1. Start with minSlotSet = {{A}, {B}, {C}, {D}, {E}}
* 2. For {A} -> {B}:
* - Both {A} and {B} are in minSlotSet, so mark {B} for elimination
* 3. For {D, E} -> {G}:
* - Both {D, E} and {G} are in minSlotSet, so mark {G} for elimination
* 4. For {F} -> {G}:
* - Both {F} and {G} are in minSlotSet, but {G} is already marked for elimination
* 5. Remove eliminated slots: {B} and {G}
* 3. For {B} -> {C}:
* - Both {B} and {C} are in minSlotSet, so mark {C} for elimination
* 4. For {C} -> {D}:
* - Both {C} and {D} are in minSlotSet, so mark {D} for elimination
* 4. For {D} -> {E}:
* - Both {D} and {E} are in minSlotSet, so mark {E} for elimination
*
* Result: {{A, C}, {D, E}, {F}}
* Result: {{A}}
* </p>
*
* @param slots the initial set of slot sets to be reduced
* @param requireOutputs the set of slots that must be preserved in the output
* @return the minimal set of slot sets after applying all possible reductions
*/
*/
public Set<Set<Slot>> eliminateDeps(Set<Set<Slot>> slots, Set<Slot> requireOutputs) {
Set<Set<Slot>> minSlotSet = Sets.newHashSet(slots);
Set<Set<Slot>> eliminatedSlots = new HashSet<>();
Expand All @@ -183,6 +184,7 @@ public boolean isFuncDeps(Set<Slot> dominate, Set<Slot> dependency) {
return items.contains(new FuncDepsItem(dominate, dependency));
}

// 这个也是判断是否为双射的
public boolean isCircleDeps(Set<Slot> dominate, Set<Slot> dependency) {
return items.contains(new FuncDepsItem(dominate, dependency))
&& items.contains(new FuncDepsItem(dependency, dominate));
Expand All @@ -201,16 +203,30 @@ public Map<Set<Slot>, Set<Set<Slot>>> getREdges() {
}

/**
* find the determinants of dependencies
* Finds all slot sets that have a bijective relationship with the given slot set.
* Given edges containing:
* {A} -> {{B}, {C}}
* {B} -> {{A}, {D}}
* {C} -> {{A}}
* When slot = {A}, returns {{B}} because {A} and {B} mutually determine each other.
* {C} is not returned because {C} does not determine {A} (one-way dependency only).
*/
public Set<Set<Slot>> findDeterminats(Set<Slot> dependency) {
Set<Set<Slot>> determinants = new HashSet<>();
for (FuncDepsItem item : items) {
if (item.dependencies.equals(dependency)) {
determinants.add(item.determinants);
public Set<Set<Slot>> findBijectionSlots(Set<Slot> slot) {
Set<Set<Slot>> bijectionSlots = new HashSet<>();
if (!edges.containsKey(slot)) {
return bijectionSlots;
}
for (Set<Slot> dep : edges.get(slot)) {
if (!edges.containsKey(dep)) {
continue;
}
for (Set<Slot> det : edges.get(dep)) {
if (det.equals(slot)) {
bijectionSlots.add(dep);
}
}
}
return determinants;
return bijectionSlots;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,41 @@ private LogicalAggregate<?> eliminatePrimaryOutput(LogicalAggregate<?> agg, Plan
if (primary.getOutputSet().stream().noneMatch(aggInputs::contains)) {
return agg;
}
// Firstly, using fd to eliminate group by key.
// group by primary_table_pk, primary_table_other
// -> group by primary_table_pk
Set<Set<Slot>> groupBySlots = new HashSet<>();
Set<Slot> validSlots = new HashSet<>();
for (Expression expression : agg.getGroupByExpressions()) {
groupBySlots.add(expression.getInputSlots());
validSlots.addAll(expression.getInputSlots());
}
DataTrait dataTrait = child.getLogicalProperties().getTrait();
FuncDeps funcDeps = dataTrait.getAllValidFuncDeps(validSlots);
Set<Slot> foreignOutput = Sets.intersection(agg.getOutputSet(), foreign.getOutputSet());
Set<Set<Slot>> minGroupBySlots = funcDeps.eliminateDeps(groupBySlots, foreignOutput);
Set<Expression> removeExpression = new HashSet<>();
for (Set<Slot> slots : groupBySlots) {
if (!minGroupBySlots.contains(slots) && !foreignOutput.containsAll(slots)) {
removeExpression.add(slots.iterator().next());
}
}
List<Expression> minGroupBySlotList = new ArrayList<>();
for (Expression expression : agg.getGroupByExpressions()) {
if (!removeExpression.contains(expression)) {
minGroupBySlotList.add(expression);
}
}

// Secondly, put bijective slot into map: {primary_table_pk : foreign_table_fk}
// Bijective slots are mutually interchangeable within GROUP BY keys.
// group by primary_table_pk equals group by foreign_table_fk
Set<Slot> primaryOutputSet = primary.getOutputSet();
Set<Slot> primarySlots = Sets.intersection(aggInputs, primaryOutputSet);
DataTrait dataTrait = child.getLogicalProperties().getTrait();
FuncDeps funcDeps = dataTrait.getAllValidFuncDeps(Sets.union(foreign.getOutputSet(), primary.getOutputSet()));
HashMap<Slot, Slot> primaryToForeignDeps = new HashMap<>();
FuncDeps funcDepsForJoin = dataTrait.getAllValidFuncDeps(Sets.union(primaryOutputSet, foreign.getOutputSet()));
for (Slot slot : primarySlots) {
Set<Set<Slot>> replacedSlotSets = funcDeps.findDeterminats(ImmutableSet.of(slot));
Set<Set<Slot>> replacedSlotSets = funcDepsForJoin.findBijectionSlots(ImmutableSet.of(slot));
for (Set<Slot> replacedSlots : replacedSlotSets) {
if (primaryOutputSet.stream().noneMatch(replacedSlots::contains)
&& replacedSlots.size() == 1) {
Expand All @@ -147,19 +175,21 @@ private LogicalAggregate<?> eliminatePrimaryOutput(LogicalAggregate<?> agg, Plan
}
}

Set<Expression> newGroupBySlots = constructNewGroupBy(agg, primaryOutputSet, primaryToForeignDeps);
// Thirdly, construct new Agg below join.
Set<Expression> newGroupBySlots = constructNewGroupBy(minGroupBySlotList, primaryOutputSet,
primaryToForeignDeps);
List<NamedExpression> newOutput = constructNewOutput(
agg, primaryOutputSet, primaryToForeignDeps, funcDeps, primary);
agg, primaryOutputSet, primaryToForeignDeps, funcDepsForJoin, primary);
if (newGroupBySlots == null || newOutput == null) {
return null;
}
return agg.withGroupByAndOutput(ImmutableList.copyOf(newGroupBySlots), ImmutableList.copyOf(newOutput));
}

private @Nullable Set<Expression> constructNewGroupBy(LogicalAggregate<?> agg, Set<Slot> primaryOutputs,
Map<Slot, Slot> primaryToForeignBiDeps) {
private @Nullable Set<Expression> constructNewGroupBy(List<? extends Expression> gbyExpression,
Set<Slot> primaryOutputs, Map<Slot, Slot> primaryToForeignBiDeps) {
Set<Expression> newGroupBySlots = new HashSet<>();
for (Expression expression : agg.getGroupByExpressions()) {
for (Expression expression : gbyExpression) {
if (!(expression instanceof Slot)) {
return null;
}
Expand Down Expand Up @@ -196,9 +226,7 @@ private LogicalAggregate<?> eliminatePrimaryOutput(LogicalAggregate<?> agg, Plan
&& expression.child(0).child(0) instanceof Slot) {
// count(slot) can be rewritten by circle deps
Slot slot = (Slot) expression.child(0).child(0);
if (primaryToForeignDeps.containsKey(slot)
&& funcDeps.isCircleDeps(
ImmutableSet.of(slot), ImmutableSet.of(primaryToForeignDeps.get(slot)))) {
if (primaryToForeignDeps.containsKey(slot)) {
expression = (NamedExpression) expression.rewriteUp(e ->
e instanceof Slot
? primaryToForeignDeps.getOrDefault((Slot) e, (Slot) e)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !not_push_down_shape --
PhysicalResultSink
--hashAgg[GLOBAL]
----hashAgg[LOCAL]
------hashJoin[INNER_JOIN] hashCondition=((store_sales_test.ss_customer_sk = customer_test.c_customer_sk)) otherCondition=()
--------PhysicalOlapScan[store_sales_test]
--------PhysicalOlapScan[customer_test]

-- !not_push_down_result --
Smith John 2024-01-01

-- !push_down_shape --
PhysicalResultSink
--hashJoin[INNER_JOIN] hashCondition=((store_sales_test.ss_customer_sk = customer_test.c_customer_sk)) otherCondition=()
----hashAgg[GLOBAL]
------hashAgg[LOCAL]
--------PhysicalOlapScan[store_sales_test]
----PhysicalOlapScan[customer_test]

-- !push_down_result --
John 1 2024-01-01
John 2 2024-01-01

-- !push_down_with_count_shape --
PhysicalResultSink
--hashJoin[INNER_JOIN] hashCondition=((store_sales_test.ss_customer_sk = customer_test.c_customer_sk)) otherCondition=()
----hashAgg[GLOBAL]
------hashAgg[LOCAL]
--------PhysicalOlapScan[store_sales_test]
----PhysicalOlapScan[customer_test]

-- !push_down_with_count_result --
John 1 2024-01-01 1
John 2 2024-01-01 1