Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions sql/core/benchmarks/CSVBenchmark-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
================================================================================================
Benchmark to measure CSV read/write performance
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Parsing quoted values: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
One quoted string 64733 / 64839 0.0 1294653.1 1.0X

OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Select 1000 columns 185609 / 189735 0.0 185608.6 1.0X
Select 100 columns 50195 / 51808 0.0 50194.8 3.7X
Select one column 39266 / 39293 0.0 39265.6 4.7X
count() 10959 / 11000 0.1 10958.5 16.9X

OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Select 10 columns + count() 24637 / 24768 0.4 2463.7 1.0X
Select 1 column + count() 20026 / 20076 0.5 2002.6 1.2X
count() 3754 / 3877 2.7 375.4 6.6X

Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,31 @@
*/
package org.apache.spark.sql.execution.datasources.csv

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{Column, Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.{Column, Row}
import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types._

/**
* Benchmark to measure CSV read/write performance.
* To run this:
* spark-submit --class <this class> --jars <spark sql test jar>
* To run this benchmark:
* {{{
* 1. without sbt:
* bin/spark-submit --class <this class> --jars <spark core test jar>,
* <spark catalyst test jar> <spark sql test jar>
* 2. build/sbt "sql/test:runMain <this class>"
* 3. generate result:
* SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
* Results will be written to "benchmarks/CSVBenchmark-results.txt".
* }}}
*/
object CSVBenchmarks extends SQLHelper {
val conf = new SparkConf()

val spark = SparkSession.builder
.master("local[1]")
.appName("benchmark-csv-datasource")
.config(conf)
.getOrCreate()

object CSVBenchmark extends SqlBasedBenchmark {
import spark.implicits._

def quotedValuesBenchmark(rowsNum: Int, numIters: Int): Unit = {
val benchmark = new Benchmark(s"Parsing quoted values", rowsNum)
val benchmark = new Benchmark(s"Parsing quoted values", rowsNum, output = output)

withTempPath { path =>
val str = (0 until 10000).map(i => s""""$i"""").mkString(",")
Expand All @@ -56,20 +57,13 @@ object CSVBenchmarks extends SQLHelper {
ds.filter((_: Row) => true).count()
}

/*
Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz

Parsing quoted values: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------
One quoted string 30273 / 30549 0.0 605451.2 1.0X
*/
benchmark.run()
}
}

def multiColumnsBenchmark(rowsNum: Int): Unit = {
val colsNum = 1000
val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum)
val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum, output = output)

withTempPath { path =>
val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType))
Expand Down Expand Up @@ -98,23 +92,14 @@ object CSVBenchmarks extends SQLHelper {
ds.count()
}

/*
Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz

Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------
Select 1000 columns 81091 / 81692 0.0 81090.7 1.0X
Select 100 columns 30003 / 34448 0.0 30003.0 2.7X
Select one column 24792 / 24855 0.0 24792.0 3.3X
count() 24344 / 24642 0.0 24343.8 3.3X
*/
benchmark.run()
}
}

def countBenchmark(rowsNum: Int): Unit = {
val colsNum = 10
val benchmark = new Benchmark(s"Count a dataset with $colsNum columns", rowsNum)
val benchmark =
new Benchmark(s"Count a dataset with $colsNum columns", rowsNum, output = output)

withTempPath { path =>
val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType))
Expand All @@ -137,22 +122,15 @@ object CSVBenchmarks extends SQLHelper {
ds.count()
}

/*
Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz

Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------
Select 10 columns + count() 12598 / 12740 0.8 1259.8 1.0X
Select 1 column + count() 7960 / 8175 1.3 796.0 1.6X
count() 2332 / 2386 4.3 233.2 5.4X
*/
benchmark.run()
}
}

def main(args: Array[String]): Unit = {
quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3)
multiColumnsBenchmark(rowsNum = 1000 * 1000)
countBenchmark(10 * 1000 * 1000)
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("Benchmark to measure CSV read/write performance") {
quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3)
multiColumnsBenchmark(rowsNum = 1000 * 1000)
countBenchmark(10 * 1000 * 1000)
}
}
}