Skip to content

Commit

Permalink
Fix colocated join when no mapping project is used
Browse files Browse the repository at this point in the history
  • Loading branch information
gortiz committed Jul 19, 2024
1 parent 205a75d commit 7e54702
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,23 @@
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.calcite.util.mapping.IntPair;
import org.apache.calcite.util.mapping.Mapping;
import org.apache.calcite.util.mapping.MappingType;
import org.apache.calcite.util.mapping.Mappings;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.logical.PinotLogicalAggregate;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -52,6 +58,7 @@
public class PinotRelDistributionTraitRule extends RelOptRule {
public static final PinotRelDistributionTraitRule INSTANCE =
new PinotRelDistributionTraitRule(PinotRuleUtils.PINOT_REL_FACTORY);
private static final Logger LOGGER = LoggerFactory.getLogger(PinotRelDistributionTraitRule.class);

public PinotRelDistributionTraitRule(RelBuilderFactory factory) {
super(operand(RelNode.class, any()));
Expand Down Expand Up @@ -110,10 +117,22 @@ private static RelDistribution deriveDistribution(RelNode node) {
LogicalProject project = (LogicalProject) node;
try {
if (inputRelDistribution != null) {
return inputRelDistribution.apply(project.getMapping());
Mappings.TargetMapping mapping =
Project.getPartialMapping(input.getRowType().getFieldCount(), project.getProjects());
// Note(gonzalo): In Calcite 1.37 mapping.getTargetOpt will fail in what it looks like a Calcite bug.
// Therefore here we apply a workaround and create a new map where the same elements (extracted with
// iterator, which actually work) are added to the new mapping.
// See https://lists.apache.org/thread/qz18qxrfp5bqldnoln2tg4582g402zyv
Mapping actualMapping = Mappings.create(MappingType.PARTIAL_FUNCTION, input.getRowType().getFieldCount(),
project.getRowType().getFieldCount());
for (IntPair intPair : mapping) {
actualMapping.set(intPair.source, intPair.target);
}
return inputRelDistribution.apply(actualMapping);
}
} catch (Exception e) {
// ... skip;
LOGGER.debug("Failed to derive distribution from input for node: {}", node, e);
}
} else if (node instanceof LogicalFilter) {
assert inputs.size() == 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
},
{
"description": "explain plan with join with colocated tables, both-sided on partition key",
"sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, a.col3, b.col3 FROM a /*+ tableOptions(partition_function='hashcode', partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_function='hashcode', partition_key='col1', partition_size='4') */ ON a.col2 = b.col1 WHERE b.col3 > 0",
"sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.mySum, b.col3 FROM (select col2, col3 + col2 as mySum from a /*+ tableOptions(partition_function='hashcode', partition_key='col2', partition_size='4') */) as a JOIN b /*+ tableOptions(partition_function='hashcode', partition_key='col1', partition_size='4') */ ON a.col2 = b.col1",
"output": [
"[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
"├── [1]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
Expand All @@ -131,8 +131,7 @@
" ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n",
" └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
" └── [3]@localhost:1|[1] PROJECT\n",
" └── [3]@localhost:1|[1] FILTER\n",
" └── [3]@localhost:1|[1] TABLE SCAN (b) null\n",
" └── [3]@localhost:1|[1] TABLE SCAN (b) null\n",
""
]
},
Expand Down Expand Up @@ -449,6 +448,58 @@
" └── [5]@localhost:1|[0] TABLE SCAN (b) null\n",
""
]
},
{
"description": "explain plan with colocated join and a projection that is not mapping",
"sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.mySum, b.col3 FROM (select col2, col3 + col2 as mySum from a /*+ tableOptions(partition_function='hashcode', partition_key='col2', partition_size='4') */) as a JOIN b /*+ tableOptions(partition_function='hashcode', partition_key='col1', partition_size='4') */ ON a.col2 = b.col1",
"output": [
"[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
"├── [1]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
"├── [1]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
"├── [1]@localhost:1|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
"└── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
" └── [1]@localhost:1|[1] PROJECT\n",
" └── [1]@localhost:1|[1] JOIN\n",
" ├── [1]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" │ ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree Omitted)\n",
" │ ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree Omitted)\n",
" │ ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n",
" │ └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
" │ └── [2]@localhost:1|[1] PROJECT\n",
" │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
" └── [1]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree Omitted)\n",
" ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree Omitted)\n",
" ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n",
" └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
" └── [3]@localhost:1|[1] PROJECT\n",
" └── [3]@localhost:1|[1] TABLE SCAN (b) null\n"
]
},
{
"description": "explain plan with colocated join and a projection that doesn't keep the key column",
"sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.mySum, b.col3 FROM (select col3 as col2, col3 + col2 as mySum from a /*+ tableOptions(partition_function='hashcode', partition_key='col2', partition_size='4') */) as a JOIN b /*+ tableOptions(partition_function='hashcode', partition_key='col1', partition_size='4') */ ON a.col2 = b.col1",
"output": [
"[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
"├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
"└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
" └── [1]@localhost:2|[0] PROJECT\n",
" └── [1]@localhost:2|[0] JOIN\n",
" ├── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" │ ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
" │ ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
" │ ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
" │ └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
" │ └── [2]@localhost:1|[1] PROJECT\n",
" │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
" └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
" ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
" ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
" └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
" └── [3]@localhost:1|[1] PROJECT\n",
" └── [3]@localhost:1|[1] TABLE SCAN (b) null\n"
]
}
]
}
Expand Down

0 comments on commit 7e54702

Please sign in to comment.