diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala index 8b72fcf08..37657c4b4 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala @@ -308,3 +308,81 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan { throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") } } + +//TODO(): consolidate locallimit and globallimit +case class ColumnarLocalLimitExec(limit: Int, child: SparkPlan) extends LimitExec { + // updating nullability to make all the children consistent + + buildCheck() + + def buildCheck(): Unit = { + for (child <- children) { + for (schema <- child.schema) { + try { + ConverterUtils.checkIfTypeSupported(schema.dataType) + } catch { + case e: UnsupportedOperationException => + throw new UnsupportedOperationException( + s"${schema.dataType} is not supported in ColumnarLocalLimitExec") + } + } + } + } + + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + //override def outputPartitioning: Partitioning = child.outputPartitioning + + override def supportsColumnar = true + override def output: Seq[Attribute] = child.output + protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { + child.executeColumnar().mapPartitions { iter => + iter.take(limit) + } + } + + protected override def doExecute() + : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = { + throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") + } +} + +case class ColumnarGlobalLimitExec(limit: Int, child: SparkPlan) extends LimitExec { + // updating nullability to make all the children consistent + + buildCheck() + + def buildCheck(): Unit = { + for (child <- children) { + for (schema <- child.schema) { + try { + ConverterUtils.checkIfTypeSupported(schema.dataType) + } catch { + case e: UnsupportedOperationException => + throw new UnsupportedOperationException( + s"${schema.dataType} is not supported in ColumnarGlobalLimitExec") + } + } + } + } + + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + //override def outputPartitioning: Partitioning = child.outputPartitioning + //override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + + override def supportsColumnar = true + override def output: Seq[Attribute] = child.output + protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { + child.executeColumnar().mapPartitions { iter => + iter.take(limit) + } + } + + protected override def doExecute() + : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = { + throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") + } +} \ No newline at end of file diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index 6a7f5b641..d123b563b 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -107,6 +107,14 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { val children = plan.children.map(replaceWithColumnarPlan) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") ColumnarUnionExec(children) + case plan: LocalLimitExec => + val child = replaceWithColumnarPlan(plan.child) + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + ColumnarLocalLimitExec(plan.limit, child) + case plan: GlobalLimitExec => + val child = replaceWithColumnarPlan(plan.child) + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + ColumnarGlobalLimitExec(plan.limit, child) case plan: ExpandExec => val child = replaceWithColumnarPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")