Skip to content

Commit

Permalink
[NSE-782] backport master changes to 1.3.1 branch (oap-project#814)
Browse files Browse the repository at this point in the history
* [NSE-772] Code refactor for ColumnarBatchScan (oap-project#805)

* Override doCanonicalize in ColumnarBatchScanExec

* Refactor ColumnarBatchScan

* [NSE-778] Failed to find include file while running code gen (oap-project#779)

* fix can not find include file

* add maven test content back

Co-authored-by: PHILO-HE <feilong.he@intel.com>
Co-authored-by: Jacky Lee <lijunqing@baidu.com>
  • Loading branch information
3 people authored Mar 30, 2022
1 parent ad756df commit 9224df6
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,24 @@ private static void loadIncludeFromJar(String tmp_dir) throws IOException, Illeg
tmp_dir = System.getProperty("java.io.tmpdir");
}
final String folderToLoad = "include";
final URLConnection urlConnection = JniUtils.class.getClassLoader().getResource("include").openConnection();
if (urlConnection instanceof JarURLConnection) {
final JarFile jarFile = ((JarURLConnection) urlConnection).getJarFile();
extractResourcesToDirectory(jarFile, folderToLoad, tmp_dir + "/" + "nativesql_include");
// only find all include file in the jar that contains JniUtils.class
final String jarPath =
JniUtils.class.getProtectionDomain().getCodeSource().getLocation().getPath();
if (jarPath.endsWith(".jar")) {
extractResourcesToDirectory(
new JarFile(new File(jarPath)), folderToLoad, tmp_dir + "/" + "nativesql_include");
} else {
// For Maven test only
final URLConnection urlConnection =
JniUtils.class.getClassLoader().getResource("include").openConnection();
String path = urlConnection.getURL().toString();
if (urlConnection.getURL().toString().startsWith("file:")) {
// remove the prefix of "file:" from includePath
path = urlConnection.getURL().toString().substring(5);
}
final File folder = new File(path);
copyResourcesToDirectory(urlConnection,
tmp_dir + "/" + "nativesql_include", folder);
tmp_dir + "/" + "nativesql_include", folder);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.execution

import com.intel.oap.GazellePluginConfig
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Literal, _}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.read.{Scan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.vectorized.ColumnarBatch


class ColumnarBatchScanExec(output: Seq[AttributeReference], @transient scan: Scan,
runtimeFilters: Seq[Expression])
extends ColumnarBatchScanExecBase(output, scan, runtimeFilters) {
val tmpDir: String = GazellePluginConfig.getConf.tmpFile
override def supportsColumnar(): Boolean = true
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "input_batches"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "output_batches"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime_batchscan"),
"inputSize" -> SQLMetrics.createSizeMetric(sparkContext, "input size in bytes"))

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
val numOutputBatches = longMetric("numOutputBatches")
val scanTime = longMetric("scanTime")
val inputSize = longMetric("inputSize")
val inputColumnarRDD =
new ColumnarDataSourceRDD(sparkContext, partitions, readerFactory,
true, scanTime, numInputBatches, inputSize, tmpDir)
inputColumnarRDD.map { r =>
numOutputRows += r.numRows()
numOutputBatches += 1
r
}
}

override def doCanonicalize(): ColumnarBatchScanExec = {
if (runtimeFilters == null) {
// For spark3.1.
new ColumnarBatchScanExec(output.map(QueryPlan.normalizeExpressions(_, output)), scan, null)
} else {
// For spark3.2.
new ColumnarBatchScanExec(
output.map(QueryPlan.normalizeExpressions(_, output)), scan,
QueryPlan.normalizePredicates(
runtimeFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)),
output))
}
}

override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarBatchScanExec]

