From 7e54702a7854e693b6000b843519336b0e4538d9 Mon Sep 17 00:00:00 2001 From: Gonzalo Ortiz Date: Fri, 19 Jul 2024 15:55:56 +0200 Subject: [PATCH] Fix colocated join when no mapping project is used --- .../rules/PinotRelDistributionTraitRule.java | 21 ++++++- .../queries/ExplainPhysicalPlans.json | 57 ++++++++++++++++++- 2 files changed, 74 insertions(+), 4 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java index 821d1cf5ad9a..66247284fb13 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java @@ -29,17 +29,23 @@ import org.apache.calcite.rel.RelDistributions; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Exchange; +import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.mapping.IntPair; +import org.apache.calcite.util.mapping.Mapping; +import org.apache.calcite.util.mapping.MappingType; import org.apache.calcite.util.mapping.Mappings; import org.apache.pinot.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable; import org.apache.pinot.calcite.rel.logical.PinotLogicalAggregate; import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange; import org.apache.pinot.query.planner.plannode.AggregateNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -52,6 +58,7 @@ public class PinotRelDistributionTraitRule extends RelOptRule { public static final PinotRelDistributionTraitRule INSTANCE = new PinotRelDistributionTraitRule(PinotRuleUtils.PINOT_REL_FACTORY); + private static final Logger LOGGER = LoggerFactory.getLogger(PinotRelDistributionTraitRule.class); public PinotRelDistributionTraitRule(RelBuilderFactory factory) { super(operand(RelNode.class, any())); @@ -110,10 +117,22 @@ private static RelDistribution deriveDistribution(RelNode node) { LogicalProject project = (LogicalProject) node; try { if (inputRelDistribution != null) { - return inputRelDistribution.apply(project.getMapping()); + Mappings.TargetMapping mapping = + Project.getPartialMapping(input.getRowType().getFieldCount(), project.getProjects()); + // Note(gonzalo): In Calcite 1.37 mapping.getTargetOpt will fail in what it looks like a Calcite bug. + // Therefore here we apply a workaround and create a new map where the same elements (extracted with + // iterator, which actually work) are added to the new mapping. + // See https://lists.apache.org/thread/qz18qxrfp5bqldnoln2tg4582g402zyv + Mapping actualMapping = Mappings.create(MappingType.PARTIAL_FUNCTION, input.getRowType().getFieldCount(), + project.getRowType().getFieldCount()); + for (IntPair intPair : mapping) { + actualMapping.set(intPair.source, intPair.target); + } + return inputRelDistribution.apply(actualMapping); } } catch (Exception e) { // ... skip; + LOGGER.debug("Failed to derive distribution from input for node: {}", node, e); } } else if (node instanceof LogicalFilter) { assert inputs.size() == 1; diff --git a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json index f72f8c18523b..d842fac5c4c4 100644 --- a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json +++ b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json @@ -109,7 +109,7 @@ }, { "description": "explain plan with join with colocated tables, both-sided on partition key", - "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, a.col3, b.col3 FROM a /*+ tableOptions(partition_function='hashcode', partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_function='hashcode', partition_key='col1', partition_size='4') */ ON a.col2 = b.col1 WHERE b.col3 > 0", + "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.mySum, b.col3 FROM (select col2, col3 + col2 as mySum from a /*+ tableOptions(partition_function='hashcode', partition_key='col2', partition_size='4') */) as a JOIN b /*+ tableOptions(partition_function='hashcode', partition_key='col1', partition_size='4') */ ON a.col2 = b.col1", "output": [ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n", "├── [1]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n", @@ -131,8 +131,7 @@ " ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n", " └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n", " └── [3]@localhost:1|[1] PROJECT\n", - " └── [3]@localhost:1|[1] FILTER\n", - " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n", + " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n", "" ] }, @@ -449,6 +448,58 @@ " └── [5]@localhost:1|[0] TABLE SCAN (b) null\n", "" ] + }, + { + "description": "explain plan with colocated join and a projection that is not mapping", + "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.mySum, b.col3 FROM (select col2, col3 + col2 as mySum from a /*+ tableOptions(partition_function='hashcode', partition_key='col2', partition_size='4') */) as a JOIN b /*+ tableOptions(partition_function='hashcode', partition_key='col1', partition_size='4') */ ON a.col2 = b.col1", + "output": [ + "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n", + "├── [1]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n", + "├── [1]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n", + "├── [1]@localhost:1|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n", + "└── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n", + " └── [1]@localhost:1|[1] PROJECT\n", + " └── [1]@localhost:1|[1] JOIN\n", + " ├── [1]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " │ ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree Omitted)\n", + " │ ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree Omitted)\n", + " │ ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n", + " │ └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n", + " │ └── [2]@localhost:1|[1] PROJECT\n", + " │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n", + " └── [1]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree Omitted)\n", + " ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree Omitted)\n", + " ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n", + " └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n", + " └── [3]@localhost:1|[1] PROJECT\n", + " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n" + ] + }, + { + "description": "explain plan with colocated join and a projection that doesn't keep the key column", + "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.mySum, b.col3 FROM (select col3 as col2, col3 + col2 as mySum from a /*+ tableOptions(partition_function='hashcode', partition_key='col2', partition_size='4') */) as a JOIN b /*+ tableOptions(partition_function='hashcode', partition_key='col1', partition_size='4') */ ON a.col2 = b.col1", + "output": [ + "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n", + "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n", + "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n", + " └── [1]@localhost:2|[0] PROJECT\n", + " └── [1]@localhost:2|[0] JOIN\n", + " ├── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " │ ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " │ ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " │ ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " │ └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n", + " │ └── [2]@localhost:1|[1] PROJECT\n", + " │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n", + " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n", + " └── [3]@localhost:1|[1] PROJECT\n", + " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n" + ] } ] }