diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/FuncDeps.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/FuncDeps.java index 849736ef51a737..928586ef4590ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/FuncDeps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/FuncDeps.java @@ -144,27 +144,28 @@ private Set findValidItems(Set requireOutputs) { *

* Example: * Given: - * - Initial slots: {{A, B, C}, {D, E}, {F, G}} - * - Required outputs: {A, D, F} - * - Valid functional dependencies: {A} -> {B}, {D, E} -> {G}, {F} -> {G} + * - Initial slots: {{A}, {B}, {C}, {D}, {E}} + * - Required outputs: {} + * - validItems: {A} -> {B}, {B} -> {C}, {C} -> {D}, {D} -> {A}, {A} -> {E} * * Process: - * 1. Start with minSlotSet = {{A, B, C}, {D, E}, {F, G}} + * 1. Start with minSlotSet = {{A}, {B}, {C}, {D}, {E}} * 2. For {A} -> {B}: * - Both {A} and {B} are in minSlotSet, so mark {B} for elimination - * 3. For {D, E} -> {G}: - * - Both {D, E} and {G} are in minSlotSet, so mark {G} for elimination - * 4. For {F} -> {G}: - * - Both {F} and {G} are in minSlotSet, but {G} is already marked for elimination - * 5. Remove eliminated slots: {B} and {G} + * 3. For {B} -> {C}: + * - Both {B} and {C} are in minSlotSet, so mark {C} for elimination + * 4. For {C} -> {D}: + * - Both {C} and {D} are in minSlotSet, so mark {D} for elimination + * 4. For {D} -> {E}: + * - Both {D} and {E} are in minSlotSet, so mark {E} for elimination * - * Result: {{A, C}, {D, E}, {F}} + * Result: {{A}} *

* * @param slots the initial set of slot sets to be reduced * @param requireOutputs the set of slots that must be preserved in the output * @return the minimal set of slot sets after applying all possible reductions - */ + */ public Set> eliminateDeps(Set> slots, Set requireOutputs) { Set> minSlotSet = Sets.newHashSet(slots); Set> eliminatedSlots = new HashSet<>(); @@ -183,6 +184,7 @@ public boolean isFuncDeps(Set dominate, Set dependency) { return items.contains(new FuncDepsItem(dominate, dependency)); } + // 这个也是判断是否为双射的 public boolean isCircleDeps(Set dominate, Set dependency) { return items.contains(new FuncDepsItem(dominate, dependency)) && items.contains(new FuncDepsItem(dependency, dominate)); @@ -201,16 +203,30 @@ public Map, Set>> getREdges() { } /** - * find the determinants of dependencies + * Finds all slot sets that have a bijective relationship with the given slot set. + * Given edges containing: + * {A} -> {{B}, {C}} + * {B} -> {{A}, {D}} + * {C} -> {{A}} + * When slot = {A}, returns {{B}} because {A} and {B} mutually determine each other. + * {C} is not returned because {C} does not determine {A} (one-way dependency only). */ - public Set> findDeterminats(Set dependency) { - Set> determinants = new HashSet<>(); - for (FuncDepsItem item : items) { - if (item.dependencies.equals(dependency)) { - determinants.add(item.determinants); + public Set> findBijectionSlots(Set slot) { + Set> bijectionSlots = new HashSet<>(); + if (!edges.containsKey(slot)) { + return bijectionSlots; + } + for (Set dep : edges.get(slot)) { + if (!edges.containsKey(dep)) { + continue; + } + for (Set det : edges.get(dep)) { + if (det.equals(slot)) { + bijectionSlots.add(dep); + } } } - return determinants; + return bijectionSlots; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownAggThroughJoinOnPkFk.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownAggThroughJoinOnPkFk.java index 2aeb59ae9c73f9..1aba1c79ae159a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownAggThroughJoinOnPkFk.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownAggThroughJoinOnPkFk.java @@ -131,13 +131,41 @@ private LogicalAggregate eliminatePrimaryOutput(LogicalAggregate agg, Plan if (primary.getOutputSet().stream().noneMatch(aggInputs::contains)) { return agg; } + // Firstly, using fd to eliminate group by key. + // group by primary_table_pk, primary_table_other + // -> group by primary_table_pk + Set> groupBySlots = new HashSet<>(); + Set validSlots = new HashSet<>(); + for (Expression expression : agg.getGroupByExpressions()) { + groupBySlots.add(expression.getInputSlots()); + validSlots.addAll(expression.getInputSlots()); + } + DataTrait dataTrait = child.getLogicalProperties().getTrait(); + FuncDeps funcDeps = dataTrait.getAllValidFuncDeps(validSlots); + Set foreignOutput = Sets.intersection(agg.getOutputSet(), foreign.getOutputSet()); + Set> minGroupBySlots = funcDeps.eliminateDeps(groupBySlots, foreignOutput); + Set removeExpression = new HashSet<>(); + for (Set slots : groupBySlots) { + if (!minGroupBySlots.contains(slots) && !foreignOutput.containsAll(slots)) { + removeExpression.add(slots.iterator().next()); + } + } + List minGroupBySlotList = new ArrayList<>(); + for (Expression expression : agg.getGroupByExpressions()) { + if (!removeExpression.contains(expression)) { + minGroupBySlotList.add(expression); + } + } + + // Secondly, put bijective slot into map: {primary_table_pk : foreign_table_fk} + // Bijective slots are mutually interchangeable within GROUP BY keys. + // group by primary_table_pk equals group by foreign_table_fk Set primaryOutputSet = primary.getOutputSet(); Set primarySlots = Sets.intersection(aggInputs, primaryOutputSet); - DataTrait dataTrait = child.getLogicalProperties().getTrait(); - FuncDeps funcDeps = dataTrait.getAllValidFuncDeps(Sets.union(foreign.getOutputSet(), primary.getOutputSet())); HashMap primaryToForeignDeps = new HashMap<>(); + FuncDeps funcDepsForJoin = dataTrait.getAllValidFuncDeps(Sets.union(primaryOutputSet, foreign.getOutputSet())); for (Slot slot : primarySlots) { - Set> replacedSlotSets = funcDeps.findDeterminats(ImmutableSet.of(slot)); + Set> replacedSlotSets = funcDepsForJoin.findBijectionSlots(ImmutableSet.of(slot)); for (Set replacedSlots : replacedSlotSets) { if (primaryOutputSet.stream().noneMatch(replacedSlots::contains) && replacedSlots.size() == 1) { @@ -147,19 +175,21 @@ private LogicalAggregate eliminatePrimaryOutput(LogicalAggregate agg, Plan } } - Set newGroupBySlots = constructNewGroupBy(agg, primaryOutputSet, primaryToForeignDeps); + // Thirdly, construct new Agg below join. + Set newGroupBySlots = constructNewGroupBy(minGroupBySlotList, primaryOutputSet, + primaryToForeignDeps); List newOutput = constructNewOutput( - agg, primaryOutputSet, primaryToForeignDeps, funcDeps, primary); + agg, primaryOutputSet, primaryToForeignDeps, funcDepsForJoin, primary); if (newGroupBySlots == null || newOutput == null) { return null; } return agg.withGroupByAndOutput(ImmutableList.copyOf(newGroupBySlots), ImmutableList.copyOf(newOutput)); } - private @Nullable Set constructNewGroupBy(LogicalAggregate agg, Set primaryOutputs, - Map primaryToForeignBiDeps) { + private @Nullable Set constructNewGroupBy(List gbyExpression, + Set primaryOutputs, Map primaryToForeignBiDeps) { Set newGroupBySlots = new HashSet<>(); - for (Expression expression : agg.getGroupByExpressions()) { + for (Expression expression : gbyExpression) { if (!(expression instanceof Slot)) { return null; } @@ -196,9 +226,7 @@ private LogicalAggregate eliminatePrimaryOutput(LogicalAggregate agg, Plan && expression.child(0).child(0) instanceof Slot) { // count(slot) can be rewritten by circle deps Slot slot = (Slot) expression.child(0).child(0); - if (primaryToForeignDeps.containsKey(slot) - && funcDeps.isCircleDeps( - ImmutableSet.of(slot), ImmutableSet.of(primaryToForeignDeps.get(slot)))) { + if (primaryToForeignDeps.containsKey(slot)) { expression = (NamedExpression) expression.rewriteUp(e -> e instanceof Slot ? primaryToForeignDeps.getOrDefault((Slot) e, (Slot) e) diff --git a/regression-test/data/nereids_rules_p0/agg_join_pkfk/agg_join_pkfk.out b/regression-test/data/nereids_rules_p0/agg_join_pkfk/agg_join_pkfk.out new file mode 100644 index 00000000000000..4a514373e805aa --- /dev/null +++ b/regression-test/data/nereids_rules_p0/agg_join_pkfk/agg_join_pkfk.out @@ -0,0 +1,36 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !not_push_down_shape -- +PhysicalResultSink +--hashAgg[GLOBAL] +----hashAgg[LOCAL] +------hashJoin[INNER_JOIN] hashCondition=((store_sales_test.ss_customer_sk = customer_test.c_customer_sk)) otherCondition=() +--------PhysicalOlapScan[store_sales_test] +--------PhysicalOlapScan[customer_test] + +-- !not_push_down_result -- +Smith John 2024-01-01 + +-- !push_down_shape -- +PhysicalResultSink +--hashJoin[INNER_JOIN] hashCondition=((store_sales_test.ss_customer_sk = customer_test.c_customer_sk)) otherCondition=() +----hashAgg[GLOBAL] +------hashAgg[LOCAL] +--------PhysicalOlapScan[store_sales_test] +----PhysicalOlapScan[customer_test] + +-- !push_down_result -- +John 1 2024-01-01 +John 2 2024-01-01 + +-- !push_down_with_count_shape -- +PhysicalResultSink +--hashJoin[INNER_JOIN] hashCondition=((store_sales_test.ss_customer_sk = customer_test.c_customer_sk)) otherCondition=() +----hashAgg[GLOBAL] +------hashAgg[LOCAL] +--------PhysicalOlapScan[store_sales_test] +----PhysicalOlapScan[customer_test] + +-- !push_down_with_count_result -- +John 1 2024-01-01 1 +John 2 2024-01-01 1 +