Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1260,31 +1260,41 @@ public PlanFragment visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, P
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
multiCastDataSink.getDataStreamSinks().size() - 1);
if (CollectionUtils.isNotEmpty(dataStreamSink.getConjuncts())
|| CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) {
String errMsg = "generate invalid plan \n" + filter.treeString();
LOG.warn(errMsg);
throw new AnalysisException(errMsg);
}
filter.getConjuncts().stream()
.map(e -> ExpressionTranslator.translate(e, context))
.forEach(dataStreamSink::addConjunct);
return inputFragment;
}

PlanNode planNode = inputFragment.getPlanRoot();
Plan child = filter.child();
while (child instanceof PhysicalLimit) {
child = ((PhysicalLimit<?>) child).child();
}
if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode
// this means we have filter->limit->project, need a SelectNode
|| child instanceof PhysicalProject) {
// the three nodes don't support conjuncts, need create a SelectNode to filter data
// the three nodes don't support conjuncts, need create a SelectNode to filter data
if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode) {
SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), planNode);
selectNode.setNereidsId(filter.getId());
addConjunctsToPlanNode(filter, selectNode, context);
addPlanRoot(inputFragment, selectNode, filter);
} else {
if (!(filter.child(0) instanceof AbstractPhysicalJoin)) {
// already have filter on this node, we should not override it, so need a new node
if (!planNode.getConjuncts().isEmpty()
// already have project on this node, filter need execute after project, so need a new node
|| CollectionUtils.isNotEmpty(planNode.getProjectList())
// already have limit on this node, filter need execute after limit, so need a new node
|| planNode.hasLimit()) {
planNode = new SelectNode(context.nextPlanNodeId(), planNode);
planNode.setNereidsId(filter.getId());
addPlanRoot(inputFragment, planNode, filter);
}
addConjunctsToPlanNode(filter, planNode, context);
updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), filter);
}
}
updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), filter);
// in ut, filter.stats may be null
if (filter.getStats() != null) {
inputFragment.getPlanRoot().setCardinalityAfterFilter((long) filter.getStats().getRowCount());
Expand Down Expand Up @@ -1893,8 +1903,15 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
}

PlanFragment inputFragment = project.child(0).accept(this, context);

PlanNode inputPlanNode = inputFragment.getPlanRoot();
// this means already have project on this node, filter need execute after project, so need a new node
if (CollectionUtils.isNotEmpty(inputPlanNode.getProjectList())) {
SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), inputPlanNode);
selectNode.setNereidsId(project.getId());
addPlanRoot(inputFragment, selectNode, project);
inputPlanNode = selectNode;
}

List<Expr> projectionExprs = null;
List<Expr> allProjectionExprs = Lists.newArrayList();
List<Slot> slots = null;
Expand Down Expand Up @@ -1931,6 +1948,11 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
multiCastDataSink.getDataStreamSinks().size() - 1);
if (CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) {
String errMsg = "generate invalid plan \n" + project.treeString();
LOG.warn(errMsg);
throw new AnalysisException(errMsg);
}
TupleDescriptor projectionTuple = generateTupleDesc(slots, null, context);
dataStreamSink.setProjections(projectionExprs);
dataStreamSink.setOutputTupleDesc(projectionTuple);
Expand Down