Skip to content

Commit

Permalink
[fix](nereids) fix multi window projection issue temporarily (apache#…
Browse files Browse the repository at this point in the history
…24912)

Current multi-window plan generation has problem on the project sequence, for example:

+--LogicalWindow ( windowExpressions=[avg(sum_sales#115) WindowSpec(...) AS `avg_monthly_sales`apache#116, rank() WindowSpec(...) AS `rn`apache#117], ...)
and correspond physical plan is:

+--PhysicalWindow[6572]@16 ( windowFrameGroup=(Funcs=[avg(sum_sales#115) WindowSpec(...) AS `avg_monthly_sales`apache#116], ... )
    +--PhysicalWindow[6568]@29 ( windowFrameGroup=(Funcs=[rank() WindowSpec(...) AS `rn`apache#117], ...] )
If the final plan is generated as following:

MultiCastDataSinks
STREAM DATA SINK
  EXCHANGE ID: 20
  HASH_PARTITIONED: rn[apache#208], i_brand[apache#202], cc_name[apache#203], i_category[apache#201]
Before we eventually resolve the multi-window issue, we add a projection as following and force a mapping but this will not cover all potential problems.

MultiCastDataSinks
STREAM DATA SINK
  EXCHANGE ID: 20
  HASH_PARTITIONED: rn[apache#219], i_brand[apache#213], cc_name[apache#214], i_category[apache#212]
  PROJECTIONS: i_category[apache#184], i_brand[apache#185], cc_name[apache#186], d_year[apache#187], d_moy[apache#188], sum_sales[apache#189], avg_monthly_sales[apache#191], rn[apache#190]
  PROJECTION TUPLE: 20
  • Loading branch information
xzj7019 authored and vinlee19 committed Oct 7, 2023
1 parent 9a2a82e commit 464c5b8
Showing 1 changed file with 29 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,23 @@ public PlanFragment visitPhysicalDistribute(PhysicalDistribute<? extends Plan> d
keys.addAll(validOutputIds);
validOutputIds = keys;
}
if (inputFragment instanceof MultiCastPlanFragment) {
// TODO: remove this logic when we split to multi-window in logical window to physical window conversion
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
multiCastDataSink.getDataStreamSinks().size() - 1);
if (!(distribute.child() instanceof PhysicalProject)) {
List<Expr> projectionExprs = new ArrayList<>();
PhysicalCTEConsumer consumer = getCTEConsumerChild(distribute);
Preconditions.checkState(consumer != null, "consumer not found");
for (Slot slot : distribute.getOutput()) {
projectionExprs.add(ExpressionTranslator.translate(consumer.getProducerSlot(slot), context));
}
TupleDescriptor projectionTuple = generateTupleDesc(distribute.getOutput(), null, context);
dataStreamSink.setProjections(projectionExprs);
dataStreamSink.setOutputTupleDesc(projectionTuple);
}
}
DataPartition dataPartition = toDataPartition(distribute.getDistributionSpec(), validOutputIds, context);
PlanFragment parentFragment = new PlanFragment(context.nextFragmentId(), exchangeNode, dataPartition);
exchangeNode.setNumInstances(inputFragment.getPlanRoot().getNumInstances());
Expand Down Expand Up @@ -2332,4 +2349,16 @@ private boolean isComplexDataType(DataType dataType) {
return dataType instanceof ArrayType || dataType instanceof MapType || dataType instanceof JsonType
|| dataType instanceof StructType;
}

private PhysicalCTEConsumer getCTEConsumerChild(PhysicalPlan root) {
if (root == null) {
return null;
} else if (root instanceof PhysicalCTEConsumer) {
return (PhysicalCTEConsumer) root;
} else if (root.children().size() != 1) {
return null;
} else {
return getCTEConsumerChild((PhysicalPlan) root.child(0));
}
}
}

0 comments on commit 464c5b8

Please sign in to comment.