override def equals(other: Any): Boolean = other match {
case that: ColumnarBatchScanExec =>
(that canEqual this) && super.equals(that)
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import com.intel.oap.sql.shims.SparkShimLoader

import org.apache.spark.{MapOutputStatistics, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.BuildLeft
Expand All @@ -51,7 +50,6 @@ import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, ColumnarArrowEvalPythonExec}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.spark.util.ShufflePartitionUtils

Expand Down Expand Up @@ -90,26 +88,7 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
case plan: BatchScanExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val runtimeFilters = SparkShimLoader.getSparkShims.getRuntimeFilters(plan)
new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters) {
// This method is a commonly shared implementation for ColumnarBatchScanExec.
// We move it outside of shim layer to break the cyclic dependency caused by
// ColumnarDataSourceRDD.
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
val numOutputBatches = longMetric("numOutputBatches")
val scanTime = longMetric("scanTime")
val inputSize = longMetric("inputSize")
val inputColumnarRDD =
new ColumnarDataSourceRDD(sparkContext, partitions, readerFactory,
true, scanTime, numInputBatches, inputSize, tmpDir)
inputColumnarRDD.map { r =>
numOutputRows += r.numRows()
numOutputBatches += 1
r
}
}
}
new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters)
case plan: CoalesceExec =>
ColumnarCoalesceExec(plan.numPartitions, replaceWithColumnarPlan(plan.child))
case plan: InMemoryTableScanExec =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.ArrowEvalPythonExec
import org.apache.spark.sql.execution.python.ColumnarArrowEvalPythonExec
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.vectorized.ColumnarBatch

case class RowGuard(child: SparkPlan) extends SparkPlan {
def output: Seq[Attribute] = child.output
Expand Down Expand Up @@ -81,26 +80,7 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
case plan: BatchScanExec =>
if (!enableColumnarBatchScan) return false
val runtimeFilters = SparkShimLoader.getSparkShims.getRuntimeFilters(plan)
new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters) {
// This method is a commonly shared implementation for ColumnarBatchScanExec.
// We move it outside of shim layer to break the cyclic dependency caused by
// ColumnarDataSourceRDD.
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
val numOutputBatches = longMetric("numOutputBatches")
val scanTime = longMetric("scanTime")
val inputSize = longMetric("inputSize")
val inputColumnarRDD =
new ColumnarDataSourceRDD(sparkContext, partitions, readerFactory,
true, scanTime, numInputBatches, inputSize, tmpDir)
inputColumnarRDD.map { r =>
numOutputRows += r.numRows()
numOutputBatches += 1
r
}
}
}
new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters)
case plan: FileSourceScanExec =>
if (plan.supportsColumnar) {
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.BasePythonRunnerChild
import org.apache.spark.util.Utils

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.execution

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.connector.read.Scan

/** For spark 3.1, the runtimeFilters: Seq[Expression] is not introduced in BatchScanExec.
*/
abstract class ColumnarBatchScanExecBase(output: Seq[AttributeReference], @transient scan: Scan,
runtimeFilters: Seq[Expression])
extends BatchScanExec(output, scan) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.intel.oap.sql.shims.spark311

import com.intel.oap.execution.ColumnarBatchScanExec
import com.intel.oap.spark.sql.ArrowWriteQueue
import com.intel.oap.sql.shims.{ShimDescriptor, SparkShims}
import java.io.File
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.execution

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.connector.read.Scan

/**
* This class is used to adapt to spark3.2 BatchScanExec with runtimeFilters.
*/
abstract class ColumnarBatchScanExecBase(output: Seq[AttributeReference], @transient scan: Scan,
runtimeFilters: Seq[Expression])
extends BatchScanExec(output, scan, runtimeFilters) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.intel.oap.sql.shims.spark321

import com.intel.oap.execution.ColumnarBatchScanExec
import com.intel.oap.spark.sql.ArrowWriteQueue
import com.intel.oap.sql.shims.{ShimDescriptor, SparkShims}
import java.io.File
Expand Down

0 comments on commit 9224df6

Please sign in to comment.