Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
implement columnar limit
Browse files Browse the repository at this point in the history
Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
  • Loading branch information
zhouyuan committed Nov 24, 2021
1 parent 9bc56ff commit 2a412a4
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,81 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}
}

//TODO(): consolidate locallimit and globallimit
case class ColumnarLocalLimitExec(limit: Int, child: SparkPlan) extends LimitExec {
// updating nullability to make all the children consistent

buildCheck()

def buildCheck(): Unit = {
for (child <- children) {
for (schema <- child.schema) {
try {
ConverterUtils.checkIfTypeSupported(schema.dataType)
} catch {
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${schema.dataType} is not supported in ColumnarLocalLimitExec")
}
}
}
}


override def outputOrdering: Seq[SortOrder] = child.outputOrdering

//override def outputPartitioning: Partitioning = child.outputPartitioning

override def supportsColumnar = true
override def output: Seq[Attribute] = child.output
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
child.executeColumnar().mapPartitions { iter =>
iter.take(limit)
}
}

protected override def doExecute()
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}
}

case class ColumnarGlobalLimitExec(limit: Int, child: SparkPlan) extends LimitExec {
// updating nullability to make all the children consistent

buildCheck()

def buildCheck(): Unit = {
for (child <- children) {
for (schema <- child.schema) {
try {
ConverterUtils.checkIfTypeSupported(schema.dataType)
} catch {
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${schema.dataType} is not supported in ColumnarGlobalLimitExec")
}
}
}
}


override def outputOrdering: Seq[SortOrder] = child.outputOrdering

//override def outputPartitioning: Partitioning = child.outputPartitioning
//override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil

override def supportsColumnar = true
override def output: Seq[Attribute] = child.output
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
child.executeColumnar().mapPartitions { iter =>
iter.take(limit)
}
}

protected override def doExecute()
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
val children = plan.children.map(replaceWithColumnarPlan)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarUnionExec(children)
case plan: LocalLimitExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarLocalLimitExec(plan.limit, child)
case plan: GlobalLimitExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarGlobalLimitExec(plan.limit, child)
case plan: ExpandExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
Expand Down

0 comments on commit 2a412a4

Please sign in to comment.