Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ object ColumnPruning extends Rule[LogicalPlan] {
case p @ Project(_, a: Aggregate) if (a.outputSet -- p.references).nonEmpty =>
p.copy(
child = a.copy(aggregateExpressions = a.aggregateExpressions.filter(p.references.contains)))
case a @ Project(_, e @ Expand(_, _, grandChild)) if (e.outputSet -- a.references).nonEmpty =>
case a @ Project(_, e @ Expand(_, _, grandChild))
if (e.outputSet -- a.references).nonEmpty =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is best to avoid spurious changes because it pollutes git blame. I can revert this while merging this time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Got it.

val newOutput = e.output.filter(a.references.contains(_))
val newProjects = e.projections.map { proj =>
proj.zip(e.output).filter { case (e, a) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,14 +519,15 @@ case class Expand(
projections: Seq[Seq[Expression]],
output: Seq[Attribute],
child: LogicalPlan) extends UnaryNode {

override def references: AttributeSet =
AttributeSet(projections.flatten.flatMap(_.references))

override def statistics: Statistics = {
val sizeInBytes = super.statistics.sizeInBytes * projections.length
Statistics(sizeInBytes = sizeInBytes)
}

override protected def validConstraints: Set[Expression] = Set.empty[Expression]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also going to add a comment here to explain why this is empty.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ class ColumnPruningSuite extends PlanTest {
Seq('a, 'b, 'c, 'aa.int, 'gid.int),
input)).analyze
val optimized = Optimize.execute(query)

val expected =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Aggregate(
Seq('aa, 'gid),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,33 @@ class ConstraintPropagationSuite extends SparkFunSuite {
IsNotNull(resolveColumn(aliasedRelation.analyze, "a")))))
}

test("propagating constraints in expand") {
val tr = LocalRelation('a.int, 'b.int, 'c.int)

assert(tr.analyze.constraints.isEmpty)

// We add IsNotNull constraints for 'a, 'b and 'c into LocalRelation
// by creating notNullRelation.
val notNullRelation = tr.where('c.attr > 10 && 'a.attr < 5 && 'b.attr > 2)
verifyConstraints(notNullRelation.analyze.constraints,
ExpressionSet(Seq(resolveColumn(notNullRelation.analyze, "c") > 10,
IsNotNull(resolveColumn(notNullRelation.analyze, "c")),
resolveColumn(notNullRelation.analyze, "a") < 5,
IsNotNull(resolveColumn(notNullRelation.analyze, "a")),
resolveColumn(notNullRelation.analyze, "b") > 2,
IsNotNull(resolveColumn(notNullRelation.analyze, "b")))))

val expand = Expand(
Seq(
Seq('c, Literal.create(null, StringType), 1),
Seq('c, 'a, 2)),
Seq('c, 'a, 'gid.int),
Project(Seq('a, 'c),
notNullRelation))
verifyConstraints(expand.analyze.constraints,
ExpressionSet(Seq.empty[Expression]))
}

test("propagating constraints in aliases") {
val tr = LocalRelation('a.int, 'b.string, 'c.int)

Expand Down