Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.scalatest.Suite
import org.scalatest.Tag

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
Expand Down Expand Up @@ -57,7 +56,7 @@ trait CodegenInterpretedPlanTest extends PlanTest {
* Provides helper methods for comparing plans, but without the overhead of
* mandating a FunSuite.
*/
trait PlanTestBase extends PredicateHelper { self: Suite =>
trait PlanTestBase extends PredicateHelper with SQLHelper { self: Suite =>

// TODO(gatorsmile): remove this from PlanTest and all the analyzer rules
protected def conf = SQLConf.get
Expand Down Expand Up @@ -174,32 +173,4 @@ trait PlanTestBase extends PredicateHelper { self: Suite =>
plan1 == plan2
}
}

/**
* Sets all SQL configurations specified in `pairs`, calls `f`, and then restores all SQL
* configurations.
*/
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
val conf = SQLConf.get
val (keys, values) = pairs.unzip
val currentValues = keys.map { key =>
if (conf.contains(key)) {
Some(conf.getConfString(key))
} else {
None
}
}
(keys, values).zipped.foreach { (k, v) =>
if (SQLConf.staticConfKeys.contains(k)) {
throw new AnalysisException(s"Cannot modify the value of a static config: $k")
}
conf.setConfString(k, v)
}
try f finally {
keys.zip(currentValues).foreach {
case (key, Some(value)) => conf.setConfString(key, value)
case (key, None) => conf.unsetConf(key)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.plans

import java.io.File

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils

trait SQLHelper {

/**
* Sets all SQL configurations specified in `pairs`, calls `f`, and then restores all SQL
* configurations.
*/
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
val conf = SQLConf.get
val (keys, values) = pairs.unzip
val currentValues = keys.map { key =>
if (conf.contains(key)) {
Some(conf.getConfString(key))
} else {
None
}
}
(keys, values).zipped.foreach { (k, v) =>
if (SQLConf.staticConfKeys.contains(k)) {
throw new AnalysisException(s"Cannot modify the value of a static config: $k")
}
conf.setConfString(k, v)
}
try f finally {
keys.zip(currentValues).foreach {
case (key, Some(value)) => conf.setConfString(key, value)
case (key, None) => conf.unsetConf(key)
}
}
}

/**
* Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
* a file/directory is created there by `f`, it will be delete after `f` returns.
*/
protected def withTempPath(f: File => Unit): Unit = {
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,25 @@ package org.apache.spark.sql.execution.benchmark
import java.io.File

import scala.collection.JavaConverters._
import scala.util.{Random, Try}
import scala.util.Random

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.execution.datasources.parquet.{SpecificParquetRecordReaderBase, VectorizedParquetRecordReader}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnVector
import org.apache.spark.util.Utils


/**
* Benchmark to measure data source read performance.
* To run this:
* spark-submit --class <this class> <spark sql test jar>
*/
object DataSourceReadBenchmark {
object DataSourceReadBenchmark extends SQLHelper {
val conf = new SparkConf()
.setAppName("DataSourceReadBenchmark")
// Since `spark.master` always exists, overrides this value
Expand All @@ -54,27 +54,10 @@ object DataSourceReadBenchmark {
spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "true")
spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")

def withTempPath(f: File => Unit): Unit = {
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}

def withTempTable(tableNames: String*)(f: => Unit): Unit = {
try f finally tableNames.foreach(spark.catalog.dropTempView)
}

def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
val (keys, values) = pairs.unzip
val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
(keys, values).zipped.foreach(spark.conf.set)
try f finally {
keys.zip(currentValues).foreach {
case (key, Some(value)) => spark.conf.set(key, value)
case (key, None) => spark.conf.unset(key)
}
}
}
private def prepareTable(dir: File, df: DataFrame, partition: Option[String] = None): Unit = {
val testDf = if (partition.isDefined) {
df.write.partitionBy(partition.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ package org.apache.spark.sql.execution.benchmark

import java.io.File

import scala.util.{Random, Try}
import scala.util.Random

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType, TimestampType}
import org.apache.spark.util.Utils

/**
* Benchmark to measure read performance with Filter pushdown.
Expand All @@ -40,7 +40,7 @@ import org.apache.spark.util.Utils
* Results will be written to "benchmarks/FilterPushdownBenchmark-results.txt".
* }}}
*/
object FilterPushdownBenchmark extends BenchmarkBase {
object FilterPushdownBenchmark extends BenchmarkBase with SQLHelper {

private val conf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
Expand All @@ -60,28 +60,10 @@ object FilterPushdownBenchmark extends BenchmarkBase {

private val spark = SparkSession.builder().config(conf).getOrCreate()

def withTempPath(f: File => Unit): Unit = {
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}

def withTempTable(tableNames: String*)(f: => Unit): Unit = {
try f finally tableNames.foreach(spark.catalog.dropTempView)
}

def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
val (keys, values) = pairs.unzip
val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
(keys, values).zipped.foreach(spark.conf.set)
try f finally {
keys.zip(currentValues).foreach {
case (key, Some(value)) => spark.conf.set(key, value)
case (key, None) => spark.conf.unset(key)
}
}
}

private def prepareTable(
dir: File, numRows: Int, width: Int, useStringForValue: Boolean): Unit = {
import spark.implicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@
*/
package org.apache.spark.sql.execution.datasources.csv

import java.io.File

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{Column, Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

/**
* Benchmark to measure CSV read/write performance.
* To run this:
* spark-submit --class <this class> --jars <spark sql test jar>
*/
object CSVBenchmarks {
object CSVBenchmarks extends SQLHelper {
val conf = new SparkConf()

val spark = SparkSession.builder
Expand All @@ -40,12 +38,6 @@ object CSVBenchmarks {
.getOrCreate()
import spark.implicits._

def withTempPath(f: File => Unit): Unit = {
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}

def quotedValuesBenchmark(rowsNum: Int, numIters: Int): Unit = {
val benchmark = new Benchmark(s"Parsing quoted values", rowsNum)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import java.io.File
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

/**
* The benchmarks aims to measure performance of JSON parsing when encoding is set and isn't.
* To run this:
* spark-submit --class <this class> --jars <spark sql test jar>
*/
object JSONBenchmarks {
object JSONBenchmarks extends SQLHelper {
val conf = new SparkConf()

val spark = SparkSession.builder
Expand All @@ -40,13 +40,6 @@ object JSONBenchmarks {
.getOrCreate()
import spark.implicits._

def withTempPath(f: File => Unit): Unit = {
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}


def schemaInferring(rowsNum: Int): Unit = {
val benchmark = new Benchmark("JSON schema inferring", rowsNum)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils

abstract class CheckpointFileManagerTests extends SparkFunSuite {
abstract class CheckpointFileManagerTests extends SparkFunSuite with SQLHelper {

def createManager(path: Path): CheckpointFileManager

Expand Down Expand Up @@ -88,12 +88,6 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite {
fm.delete(path) // should not throw exception
}
}

protected def withTempPath(f: File => Unit): Unit = {
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}
}

class CheckpointFileManagerSuite extends SparkFunSuite with SharedSparkSession {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.FilterExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.UninterruptibleThread
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -167,18 +166,6 @@ private[sql] trait SQLTestUtilsBase
super.withSQLConf(pairs: _*)(f)
}

/**
* Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
* a file/directory is created there by `f`, it will be delete after `f` returns.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withTempPath(f: File => Unit): Unit = {
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}

/**
* Copy file in jar's resource to a temp file, then pass it to `f`.
* This function is used to make `f` can use the path of temp file(e.g. file:/), instead of
Expand Down
Loading